一、为什么需要数据迁移?
上周五凌晨3点,我接到某电商平台运维负责人的紧急电话:"我们的ES集群磁盘即将爆满,新采购的服务器已经到货,必须在天亮前完成数据迁移!"这样的场景在数据量激增的互联网公司中并不罕见。以下是典型的数据迁移需求场景:
- 硬件升级:从机械硬盘升级到NVMe固态
- 架构调整:单集群改造成多可用区部署
- 版本迭代:ES 6.x升级到7.x版本
- 成本优化:从自建机房迁移到云服务商
二、环境检查与准备工作
1. 新旧集群环境比对(以ES 7.17.9为例)
curl -XGET 'http://old-node:9200/_cluster/settings?include_defaults=true'
# 新集群健康检查
curl -XGET 'http://new-node:9200/_cluster/health?pretty'
# 版本兼容性验证(必须确保主版本号一致)
grep "version" /var/lib/elasticsearch/nodes/0/*.json
2. 磁盘容量黄金法则
总数据量 × 1.5 ≤ 新集群总存储
(保留50%空间用于副本和写入缓冲)
三、四类迁移方案深度解析
方案1:快照恢复(推荐方案)
适用场景:跨机房/跨云迁移、版本升级
# 创建共享文件仓库(需提前配置所有节点)
PUT _snapshot/migration_repo
{
"type": "fs",
"settings": {
"location": "/mnt/nfs_share/es_backup",
"max_snapshot_bytes_per_sec": "100mb",
"max_restore_bytes_per_sec": "200mb"
}
}
# 创建快照(含所有索引)
PUT _snapshot/migration_repo/snapshot_202308?wait_for_completion=true
# 新集群恢复(注意分片分配规则)
POST _snapshot/migration_repo/snapshot_202308/_restore
{
"indices": "*",
"include_global_state": false,
"index_settings": {
"index.number_of_replicas": 1
},
"ignore_index_settings": ["index.routing.allocation.require._name"]
}
避坑指南:
- 文件仓库路径需要777权限
- 网络带宽需≥迁移数据量/(业务允许停机时间×0.7)
- 快照版本必须≤ES主版本
方案2:Reindex API远程操作
适用场景:小规模索引迁移、字段结构调整
# 使用Python客户端执行跨集群reindex(ES 7.x+)
from elasticsearch import Elasticsearch
old_es = Elasticsearch(["http://old-node:9200"], timeout=30)
new_es = Elasticsearch(["http://new-node:9200"], timeout=30)
body = {
"source": {
"remote": {
"host": "http://old-node:9200",
"username": "admin",
"password": "pass123"
},
"index": "order_index",
"size": 5000,
"query": {
"range": {"@timestamp": {"gte": "now-30d"}}
}
},
"dest": {
"index": "order_index_new",
"op_type": "create"
}
}
response = new_es.reindex(body=body, requests_per_second=1000)
print(f"迁移进度:{response['task']}")
性能优化技巧:
- 设置
scroll_size=5000-10000
平衡吞吐与内存 - 使用
slice
参数并行处理 - 关闭目标索引的refresh_interval
方案3:Logstash管道迁移
适用场景:异构数据源同步、复杂ETL处理
# logstash_es_to_es.conf
input {
elasticsearch {
hosts => ["http://old-node:9200"]
index => "user_behavior"
query => '{ "query": { "range": { "timestamp": { "gte": "now-7d" } } } }'
size => 1000
scroll => "5m"
docinfo => true
}
}
filter {
# 示例:转换字段类型
mutate {
convert => { "user_id" => "integer" }
remove_field => ["@version", "@timestamp"]
}
}
output {
elasticsearch {
hosts => ["http://new-node:9200"]
index => "%{[@metadata][_index]}_v2"
document_id => "%{[@metadata][_id]}"
retry_on_conflict => 3
}
}
方案4:Segment文件直拷(高阶操作)
适用场景:完全同版本、同配置的集群
# 旧集群执行(确保无写入操作)
curl -XPOST 'http://old-node:9200/_flush/synced'
# 物理拷贝数据文件
rsync -avz --progress /old_path/data /new_path/data
# 新集群重启前的必要操作
chown -R elasticsearch:elasticsearch /new_path/data
rm -rf /new_path/data/*.lock
危险操作预警:
- 必须确保JVM版本完全一致
- 集群名称需要修改
- 需要重建索引元数据
四、迁移后的关键验证步骤
- 文档数比对
# 新旧集群文档数校验
diff <(curl -s old-node:9200/_cat/count?v) <(curl -s new-node:9200/_cat/count?v)
- 字段映射验证
GET /_mapping
{
"user_index": {
"mappings": {
"properties": {
"geoip": {
"type": "geo_point" // 确保地理位置字段类型正确
}
}
}
}
}
- 查询一致性测试
# 使用DSL查询对比结果
from deepdiff import DeepDiff
old_result = old_es.search(index="log_index", body={"query":{"match_all":{}}})
new_result = new_es.search(index="log_index", body={"query":{"match_all":{}}})
diff = DeepDiff(old_result['hits'], new_result['hits'], ignore_order=True)
print(f"数据差异:{diff if diff else '无差异'}")
五、技术方案对比矩阵
维度 | 快照恢复 | Reindex | Logstash | 文件拷贝 |
---|---|---|---|---|
迁移速度 | ★★★★☆ | ★★☆☆☆ | ★★☆☆☆ | ★★★★★ |
网络消耗 | 低 | 高 | 高 | 无 |
版本兼容性 | 严格 | 宽松 | 宽松 | 极严格 |
可中断恢复 | 支持 | 不支持 | 支持 | 不支持 |
适用数据量 | TB级 | GB级 | GB级 | 任意 |
六、血泪教训总结
- 带宽陷阱:某次迁移预估需4小时,实际花费20小时(发现IDC交换机限速)
- 版本灾难:ES 7.8→7.10升级导致分词器不兼容
- 认证漏洞:忘记同步X-Pack角色配置导致生产权限泄露
- 监控盲区:未设置
indices.recovery.max_bytes_per_sec
引发业务抖动