1. 为什么需要数据同步?
当我们在电商平台使用Elasticsearch(ES)实现商品搜索时,原始数据可能存储在MySQL中。但ES的索引数据需要定期更新,这时候就会遇到数据同步问题。常见场景包括:
- 数据分析:需要将ES的聚合结果写入Hive做离线分析
- 灾备恢复:建立MySQL的实时副本
- 合规审计:同步操作日志到关系型数据库
- 多系统协作:将ES中的用户行为数据同步到Redis做实时推荐
去年我们团队就遇到过这样的问题:由于ES和MySQL的数据不同步,导致用户看到的库存信息和实际数据库相差20%,直接引发订单纠纷。这促使我们深入研究数据同步的各种方案。
2. 同步方案全景图(技术栈:Logstash)
input {
elasticsearch {
hosts => ["http://es-node1:9200"]
index => "products"
query => '{ "query": { "range": { "@timestamp": { "gte": "now-1h" }}}}'
}
}
filter {
# 转换价格字段格式
mutate {
convert => { "price" => "float" }
}
# 处理空值
if [description] == "" {
mutate {
replace => { "description" => "暂无描述" }
}
}
}
output {
jdbc {
driver_class => "com.mysql.cj.jdbc.Driver"
connection_string => "jdbc:mysql://mysql01:3306/warehouse?useSSL=false"
username => "sync_user"
password => "Sync@2023"
statement => [
"INSERT INTO product_sync (id,name,price) VALUES (?,?,?)
ON DUPLICATE KEY UPDATE name=VALUES(name), price=VALUES(price)",
"id", "name", "price"
]
}
}
▲ 注释说明:
- 每小时增量同步ES的products索引
- 自动转换价格字段类型
- 使用MySQL的UPSERT语法避免重复数据
- 需要提前在MySQL创建product_sync表
3. 实时同步的进阶方案(技术栈:Kafka+Debezium)
// Kafka生产者配置类
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String servers;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
// 开启GZIP压缩
configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
return new DefaultKafkaProducerFactory<>(configProps);
}
}
// ES监听器
@ElasticsearchListener(topics = "product_change")
public void handleChange(ConsumerRecord<String, ChangeEvent> record) {
ChangeEvent event = record.value();
if ("UPDATE".equals(event.getOp())) {
mysqlTemplate.update(
"UPDATE products SET stock=? WHERE id=?",
event.getAfter().getStock(),
event.getAfter().getId()
);
}
}
▲ 注释说明:
- 使用Debezium捕获ES数据变更
- Kafka消息采用GZIP压缩降低带宽消耗
- 通过注解方式实现事件监听
- 保证最终一致性而非强一致性
4. 同步架构的"隐藏BOSS"——Canal方案
# canal.properties
canal.serverMode = kafka
canal.mq.topic=es_to_mysql
canal.mq.partition=0
# ES连接配置
canal.es.cluster.name=production
canal.es.nodes=es01:9300,es02:9300
# MySQL连接池
canal.instance.mysql.slaveId=1234
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal@2023
canal.instance.connectionCharset=UTF-8
canal.instance.enableDruid=true
▲ 注释说明:
- 采用Kafka作为消息中间件
- 支持多ES节点集群
- 使用Druid连接池管理MySQL连接
- 需要配置MySQL的binlog格式为ROW
5. 各方案对比评测
指标 | Logstash方案 | Kafka方案 | Canal方案 |
---|---|---|---|
实时性 | 分钟级 | 秒级 | 秒级 |
资源消耗 | 高(JVM) | 中 | 低 |
数据一致性 | 最终 | 最终 | 强 |
开发复杂度 | 低 | 高 | 中 |
运维成本 | 中 | 高 | 低 |
典型故障案例:某金融系统使用Logstash同步交易记录时,因网络抖动导致数据序列错乱,最终通过以下方案修复:
-- 数据修复SQL示例
UPDATE transactions t
JOIN (
SELECT id, MAX(version) as max_version
FROM transaction_sync
GROUP BY id
) tmp ON t.id = tmp.id
SET t.amount = tmp.amount
WHERE t.version < tmp.max_version;
6. 血泪经验总结
- 数据一致性:采用版本号校验机制
// 版本冲突处理
try {
esClient.index(request);
} catch (VersionConflictEngineException e) {
log.warn("版本冲突: {}", e.getMessage());
// 从MySQL重新获取最新数据
Product latest = mysqlMapper.selectById(product.getId());
esClient.index(updateRequest(latest));
}
- 性能调优:批量操作提升10倍效率
# Python批量写入示例(使用elasticsearch-py)
actions = [
{
"_op_type": "update",
"_index": "products",
"_id": p["id"],
"doc": {"stock": p["stock"]}
} for p in changed_products
]
helpers.bulk(es, actions, max_retries=3)
- 监控预警:配置Prometheus监控指标
# prometheus配置片段
- job_name: 'es_sync'
metrics_path: '/_prometheus/metrics'
static_configs:
- targets: ['sync-server:9200']
params:
group_by: [index]
7. 未来演进方向
- 基于Flink的流批一体同步架构
- 智能路由:根据数据类型选择最佳存储
- 自动修复:利用机器学习检测数据异常