Skip to content

RocketMQ 事务消息深度解析

面试官:下单时需要同时扣库存,如何保证两个操作的原子性?

:我们使用 RocketMQ 事务消息实现。下单操作和消息发送通过 Half 消息机制绑定,保证要么下单成功且库存消息被消费,要么下单失败消息被回滚,从而实现最终一致性。

面试官:Half 消息是什么?整个流程是怎样的?

这个问题很多人只能说”事务消息保证一致性”,但能讲清楚 Half 消息、两阶段提交、事务回查这三个核心概念的,才能真正体现技术深度。


Q1:RocketMQ 事务消息的完整流程是什么?必考

Section titled “Q1:RocketMQ 事务消息的完整流程是什么?”

核心思想:将本地事务与消息发送”绑定”,通过两阶段提交保证原子性。

完整流程图

┌─────────────────────────────────────────────────────────────┐
│ 第一阶段:发送 Half 消息(预提交) │
└─────────────────────────────────────────────────────────────┘
Producer Broker
│ │
├── 1. 发送 Half 消息 ────→│
│ (PREPARE 状态) │
│ ├── 存入特殊 Topic:
│ │ RMQ_SYS_TRANS_HALF_TOPIC
│ │ (对消费者不可见!)
│ │
│←── 2. 返回发送成功 ───────┤
│ (拿到消息 offset) │
┌─────────────────────────────────────────────────────────────┐
│ 第二阶段:执行本地事务 │
└─────────────────────────────────────────────────────────────┘
Producer
├── 3. executeLocalTransaction()
│ ├── 开启数据库事务
│ ├── INSERT INTO orders ...
│ ├── INSERT INTO inventory_deduct ...
│ ├── COMMIT;
│ │
│ └── 根据本地事务结果,决定消息状态:
│ ├── 成功 → COMMIT_MESSAGE
│ ├── 失败 → ROLLBACK_MESSAGE
│ └── 不确定 → UNKNOWN
┌─────────────────────────────────────────────────────────────┐
│ 第三阶段:提交或回滚消息 │
└─────────────────────────────────────────────────────────────┘
Producer Broker
│ │
├── 4. 提交结果 ──────────→│
│ (COMMIT/ROLLBACK) │
│ │
│ ├── COMMIT:
│ │ ├── 从 RMQ_SYS_TRANS_HALF_TOPIC 删除
│ │ ├── 转移到真实 Topic(如 ORDER_TOPIC)
│ │ └── 消费者可见,开始消费
│ │
│ └── ROLLBACK:
│ ├── 从 RMQ_SYS_TRANS_HALF_TOPIC 删除
│ └── 消费者不可见,相当于丢弃
┌─────────────────────────────────────────────────────────────┐
│ 异常情况:事务回查(补偿机制) │
└─────────────────────────────────────────────────────────────┘
Broker Producer
│ │
├── 5. 定时扫描 ──────────→│(60s 一次)
│ Half 消息 │
│ (未 COMMIT/ROLLBACK) │
│ │
│ ├── 6. checkLocalTransaction()
│ │ ├── 查询订单表:SELECT * FROM orders
│ │ ├── 订单存在 → 返回 COMMIT
│ │ └── 订单不存在 → 返回 ROLLBACK
│ │
│←── 7. 返回事务状态 ───────┤
│ │
└── 执行 COMMIT/ROLLBACK ──┘

关键概念解释

1. Half 消息(半消息)

Half 消息 = 预提交的消息,对消费者不可见
特征:
- 消息已写入 CommitLog(持久化)
- Topic 被替换为 RMQ_SYS_TRANS_HALF_TOPIC(内部 Topic)
- 消费者订阅的 Topic 不是 RMQ_SYS_TRANS_HALF_TOPIC,所以看不到
- 等待生产者提交 COMMIT 或 ROLLBACK

2. 两阶段提交

第一阶段(预提交):
- 发送 Half 消息
- Broker 返回发送成功(消息已持久化)
第二阶段(提交/回滚):
- 执行本地事务
- 根据本地事务结果,决定消息命运
- COMMIT → 消息对消费者可见
- ROLLBACK → 消息被丢弃

3. 事务回查(补偿机制)

场景:生产者执行本地事务后,发送 COMMIT/ROLLBACK 失败
原因:网络抖动、生产者宕机、Broker 宕机
解决方案:Broker 定时扫描 Half 消息
- 扫描间隔:60s(默认)
- 回查生产者:checkLocalTransaction()
- 生产者查询本地数据库,判断事务是否已提交
- 返回 COMMIT/ROLLBACK/UNKNOWN

Q2:代码层面如何实现事务消息?高频

Section titled “Q2:代码层面如何实现事务消息?”

完整代码示例

// ==================== 第一步:实现事务监听器 ====================
public class OrderTransactionListener implements TransactionListener {
@Autowired
private OrderService orderService;
@Autowired
private OrderRepository orderRepository;
/**
* 执行本地事务
* 在这里完成下单操作(订单表 + 库存表)
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
String orderId = new String(msg.getBody(), StandardCharsets.UTF_8);
CreateOrderRequest request = (CreateOrderRequest) arg;
// 执行本地事务(下单 + 扣库存)
orderService.createOrder(orderId, request);
// 本地事务成功,提交消息
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
log.error("本地事务执行失败", e);
// 本地事务失败,回滚消息
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
/**
* 事务回查
* Broker 定期调用此方法,询问本地事务是否已提交
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String orderId = new String(msg.getBody(), StandardCharsets.UTF_8);
// 查询本地数据库,判断订单是否已创建
Order order = orderRepository.findById(orderId);
if (order != null) {
// 订单已创建,说明本地事务已提交
log.info("事务回查:订单存在,提交消息,orderId={}", orderId);
return LocalTransactionState.COMMIT_MESSAGE;
} else {
// 订单不存在,说明本地事务已回滚
log.info("事务回查:订单不存在,回滚消息,orderId={}", orderId);
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
}
// ==================== 第二步:发送事务消息 ====================
@Service
public class OrderService {
private TransactionMQProducer producer;
@PostConstruct
public void init() throws MQClientException {
// 创建事务消息生产者
producer = new TransactionMQProducer("ORDER_PRODUCER_GROUP");
producer.setNamesrvAddr("localhost:9876");
// 设置事务监听器
producer.setTransactionListener(new OrderTransactionListener());
// 设置事务回查线程池
producer.setCheckThreadPoolMinSize(2);
producer.setCheckThreadPoolMaxSize(5);
producer.start();
}
/**
* 创建订单(事务消息)
*/
@Transactional
public void createOrderWithTransaction(CreateOrderRequest request) {
String orderId = UUID.randomUUID().toString();
// 构建消息
Message message = new Message(
"INVENTORY_DEDUCT_TOPIC", // Topic:库存扣减
"TAG_DEDUCT", // Tag
orderId.getBytes(StandardCharsets.UTF_8) // 消息体:订单 ID
);
try {
// 发送事务消息(半消息 + 本地事务 + 提交/回滚)
TransactionSendResult result = producer.sendMessageInTransaction(
message, // 消息
request // 本地事务参数
);
log.info("事务消息发送结果:orderId={}, state={}",
orderId, result.getLocalTransactionState());
} catch (Exception e) {
log.error("事务消息发送失败:orderId={}", orderId, e);
throw new RuntimeException("下单失败");
}
}
/**
* 执行本地事务(订单 + 库存)
*/
@Transactional
public void createOrder(String orderId, CreateOrderRequest request) {
// 1. 创建订单
Order order = new Order();
order.setId(orderId);
order.setUserId(request.getUserId());
order.setProductId(request.getProductId());
order.setAmount(request.getAmount());
order.setStatus(OrderStatus.CREATED);
orderRepository.save(order);
// 2. 扣库存(本地数据库事务)
Inventory inventory = inventoryRepository.findByProductId(request.getProductId());
if (inventory.getStock() < request.getAmount()) {
throw new RuntimeException("库存不足");
}
inventory.setStock(inventory.getStock() - request.getAmount());
inventoryRepository.save(inventory);
log.info("订单创建成功:orderId={}", orderId);
}
}
// ==================== 第三步:消费者处理库存扣减 ====================
@Service
public class InventoryConsumer {
@PostConstruct
public void startConsumer() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
"INVENTORY_CONSUMER_GROUP"
);
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("INVENTORY_DEDUCT_TOPIC", "TAG_DEDUCT");
consumer.registerMessageListener((msgs, context) -> {
for (MessageExt msg : msgs) {
String orderId = new String(msg.getBody());
// 查询订单,获取商品 ID 和数量
Order order = orderRepository.findById(orderId);
if (order == null) {
log.warn("订单不存在:orderId={}", orderId);
continue;
}
// 幂等处理:检查是否已扣库存
if (order.getInventoryDeducted()) {
log.info("库存已扣减,跳过:orderId={}", orderId);
continue;
}
// 扣库存(消费者端,可能是另一个服务)
inventoryService.deduct(order.getProductId(), order.getAmount());
// 标记库存已扣减
order.setInventoryDeducted(true);
orderRepository.save(order);
log.info("库存扣减成功:orderId={}, productId={}, amount={}",
orderId, order.getProductId(), order.getAmount());
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}
}

关键代码解析

1. TransactionListener 接口

public interface TransactionListener {
/**
* 执行本地事务
* 在 Half 消息发送成功后立即调用
* 返回值:
* - COMMIT_MESSAGE:提交消息,消费者可见
* - ROLLBACK_MESSAGE:回滚消息,消费者不可见
* - UNKNOWN:不确定,等待事务回查
*/
LocalTransactionState executeLocalTransaction(Message msg, Object arg);
/**
* 事务回查
* Broker 定期扫描未确认的 Half 消息,调用此方法
* 返回值同上
*/
LocalTransactionState checkLocalTransaction(MessageExt msg);
}

2. sendMessageInTransaction() 执行流程

// RocketMQ 源码(简化版)
public TransactionSendResult sendMessageInTransaction(Message msg, Object arg) {
// 1. 发送 Half 消息
SendResult sendResult = this.send(msg);
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
// Half 消息发送失败,直接返回
return new TransactionSendResult(LocalTransactionState.ROLLBACK_MESSAGE);
}
// 2. 执行本地事务
LocalTransactionState localTransactionState =
transactionListener.executeLocalTransaction(msg, arg);
// 3. 提交或回滚消息
if (localTransactionState == LocalTransactionState.COMMIT_MESSAGE) {
this.endTransaction(msg, sendResult, LocalTransactionState.COMMIT_MESSAGE);
} else if (localTransactionState == LocalTransactionState.ROLLBACK_MESSAGE) {
this.endTransaction(msg, sendResult, LocalTransactionState.ROLLBACK_MESSAGE);
}
// 如果是 UNKNOWN,不发送任何请求,等待 Broker 回查
return new TransactionSendResult(localTransactionState);
}

3. 事务回查机制

// Broker 端定时任务(每 60s 执行一次)
public class TransactionalMessageService {
public void checkTransaction() {
// 1. 扫描 RMQ_SYS_TRANS_HALF_TOPIC 中未确认的消息
List<MessageExt> halfMessages = scanHalfMessages();
for (MessageExt msg : halfMessages) {
// 2. 回查生产者
LocalTransactionState state = producer.checkLocalTransaction(msg);
// 3. 根据回查结果提交或回滚
if (state == LocalTransactionState.COMMIT_MESSAGE) {
commitMessage(msg);
} else if (state == LocalTransactionState.ROLLBACK_MESSAGE) {
rollbackMessage(msg);
}
// 如果是 UNKNOWN,继续等待下次回查
}
}
}

Q3:事务消息如何保证消费者的幂等性?必考

Section titled “Q3:事务消息如何保证消费者的幂等性?”

为什么需要幂等

事务消息只能保证"至少一次"投递,不能保证"恰好一次":
场景 1:消费者处理成功,ACK 网络超时 → Broker 重新投递
场景 2:Broker 主从切换,消息重复投递
场景 3:消费者宕机,Broker 切换到其他消费者,消息重复投递
结果:消费者可能收到重复消息,必须幂等处理

幂等方案对比

方案实现方式优点缺点适用场景
数据库唯一索引INSERT 时唯一索引约束简单可靠,性能好需要设计表结构扣库存、扣款
Redis 去重SETNX 记录已处理消息 ID性能极高Redis 故障丢数据高并发、允许少量重复
数据库状态机UPDATE ... WHERE status=OLD天然幂等需要状态字段订单状态流转
Token 机制生产者生成 Token,消费者去重灵活需要额外存储通用场景

方案一:数据库唯一索引(推荐)

// 消费者:扣库存
consumer.registerMessageListener((msgs, context) -> {
for (MessageExt msg : msgs) {
String orderId = new String(msg.getBody());
Order order = orderRepository.findById(orderId);
try {
// 插入库存扣减记录(order_id + product_id 唯一索引)
InventoryDeductLog log = new InventoryDeductLog();
log.setOrderId(orderId);
log.setProductId(order.getProductId());
log.setAmount(order.getAmount());
inventoryDeductLogRepository.save(log); // 唯一索引约束
// 扣库存
inventoryService.deduct(order.getProductId(), order.getAmount());
} catch (DuplicateKeyException e) {
// 唯一索引冲突,说明已处理,直接返回成功
log.info("库存已扣减,跳过:orderId={}", orderId);
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

数据库表设计

CREATE TABLE inventory_deduct_log (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
order_id VARCHAR(64) NOT NULL COMMENT '订单 ID',
product_id BIGINT NOT NULL COMMENT '商品 ID',
amount INT NOT NULL COMMENT '扣减数量',
create_time DATETIME DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY uk_order_product (order_id, product_id) -- 唯一索引
) COMMENT '库存扣减日志表';

方案二:Redis 去重

consumer.registerMessageListener((msgs, context) -> {
for (MessageExt msg : msgs) {
String msgId = msg.getMsgId(); // RocketMQ 生成的唯一消息 ID
// Redis SETNX:key 不存在时设置成功,返回 true
// key 已存在时设置失败,返回 false
Boolean isFirst = redisTemplate.opsForValue().setIfAbsent(
"msg:" + msgId,
"1",
24, TimeUnit.HOURS // 过期时间 24 小时
);
if (Boolean.TRUE.equals(isFirst)) {
// 首次处理
String orderId = new String(msg.getBody());
inventoryService.deduct(orderId);
} else {
// 重复消息,直接返回成功
log.info("重复消息,跳过:msgId={}", msgId);
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

方案三:数据库状态机(订单状态流转)

// 场景:订单状态流转(待支付 → 已支付 → 已发货 → 已完成)
consumer.registerMessageListener((msgs, context) -> {
for (MessageExt msg : msgs) {
String orderId = new String(msg.getBody());
// 乐观锁 + 状态机:只有状态为 UNPAID 时才能更新为 PAID
int rows = orderRepository.updateStatus(
orderId,
OrderStatus.UNPAID, // 期望的旧状态
OrderStatus.PAID // 新状态
);
if (rows > 0) {
// 更新成功,执行业务逻辑
log.info("订单状态更新成功:orderId={}", orderId);
} else {
// 更新失败,说明订单状态已变更(幂等)
log.info("订单状态已变更,跳过:orderId={}", orderId);
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// SQL
UPDATE orders
SET status = 'PAID'
WHERE id = ? AND status = 'UNPAID'; -- 乐观锁条件

幂等方案性能对比

方案QPS延迟可靠性实现复杂度
数据库唯一索引50005ms高(持久化)
Redis 去重500001ms中(Redis 故障丢数据)
数据库状态机30008ms
Token 机制100003ms

最佳实践:数据库唯一索引 + Redis 去重(双重保障)

consumer.registerMessageListener((msgs, context) -> {
for (MessageExt msg : msgs) {
String msgId = msg.getMsgId();
// 第一层:Redis 去重(快速判断)
if (!redisTemplate.opsForValue().setIfAbsent("msg:" + msgId, "1", 24, TimeUnit.HOURS)) {
log.info("Redis 去重:重复消息,跳过,msgId={}", msgId);
continue;
}
try {
// 第二层:数据库唯一索引(持久化保障)
String orderId = new String(msg.getBody());
inventoryService.deductWithLog(orderId); // 扣库存 + 插入日志
} catch (DuplicateKeyException e) {
// 数据库去重:说明已处理
log.info("数据库去重:重复消息,跳过,msgId={}", msgId);
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

链式追问二:事务消息与其他方案对比

Section titled “链式追问二:事务消息与其他方案对比”

Q4:RocketMQ 事务消息和 TCC 分布式事务有什么区别?高频

Section titled “Q4:RocketMQ 事务消息和 TCC 分布式事务有什么区别?”

对比表格

维度RocketMQ 事务消息TCC(Try-Confirm-Cancel)
一致性级别最终一致(异步,有时间窗口)强一致(Try 阶段锁定资源)
实现复杂度低(消息 + 幂等消费)高(三个接口:Try/Confirm/Cancel)
业务侵入低(只需实现 TransactionListener)高(业务代码需实现三个方法)
性能高(异步,无资源锁定)低(同步,资源锁定时间长)
可靠性中(依赖 MQ,消息可能丢失)高(本地事务 + 补偿)
适用场景下单扣库存、发送通知(允许延迟)转账、扣款(必须强一致)
补偿机制事务回查Cancel 接口回滚
典型框架RocketMQSeata TCC、Hmily

TCC 实现示例(转账场景)

// Try:冻结资金(预扣)
public void tryTransfer(String fromAccount, String toAccount, BigDecimal amount) {
// 冻结转出账户资金
accountRepository.freeze(fromAccount, amount);
}
// Confirm:确认转账(扣款)
public void confirmTransfer(String fromAccount, String toAccount, BigDecimal amount) {
// 扣除冻结资金
accountRepository.deductFrozen(fromAccount, amount);
// 增加转入账户资金
accountRepository.add(toAccount, amount);
}
// Cancel:取消转账(解冻)
public void cancelTransfer(String fromAccount, String toAccount, BigDecimal amount) {
// 释放冻结资金
accountRepository.unfreeze(fromAccount, amount);
}

RocketMQ 事务消息示例(下单扣库存)

// 本地事务:下单 + 扣库存
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
orderService.createOrder((CreateOrderRequest) arg); // 本地数据库事务
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
// 消费者:异步扣库存(允许延迟)
consumer.registerMessageListener((msgs, context) -> {
for (MessageExt msg : msgs) {
inventoryService.deduct(msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

选型建议

场景 1:下单扣库存
需求:用户下单后,扣减库存
特点:允许异步处理(订单创建成功后,库存可以延迟几秒扣减)
推荐:RocketMQ 事务消息(实现简单,性能高)
场景 2:账户转账
需求:A 转账给 B 100 元
特点:必须强一致(A 的钱扣了,B 的钱必须到账,不能有时间窗口)
推荐:TCC 或 Seata AT 模式(保证强一致)
场景 3:发送通知
需求:用户注册后,发送欢迎邮件
特点:允许异步,失败了可以重试
推荐:RocketMQ 事务消息(简单可靠)

Q5:RocketMQ 事务消息能保证”恰好一次”语义吗?中频

Section titled “Q5:RocketMQ 事务消息能保证”恰好一次”语义吗?”

答案:不能,RocketMQ 事务消息只能保证”至少一次”(At-Least-Once)。

为什么不能保证”恰好一次”

消息投递流程:
Broker → Consumer
场景 1:消费者处理成功,ACK 网络超时
├── 消费者:已处理成功
└── Broker:未收到 ACK,认为消费失败 → 重新投递
场景 2:Broker 主从切换
├── Master Broker:消息已投递,但未同步到 Slave
└── Slave 升级为 Master → 消息重新投递
场景 3:消费者宕机
├── 消费者 A:处理到一半宕机
└── Broker 切换到消费者 B → 消息重新投递
结果:消费者收到重复消息

事务消息的正确理解

保证不保证
生产端原子性:本地事务与消息发送的原子性消费端恰好一次:消息可能重复投递
消息不丢失:Half 消息持久化 + 事务回查消息不重复:消费者可能收到重复消息
最终一致性:本地事务成功 → 消息最终被消费强一致性:消息立即被消费

正确的事务消息语义

事务消息 = At-Least-Once Delivery + Idempotent Consumer
翻译:
- At-Least-Once Delivery:消息至少投递一次(可能多次)
- Idempotent Consumer:消费者必须幂等(重复消费不影响结果)
结论:
事务消息 + 消费者幂等 = 最终一致性

实战案例

// ❌ 错误理解:事务消息保证不重复
consumer.registerMessageListener((msgs, context) -> {
for (MessageExt msg : msgs) {
// 直接扣库存,没有幂等处理
inventoryService.deduct(msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// ✅ 正确实现:事务消息 + 消费者幂等
consumer.registerMessageListener((msgs, context) -> {
for (MessageExt msg : msgs) {
String msgId = msg.getMsgId();
// Redis 去重
if (redisTemplate.opsForValue().setIfAbsent("msg:" + msgId, "1", 24, TimeUnit.HOURS)) {
// 首次处理,扣库存
inventoryService.deductWithLog(msg); // 数据库唯一索引保证幂等
} else {
// 重复消息,跳过
log.info("重复消息,跳过:msgId={}", msgId);
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

实战案例:下单扣库存完整方案

Section titled “实战案例:下单扣库存完整方案”

完整架构图

┌─────────────────────────────────────────────────────────────┐
│ Order Service(订单服务) │
└─────────────────────────────────────────────────────────────┘
├── 1. 发送 Half 消息 → Broker
│ (Topic: INVENTORY_DEDUCT_TOPIC)
├── 2. 执行本地事务
│ ├── INSERT INTO orders ...
│ └── COMMIT
├── 3. 提交/回滚消息 → Broker
└── 4. 事务回查(异常情况)
└── SELECT * FROM orders WHERE id=?
┌─────────────────────────────────────────────────────────────┐
│ Inventory Service(库存服务) │
└─────────────────────────────────────────────────────────────┘
├── 1. 接收消息 → 消费者
├── 2. 幂等检查(Redis + 数据库唯一索引)
├── 3. 扣库存
│ ├── INSERT INTO inventory_deduct_log ...
│ └── UPDATE inventory SET stock=stock-? ...
└── 4. 兜底补偿(定时任务)
└── 扫描未扣库存的订单 → 重新扣库存

完整代码实现

// ==================== Order Service(订单服务)====================
@Service
public class OrderService {
@Autowired
private TransactionMQProducer producer;
@Autowired
private OrderRepository orderRepository;
/**
* 创建订单(事务消息)
*/
public String createOrder(CreateOrderRequest request) {
String orderId = UUID.randomUUID().toString();
// 构建消息
Message message = new Message(
"INVENTORY_DEDUCT_TOPIC",
orderId.getBytes(StandardCharsets.UTF_8)
);
try {
// 发送事务消息
TransactionSendResult result = producer.sendMessageInTransaction(
message,
request
);
if (result.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE) {
return orderId;
} else {
throw new RuntimeException("创建订单失败");
}
} catch (Exception e) {
log.error("创建订单失败", e);
throw new RuntimeException("创建订单失败");
}
}
/**
* 事务监听器
*/
public static class OrderTransactionListener implements TransactionListener {
@Autowired
private OrderRepository orderRepository;
@Override
@Transactional
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
String orderId = new String(msg.getBody());
CreateOrderRequest request = (CreateOrderRequest) arg;
// 创建订单
Order order = new Order();
order.setId(orderId);
order.setUserId(request.getUserId());
order.setProductId(request.getProductId());
order.setAmount(request.getAmount());
order.setStatus(OrderStatus.CREATED);
order.setInventoryDeducted(false); // 库存未扣减
orderRepository.save(order);
log.info("订单创建成功:orderId={}", orderId);
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
log.error("订单创建失败", e);
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String orderId = new String(msg.getBody());
Order order = orderRepository.findById(orderId);
if (order != null) {
log.info("事务回查:订单存在,提交消息,orderId={}", orderId);
return LocalTransactionState.COMMIT_MESSAGE;
} else {
log.info("事务回查:订单不存在,回滚消息,orderId={}", orderId);
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
}
}
// ==================== Inventory Service(库存服务)====================
@Service
public class InventoryService {
@Autowired
private InventoryRepository inventoryRepository;
@Autowired
private InventoryDeductLogRepository deductLogRepository;
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**
* 扣库存(幂等)
*/
@Transactional
public void deduct(String orderId, Long productId, Integer amount) {
try {
// 插入扣减日志(唯一索引保证幂等)
InventoryDeductLog log = new InventoryDeductLog();
log.setOrderId(orderId);
log.setProductId(productId);
log.setAmount(amount);
deductLogRepository.save(log);
// 扣库存
Inventory inventory = inventoryRepository.findByProductId(productId);
if (inventory.getStock() < amount) {
throw new RuntimeException("库存不足");
}
inventory.setStock(inventory.getStock() - amount);
inventoryRepository.save(inventory);
log.info("库存扣减成功:orderId={}, productId={}, amount={}",
orderId, productId, amount);
} catch (DataIntegrityViolationException e) {
// 唯一索引冲突,说明已扣减
log.info("库存已扣减,跳过:orderId={}", orderId);
}
}
}
// ==================== Inventory Consumer(库存消费者)====================
@Service
public class InventoryConsumer {
@Autowired
private InventoryService inventoryService;
@Autowired
private OrderRepository orderRepository;
@Autowired
private RedisTemplate<String, String> redisTemplate;
@PostConstruct
public void startConsumer() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
"INVENTORY_CONSUMER_GROUP"
);
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("INVENTORY_DEDUCT_TOPIC", "*");
consumer.registerMessageListener((msgs, context) -> {
for (MessageExt msg : msgs) {
String msgId = msg.getMsgId();
String orderId = new String(msg.getBody());
// 第一层:Redis 去重(快速判断)
if (!redisTemplate.opsForValue().setIfAbsent(
"msg:" + msgId, "1", 24, TimeUnit.HOURS)) {
log.info("Redis 去重:重复消息,跳过,msgId={}", msgId);
continue;
}
try {
// 查询订单
Order order = orderRepository.findById(orderId);
if (order == null) {
log.warn("订单不存在:orderId={}", orderId);
continue;
}
// 幂等检查:订单是否已扣库存
if (order.getInventoryDeducted()) {
log.info("库存已扣减,跳过:orderId={}", orderId);
continue;
}
// 第二层:数据库唯一索引(持久化保障)
inventoryService.deduct(orderId, order.getProductId(), order.getAmount());
// 标记库存已扣减
order.setInventoryDeducted(true);
orderRepository.save(order);
} catch (DataIntegrityViolationException e) {
// 数据库去重:说明已扣减
log.info("数据库去重:库存已扣减,跳过,msgId={}", msgId);
} catch (Exception e) {
log.error("库存扣减失败:orderId={}", orderId, e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}
}
// ==================== 补偿任务(兜底方案)====================
@Service
public class InventoryCompensationTask {
@Autowired
private OrderRepository orderRepository;
@Autowired
private InventoryService inventoryService;
// 每 10 分钟执行一次
@Scheduled(cron = "0 */10 * * * ?")
public void compensate() {
log.info("开始库存补偿任务");
// 查询创建时间 > 1 小时且库存未扣减的订单
List<Order> orders = orderRepository.findByInventoryDeductedAndCreateTimeBefore(
false,
LocalDateTime.now().minusHours(1)
);
int count = 0;
for (Order order : orders) {
try {
inventoryService.deduct(order.getId(), order.getProductId(), order.getAmount());
order.setInventoryDeducted(true);
orderRepository.save(order);
count++;
} catch (Exception e) {
log.error("库存补偿失败:orderId={}", order.getId(), e);
}
}
log.info("库存补偿任务完成:处理数量={}", count);
}
}