1. 当消息重复时会发生什么?

想象这样一个场景:你正在开发一个电商平台的支付系统。当用户点击支付按钮时,系统通过RabbitMQ发送支付请求到结算服务。但网络抖动导致生产者重复发送了两条相同消息,结算服务连续扣款两次——这绝对是个灾难!

这就是典型的幂等性问题。消息中间件虽然提供At Least Once的可靠性保证,但恰恰是这种机制可能导致消费者收到重复消息。就像网购时的重复下单,我们需要确保系统具备"天然免疫力"。

2. 消息幂等的核心逻辑

实现消息幂等性的核心在于唯一标识+状态记录。具体流程就像超市的快递柜:

// C#伪代码示例
public void ProcessMessage(Message message)
{
    // 获取消息指纹(类似取件码)
    var messageId = GetMessageId(message);
    
    // 检查快递柜是否已有包裹
    if (!_processedMessages.Contains(messageId)) 
    {
        // 执行业务逻辑
        HandleBusiness(message);
        
        // 存入处理记录(放入快递柜)
        _processedMessages.Add(messageId);
    }
}

3. 四种实用实现方案

3.1 消费者ACK机制

就像外卖小哥确认送达一样,正确处理后再签收:

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    try
    {
        // 业务处理...
        channel.BasicAck(ea.DeliveryTag, false);
    }
    catch
    {
        channel.BasicNack(ea.DeliveryTag, false, true);
    }
};

注意点:必须关闭自动ACK,手动确认才能保证可靠性

3.2 唯一ID方案

为每条消息打造身份证:

// 发送端生成唯一ID
var properties = channel.CreateBasicProperties();
properties.MessageId = Guid.NewGuid().ToString();

// 接收端检查
using var connection = new SqlConnection(_dbConnection);
var exists = connection.QueryFirst<bool>(
    "SELECT COUNT(1) FROM MessageLog WHERE MessageId=@Id", 
    new { Id = messageId });

3.3 版本号控制

适合状态变更类操作,像软件版本更新:

UPDATE products 
SET stock = stock - 1, version = version + 1 
WHERE id = 1001 AND version = 5

3.4 业务状态检查

类似快递查询物流信息:

var order = _orderRepository.Get(orderId);
if (order.Status != OrderStatus.Pending)
{
    _logger.LogWarning("订单{OrderId}已处理,跳过执行", orderId);
    return;
}

4. 不同场景的选型指南

  • 支付系统:推荐唯一ID+数据库记录方案(金融级可靠性)
  • 库存扣减:版本号控制是最佳拍档(天然防超卖)
  • 日志收集:内存去重足够(允许少量重复)
  • IM消息推送:Redis过期方案(兼顾性能与时效)

5. 方案优缺点大比拼

方案 优点 缺点 适用场景
唯一ID 实现简单,通用性强 存储成本高 金融交易类
版本号 无额外存储 仅适用状态变更 库存/积分系统
业务状态检查 零成本 依赖业务设计 订单类系统
Redis过期 高性能 可能短暂重复 时效性要求不高

6. 避坑指南

  1. ID生成器要可靠:推荐Snowflake算法,避免UUID重复
  2. 存储选择要明智:高频场景用Redis,重要数据用数据库
  3. 过期时间设置:根据业务最长时间间隔设置(如支付系统设24小时)
  4. 版本号设计:建议使用long类型避免溢出
  5. 清理策略:定期归档处理记录,防止数据膨胀

7. 实战中的常见问题

案例1:某社交平台使用内存记录ID,重启服务后重复消费 → 解决方案:改用Redis持久化存储

案例2:电商系统使用数据库唯一索引,导致高并发时性能瓶颈 → 解决方案:改用Redis+异步落库

案例3:物流系统误用时间戳做唯一ID,导致批量重复 → 解决方案:改用Snowflake算法

8. 最佳实践总结

  1. 分级处理:核心业务用严格方案,次要业务适当放宽
  2. 监控报警:对重复消息率设置阈值告警
  3. 压力测试:模拟极端情况下的重复消息冲击
  4. 熔断机制:当重复率异常时启动熔断
  5. 文档规范:在消息协议中明确幂等要求
// 综合方案示例:Redis+数据库
public class IdempotentConsumer
{
    private readonly IDatabase _redis;
    private readonly AppDbContext _db;

    public async Task ProcessAsync(Message message)
    {
        var messageId = message.Headers["MessageId"].ToString();
        
        // 第一重检查:Redis快速过滤
        if (await _redis.KeyExistsAsync(messageId))
            return;

        // 第二重检查:数据库兜底
        if (await _db.ProcessedMessages.AnyAsync(m => m.MessageId == messageId))
            return;

        // 处理业务...
        
        // 更新存储(先Redis后数据库)
        await _redis.StringSetAsync(messageId, "1", TimeSpan.FromDays(1));
        await _db.ProcessedMessages.AddAsync(new MessageRecord(messageId));
        await _db.SaveChangesAsync();
    }
}

9. 未来展望

随着Serverless架构兴起,幂等性设计面临新挑战。建议关注:

  • 云服务商提供的原生幂等API
  • 事务型消息中间件的发展
  • 基于区块链的不可重复机制
  • 机器学习预测重复模式

消息幂等性就像系统的免疫系统,平时可能感受不到它的存在,但关键时刻能避免系统"生病"。选择适合的方案,让我们的系统在面对任何消息风暴时都能从容应对!