RocketMQ 顺序消息深度解析
面试官:订单状态流转(待支付→已支付→已发货→已完成)需要顺序处理,如何保证消息的顺序消费?
你:我们使用 RocketMQ 的分区顺序消息。通过 MessageQueueSelector 将同一订单的所有消息路由到同一个 MessageQueue,消费时使用 MessageListenerOrderly(顺序消费监听器),保证同一订单的消息被单线程串行处理。
面试官:为什么要用分区顺序而不是全局顺序?
这个问题很多人只能说”用 MessageListenerOrderly”,但能讲清楚分区顺序 vs 全局顺序、顺序消费的代价、Broker 故障时如何保证不乱序的,才能真正体现技术深度。
链式追问一:顺序消息基础
Section titled “链式追问一:顺序消息基础”Q1:全局顺序消息和分区顺序消息的区别?必考
Section titled “Q1:全局顺序消息和分区顺序消息的区别?”对比表格:
| 维度 | 全局顺序消息 | 分区顺序消息(推荐) |
|---|---|---|
| 顺序范围 | 所有消息严格有序(跨 Queue、跨 Broker) | 同一分区(MessageQueue)内有序 |
| 实现方式 | 1 个 Topic 只能有 1 个 MessageQueue | 同一 ID 的消息路由到同一 Queue |
| 吞吐量 | 极低(单 Queue,无并行,单线程写入/消费) | 高(多 Queue 并行,互不干扰) |
| 高可用性 | 差(Broker 故障则全部阻塞) | 好(只影响部分 Queue) |
| 扩展性 | 差(无法水平扩展) | 好(增加 Queue 数量即可) |
| 适用场景 | 几乎不用(通常是设计问题) | 订单状态流转、用户操作序列、数据库 Binlog 同步 |
核心原则:大多数场景只需要”同一业务实体的消息有序”(同一订单、同一用户),不需要所有消息全局有序。
全局顺序消息的代价:
全局顺序 = 单 Queue = 单点瓶颈
写入性能:- 单 Queue 单线程写入:1 万条/秒- 多 Queue 并行写入:10 万条/秒(提升 10 倍)
消费性能:- 单 Queue 单线程消费:1 万条/秒- 多 Queue 并行消费:10 万条/秒(提升 10 倍)
高可用:- 单 Queue 在 Broker-a- Broker-a 故障 → 所有消息无法消费(阻塞)
结论:全局顺序消息的代价太大,除非业务必须,否则不推荐分区顺序消息的优势:
分区顺序 = 多 Queue = 并行 + 有序
示例:订单状态流转(4 个 Queue)
Queue 0:订单 A 的所有消息(待支付→已支付→已发货)Queue 1:订单 B 的所有消息Queue 2:订单 C 的所有消息Queue 3:订单 D 的所有消息
写入性能:- 4 个 Queue 并行写入:4 万条/秒(提升 4 倍)
消费性能:- 4 个 Queue 并行消费(每个 Queue 单线程):4 万条/秒(提升 4 倍)
高可用:- Queue 0 在 Broker-a,Queue 1/2/3 在 Broker-b- Broker-a 故障 → 只影响订单 A,订单 B/C/D 正常消费
结论:分区顺序在保证业务有序的前提下,兼顾了性能和高可用Q2:如何实现分区顺序消息?必考
Section titled “Q2:如何实现分区顺序消息?”实现步骤:
第一步:生产者通过 MessageQueueSelector 路由到固定 Queue第二步:消费者使用 MessageListenerOrderly(顺序消费)生产者代码示例:
DefaultMQProducer producer = new DefaultMQProducer("ORDER_PRODUCER_GROUP");producer.start();
// 发送订单状态变更消息public void sendOrderStatusMessage(String orderId, OrderStatus status) { Message message = new Message( "ORDER_STATUS_TOPIC", status.name().getBytes(StandardCharsets.UTF_8) );
// 使用 MessageQueueSelector 路由到固定 Queue SendResult result = producer.send( message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { String orderId = (String) arg;
// 哈希取模,保证同一 orderId 路由到同一 Queue int index = Math.abs(orderId.hashCode()) % mqs.size();
return mqs.get(index); } }, orderId // arg = orderId );
log.info("发送成功:orderId={}, queue={}, status={}", orderId, result.getMessageQueue(), status);}路由算法详解:
// 假设 Topic 有 4 个 Queue:[Queue0, Queue1, Queue2, Queue3]List<MessageQueue> mqs = producer.fetchPublishMessageQueues("ORDER_STATUS_TOPIC");
// 订单 A 的所有消息String orderIdA = "order_123";int indexA = Math.abs(orderIdA.hashCode()) % 4; // 假设结果为 0// 所有消息都路由到 Queue0
// 订单 B 的所有消息String orderIdB = "order_456";int indexB = Math.abs(orderIdB.hashCode()) % 4; // 假设结果为 2// 所有消息都路由到 Queue2
// 结果:// - 订单 A 的消息 → Queue0(严格有序)// - 订单 B 的消息 → Queue2(严格有序)// - Queue0 和 Queue2 可以并行消费(互不影响)消费者代码示例:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ORDER_CONSUMER_GROUP");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("ORDER_STATUS_TOPIC", "*");
// 使用 MessageListenerOrderly(顺序消费)consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { for (MessageExt msg : msgs) { String orderId = msg.getKeys(); // 获取订单 ID OrderStatus status = OrderStatus.valueOf(new String(msg.getBody()));
// 处理订单状态变更(单线程串行处理) orderService.updateStatus(orderId, status);
log.info("订单状态更新:orderId={}, status={}", orderId, status); } return ConsumeOrderlyStatus.SUCCESS; }});
consumer.start();MessageListenerOrderly vs MessageListenerConcurrently:
| 维度 | MessageListenerConcurrently | MessageListenerOrderly |
|---|---|---|
| 并发方式 | 多线程并发消费(线程池) | 每个 Queue 单线程串行消费 |
| 吞吐量 | 高(多线程并行) | 低(单线程处理每个 Queue) |
| 顺序保证 | ❌ 不保证顺序 | ✅ 同一 Queue 严格有序 |
| 锁机制 | 无 | 消费者对 Queue 加分布式锁 |
| 失败处理 | 重试,不阻塞其他消息 | 阻塞:必须等当前消息处理成功才处理下一条 |
| 重试机制 | 进入重试队列(延迟重试) | 原地重试(不进重试队列) |
| 适用场景 | 普通消息,对顺序无要求 | 订单状态流转、用户操作序列 |
顺序消费的锁机制:
消费者启动时:┌─────────────────────────────────────────────────────────────┐│ Consumer 实例 1 │└─────────────────────────────────────────────────────────────┘ │ ├── 向 Broker 申请 Queue 0 的锁(分布式锁) │ └── 锁定成功 → 单线程消费 Queue 0 │ └── 向 Broker 申请 Queue 1 的锁 └── 锁定失败(被 Consumer 实例 2 锁定) → 放弃
┌─────────────────────────────────────────────────────────────┐│ Consumer 实例 2 │└─────────────────────────────────────────────────────────────┘ │ └── 向 Broker 申请 Queue 1 的锁 └── 锁定成功 → 单线程消费 Queue 1
结果:- Consumer 实例 1 消费 Queue 0(单线程)- Consumer 实例 2 消费 Queue 1(单线程)- 保证同一 Queue 被单线程消费,从而保证顺序顺序消费的阻塞问题:
Queue 0: [消息1(待支付), 消息2(已支付), 消息3(已发货)]
消息1 处理失败 → 原地重试(最多 16 次) → 在消息1 成功之前,消息2 和消息3 被阻塞,无法消费!
这是顺序消息的代价:为了保证顺序,失败时必须阻塞
实测数据:- 消息1 处理耗时 100ms,成功 → 消息2 延迟 100ms- 消息1 处理失败,重试 16 次(每次延迟 10s)→ 消息2 延迟 160sQ3:顺序消息消费失败时如何处理?高频
Section titled “Q3:顺序消息消费失败时如何处理?”顺序消息 vs 普通消息的失败处理对比:
普通消息消费失败:消息1 → 消费失败 → 进入重试队列(%RETRY%consumerGroup) → 延迟 10s → 重新投递 → 消费成功 → 消息2 开始消费 → 消息2 和消息3 不受影响,可以继续消费
顺序消息消费失败:消息1 → 消费失败 → 原地重试(不进入重试队列!) → 重试间隔:10s → 30s → 1m → 2m → ... → 2h(共 16 次) → 在消息1 成功之前,消息2 和消息3 被阻塞!对比表格:
| 维度 | 普通消息(Concurrently) | 顺序消息(Orderly) |
|---|---|---|
| 失败处理 | 进入重试队列 | 原地重试(不进重试队列) |
| 重试次数 | 16 次 | 16 次 |
| 重试间隔 | 延迟级别:10s → 30s → 1m → 2m → … → 2h | 固定间隔:1s(默认) |
| 对其他消息的影响 | 无影响(其他消息继续消费) | 阻塞后续消息(必须等当前消息成功) |
| 死信队列 | 16 次重试后进入 DLQ | 16 次重试后进入 DLQ |
顺序消息失败处理代码:
consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { for (MessageExt msg : msgs) { String orderId = msg.getKeys();
try { // 处理订单状态变更 orderService.updateStatus(orderId, status);
} catch (Exception e) { log.error("订单状态更新失败:orderId={}", orderId, e);
// 返回 SUSPEND_CURRENT_QUEUE_A_MOMENT,原地重试 // 下次重试间隔 = context.getSuspendCurrentQueueTimeMillis(),默认 1s context.setSuspendCurrentQueueTimeMillis(10000); // 设置重试间隔 10s return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } } return ConsumeOrderlyStatus.SUCCESS; }});顺序消息的最佳实践:
1. 业务层幂等处理:
// 订单状态流转必须是幂等的public void updateStatus(String orderId, OrderStatus newStatus) { Order order = orderRepository.findById(orderId);
// 幂等检查:订单状态已经是 newStatus,直接返回 if (order.getStatus() == newStatus) { log.info("订单状态已是 {},跳过更新:orderId={}", newStatus, orderId); return; }
// 状态流转检查:只能从 UNPAID → PAID,不能跳跃 if (!isValidTransition(order.getStatus(), newStatus)) { log.warn("订单状态流转不合法:orderId={}, currentStatus={}, newStatus={}", orderId, order.getStatus(), newStatus); return; // 直接返回成功,避免阻塞 }
// 更新订单状态 order.setStatus(newStatus); orderRepository.save(order);}2. 设置合理的重试间隔:
@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { try { // 处理消息 process(msgs); return ConsumeOrderlyStatus.SUCCESS;
} catch (RetryableException e) { // 可重试异常(如数据库连接超时),设置较短的重试间隔 context.setSuspendCurrentQueueTimeMillis(1000); // 1s 后重试 return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
} catch (NonRetryableException e) { // 不可重试异常(如业务校验失败),直接返回成功,避免阻塞 log.error("业务异常,不再重试:{}", e.getMessage()); return ConsumeOrderlyStatus.SUCCESS; // 直接成功,消息不再重试 }}3. 监控和告警:
@Componentpublic class OrderMessageMonitor {
private final AtomicInteger retryCount = new AtomicInteger(0); private final AtomicInteger dlqCount = new AtomicInteger(0);
// 监听死信队列 @PostConstruct public void startDLQConsumer() throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer( "ORDER_DLQ_CONSUMER_GROUP" ); consumer.subscribe("%DLQ%ORDER_CONSUMER_GROUP", "*");
consumer.registerMessageListener((msgs, context) -> { for (MessageExt msg : msgs) { // 死信消息告警 alertService.alert("订单顺序消息进入死信队列:" + msg.getKeys()); dlqCount.incrementAndGet();
// 记录到数据库,人工干预 deadLetterService.record(msg); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; });
consumer.start(); }
// 每分钟输出统计 @Scheduled(cron = "0 * * * * ?") public void logStats() { log.info("顺序消息统计:retry={}, dlq={}", retryCount.getAndSet(0), dlqCount.getAndSet(0)); }}实战案例:订单状态流转的失败处理:
业务场景:订单 A 的状态流转:UNPAID → PAID → SHIPPED → COMPLETED
场景 1:PAID 状态处理失败(数据库连接超时)处理:原地重试,SHIPPED 消息阻塞建议:设置重试间隔 1s,最多重试 3 次,3 次失败后记录日志并跳过
场景 2:SHIPPED 状态处理失败(订单状态已是 CANCELLED)原因:用户在 PAID 后取消了订单处理:直接返回成功,避免阻塞建议:在业务层检查订单状态,如果状态不合法直接返回成功
场景 3:COMPLETED 状态处理失败,重试 16 次仍失败处理:消息进入死信队列建议:监听死信队列,人工干预或补偿任务处理链式追问二:顺序消息的保障与代价
Section titled “链式追问二:顺序消息的保障与代价”Q4:顺序消息如何保证 Broker 故障时不乱序?高频
Section titled “Q4:顺序消息如何保证 Broker 故障时不乱序?”问题场景:
Topic: ORDER_STATUS_TOPICQueue 0: 在 Broker-a 上,包含订单 A 的消息Queue 1: 在 Broker-b 上,包含订单 B 的消息
场景 1:Broker-a 故障问题:Queue 0 的消息如何处理?能切换到 Queue 1 吗?答案:不能!切换 Queue 会导致顺序混乱
场景 2:消费者宕机问题:Consumer 实例 1 宕机,Queue 0 的锁如何释放?答案:Broker 检测到消费者宕机,释放锁,Consumer 实例 2 接管RocketMQ 的处理机制:
┌─────────────────────────────────────────────────────────────┐│ 正常情况:Broker-a 正常 │└─────────────────────────────────────────────────────────────┘
Consumer 实例 1 ←── 锁定 Queue 0 ──→ Broker-aConsumer 实例 2 ←── 锁定 Queue 1 ──→ Broker-b
处理:- Consumer 实例 1 单线程消费 Queue 0(订单 A 的消息)- Consumer 实例 2 单线程消费 Queue 1(订单 B 的消息)
┌─────────────────────────────────────────────────────────────┐│ 异常情况 1:Broker-a 故障 │└─────────────────────────────────────────────────────────────┘
Broker-a 宕机 └── Queue 0 不可用 └── Consumer 实例 1 无法拉取消息 └── 订单 A 的消息暂时无法消费(阻塞)
处理:- ❌ 不切换到其他 Queue(否则订单 A 的消息会乱序)- ✅ 等待 Broker-a 恢复或主从切换- ✅ 如果 Broker-a 有 Slave,切换到 Slave 继续消费(保持顺序)
┌─────────────────────────────────────────────────────────────┐│ 异常情况 2:Consumer 实例 1 宕机 │└─────────────────────────────────────────────────────────────┘
Consumer 实例 1 宕机 └── Broker-a 检测到连接断开(心跳超时 30s) └── 释放 Queue 0 的锁 └── Consumer 实例 2 申请 Queue 0 的锁 └── 锁定成功 → Consumer 实例 2 接管 Queue 0
处理:- ✅ Broker 自动释放锁(30s 心跳超时)- ✅ 其他消费者实例接管 Queue 0(继续消费,保持顺序)- ⚠️ 切换期间有 30s 的阻塞(心跳超时时间)顺序消费的分布式锁机制:
// Broker 端的锁管理(简化版)public class LockManager {
// 锁表:QueueId -> ConsumerId private ConcurrentHashMap<String, String> lockTable = new ConcurrentHashMap<>();
/** * 消费者申请锁 */ public boolean tryLock(String queueId, String consumerId) { String currentOwner = lockTable.get(queueId);
if (currentOwner == null) { // 锁未被占用,加锁成功 lockTable.put(queueId, consumerId); return true; } else if (currentOwner.equals(consumerId)) { // 已经是锁的持有者,续期成功 return true; } else { // 锁被其他消费者占用 return false; } }
/** * 心跳续期 */ public void renewLock(String queueId, String consumerId) { if (consumerId.equals(lockTable.get(queueId))) { // 续期成功,更新心跳时间 lockHeartbeatTime.put(queueId, System.currentTimeMillis()); } }
/** * 检测锁超时 */ @Scheduled(fixedRate = 5000) // 每 5s 检查一次 public void checkLockTimeout() { long now = System.currentTimeMillis(); for (Map.Entry<String, Long> entry : lockHeartbeatTime.entrySet()) { if (now - entry.getValue() > 30000) { // 30s 超时 // 释放锁 lockTable.remove(entry.getKey()); lockHeartbeatTime.remove(entry.getKey()); log.info("锁超时释放:queueId={}", entry.getKey()); } } }}顺序消息 vs 普通消息的高可用对比:
| 维度 | 普通消息(Concurrently) | 顺序消息(Orderly) |
|---|---|---|
| Broker 故障 | 切换到其他 Broker,继续消费 | Queue 不可用时阻塞(等待 Broker 恢复) |
| 消费者宕机 | Broker 立即切换到其他消费者 | Broker 30s 后释放锁,其他消费者接管 |
| 主从切换 | 立即切换,无阻塞 | 切换到 Slave,保持顺序(需同步复制) |
| 高可用性 | 好(故障快速切换) | 差(故障时有阻塞) |
这就是顺序消息的代价:为了保序,牺牲了高可用性。
Q5:顺序消息和消息幂等的关系?中频
Section titled “Q5:顺序消息和消息幂等的关系?”核心观点:顺序消息保证处理顺序,但不保证消息不重复,消费者依然需要幂等。
为什么顺序消息还需要幂等:
场景 1:消费者处理成功,ACK 网络超时消息1(UNPAID → PAID)→ 消费者处理成功 → ACK 超时 → Broker 未收到 ACK → 重新投递消息1 → 消费者再次收到消息1(重复消息)
场景 2:Broker 主从切换Master Broker 投递消息1 → 消费者处理成功 → Master 宕机 → Slave 升级为 Master → 消息1 的 Offset 未同步 → 新 Master 重新投递消息1(重复消息)
场景 3:消费者宕机Consumer 实例 1 处理消息1 → 宕机(处理到一半) → Broker 切换到 Consumer 实例 2 → Consumer 实例 2 重新消费消息1(可能重复)顺序消息 + 幂等消费的正确实现:
consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { for (MessageExt msg : msgs) { String orderId = msg.getKeys(); OrderStatus newStatus = OrderStatus.valueOf(new String(msg.getBody()));
// 查询订单当前状态 Order order = orderRepository.findById(orderId);
// 幂等检查:订单状态已是 newStatus,直接返回成功 if (order.getStatus() == newStatus) { log.info("订单状态已更新,跳过:orderId={}, status={}", orderId, newStatus); continue; }
// 状态流转检查:只能从 UNPAID → PAID,不能跳跃 if (!isValidTransition(order.getStatus(), newStatus)) { log.warn("订单状态流转不合法:orderId={}, current={}, new={}", orderId, order.getStatus(), newStatus); continue; // 直接返回成功,避免阻塞 }
// 更新订单状态(幂等) orderRepository.updateStatus(orderId, newStatus);
log.info("订单状态更新成功:orderId={}, status={}", orderId, newStatus); } return ConsumeOrderlyStatus.SUCCESS; }});
// 数据库层幂等(乐观锁)public void updateStatus(String orderId, OrderStatus newStatus) { int rows = orderRepository.updateStatus( orderId, newStatus, OrderStatus.UNPAID // 期望的旧状态 );
if (rows == 0) { // 更新失败,说明订单状态已被其他线程更新(幂等) log.info("订单状态更新失败,可能已更新:orderId={}", orderId); }}
// SQLUPDATE ordersSET status = ?WHERE id = ? AND status = ?; -- 乐观锁条件顺序消息的幂等策略:
| 策略 | 实现方式 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|---|
| 数据库状态机 | UPDATE ... WHERE status=OLD | 天然幂等,性能好 | 需要状态字段 | 订单状态流转 |
| 数据库唯一索引 | INSERT 时唯一索引约束 | 简单可靠 | 需要设计表 | 扣款、扣库存 |
| Redis 去重 | SETNX 记录已处理消息 | 性能极高 | Redis 故障丢数据 | 高并发场景 |
| Token 机制 | 生产者生成 Token | 灵活 | 需要额外存储 | 通用场景 |
最佳实践:数据库状态机(订单状态流转):
// 订单状态枚举public enum OrderStatus { UNPAID, // 待支付 PAID, // 已支付 SHIPPED, // 已发货 COMPLETED, // 已完成 CANCELLED // 已取消}
// 状态流转规则public boolean isValidTransition(OrderStatus current, OrderStatus newStatus) { switch (current) { case UNPAID: return newStatus == OrderStatus.PAID || newStatus == OrderStatus.CANCELLED; case PAID: return newStatus == OrderStatus.SHIPPED || newStatus == OrderStatus.CANCELLED; case SHIPPED: return newStatus == OrderStatus.COMPLETED; default: return false; // COMPLETED 和 CANCELLED 不能再流转 }}
// 消费者处理consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { for (MessageExt msg : msgs) { String orderId = msg.getKeys(); OrderStatus newStatus = OrderStatus.valueOf(new String(msg.getBody()));
try { // 状态流转处理(幂等) boolean success = orderService.transitionStatus(orderId, newStatus);
if (!success) { // 状态流转失败(可能已流转或流转不合法) log.warn("订单状态流转失败:orderId={}, newStatus={}", orderId, newStatus); }
} catch (Exception e) { log.error("订单状态流转异常:orderId={}", orderId, e); // 返回 SUSPEND_CURRENT_QUEUE_A_MOMENT,原地重试 return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } } return ConsumeOrderlyStatus.SUCCESS; }});实战案例:订单状态流转完整方案
Section titled “实战案例:订单状态流转完整方案”完整架构图:
┌─────────────────────────────────────────────────────────────┐│ Order Service(订单服务) │└─────────────────────────────────────────────────────────────┘ │ ├── 用户支付成功 │ └── 发送消息:orderId, PAID │ ├── MessageQueueSelector 路由到 Queue N │ └── 同一 orderId 的消息路由到同一 Queue │ ├── 商家发货 │ └── 发送消息:orderId, SHIPPED │ └── 用户确认收货 └── 发送消息:orderId, COMPLETED
┌─────────────────────────────────────────────────────────────┐│ Broker(消息队列) │└─────────────────────────────────────────────────────────────┘ │ ├── Queue 0: 订单 A 的消息(UNPAID → PAID → SHIPPED) ├── Queue 1: 订单 B 的消息 ├── Queue 2: 订单 C 的消息 └── Queue 3: 订单 D 的消息
┌─────────────────────────────────────────────────────────────┐│ Order Status Consumer(订单状态消费者) │└─────────────────────────────────────────────────────────────┘ │ ├── Consumer 实例 1 │ ├── 锁定 Queue 0 │ └── 单线程串行消费 Queue 0 的消息 │ └── Consumer 实例 2 ├── 锁定 Queue 1 └── 单线程串行消费 Queue 1 的消息完整代码实现:
// ==================== 生产者:发送订单状态变更消息 ====================
@Servicepublic class OrderStatusProducer {
@Autowired private DefaultMQProducer producer;
/** * 发送订单状态变更消息(分区顺序) */ public void sendOrderStatusMessage(String orderId, OrderStatus status) { Message message = new Message( "ORDER_STATUS_TOPIC", status.name().getBytes(StandardCharsets.UTF_8) ); message.setKeys(orderId); // 设置消息 Key(订单 ID)
try { // 使用 MessageQueueSelector 路由到固定 Queue SendResult result = producer.send( message, (mqs, msg, arg) -> { String id = (String) arg; // 哈希取模,保证同一 orderId 路由到同一 Queue int index = Math.abs(id.hashCode()) % mqs.size(); return mqs.get(index); }, orderId );
log.info("发送订单状态消息成功:orderId={}, status={}, queue={}", orderId, status, result.getMessageQueue().getQueueId());
} catch (Exception e) { log.error("发送订单状态消息失败:orderId={}, status={}", orderId, status, e); throw new RuntimeException("发送消息失败"); } }}
// ==================== 消费者:订单状态流转处理 ====================
@Servicepublic class OrderStatusConsumer {
@Autowired private OrderService orderService;
@PostConstruct public void startConsumer() throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer( "ORDER_STATUS_CONSUMER_GROUP" ); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("ORDER_STATUS_TOPIC", "*");
// 使用 MessageListenerOrderly(顺序消费) consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { for (MessageExt msg : msgs) { String orderId = msg.getKeys(); OrderStatus newStatus = OrderStatus.valueOf( new String(msg.getBody(), StandardCharsets.UTF_8) );
try { // 状态流转处理(幂等) boolean success = orderService.transitionStatus(orderId, newStatus);
if (!success) { // 状态流转失败(可能已流转或流转不合法),直接返回成功 log.warn("订单状态流转失败,跳过:orderId={}, newStatus={}", orderId, newStatus); } else { log.info("订单状态流转成功:orderId={}, newStatus={}", orderId, newStatus); }
} catch (Exception e) { log.error("订单状态流转异常:orderId={}, newStatus={}", orderId, newStatus, e);
// 设置重试间隔 10s context.setSuspendCurrentQueueTimeMillis(10000); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } } return ConsumeOrderlyStatus.SUCCESS; } });
consumer.start(); log.info("订单状态消费者启动成功"); }}
// ==================== 业务层:订单状态流转(幂等)====================
@Servicepublic class OrderService {
@Autowired private OrderRepository orderRepository;
/** * 订单状态流转(幂等) */ @Transactional public boolean transitionStatus(String orderId, OrderStatus newStatus) { // 查询订单 Order order = orderRepository.findById(orderId); if (order == null) { log.warn("订单不存在:orderId={}", orderId); return false; }
// 幂等检查:订单状态已是 newStatus if (order.getStatus() == newStatus) { log.info("订单状态已是 {},跳过:orderId={}", newStatus, orderId); return true; // 返回 true,避免重试 }
// 状态流转检查:只能从 UNPAID → PAID,不能跳跃 if (!isValidTransition(order.getStatus(), newStatus)) { log.warn("订单状态流转不合法:orderId={}, current={}, new={}", orderId, order.getStatus(), newStatus); return true; // 返回 true,避免重试 }
// 更新订单状态(乐观锁) int rows = orderRepository.updateStatus(orderId, newStatus, order.getStatus());
if (rows > 0) { log.info("订单状态更新成功:orderId={}, status={}", orderId, newStatus);
// 发送通知(如短信、推送) sendNotification(orderId, newStatus);
return true; } else { // 更新失败,说明订单状态已被其他线程更新(幂等) log.info("订单状态更新失败,可能已更新:orderId={}", orderId); return true; // 返回 true,避免重试 } }
/** * 状态流转规则 */ private boolean isValidTransition(OrderStatus current, OrderStatus newStatus) { switch (current) { case UNPAID: return newStatus == OrderStatus.PAID || newStatus == OrderStatus.CANCELLED; case PAID: return newStatus == OrderStatus.SHIPPED || newStatus == OrderStatus.CANCELLED; case SHIPPED: return newStatus == OrderStatus.COMPLETED; default: return false; } }}
// ==================== 数据库层:订单状态更新(乐观锁)====================
@Repositorypublic class OrderRepository {
@Autowired private JdbcTemplate jdbcTemplate;
/** * 更新订单状态(乐观锁) */ public int updateStatus(String orderId, OrderStatus newStatus, OrderStatus expectedOldStatus) { String sql = "UPDATE orders SET status = ?, update_time = NOW() " + "WHERE id = ? AND status = ?";
return jdbcTemplate.update(sql, newStatus.name(), orderId, expectedOldStatus.name()); }}
// ==================== 监控:死信队列监听 ====================
@Servicepublic class OrderDLQConsumer {
@PostConstruct public void startDLQConsumer() throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer( "ORDER_DLQ_CONSUMER_GROUP" ); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("%DLQ%ORDER_STATUS_CONSUMER_GROUP", "*");
consumer.registerMessageListener((msgs, context) -> { for (MessageExt msg : msgs) { // 死信消息告警 String orderId = msg.getKeys(); alertService.alert("订单状态消息进入死信队列:orderId=" + orderId);
// 记录到数据库,人工干预 deadLetterService.record(msg); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; });
consumer.start(); }}
// ==================== 监控:统计和告警 ====================
@Componentpublic class OrderMessageMonitor {
private final AtomicInteger successCount = new AtomicInteger(0); private final AtomicInteger retryCount = new AtomicInteger(0); private final AtomicInteger dlqCount = new AtomicInteger(0);
// 每分钟输出统计 @Scheduled(cron = "0 * * * * ?") public void logStats() { int success = successCount.getAndSet(0); int retry = retryCount.getAndSet(0); int dlq = dlqCount.getAndSet(0);
log.info("订单状态消息统计:success={}, retry={}, dlq={}", success, retry, dlq);
// 告警:重试率 > 5% if (success > 0 && retry * 100 / (success + retry) > 5) { alertService.alert("订单状态消息重试率过高:" + retry * 100 / (success + retry) + "%"); } }}