1. 消息重复消费的根源剖析

想象你在餐厅点了份牛排,服务员记下订单后转身走向厨房。这时候可能发生两种意外:服务员担心自己没记清楚,又回来跟你确认一遍(生产者重复发送);或者厨师做完菜发现传菜员没响应,于是重新做了一份(消费者重复处理)。RabbitMQ的消息重复消费正是类似的场景,主要源于:

  1. 生产者重复投递(网络闪断触发重试机制)
  2. 消费者处理失败后重新入队(手动ACK未正确使用)
  3. 集群环境下多个消费者同时消费(竞争消费)

笔者曾遇到线上事故:某电商平台在促销期间,因消息重复导致同一用户收到多张满减券,直接经济损失超百万。这让我们意识到正确处理重复消息的重要性。

2. 消息去重表方案(MySQL实现)

2.1 实现原理

建立消息指纹存储表,处理前先校验消息是否已存在,类似快递柜的取件码机制。

// Spring Boot + MyBatis 示例
@Service
public class OrderService {
    @Autowired
    private MessageRecordMapper messageRecordMapper;

    public void handleOrder(OrderMessage message) {
        // 生成消息指纹:业务类型+唯一ID
        String messageId = "ORDER_" + message.getOrderId();
        
        // 检查是否存在记录(带数据库行级锁)
        MessageRecord record = messageRecordMapper.selectForUpdate(messageId);
        if (record != null) {
            log.warn("重复消息已拦截:{}", messageId);
            return;
        }
        
        // 处理核心业务逻辑
        processOrder(message);
        
        // 插入处理记录
        messageRecordMapper.insert(new MessageRecord(messageId, LocalDateTime.now()));
    }
}

应用场景:适用于低频关键业务(如金融交易),需要与数据库事务结合使用

优点:实现直观,可靠性高
缺点:增加数据库压力,高并发场景可能成为瓶颈
注意:需使用SELECT FOR UPDATE避免并发问题,消息ID生成策略要全局唯一

3. 幂等性设计(业务层解决方案)

3.1 状态机模式

以订单系统为例,通过状态流转确保操作幂等:

// 订单状态枚举
public enum OrderStatus {
    CREATED, PAID, SHIPPED, COMPLETED, CANCELED
}

@Service
public class OrderServiceImpl {
    @Transactional
    public void handlePayment(String orderId) {
        Order order = orderDao.selectById(orderId);
        
        // 状态检查
        if (order.getStatus() != OrderStatus.CREATED) {
            log.info("订单[{}]当前状态[{}]无需重复处理", orderId, order.getStatus());
            return;
        }
        
        // 扣款业务逻辑
        paymentService.charge(order);
        
        // 原子更新状态
        int affected = orderDao.updateStatus(orderId, OrderStatus.CREATED, OrderStatus.PAID);
        if (affected == 0) {
            throw new ConcurrentUpdateException("订单状态已被修改");
        }
    }
}

3.2 版本号控制

在数据库记录中增加版本号字段:

CREATE TABLE orders (
    id VARCHAR(32) PRIMARY KEY,
    amount DECIMAL(10,2) NOT NULL,
    version INT DEFAULT 0,
    status VARCHAR(20)
);
@Update("UPDATE orders SET status = #{status}, version = version + 1 
         WHERE id = #{id} AND version = #{version}")
int optimisticUpdate(Order order);

应用场景:高频更新的业务场景(库存扣减、票务系统)
优点:不依赖中间件,业务自洽
缺点:需要改造现有业务逻辑
注意:要覆盖所有可能产生副作用的操作

4. 消息确认机制(RabbitMQ原生方案)

正确配置消费者应答模式是防御重复消费的第一道防线:

// Spring Boot配置示例
@Configuration
public class RabbitConfig {
    
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
            ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 手动ACK
        factory.setPrefetchCount(50); // 合理设置预取数量
        return factory;
    }
}

@Component
public class OrderListener {
    
    @RabbitListener(queues = "order.queue")
    public void processOrder(OrderMessage message, Channel channel, 
                            @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
        try {
            // 业务处理
            orderService.handlePayment(message);
            
            // 成功处理时确认
            channel.basicAck(tag, false);
        } catch (Exception e) {
            // 根据异常类型决定重试策略
            if (e instanceof BusinessException) {
                channel.basicReject(tag, false); // 直接丢弃
            } else {
                channel.basicNack(tag, false, true); // 重新入队
            }
        }
    }
}

应用场景:所有RabbitMQ消费场景的基础配置
优点:利用消息队列自身机制
缺点:无法解决生产者重复投递问题
注意:正确处理NACK后的重试次数,避免无限循环

5. 消息过期+死信队列

通过TTL(Time-To-Live)控制消息生命周期:

// 创建带TTL的主队列
@Bean
public Queue orderQueue() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-message-ttl", 60000); // 1分钟过期
    args.put("x-dead-letter-exchange", "dead.letter.exchange");
    args.put("x-dead-letter-routing-key", "dead.order");
    return new Queue("order.queue", true, false, false, args);
}

// 死信处理器
@RabbitListener(queues = "dead.order.queue")
public void handleDeadLetter(OrderMessage message) {
    log.error("消息处理超时:{}", message);
    // 执行补偿操作:通知人工处理/记录异常等
}

应用场景:时效性敏感的业务(秒杀订单、限时优惠)
优点:自动清理过期消息
缺点:需要额外处理死信消息
注意:合理设置TTL时长,避免正常消息被误杀

6. 分布式锁方案(Redis实现)

在消费前获取锁,确保全局唯一处理:

@Service
public class OrderConsumer {
    
    @Autowired
    private RedissonClient redissonClient;

    public void processOrder(OrderMessage message) {
        String lockKey = "order_lock:" + message.getOrderId();
        RLock lock = redissonClient.getLock(lockKey);
        
        try {
            // 尝试加锁,最多等待100ms,锁定后30秒自动释放
            if (lock.tryLock(100, 30000, TimeUnit.MILLISECONDS)) {
                orderService.handlePayment(message);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            if (lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }
    }
}

应用场景:分布式集群环境下的高并发处理
优点:保证强一致性
缺点:增加系统复杂度,可能产生死锁
注意:设置合理的锁超时时间,建议使用Redisson的看门狗机制

7. 消息版本号方案

在消息体中嵌入版本信息:

@Data
public class OrderMessage {
    private String orderId;
    private BigDecimal amount;
    private int version; // 每次修改递增
}

// 消费者处理逻辑
@Transactional
public void processOrder(OrderMessage message) {
    Order order = orderDao.selectById(message.getOrderId());
    
    if (order.getVersion() >= message.getVersion()) {
        log.info("忽略旧版本消息,当前版本:{},消息版本:{}", 
                order.getVersion(), message.getVersion());
        return;
    }
    
    // 处理业务...
    orderDao.updateVersion(message.getOrderId(), message.getVersion());
}

应用场景:需要保证消息顺序的业务(状态同步、数据聚合)
优点:天然支持版本控制
缺点:需要维护版本号的一致性
注意:版本号生成必须全局有序

8. 方案选型与综合实践

方案 适用场景 QPS支持 实施成本 可靠性
消息去重表 低频金融交易 1k以下 ★★★★☆
幂等性设计 所有业务场景 无限制 ★★★★★
手动ACK 基础防护 无限制 ★★☆☆☆
TTL+死信队列 时效敏感型业务 10k+ ★★★☆☆
分布式锁 分布式高并发 5k+ ★★★★☆
版本号控制 状态同步业务 10k+ ★★★★☆

黄金实践原则

  1. 必做:所有消费者必须实现幂等性
  2. 必做:生产端添加唯一消息ID
  3. 推荐:关键业务组合使用去重表+版本号
  4. 监控:实现消息轨迹追踪系统

9. 总结与展望

在分布式系统中,消息重复就像空气中的尘埃无法完全避免。通过本文介绍的七种武器,我们可以将重复消费的概率降到最低。实际应用中,往往需要根据业务特点组合多种方案,比如:幂等性设计+版本号控制+分布式锁的三重防护。

随着技术演进,Service Mesh等新技术为消息治理提供了新思路。但无论技术如何发展,理解业务本质、设计合理的消息处理流程,才是解决重复消费问题的根本之道。就像优秀的厨师不会因为多收到一张订单就多做一份牛排,我们的系统也应该具备这种"智能过滤"的能力。