RocketMQ 事务消息深度解析
面试官:下单时需要同时扣库存,如何保证两个操作的原子性?
你:我们使用 RocketMQ 事务消息实现。下单操作和消息发送通过 Half 消息机制绑定,保证要么下单成功且库存消息被消费,要么下单失败消息被回滚,从而实现最终一致性。
面试官:Half 消息是什么?整个流程是怎样的?
这个问题很多人只能说”事务消息保证一致性”,但能讲清楚 Half 消息、两阶段提交、事务回查这三个核心概念的,才能真正体现技术深度。
链式追问一:事务消息流程
Section titled “链式追问一:事务消息流程”Q1:RocketMQ 事务消息的完整流程是什么?必考
Section titled “Q1:RocketMQ 事务消息的完整流程是什么?”核心思想:将本地事务与消息发送”绑定”,通过两阶段提交保证原子性。
完整流程图:
┌─────────────────────────────────────────────────────────────┐│ 第一阶段:发送 Half 消息(预提交) │└─────────────────────────────────────────────────────────────┘
Producer Broker │ │ ├── 1. 发送 Half 消息 ────→│ │ (PREPARE 状态) │ │ ├── 存入特殊 Topic: │ │ RMQ_SYS_TRANS_HALF_TOPIC │ │ (对消费者不可见!) │ │ │←── 2. 返回发送成功 ───────┤ │ (拿到消息 offset) │
┌─────────────────────────────────────────────────────────────┐│ 第二阶段:执行本地事务 │└─────────────────────────────────────────────────────────────┘
Producer │ ├── 3. executeLocalTransaction() │ ├── 开启数据库事务 │ ├── INSERT INTO orders ... │ ├── INSERT INTO inventory_deduct ... │ ├── COMMIT; │ │ │ └── 根据本地事务结果,决定消息状态: │ ├── 成功 → COMMIT_MESSAGE │ ├── 失败 → ROLLBACK_MESSAGE │ └── 不确定 → UNKNOWN
┌─────────────────────────────────────────────────────────────┐│ 第三阶段:提交或回滚消息 │└─────────────────────────────────────────────────────────────┘
Producer Broker │ │ ├── 4. 提交结果 ──────────→│ │ (COMMIT/ROLLBACK) │ │ │ │ ├── COMMIT: │ │ ├── 从 RMQ_SYS_TRANS_HALF_TOPIC 删除 │ │ ├── 转移到真实 Topic(如 ORDER_TOPIC) │ │ └── 消费者可见,开始消费 │ │ │ └── ROLLBACK: │ ├── 从 RMQ_SYS_TRANS_HALF_TOPIC 删除 │ └── 消费者不可见,相当于丢弃
┌─────────────────────────────────────────────────────────────┐│ 异常情况:事务回查(补偿机制) │└─────────────────────────────────────────────────────────────┘
Broker Producer │ │ ├── 5. 定时扫描 ──────────→│(60s 一次) │ Half 消息 │ │ (未 COMMIT/ROLLBACK) │ │ │ │ ├── 6. checkLocalTransaction() │ │ ├── 查询订单表:SELECT * FROM orders │ │ ├── 订单存在 → 返回 COMMIT │ │ └── 订单不存在 → 返回 ROLLBACK │ │ │←── 7. 返回事务状态 ───────┤ │ │ └── 执行 COMMIT/ROLLBACK ──┘关键概念解释:
1. Half 消息(半消息):
Half 消息 = 预提交的消息,对消费者不可见
特征:- 消息已写入 CommitLog(持久化)- Topic 被替换为 RMQ_SYS_TRANS_HALF_TOPIC(内部 Topic)- 消费者订阅的 Topic 不是 RMQ_SYS_TRANS_HALF_TOPIC,所以看不到- 等待生产者提交 COMMIT 或 ROLLBACK2. 两阶段提交:
第一阶段(预提交):- 发送 Half 消息- Broker 返回发送成功(消息已持久化)
第二阶段(提交/回滚):- 执行本地事务- 根据本地事务结果,决定消息命运- COMMIT → 消息对消费者可见- ROLLBACK → 消息被丢弃3. 事务回查(补偿机制):
场景:生产者执行本地事务后,发送 COMMIT/ROLLBACK 失败原因:网络抖动、生产者宕机、Broker 宕机
解决方案:Broker 定时扫描 Half 消息- 扫描间隔:60s(默认)- 回查生产者:checkLocalTransaction()- 生产者查询本地数据库,判断事务是否已提交- 返回 COMMIT/ROLLBACK/UNKNOWNQ2:代码层面如何实现事务消息?高频
Section titled “Q2:代码层面如何实现事务消息?”完整代码示例:
// ==================== 第一步:实现事务监听器 ====================public class OrderTransactionListener implements TransactionListener {
@Autowired private OrderService orderService;
@Autowired private OrderRepository orderRepository;
/** * 执行本地事务 * 在这里完成下单操作(订单表 + 库存表) */ @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { try { String orderId = new String(msg.getBody(), StandardCharsets.UTF_8); CreateOrderRequest request = (CreateOrderRequest) arg;
// 执行本地事务(下单 + 扣库存) orderService.createOrder(orderId, request);
// 本地事务成功,提交消息 return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) { log.error("本地事务执行失败", e);
// 本地事务失败,回滚消息 return LocalTransactionState.ROLLBACK_MESSAGE; } }
/** * 事务回查 * Broker 定期调用此方法,询问本地事务是否已提交 */ @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { String orderId = new String(msg.getBody(), StandardCharsets.UTF_8);
// 查询本地数据库,判断订单是否已创建 Order order = orderRepository.findById(orderId);
if (order != null) { // 订单已创建,说明本地事务已提交 log.info("事务回查:订单存在,提交消息,orderId={}", orderId); return LocalTransactionState.COMMIT_MESSAGE; } else { // 订单不存在,说明本地事务已回滚 log.info("事务回查:订单不存在,回滚消息,orderId={}", orderId); return LocalTransactionState.ROLLBACK_MESSAGE; } }}
// ==================== 第二步:发送事务消息 ====================@Servicepublic class OrderService {
private TransactionMQProducer producer;
@PostConstruct public void init() throws MQClientException { // 创建事务消息生产者 producer = new TransactionMQProducer("ORDER_PRODUCER_GROUP"); producer.setNamesrvAddr("localhost:9876");
// 设置事务监听器 producer.setTransactionListener(new OrderTransactionListener());
// 设置事务回查线程池 producer.setCheckThreadPoolMinSize(2); producer.setCheckThreadPoolMaxSize(5);
producer.start(); }
/** * 创建订单(事务消息) */ @Transactional public void createOrderWithTransaction(CreateOrderRequest request) { String orderId = UUID.randomUUID().toString();
// 构建消息 Message message = new Message( "INVENTORY_DEDUCT_TOPIC", // Topic:库存扣减 "TAG_DEDUCT", // Tag orderId.getBytes(StandardCharsets.UTF_8) // 消息体:订单 ID );
try { // 发送事务消息(半消息 + 本地事务 + 提交/回滚) TransactionSendResult result = producer.sendMessageInTransaction( message, // 消息 request // 本地事务参数 );
log.info("事务消息发送结果:orderId={}, state={}", orderId, result.getLocalTransactionState());
} catch (Exception e) { log.error("事务消息发送失败:orderId={}", orderId, e); throw new RuntimeException("下单失败"); } }
/** * 执行本地事务(订单 + 库存) */ @Transactional public void createOrder(String orderId, CreateOrderRequest request) { // 1. 创建订单 Order order = new Order(); order.setId(orderId); order.setUserId(request.getUserId()); order.setProductId(request.getProductId()); order.setAmount(request.getAmount()); order.setStatus(OrderStatus.CREATED); orderRepository.save(order);
// 2. 扣库存(本地数据库事务) Inventory inventory = inventoryRepository.findByProductId(request.getProductId()); if (inventory.getStock() < request.getAmount()) { throw new RuntimeException("库存不足"); } inventory.setStock(inventory.getStock() - request.getAmount()); inventoryRepository.save(inventory);
log.info("订单创建成功:orderId={}", orderId); }}
// ==================== 第三步:消费者处理库存扣减 ====================@Servicepublic class InventoryConsumer {
@PostConstruct public void startConsumer() throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer( "INVENTORY_CONSUMER_GROUP" ); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("INVENTORY_DEDUCT_TOPIC", "TAG_DEDUCT");
consumer.registerMessageListener((msgs, context) -> { for (MessageExt msg : msgs) { String orderId = new String(msg.getBody());
// 查询订单,获取商品 ID 和数量 Order order = orderRepository.findById(orderId); if (order == null) { log.warn("订单不存在:orderId={}", orderId); continue; }
// 幂等处理:检查是否已扣库存 if (order.getInventoryDeducted()) { log.info("库存已扣减,跳过:orderId={}", orderId); continue; }
// 扣库存(消费者端,可能是另一个服务) inventoryService.deduct(order.getProductId(), order.getAmount());
// 标记库存已扣减 order.setInventoryDeducted(true); orderRepository.save(order);
log.info("库存扣减成功:orderId={}, productId={}, amount={}", orderId, order.getProductId(), order.getAmount()); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; });
consumer.start(); }}关键代码解析:
1. TransactionListener 接口:
public interface TransactionListener { /** * 执行本地事务 * 在 Half 消息发送成功后立即调用 * 返回值: * - COMMIT_MESSAGE:提交消息,消费者可见 * - ROLLBACK_MESSAGE:回滚消息,消费者不可见 * - UNKNOWN:不确定,等待事务回查 */ LocalTransactionState executeLocalTransaction(Message msg, Object arg);
/** * 事务回查 * Broker 定期扫描未确认的 Half 消息,调用此方法 * 返回值同上 */ LocalTransactionState checkLocalTransaction(MessageExt msg);}2. sendMessageInTransaction() 执行流程:
// RocketMQ 源码(简化版)public TransactionSendResult sendMessageInTransaction(Message msg, Object arg) { // 1. 发送 Half 消息 SendResult sendResult = this.send(msg);
if (sendResult.getSendStatus() != SendStatus.SEND_OK) { // Half 消息发送失败,直接返回 return new TransactionSendResult(LocalTransactionState.ROLLBACK_MESSAGE); }
// 2. 执行本地事务 LocalTransactionState localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
// 3. 提交或回滚消息 if (localTransactionState == LocalTransactionState.COMMIT_MESSAGE) { this.endTransaction(msg, sendResult, LocalTransactionState.COMMIT_MESSAGE); } else if (localTransactionState == LocalTransactionState.ROLLBACK_MESSAGE) { this.endTransaction(msg, sendResult, LocalTransactionState.ROLLBACK_MESSAGE); } // 如果是 UNKNOWN,不发送任何请求,等待 Broker 回查
return new TransactionSendResult(localTransactionState);}3. 事务回查机制:
// Broker 端定时任务(每 60s 执行一次)public class TransactionalMessageService {
public void checkTransaction() { // 1. 扫描 RMQ_SYS_TRANS_HALF_TOPIC 中未确认的消息 List<MessageExt> halfMessages = scanHalfMessages();
for (MessageExt msg : halfMessages) { // 2. 回查生产者 LocalTransactionState state = producer.checkLocalTransaction(msg);
// 3. 根据回查结果提交或回滚 if (state == LocalTransactionState.COMMIT_MESSAGE) { commitMessage(msg); } else if (state == LocalTransactionState.ROLLBACK_MESSAGE) { rollbackMessage(msg); } // 如果是 UNKNOWN,继续等待下次回查 } }}Q3:事务消息如何保证消费者的幂等性?必考
Section titled “Q3:事务消息如何保证消费者的幂等性?”为什么需要幂等:
事务消息只能保证"至少一次"投递,不能保证"恰好一次":
场景 1:消费者处理成功,ACK 网络超时 → Broker 重新投递场景 2:Broker 主从切换,消息重复投递场景 3:消费者宕机,Broker 切换到其他消费者,消息重复投递
结果:消费者可能收到重复消息,必须幂等处理幂等方案对比:
| 方案 | 实现方式 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|---|
| 数据库唯一索引 | INSERT 时唯一索引约束 | 简单可靠,性能好 | 需要设计表结构 | 扣库存、扣款 |
| Redis 去重 | SETNX 记录已处理消息 ID | 性能极高 | Redis 故障丢数据 | 高并发、允许少量重复 |
| 数据库状态机 | UPDATE ... WHERE status=OLD | 天然幂等 | 需要状态字段 | 订单状态流转 |
| Token 机制 | 生产者生成 Token,消费者去重 | 灵活 | 需要额外存储 | 通用场景 |
方案一:数据库唯一索引(推荐):
// 消费者:扣库存consumer.registerMessageListener((msgs, context) -> { for (MessageExt msg : msgs) { String orderId = new String(msg.getBody()); Order order = orderRepository.findById(orderId);
try { // 插入库存扣减记录(order_id + product_id 唯一索引) InventoryDeductLog log = new InventoryDeductLog(); log.setOrderId(orderId); log.setProductId(order.getProductId()); log.setAmount(order.getAmount()); inventoryDeductLogRepository.save(log); // 唯一索引约束
// 扣库存 inventoryService.deduct(order.getProductId(), order.getAmount());
} catch (DuplicateKeyException e) { // 唯一索引冲突,说明已处理,直接返回成功 log.info("库存已扣减,跳过:orderId={}", orderId); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});数据库表设计:
CREATE TABLE inventory_deduct_log ( id BIGINT PRIMARY KEY AUTO_INCREMENT, order_id VARCHAR(64) NOT NULL COMMENT '订单 ID', product_id BIGINT NOT NULL COMMENT '商品 ID', amount INT NOT NULL COMMENT '扣减数量', create_time DATETIME DEFAULT CURRENT_TIMESTAMP, UNIQUE KEY uk_order_product (order_id, product_id) -- 唯一索引) COMMENT '库存扣减日志表';方案二:Redis 去重:
consumer.registerMessageListener((msgs, context) -> { for (MessageExt msg : msgs) { String msgId = msg.getMsgId(); // RocketMQ 生成的唯一消息 ID
// Redis SETNX:key 不存在时设置成功,返回 true // key 已存在时设置失败,返回 false Boolean isFirst = redisTemplate.opsForValue().setIfAbsent( "msg:" + msgId, "1", 24, TimeUnit.HOURS // 过期时间 24 小时 );
if (Boolean.TRUE.equals(isFirst)) { // 首次处理 String orderId = new String(msg.getBody()); inventoryService.deduct(orderId); } else { // 重复消息,直接返回成功 log.info("重复消息,跳过:msgId={}", msgId); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});方案三:数据库状态机(订单状态流转):
// 场景:订单状态流转(待支付 → 已支付 → 已发货 → 已完成)
consumer.registerMessageListener((msgs, context) -> { for (MessageExt msg : msgs) { String orderId = new String(msg.getBody());
// 乐观锁 + 状态机:只有状态为 UNPAID 时才能更新为 PAID int rows = orderRepository.updateStatus( orderId, OrderStatus.UNPAID, // 期望的旧状态 OrderStatus.PAID // 新状态 );
if (rows > 0) { // 更新成功,执行业务逻辑 log.info("订单状态更新成功:orderId={}", orderId); } else { // 更新失败,说明订单状态已变更(幂等) log.info("订单状态已变更,跳过:orderId={}", orderId); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});
// SQLUPDATE ordersSET status = 'PAID'WHERE id = ? AND status = 'UNPAID'; -- 乐观锁条件幂等方案性能对比:
| 方案 | QPS | 延迟 | 可靠性 | 实现复杂度 |
|---|---|---|---|---|
| 数据库唯一索引 | 5000 | 5ms | 高(持久化) | 低 |
| Redis 去重 | 50000 | 1ms | 中(Redis 故障丢数据) | 低 |
| 数据库状态机 | 3000 | 8ms | 高 | 低 |
| Token 机制 | 10000 | 3ms | 中 | 中 |
最佳实践:数据库唯一索引 + Redis 去重(双重保障):
consumer.registerMessageListener((msgs, context) -> { for (MessageExt msg : msgs) { String msgId = msg.getMsgId();
// 第一层:Redis 去重(快速判断) if (!redisTemplate.opsForValue().setIfAbsent("msg:" + msgId, "1", 24, TimeUnit.HOURS)) { log.info("Redis 去重:重复消息,跳过,msgId={}", msgId); continue; }
try { // 第二层:数据库唯一索引(持久化保障) String orderId = new String(msg.getBody()); inventoryService.deductWithLog(orderId); // 扣库存 + 插入日志
} catch (DuplicateKeyException e) { // 数据库去重:说明已处理 log.info("数据库去重:重复消息,跳过,msgId={}", msgId); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});链式追问二:事务消息与其他方案对比
Section titled “链式追问二:事务消息与其他方案对比”Q4:RocketMQ 事务消息和 TCC 分布式事务有什么区别?高频
Section titled “Q4:RocketMQ 事务消息和 TCC 分布式事务有什么区别?”对比表格:
| 维度 | RocketMQ 事务消息 | TCC(Try-Confirm-Cancel) |
|---|---|---|
| 一致性级别 | 最终一致(异步,有时间窗口) | 强一致(Try 阶段锁定资源) |
| 实现复杂度 | 低(消息 + 幂等消费) | 高(三个接口:Try/Confirm/Cancel) |
| 业务侵入 | 低(只需实现 TransactionListener) | 高(业务代码需实现三个方法) |
| 性能 | 高(异步,无资源锁定) | 低(同步,资源锁定时间长) |
| 可靠性 | 中(依赖 MQ,消息可能丢失) | 高(本地事务 + 补偿) |
| 适用场景 | 下单扣库存、发送通知(允许延迟) | 转账、扣款(必须强一致) |
| 补偿机制 | 事务回查 | Cancel 接口回滚 |
| 典型框架 | RocketMQ | Seata TCC、Hmily |
TCC 实现示例(转账场景):
// Try:冻结资金(预扣)public void tryTransfer(String fromAccount, String toAccount, BigDecimal amount) { // 冻结转出账户资金 accountRepository.freeze(fromAccount, amount);}
// Confirm:确认转账(扣款)public void confirmTransfer(String fromAccount, String toAccount, BigDecimal amount) { // 扣除冻结资金 accountRepository.deductFrozen(fromAccount, amount); // 增加转入账户资金 accountRepository.add(toAccount, amount);}
// Cancel:取消转账(解冻)public void cancelTransfer(String fromAccount, String toAccount, BigDecimal amount) { // 释放冻结资金 accountRepository.unfreeze(fromAccount, amount);}RocketMQ 事务消息示例(下单扣库存):
// 本地事务:下单 + 扣库存public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { try { orderService.createOrder((CreateOrderRequest) arg); // 本地数据库事务 return LocalTransactionState.COMMIT_MESSAGE; } catch (Exception e) { return LocalTransactionState.ROLLBACK_MESSAGE; }}
// 消费者:异步扣库存(允许延迟)consumer.registerMessageListener((msgs, context) -> { for (MessageExt msg : msgs) { inventoryService.deduct(msg); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});选型建议:
场景 1:下单扣库存需求:用户下单后,扣减库存特点:允许异步处理(订单创建成功后,库存可以延迟几秒扣减)推荐:RocketMQ 事务消息(实现简单,性能高)
场景 2:账户转账需求:A 转账给 B 100 元特点:必须强一致(A 的钱扣了,B 的钱必须到账,不能有时间窗口)推荐:TCC 或 Seata AT 模式(保证强一致)
场景 3:发送通知需求:用户注册后,发送欢迎邮件特点:允许异步,失败了可以重试推荐:RocketMQ 事务消息(简单可靠)Q5:RocketMQ 事务消息能保证”恰好一次”语义吗?中频
Section titled “Q5:RocketMQ 事务消息能保证”恰好一次”语义吗?”答案:不能,RocketMQ 事务消息只能保证”至少一次”(At-Least-Once)。
为什么不能保证”恰好一次”:
消息投递流程:Broker → Consumer
场景 1:消费者处理成功,ACK 网络超时 ├── 消费者:已处理成功 └── Broker:未收到 ACK,认为消费失败 → 重新投递
场景 2:Broker 主从切换 ├── Master Broker:消息已投递,但未同步到 Slave └── Slave 升级为 Master → 消息重新投递
场景 3:消费者宕机 ├── 消费者 A:处理到一半宕机 └── Broker 切换到消费者 B → 消息重新投递
结果:消费者收到重复消息事务消息的正确理解:
| 保证 | 不保证 |
|---|---|
| 生产端原子性:本地事务与消息发送的原子性 | 消费端恰好一次:消息可能重复投递 |
| 消息不丢失:Half 消息持久化 + 事务回查 | 消息不重复:消费者可能收到重复消息 |
| 最终一致性:本地事务成功 → 消息最终被消费 | 强一致性:消息立即被消费 |
正确的事务消息语义:
事务消息 = At-Least-Once Delivery + Idempotent Consumer
翻译:- At-Least-Once Delivery:消息至少投递一次(可能多次)- Idempotent Consumer:消费者必须幂等(重复消费不影响结果)
结论:事务消息 + 消费者幂等 = 最终一致性实战案例:
// ❌ 错误理解:事务消息保证不重复consumer.registerMessageListener((msgs, context) -> { for (MessageExt msg : msgs) { // 直接扣库存,没有幂等处理 inventoryService.deduct(msg); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});
// ✅ 正确实现:事务消息 + 消费者幂等consumer.registerMessageListener((msgs, context) -> { for (MessageExt msg : msgs) { String msgId = msg.getMsgId();
// Redis 去重 if (redisTemplate.opsForValue().setIfAbsent("msg:" + msgId, "1", 24, TimeUnit.HOURS)) { // 首次处理,扣库存 inventoryService.deductWithLog(msg); // 数据库唯一索引保证幂等 } else { // 重复消息,跳过 log.info("重复消息,跳过:msgId={}", msgId); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});实战案例:下单扣库存完整方案
Section titled “实战案例:下单扣库存完整方案”完整架构图:
┌─────────────────────────────────────────────────────────────┐│ Order Service(订单服务) │└─────────────────────────────────────────────────────────────┘ │ ├── 1. 发送 Half 消息 → Broker │ (Topic: INVENTORY_DEDUCT_TOPIC) │ ├── 2. 执行本地事务 │ ├── INSERT INTO orders ... │ └── COMMIT │ ├── 3. 提交/回滚消息 → Broker │ └── 4. 事务回查(异常情况) └── SELECT * FROM orders WHERE id=?
┌─────────────────────────────────────────────────────────────┐│ Inventory Service(库存服务) │└─────────────────────────────────────────────────────────────┘ │ ├── 1. 接收消息 → 消费者 │ ├── 2. 幂等检查(Redis + 数据库唯一索引) │ ├── 3. 扣库存 │ ├── INSERT INTO inventory_deduct_log ... │ └── UPDATE inventory SET stock=stock-? ... │ └── 4. 兜底补偿(定时任务) └── 扫描未扣库存的订单 → 重新扣库存完整代码实现:
// ==================== Order Service(订单服务)====================
@Servicepublic class OrderService {
@Autowired private TransactionMQProducer producer;
@Autowired private OrderRepository orderRepository;
/** * 创建订单(事务消息) */ public String createOrder(CreateOrderRequest request) { String orderId = UUID.randomUUID().toString();
// 构建消息 Message message = new Message( "INVENTORY_DEDUCT_TOPIC", orderId.getBytes(StandardCharsets.UTF_8) );
try { // 发送事务消息 TransactionSendResult result = producer.sendMessageInTransaction( message, request );
if (result.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE) { return orderId; } else { throw new RuntimeException("创建订单失败"); }
} catch (Exception e) { log.error("创建订单失败", e); throw new RuntimeException("创建订单失败"); } }
/** * 事务监听器 */ public static class OrderTransactionListener implements TransactionListener {
@Autowired private OrderRepository orderRepository;
@Override @Transactional public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { try { String orderId = new String(msg.getBody()); CreateOrderRequest request = (CreateOrderRequest) arg;
// 创建订单 Order order = new Order(); order.setId(orderId); order.setUserId(request.getUserId()); order.setProductId(request.getProductId()); order.setAmount(request.getAmount()); order.setStatus(OrderStatus.CREATED); order.setInventoryDeducted(false); // 库存未扣减 orderRepository.save(order);
log.info("订单创建成功:orderId={}", orderId); return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) { log.error("订单创建失败", e); return LocalTransactionState.ROLLBACK_MESSAGE; } }
@Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { String orderId = new String(msg.getBody()); Order order = orderRepository.findById(orderId);
if (order != null) { log.info("事务回查:订单存在,提交消息,orderId={}", orderId); return LocalTransactionState.COMMIT_MESSAGE; } else { log.info("事务回查:订单不存在,回滚消息,orderId={}", orderId); return LocalTransactionState.ROLLBACK_MESSAGE; } } }}
// ==================== Inventory Service(库存服务)====================
@Servicepublic class InventoryService {
@Autowired private InventoryRepository inventoryRepository;
@Autowired private InventoryDeductLogRepository deductLogRepository;
@Autowired private RedisTemplate<String, String> redisTemplate;
/** * 扣库存(幂等) */ @Transactional public void deduct(String orderId, Long productId, Integer amount) { try { // 插入扣减日志(唯一索引保证幂等) InventoryDeductLog log = new InventoryDeductLog(); log.setOrderId(orderId); log.setProductId(productId); log.setAmount(amount); deductLogRepository.save(log);
// 扣库存 Inventory inventory = inventoryRepository.findByProductId(productId); if (inventory.getStock() < amount) { throw new RuntimeException("库存不足"); } inventory.setStock(inventory.getStock() - amount); inventoryRepository.save(inventory);
log.info("库存扣减成功:orderId={}, productId={}, amount={}", orderId, productId, amount);
} catch (DataIntegrityViolationException e) { // 唯一索引冲突,说明已扣减 log.info("库存已扣减,跳过:orderId={}", orderId); } }}
// ==================== Inventory Consumer(库存消费者)====================
@Servicepublic class InventoryConsumer {
@Autowired private InventoryService inventoryService;
@Autowired private OrderRepository orderRepository;
@Autowired private RedisTemplate<String, String> redisTemplate;
@PostConstruct public void startConsumer() throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer( "INVENTORY_CONSUMER_GROUP" ); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("INVENTORY_DEDUCT_TOPIC", "*");
consumer.registerMessageListener((msgs, context) -> { for (MessageExt msg : msgs) { String msgId = msg.getMsgId(); String orderId = new String(msg.getBody());
// 第一层:Redis 去重(快速判断) if (!redisTemplate.opsForValue().setIfAbsent( "msg:" + msgId, "1", 24, TimeUnit.HOURS)) { log.info("Redis 去重:重复消息,跳过,msgId={}", msgId); continue; }
try { // 查询订单 Order order = orderRepository.findById(orderId); if (order == null) { log.warn("订单不存在:orderId={}", orderId); continue; }
// 幂等检查:订单是否已扣库存 if (order.getInventoryDeducted()) { log.info("库存已扣减,跳过:orderId={}", orderId); continue; }
// 第二层:数据库唯一索引(持久化保障) inventoryService.deduct(orderId, order.getProductId(), order.getAmount());
// 标记库存已扣减 order.setInventoryDeducted(true); orderRepository.save(order);
} catch (DataIntegrityViolationException e) { // 数据库去重:说明已扣减 log.info("数据库去重:库存已扣减,跳过,msgId={}", msgId); } catch (Exception e) { log.error("库存扣减失败:orderId={}", orderId, e); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; });
consumer.start(); }}
// ==================== 补偿任务(兜底方案)====================
@Servicepublic class InventoryCompensationTask {
@Autowired private OrderRepository orderRepository;
@Autowired private InventoryService inventoryService;
// 每 10 分钟执行一次 @Scheduled(cron = "0 */10 * * * ?") public void compensate() { log.info("开始库存补偿任务");
// 查询创建时间 > 1 小时且库存未扣减的订单 List<Order> orders = orderRepository.findByInventoryDeductedAndCreateTimeBefore( false, LocalDateTime.now().minusHours(1) );
int count = 0; for (Order order : orders) { try { inventoryService.deduct(order.getId(), order.getProductId(), order.getAmount()); order.setInventoryDeducted(true); orderRepository.save(order); count++; } catch (Exception e) { log.error("库存补偿失败:orderId={}", order.getId(), e); } }
log.info("库存补偿任务完成:处理数量={}", count); }}