1. 为什么需要数据同步?

当我们在电商平台使用Elasticsearch(ES)实现商品搜索时,原始数据可能存储在MySQL中。但ES的索引数据需要定期更新,这时候就会遇到数据同步问题。常见场景包括:

  • 数据分析:需要将ES的聚合结果写入Hive做离线分析
  • 灾备恢复:建立MySQL的实时副本
  • 合规审计:同步操作日志到关系型数据库
  • 多系统协作:将ES中的用户行为数据同步到Redis做实时推荐

去年我们团队就遇到过这样的问题:由于ES和MySQL的数据不同步,导致用户看到的库存信息和实际数据库相差20%,直接引发订单纠纷。这促使我们深入研究数据同步的各种方案。


2. 同步方案全景图(技术栈:Logstash)

input {
  elasticsearch {
    hosts => ["http://es-node1:9200"]
    index => "products"
    query => '{ "query": { "range": { "@timestamp": { "gte": "now-1h" }}}}'
  }
}

filter {
  # 转换价格字段格式
  mutate {
    convert => { "price" => "float" }
  }
  
  # 处理空值
  if [description] == "" {
    mutate {
      replace => { "description" => "暂无描述" }
    }
  }
}

output {
  jdbc {
    driver_class => "com.mysql.cj.jdbc.Driver"
    connection_string => "jdbc:mysql://mysql01:3306/warehouse?useSSL=false"
    username => "sync_user"
    password => "Sync@2023"
    statement => [
      "INSERT INTO product_sync (id,name,price) VALUES (?,?,?) 
       ON DUPLICATE KEY UPDATE name=VALUES(name), price=VALUES(price)",
      "id", "name", "price"
    ]
  }
}

▲ 注释说明:

  1. 每小时增量同步ES的products索引
  2. 自动转换价格字段类型
  3. 使用MySQL的UPSERT语法避免重复数据
  4. 需要提前在MySQL创建product_sync表

3. 实时同步的进阶方案(技术栈:Kafka+Debezium)

// Kafka生产者配置类
@Configuration
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
                       StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                       StringSerializer.class);
        // 开启GZIP压缩
        configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip"); 
        return new DefaultKafkaProducerFactory<>(configProps);
    }
}

// ES监听器
@ElasticsearchListener(topics = "product_change")
public void handleChange(ConsumerRecord<String, ChangeEvent> record) {
    ChangeEvent event = record.value();
    if ("UPDATE".equals(event.getOp())) {
        mysqlTemplate.update(
            "UPDATE products SET stock=? WHERE id=?",
            event.getAfter().getStock(),
            event.getAfter().getId()
        );
    }
}

▲ 注释说明:

  1. 使用Debezium捕获ES数据变更
  2. Kafka消息采用GZIP压缩降低带宽消耗
  3. 通过注解方式实现事件监听
  4. 保证最终一致性而非强一致性

4. 同步架构的"隐藏BOSS"——Canal方案

# canal.properties
canal.serverMode = kafka
canal.mq.topic=es_to_mysql
canal.mq.partition=0

# ES连接配置
canal.es.cluster.name=production
canal.es.nodes=es01:9300,es02:9300

# MySQL连接池
canal.instance.mysql.slaveId=1234
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal@2023
canal.instance.connectionCharset=UTF-8
canal.instance.enableDruid=true

▲ 注释说明:

  1. 采用Kafka作为消息中间件
  2. 支持多ES节点集群
  3. 使用Druid连接池管理MySQL连接
  4. 需要配置MySQL的binlog格式为ROW

5. 各方案对比评测

指标 Logstash方案 Kafka方案 Canal方案
实时性 分钟级 秒级 秒级
资源消耗 高(JVM)
数据一致性 最终 最终
开发复杂度
运维成本

典型故障案例:某金融系统使用Logstash同步交易记录时,因网络抖动导致数据序列错乱,最终通过以下方案修复:

-- 数据修复SQL示例
UPDATE transactions t
JOIN (
    SELECT id, MAX(version) as max_version 
    FROM transaction_sync 
    GROUP BY id
) tmp ON t.id = tmp.id
SET t.amount = tmp.amount
WHERE t.version < tmp.max_version;

6. 血泪经验总结

  1. 数据一致性:采用版本号校验机制
// 版本冲突处理
try {
    esClient.index(request);
} catch (VersionConflictEngineException e) {
    log.warn("版本冲突: {}", e.getMessage());
    // 从MySQL重新获取最新数据
    Product latest = mysqlMapper.selectById(product.getId());
    esClient.index(updateRequest(latest)); 
}
  1. 性能调优:批量操作提升10倍效率
# Python批量写入示例(使用elasticsearch-py)
actions = [
    {
        "_op_type": "update",
        "_index": "products",
        "_id": p["id"],
        "doc": {"stock": p["stock"]}
    } for p in changed_products
]

helpers.bulk(es, actions, max_retries=3)
  1. 监控预警:配置Prometheus监控指标
# prometheus配置片段
- job_name: 'es_sync'
  metrics_path: '/_prometheus/metrics'
  static_configs:
    - targets: ['sync-server:9200']
  params:
    group_by: [index]

7. 未来演进方向

  1. 基于Flink的流批一体同步架构
  2. 智能路由:根据数据类型选择最佳存储
  3. 自动修复:利用机器学习检测数据异常