Skip to content

RocketMQ 顺序消息深度解析

面试官:订单状态流转(待支付→已支付→已发货→已完成)需要顺序处理,如何保证消息的顺序消费?

:我们使用 RocketMQ 的分区顺序消息。通过 MessageQueueSelector 将同一订单的所有消息路由到同一个 MessageQueue,消费时使用 MessageListenerOrderly(顺序消费监听器),保证同一订单的消息被单线程串行处理。

面试官:为什么要用分区顺序而不是全局顺序?

这个问题很多人只能说”用 MessageListenerOrderly”,但能讲清楚分区顺序 vs 全局顺序、顺序消费的代价、Broker 故障时如何保证不乱序的,才能真正体现技术深度。


Q1:全局顺序消息和分区顺序消息的区别?必考

Section titled “Q1:全局顺序消息和分区顺序消息的区别?”

对比表格

维度全局顺序消息分区顺序消息(推荐)
顺序范围所有消息严格有序(跨 Queue、跨 Broker)同一分区(MessageQueue)内有序
实现方式1 个 Topic 只能有 1 个 MessageQueue同一 ID 的消息路由到同一 Queue
吞吐量极低(单 Queue,无并行,单线程写入/消费)高(多 Queue 并行,互不干扰)
高可用性差(Broker 故障则全部阻塞)好(只影响部分 Queue)
扩展性差(无法水平扩展)好(增加 Queue 数量即可)
适用场景几乎不用(通常是设计问题)订单状态流转、用户操作序列、数据库 Binlog 同步

核心原则:大多数场景只需要”同一业务实体的消息有序”(同一订单、同一用户),不需要所有消息全局有序。

全局顺序消息的代价

全局顺序 = 单 Queue = 单点瓶颈
写入性能:
- 单 Queue 单线程写入:1 万条/秒
- 多 Queue 并行写入:10 万条/秒(提升 10 倍)
消费性能:
- 单 Queue 单线程消费:1 万条/秒
- 多 Queue 并行消费:10 万条/秒(提升 10 倍)
高可用:
- 单 Queue 在 Broker-a
- Broker-a 故障 → 所有消息无法消费(阻塞)
结论:全局顺序消息的代价太大,除非业务必须,否则不推荐

分区顺序消息的优势

分区顺序 = 多 Queue = 并行 + 有序
示例:订单状态流转(4 个 Queue)
Queue 0:订单 A 的所有消息(待支付→已支付→已发货)
Queue 1:订单 B 的所有消息
Queue 2:订单 C 的所有消息
Queue 3:订单 D 的所有消息
写入性能:
- 4 个 Queue 并行写入:4 万条/秒(提升 4 倍)
消费性能:
- 4 个 Queue 并行消费(每个 Queue 单线程):4 万条/秒(提升 4 倍)
高可用:
- Queue 0 在 Broker-a,Queue 1/2/3 在 Broker-b
- Broker-a 故障 → 只影响订单 A,订单 B/C/D 正常消费
结论:分区顺序在保证业务有序的前提下,兼顾了性能和高可用

Q2:如何实现分区顺序消息?必考

Section titled “Q2:如何实现分区顺序消息?”

实现步骤

第一步:生产者通过 MessageQueueSelector 路由到固定 Queue
第二步:消费者使用 MessageListenerOrderly(顺序消费)

生产者代码示例

DefaultMQProducer producer = new DefaultMQProducer("ORDER_PRODUCER_GROUP");
producer.start();
// 发送订单状态变更消息
public void sendOrderStatusMessage(String orderId, OrderStatus status) {
Message message = new Message(
"ORDER_STATUS_TOPIC",
status.name().getBytes(StandardCharsets.UTF_8)
);
// 使用 MessageQueueSelector 路由到固定 Queue
SendResult result = producer.send(
message,
new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
String orderId = (String) arg;
// 哈希取模,保证同一 orderId 路由到同一 Queue
int index = Math.abs(orderId.hashCode()) % mqs.size();
return mqs.get(index);
}
},
orderId // arg = orderId
);
log.info("发送成功:orderId={}, queue={}, status={}",
orderId, result.getMessageQueue(), status);
}

路由算法详解

// 假设 Topic 有 4 个 Queue:[Queue0, Queue1, Queue2, Queue3]
List<MessageQueue> mqs = producer.fetchPublishMessageQueues("ORDER_STATUS_TOPIC");
// 订单 A 的所有消息
String orderIdA = "order_123";
int indexA = Math.abs(orderIdA.hashCode()) % 4; // 假设结果为 0
// 所有消息都路由到 Queue0
// 订单 B 的所有消息
String orderIdB = "order_456";
int indexB = Math.abs(orderIdB.hashCode()) % 4; // 假设结果为 2
// 所有消息都路由到 Queue2
// 结果:
// - 订单 A 的消息 → Queue0(严格有序)
// - 订单 B 的消息 → Queue2(严格有序)
// - Queue0 和 Queue2 可以并行消费(互不影响)

消费者代码示例

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ORDER_CONSUMER_GROUP");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("ORDER_STATUS_TOPIC", "*");
// 使用 MessageListenerOrderly(顺序消费)
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
String orderId = msg.getKeys(); // 获取订单 ID
OrderStatus status = OrderStatus.valueOf(new String(msg.getBody()));
// 处理订单状态变更(单线程串行处理)
orderService.updateStatus(orderId, status);
log.info("订单状态更新:orderId={}, status={}", orderId, status);
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();

MessageListenerOrderly vs MessageListenerConcurrently

维度MessageListenerConcurrentlyMessageListenerOrderly
并发方式多线程并发消费(线程池)每个 Queue 单线程串行消费
吞吐量高(多线程并行)低(单线程处理每个 Queue)
顺序保证❌ 不保证顺序✅ 同一 Queue 严格有序
锁机制消费者对 Queue 加分布式锁
失败处理重试,不阻塞其他消息阻塞:必须等当前消息处理成功才处理下一条
重试机制进入重试队列(延迟重试)原地重试(不进重试队列)
适用场景普通消息,对顺序无要求订单状态流转、用户操作序列

顺序消费的锁机制

消费者启动时:
┌─────────────────────────────────────────────────────────────┐
│ Consumer 实例 1 │
└─────────────────────────────────────────────────────────────┘
├── 向 Broker 申请 Queue 0 的锁(分布式锁)
│ └── 锁定成功 → 单线程消费 Queue 0
└── 向 Broker 申请 Queue 1 的锁
└── 锁定失败(被 Consumer 实例 2 锁定) → 放弃
┌─────────────────────────────────────────────────────────────┐
│ Consumer 实例 2 │
└─────────────────────────────────────────────────────────────┘
└── 向 Broker 申请 Queue 1 的锁
└── 锁定成功 → 单线程消费 Queue 1
结果:
- Consumer 实例 1 消费 Queue 0(单线程)
- Consumer 实例 2 消费 Queue 1(单线程)
- 保证同一 Queue 被单线程消费,从而保证顺序

顺序消费的阻塞问题

Queue 0: [消息1(待支付), 消息2(已支付), 消息3(已发货)]
消息1 处理失败 → 原地重试(最多 16 次)
→ 在消息1 成功之前,消息2 和消息3 被阻塞,无法消费!
这是顺序消息的代价:为了保证顺序,失败时必须阻塞
实测数据:
- 消息1 处理耗时 100ms,成功 → 消息2 延迟 100ms
- 消息1 处理失败,重试 16 次(每次延迟 10s)→ 消息2 延迟 160s

Q3:顺序消息消费失败时如何处理?高频

Section titled “Q3:顺序消息消费失败时如何处理?”

顺序消息 vs 普通消息的失败处理对比

普通消息消费失败:
消息1 → 消费失败 → 进入重试队列(%RETRY%consumerGroup)
→ 延迟 10s → 重新投递 → 消费成功 → 消息2 开始消费
→ 消息2 和消息3 不受影响,可以继续消费
顺序消息消费失败:
消息1 → 消费失败 → 原地重试(不进入重试队列!)
→ 重试间隔:10s → 30s → 1m → 2m → ... → 2h(共 16 次)
→ 在消息1 成功之前,消息2 和消息3 被阻塞!

对比表格

维度普通消息(Concurrently)顺序消息(Orderly)
失败处理进入重试队列原地重试(不进重试队列)
重试次数16 次16 次
重试间隔延迟级别:10s → 30s → 1m → 2m → … → 2h固定间隔:1s(默认)
对其他消息的影响无影响(其他消息继续消费)阻塞后续消息(必须等当前消息成功)
死信队列16 次重试后进入 DLQ16 次重试后进入 DLQ

顺序消息失败处理代码

consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
String orderId = msg.getKeys();
try {
// 处理订单状态变更
orderService.updateStatus(orderId, status);
} catch (Exception e) {
log.error("订单状态更新失败:orderId={}", orderId, e);
// 返回 SUSPEND_CURRENT_QUEUE_A_MOMENT,原地重试
// 下次重试间隔 = context.getSuspendCurrentQueueTimeMillis(),默认 1s
context.setSuspendCurrentQueueTimeMillis(10000); // 设置重试间隔 10s
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
return ConsumeOrderlyStatus.SUCCESS;
}
});

顺序消息的最佳实践

1. 业务层幂等处理

// 订单状态流转必须是幂等的
public void updateStatus(String orderId, OrderStatus newStatus) {
Order order = orderRepository.findById(orderId);
// 幂等检查:订单状态已经是 newStatus,直接返回
if (order.getStatus() == newStatus) {
log.info("订单状态已是 {},跳过更新:orderId={}", newStatus, orderId);
return;
}
// 状态流转检查:只能从 UNPAID → PAID,不能跳跃
if (!isValidTransition(order.getStatus(), newStatus)) {
log.warn("订单状态流转不合法:orderId={}, currentStatus={}, newStatus={}",
orderId, order.getStatus(), newStatus);
return; // 直接返回成功,避免阻塞
}
// 更新订单状态
order.setStatus(newStatus);
orderRepository.save(order);
}

2. 设置合理的重试间隔

@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
try {
// 处理消息
process(msgs);
return ConsumeOrderlyStatus.SUCCESS;
} catch (RetryableException e) {
// 可重试异常(如数据库连接超时),设置较短的重试间隔
context.setSuspendCurrentQueueTimeMillis(1000); // 1s 后重试
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
} catch (NonRetryableException e) {
// 不可重试异常(如业务校验失败),直接返回成功,避免阻塞
log.error("业务异常,不再重试:{}", e.getMessage());
return ConsumeOrderlyStatus.SUCCESS; // 直接成功,消息不再重试
}
}

3. 监控和告警

@Component
public class OrderMessageMonitor {
private final AtomicInteger retryCount = new AtomicInteger(0);
private final AtomicInteger dlqCount = new AtomicInteger(0);
// 监听死信队列
@PostConstruct
public void startDLQConsumer() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
"ORDER_DLQ_CONSUMER_GROUP"
);
consumer.subscribe("%DLQ%ORDER_CONSUMER_GROUP", "*");
consumer.registerMessageListener((msgs, context) -> {
for (MessageExt msg : msgs) {
// 死信消息告警
alertService.alert("订单顺序消息进入死信队列:" + msg.getKeys());
dlqCount.incrementAndGet();
// 记录到数据库,人工干预
deadLetterService.record(msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}
// 每分钟输出统计
@Scheduled(cron = "0 * * * * ?")
public void logStats() {
log.info("顺序消息统计:retry={}, dlq={}",
retryCount.getAndSet(0), dlqCount.getAndSet(0));
}
}

实战案例:订单状态流转的失败处理

业务场景:
订单 A 的状态流转:UNPAID → PAID → SHIPPED → COMPLETED
场景 1:PAID 状态处理失败(数据库连接超时)
处理:原地重试,SHIPPED 消息阻塞
建议:设置重试间隔 1s,最多重试 3 次,3 次失败后记录日志并跳过
场景 2:SHIPPED 状态处理失败(订单状态已是 CANCELLED)
原因:用户在 PAID 后取消了订单
处理:直接返回成功,避免阻塞
建议:在业务层检查订单状态,如果状态不合法直接返回成功
场景 3:COMPLETED 状态处理失败,重试 16 次仍失败
处理:消息进入死信队列
建议:监听死信队列,人工干预或补偿任务处理

链式追问二:顺序消息的保障与代价

Section titled “链式追问二:顺序消息的保障与代价”

Q4:顺序消息如何保证 Broker 故障时不乱序?高频

Section titled “Q4:顺序消息如何保证 Broker 故障时不乱序?”

问题场景

Topic: ORDER_STATUS_TOPIC
Queue 0: 在 Broker-a 上,包含订单 A 的消息
Queue 1: 在 Broker-b 上,包含订单 B 的消息
场景 1:Broker-a 故障
问题:Queue 0 的消息如何处理?能切换到 Queue 1 吗?
答案:不能!切换 Queue 会导致顺序混乱
场景 2:消费者宕机
问题:Consumer 实例 1 宕机,Queue 0 的锁如何释放?
答案:Broker 检测到消费者宕机,释放锁,Consumer 实例 2 接管

RocketMQ 的处理机制

┌─────────────────────────────────────────────────────────────┐
│ 正常情况:Broker-a 正常 │
└─────────────────────────────────────────────────────────────┘
Consumer 实例 1 ←── 锁定 Queue 0 ──→ Broker-a
Consumer 实例 2 ←── 锁定 Queue 1 ──→ Broker-b
处理:
- Consumer 实例 1 单线程消费 Queue 0(订单 A 的消息)
- Consumer 实例 2 单线程消费 Queue 1(订单 B 的消息)
┌─────────────────────────────────────────────────────────────┐
│ 异常情况 1:Broker-a 故障 │
└─────────────────────────────────────────────────────────────┘
Broker-a 宕机
└── Queue 0 不可用
└── Consumer 实例 1 无法拉取消息
└── 订单 A 的消息暂时无法消费(阻塞)
处理:
- ❌ 不切换到其他 Queue(否则订单 A 的消息会乱序)
- ✅ 等待 Broker-a 恢复或主从切换
- ✅ 如果 Broker-a 有 Slave,切换到 Slave 继续消费(保持顺序)
┌─────────────────────────────────────────────────────────────┐
│ 异常情况 2:Consumer 实例 1 宕机 │
└─────────────────────────────────────────────────────────────┘
Consumer 实例 1 宕机
└── Broker-a 检测到连接断开(心跳超时 30s)
└── 释放 Queue 0 的锁
└── Consumer 实例 2 申请 Queue 0 的锁
└── 锁定成功 → Consumer 实例 2 接管 Queue 0
处理:
- ✅ Broker 自动释放锁(30s 心跳超时)
- ✅ 其他消费者实例接管 Queue 0(继续消费,保持顺序)
- ⚠️ 切换期间有 30s 的阻塞(心跳超时时间)

顺序消费的分布式锁机制

// Broker 端的锁管理(简化版)
public class LockManager {
// 锁表:QueueId -> ConsumerId
private ConcurrentHashMap<String, String> lockTable = new ConcurrentHashMap<>();
/**
* 消费者申请锁
*/
public boolean tryLock(String queueId, String consumerId) {
String currentOwner = lockTable.get(queueId);
if (currentOwner == null) {
// 锁未被占用,加锁成功
lockTable.put(queueId, consumerId);
return true;
} else if (currentOwner.equals(consumerId)) {
// 已经是锁的持有者,续期成功
return true;
} else {
// 锁被其他消费者占用
return false;
}
}
/**
* 心跳续期
*/
public void renewLock(String queueId, String consumerId) {
if (consumerId.equals(lockTable.get(queueId))) {
// 续期成功,更新心跳时间
lockHeartbeatTime.put(queueId, System.currentTimeMillis());
}
}
/**
* 检测锁超时
*/
@Scheduled(fixedRate = 5000) // 每 5s 检查一次
public void checkLockTimeout() {
long now = System.currentTimeMillis();
for (Map.Entry<String, Long> entry : lockHeartbeatTime.entrySet()) {
if (now - entry.getValue() > 30000) { // 30s 超时
// 释放锁
lockTable.remove(entry.getKey());
lockHeartbeatTime.remove(entry.getKey());
log.info("锁超时释放:queueId={}", entry.getKey());
}
}
}
}

顺序消息 vs 普通消息的高可用对比

维度普通消息(Concurrently)顺序消息(Orderly)
Broker 故障切换到其他 Broker,继续消费Queue 不可用时阻塞(等待 Broker 恢复)
消费者宕机Broker 立即切换到其他消费者Broker 30s 后释放锁,其他消费者接管
主从切换立即切换,无阻塞切换到 Slave,保持顺序(需同步复制)
高可用性好(故障快速切换)差(故障时有阻塞)

这就是顺序消息的代价:为了保序,牺牲了高可用性。


Q5:顺序消息和消息幂等的关系?中频

Section titled “Q5:顺序消息和消息幂等的关系?”

核心观点:顺序消息保证处理顺序,但不保证消息不重复,消费者依然需要幂等。

为什么顺序消息还需要幂等

场景 1:消费者处理成功,ACK 网络超时
消息1(UNPAID → PAID)→ 消费者处理成功 → ACK 超时
→ Broker 未收到 ACK → 重新投递消息1
→ 消费者再次收到消息1(重复消息)
场景 2:Broker 主从切换
Master Broker 投递消息1 → 消费者处理成功 → Master 宕机
→ Slave 升级为 Master → 消息1 的 Offset 未同步
→ 新 Master 重新投递消息1(重复消息)
场景 3:消费者宕机
Consumer 实例 1 处理消息1 → 宕机(处理到一半)
→ Broker 切换到 Consumer 实例 2
→ Consumer 实例 2 重新消费消息1(可能重复)

顺序消息 + 幂等消费的正确实现

consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
String orderId = msg.getKeys();
OrderStatus newStatus = OrderStatus.valueOf(new String(msg.getBody()));
// 查询订单当前状态
Order order = orderRepository.findById(orderId);
// 幂等检查:订单状态已是 newStatus,直接返回成功
if (order.getStatus() == newStatus) {
log.info("订单状态已更新,跳过:orderId={}, status={}",
orderId, newStatus);
continue;
}
// 状态流转检查:只能从 UNPAID → PAID,不能跳跃
if (!isValidTransition(order.getStatus(), newStatus)) {
log.warn("订单状态流转不合法:orderId={}, current={}, new={}",
orderId, order.getStatus(), newStatus);
continue; // 直接返回成功,避免阻塞
}
// 更新订单状态(幂等)
orderRepository.updateStatus(orderId, newStatus);
log.info("订单状态更新成功:orderId={}, status={}", orderId, newStatus);
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
// 数据库层幂等(乐观锁)
public void updateStatus(String orderId, OrderStatus newStatus) {
int rows = orderRepository.updateStatus(
orderId,
newStatus,
OrderStatus.UNPAID // 期望的旧状态
);
if (rows == 0) {
// 更新失败,说明订单状态已被其他线程更新(幂等)
log.info("订单状态更新失败,可能已更新:orderId={}", orderId);
}
}
// SQL
UPDATE orders
SET status = ?
WHERE id = ? AND status = ?; -- 乐观锁条件

顺序消息的幂等策略

策略实现方式优点缺点适用场景
数据库状态机UPDATE ... WHERE status=OLD天然幂等,性能好需要状态字段订单状态流转
数据库唯一索引INSERT 时唯一索引约束简单可靠需要设计表扣款、扣库存
Redis 去重SETNX 记录已处理消息性能极高Redis 故障丢数据高并发场景
Token 机制生产者生成 Token灵活需要额外存储通用场景

最佳实践:数据库状态机(订单状态流转)

// 订单状态枚举
public enum OrderStatus {
UNPAID, // 待支付
PAID, // 已支付
SHIPPED, // 已发货
COMPLETED, // 已完成
CANCELLED // 已取消
}
// 状态流转规则
public boolean isValidTransition(OrderStatus current, OrderStatus newStatus) {
switch (current) {
case UNPAID:
return newStatus == OrderStatus.PAID || newStatus == OrderStatus.CANCELLED;
case PAID:
return newStatus == OrderStatus.SHIPPED || newStatus == OrderStatus.CANCELLED;
case SHIPPED:
return newStatus == OrderStatus.COMPLETED;
default:
return false; // COMPLETED 和 CANCELLED 不能再流转
}
}
// 消费者处理
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
String orderId = msg.getKeys();
OrderStatus newStatus = OrderStatus.valueOf(new String(msg.getBody()));
try {
// 状态流转处理(幂等)
boolean success = orderService.transitionStatus(orderId, newStatus);
if (!success) {
// 状态流转失败(可能已流转或流转不合法)
log.warn("订单状态流转失败:orderId={}, newStatus={}", orderId, newStatus);
}
} catch (Exception e) {
log.error("订单状态流转异常:orderId={}", orderId, e);
// 返回 SUSPEND_CURRENT_QUEUE_A_MOMENT,原地重试
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
return ConsumeOrderlyStatus.SUCCESS;
}
});

实战案例:订单状态流转完整方案

Section titled “实战案例:订单状态流转完整方案”

完整架构图

┌─────────────────────────────────────────────────────────────┐
│ Order Service(订单服务) │
└─────────────────────────────────────────────────────────────┘
├── 用户支付成功
│ └── 发送消息:orderId, PAID
│ ├── MessageQueueSelector 路由到 Queue N
│ └── 同一 orderId 的消息路由到同一 Queue
├── 商家发货
│ └── 发送消息:orderId, SHIPPED
└── 用户确认收货
└── 发送消息:orderId, COMPLETED
┌─────────────────────────────────────────────────────────────┐
│ Broker(消息队列) │
└─────────────────────────────────────────────────────────────┘
├── Queue 0: 订单 A 的消息(UNPAID → PAID → SHIPPED)
├── Queue 1: 订单 B 的消息
├── Queue 2: 订单 C 的消息
└── Queue 3: 订单 D 的消息
┌─────────────────────────────────────────────────────────────┐
│ Order Status Consumer(订单状态消费者) │
└─────────────────────────────────────────────────────────────┘
├── Consumer 实例 1
│ ├── 锁定 Queue 0
│ └── 单线程串行消费 Queue 0 的消息
└── Consumer 实例 2
├── 锁定 Queue 1
└── 单线程串行消费 Queue 1 的消息

完整代码实现

// ==================== 生产者:发送订单状态变更消息 ====================
@Service
public class OrderStatusProducer {
@Autowired
private DefaultMQProducer producer;
/**
* 发送订单状态变更消息(分区顺序)
*/
public void sendOrderStatusMessage(String orderId, OrderStatus status) {
Message message = new Message(
"ORDER_STATUS_TOPIC",
status.name().getBytes(StandardCharsets.UTF_8)
);
message.setKeys(orderId); // 设置消息 Key(订单 ID)
try {
// 使用 MessageQueueSelector 路由到固定 Queue
SendResult result = producer.send(
message,
(mqs, msg, arg) -> {
String id = (String) arg;
// 哈希取模,保证同一 orderId 路由到同一 Queue
int index = Math.abs(id.hashCode()) % mqs.size();
return mqs.get(index);
},
orderId
);
log.info("发送订单状态消息成功:orderId={}, status={}, queue={}",
orderId, status, result.getMessageQueue().getQueueId());
} catch (Exception e) {
log.error("发送订单状态消息失败:orderId={}, status={}", orderId, status, e);
throw new RuntimeException("发送消息失败");
}
}
}
// ==================== 消费者:订单状态流转处理 ====================
@Service
public class OrderStatusConsumer {
@Autowired
private OrderService orderService;
@PostConstruct
public void startConsumer() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
"ORDER_STATUS_CONSUMER_GROUP"
);
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("ORDER_STATUS_TOPIC", "*");
// 使用 MessageListenerOrderly(顺序消费)
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
String orderId = msg.getKeys();
OrderStatus newStatus = OrderStatus.valueOf(
new String(msg.getBody(), StandardCharsets.UTF_8)
);
try {
// 状态流转处理(幂等)
boolean success = orderService.transitionStatus(orderId, newStatus);
if (!success) {
// 状态流转失败(可能已流转或流转不合法),直接返回成功
log.warn("订单状态流转失败,跳过:orderId={}, newStatus={}",
orderId, newStatus);
} else {
log.info("订单状态流转成功:orderId={}, newStatus={}",
orderId, newStatus);
}
} catch (Exception e) {
log.error("订单状态流转异常:orderId={}, newStatus={}",
orderId, newStatus, e);
// 设置重试间隔 10s
context.setSuspendCurrentQueueTimeMillis(10000);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
log.info("订单状态消费者启动成功");
}
}
// ==================== 业务层:订单状态流转(幂等)====================
@Service
public class OrderService {
@Autowired
private OrderRepository orderRepository;
/**
* 订单状态流转(幂等)
*/
@Transactional
public boolean transitionStatus(String orderId, OrderStatus newStatus) {
// 查询订单
Order order = orderRepository.findById(orderId);
if (order == null) {
log.warn("订单不存在:orderId={}", orderId);
return false;
}
// 幂等检查:订单状态已是 newStatus
if (order.getStatus() == newStatus) {
log.info("订单状态已是 {},跳过:orderId={}", newStatus, orderId);
return true; // 返回 true,避免重试
}
// 状态流转检查:只能从 UNPAID → PAID,不能跳跃
if (!isValidTransition(order.getStatus(), newStatus)) {
log.warn("订单状态流转不合法:orderId={}, current={}, new={}",
orderId, order.getStatus(), newStatus);
return true; // 返回 true,避免重试
}
// 更新订单状态(乐观锁)
int rows = orderRepository.updateStatus(orderId, newStatus, order.getStatus());
if (rows > 0) {
log.info("订单状态更新成功:orderId={}, status={}", orderId, newStatus);
// 发送通知(如短信、推送)
sendNotification(orderId, newStatus);
return true;
} else {
// 更新失败,说明订单状态已被其他线程更新(幂等)
log.info("订单状态更新失败,可能已更新:orderId={}", orderId);
return true; // 返回 true,避免重试
}
}
/**
* 状态流转规则
*/
private boolean isValidTransition(OrderStatus current, OrderStatus newStatus) {
switch (current) {
case UNPAID:
return newStatus == OrderStatus.PAID
|| newStatus == OrderStatus.CANCELLED;
case PAID:
return newStatus == OrderStatus.SHIPPED
|| newStatus == OrderStatus.CANCELLED;
case SHIPPED:
return newStatus == OrderStatus.COMPLETED;
default:
return false;
}
}
}
// ==================== 数据库层:订单状态更新(乐观锁)====================
@Repository
public class OrderRepository {
@Autowired
private JdbcTemplate jdbcTemplate;
/**
* 更新订单状态(乐观锁)
*/
public int updateStatus(String orderId, OrderStatus newStatus, OrderStatus expectedOldStatus) {
String sql = "UPDATE orders SET status = ?, update_time = NOW() " +
"WHERE id = ? AND status = ?";
return jdbcTemplate.update(sql, newStatus.name(), orderId, expectedOldStatus.name());
}
}
// ==================== 监控:死信队列监听 ====================
@Service
public class OrderDLQConsumer {
@PostConstruct
public void startDLQConsumer() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
"ORDER_DLQ_CONSUMER_GROUP"
);
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("%DLQ%ORDER_STATUS_CONSUMER_GROUP", "*");
consumer.registerMessageListener((msgs, context) -> {
for (MessageExt msg : msgs) {
// 死信消息告警
String orderId = msg.getKeys();
alertService.alert("订单状态消息进入死信队列:orderId=" + orderId);
// 记录到数据库,人工干预
deadLetterService.record(msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}
}
// ==================== 监控:统计和告警 ====================
@Component
public class OrderMessageMonitor {
private final AtomicInteger successCount = new AtomicInteger(0);
private final AtomicInteger retryCount = new AtomicInteger(0);
private final AtomicInteger dlqCount = new AtomicInteger(0);
// 每分钟输出统计
@Scheduled(cron = "0 * * * * ?")
public void logStats() {
int success = successCount.getAndSet(0);
int retry = retryCount.getAndSet(0);
int dlq = dlqCount.getAndSet(0);
log.info("订单状态消息统计:success={}, retry={}, dlq={}", success, retry, dlq);
// 告警:重试率 > 5%
if (success > 0 && retry * 100 / (success + retry) > 5) {
alertService.alert("订单状态消息重试率过高:" + retry * 100 / (success + retry) + "%");
}
}
}