一、为什么需要数据迁移?

上周五凌晨3点,我接到某电商平台运维负责人的紧急电话:"我们的ES集群磁盘即将爆满,新采购的服务器已经到货,必须在天亮前完成数据迁移!"这样的场景在数据量激增的互联网公司中并不罕见。以下是典型的数据迁移需求场景:

  • 硬件升级:从机械硬盘升级到NVMe固态
  • 架构调整:单集群改造成多可用区部署
  • 版本迭代:ES 6.x升级到7.x版本
  • 成本优化:从自建机房迁移到云服务商

二、环境检查与准备工作

1. 新旧集群环境比对(以ES 7.17.9为例)

curl -XGET 'http://old-node:9200/_cluster/settings?include_defaults=true'

# 新集群健康检查
curl -XGET 'http://new-node:9200/_cluster/health?pretty'

# 版本兼容性验证(必须确保主版本号一致)
grep "version" /var/lib/elasticsearch/nodes/0/*.json

2. 磁盘容量黄金法则

总数据量 × 1.5 ≤ 新集群总存储
(保留50%空间用于副本和写入缓冲)

三、四类迁移方案深度解析

方案1:快照恢复(推荐方案)

适用场景:跨机房/跨云迁移、版本升级

# 创建共享文件仓库(需提前配置所有节点)
PUT _snapshot/migration_repo
{
  "type": "fs",
  "settings": {
    "location": "/mnt/nfs_share/es_backup",
    "max_snapshot_bytes_per_sec": "100mb",
    "max_restore_bytes_per_sec": "200mb"
  }
}

# 创建快照(含所有索引)
PUT _snapshot/migration_repo/snapshot_202308?wait_for_completion=true

# 新集群恢复(注意分片分配规则)
POST _snapshot/migration_repo/snapshot_202308/_restore
{
  "indices": "*",
  "include_global_state": false,
  "index_settings": {
    "index.number_of_replicas": 1
  },
  "ignore_index_settings": ["index.routing.allocation.require._name"]
}

避坑指南

  1. 文件仓库路径需要777权限
  2. 网络带宽需≥迁移数据量/(业务允许停机时间×0.7)
  3. 快照版本必须≤ES主版本

方案2:Reindex API远程操作

适用场景:小规模索引迁移、字段结构调整

# 使用Python客户端执行跨集群reindex(ES 7.x+)
from elasticsearch import Elasticsearch

old_es = Elasticsearch(["http://old-node:9200"], timeout=30)
new_es = Elasticsearch(["http://new-node:9200"], timeout=30)

body = {
  "source": {
    "remote": {
      "host": "http://old-node:9200",
      "username": "admin",
      "password": "pass123"
    },
    "index": "order_index",
    "size": 5000,
    "query": {
      "range": {"@timestamp": {"gte": "now-30d"}}
    }
  },
  "dest": {
    "index": "order_index_new",
    "op_type": "create"
  }
}

response = new_es.reindex(body=body, requests_per_second=1000)
print(f"迁移进度:{response['task']}")

性能优化技巧

  • 设置scroll_size=5000-10000平衡吞吐与内存
  • 使用slice参数并行处理
  • 关闭目标索引的refresh_interval

方案3:Logstash管道迁移

适用场景:异构数据源同步、复杂ETL处理

# logstash_es_to_es.conf
input {
  elasticsearch {
    hosts => ["http://old-node:9200"]
    index => "user_behavior"
    query => '{ "query": { "range": { "timestamp": { "gte": "now-7d" } } } }'
    size => 1000
    scroll => "5m"
    docinfo => true
  }
}

filter {
  # 示例:转换字段类型
  mutate {
    convert => { "user_id" => "integer" }
    remove_field => ["@version", "@timestamp"]
  }
}

output {
  elasticsearch {
    hosts => ["http://new-node:9200"]
    index => "%{[@metadata][_index]}_v2"
    document_id => "%{[@metadata][_id]}"
    retry_on_conflict => 3
  }
}

方案4:Segment文件直拷(高阶操作)

适用场景:完全同版本、同配置的集群

# 旧集群执行(确保无写入操作)
curl -XPOST 'http://old-node:9200/_flush/synced'

# 物理拷贝数据文件
rsync -avz --progress /old_path/data /new_path/data

# 新集群重启前的必要操作
chown -R elasticsearch:elasticsearch /new_path/data
rm -rf /new_path/data/*.lock

危险操作预警

  1. 必须确保JVM版本完全一致
  2. 集群名称需要修改
  3. 需要重建索引元数据

四、迁移后的关键验证步骤

  1. 文档数比对
# 新旧集群文档数校验
diff <(curl -s old-node:9200/_cat/count?v) <(curl -s new-node:9200/_cat/count?v)
  1. 字段映射验证
GET /_mapping
{
  "user_index": {
    "mappings": {
      "properties": {
        "geoip": {
          "type": "geo_point"  // 确保地理位置字段类型正确
        }
      }
    }
  }
}
  1. 查询一致性测试
# 使用DSL查询对比结果
from deepdiff import DeepDiff

old_result = old_es.search(index="log_index", body={"query":{"match_all":{}}})
new_result = new_es.search(index="log_index", body={"query":{"match_all":{}}})

diff = DeepDiff(old_result['hits'], new_result['hits'], ignore_order=True)
print(f"数据差异:{diff if diff else '无差异'}")

五、技术方案对比矩阵

维度 快照恢复 Reindex Logstash 文件拷贝
迁移速度 ★★★★☆ ★★☆☆☆ ★★☆☆☆ ★★★★★
网络消耗
版本兼容性 严格 宽松 宽松 极严格
可中断恢复 支持 不支持 支持 不支持
适用数据量 TB级 GB级 GB级 任意

六、血泪教训总结

  1. 带宽陷阱:某次迁移预估需4小时,实际花费20小时(发现IDC交换机限速)
  2. 版本灾难:ES 7.8→7.10升级导致分词器不兼容
  3. 认证漏洞:忘记同步X-Pack角色配置导致生产权限泄露
  4. 监控盲区:未设置indices.recovery.max_bytes_per_sec引发业务抖动