还是拿最经典的有君转账给芋芫100元为例来说明事务问题吧,所谓事务,即要保证 “有君账户的钱减100“和”芋芫账户的钱加100”这两个操作要么同时成功,要么同时失败,这在单机情况下很好实现。如果有君账户减100元的操作是在AAA应用里完成的,芋芫账户加100的操作是在BBB应用里完成的,这个事务要怎么保证呢,分布式系统设计领域有一些办法来实现(什么两阶段提交、paxos,raft等,对后两者我一点都不懂,不瞎BB了,说的好像你对两阶段提交很懂一样。。。)。
我们这里只关注 小事务 + 异步 这种方式。所谓小事务 + 异步, 就是指 【有君+100】、【芋芫-100】这两个本地事务加一个 AAA 发给 BBB 的异步消息,如图:
事务消息要解决的问题就是,保证 【有君+100】和 发送MQ 这两个操作要么同时成功,要么同时失败。现在问题来了,这两个操作谁先谁后呢:
- 先【有君+100】后发MQ,前者成功,后者失败了怎么办?
- 先发MQ后【有君+100】,前者成功,后者失败了怎么办?
所以,RocketMQ 是如何来解决这个问题的呢?
1 | public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg){ |
第一阶段发送的 PREPARED 消息会被 Broker 保存到 commitLog 中,但是不会构建对应的 ConsumeQueue,自然也是不能被消费的。1
2
3
4
5
6
7
8
9
10
11
12
13public void dispatch(DispatchRequest request) {
final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
DefaultMessageStore.this.putMessagePositionInfo(request);
break;
// 这两种类型的消息不会构建 ConsumeQueue
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
}
}
第三阶段,会发送 RequestCode为END_TRANSACTION 的请求,不同本地事务状态会发送不同类型的消息:1
2
3
4
5
6
7
8
9
10
11switch (localTransactionState) {
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
case UNKNOW:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
}
Broker的 EndTransactionProcessor 会去做处理,
1 | ... |
这里有个问题,如果第三阶段发送失败,或者发送的是TRANSACTION_NOT_TYPE的消息,那么 broker 里的消息一直是 Prepared ,一直不能被消费。这种情况该怎么办呢,broker 端会定期扫描这些消息(我在我看的这个tag里并没有找到这部分代码),发送 RequestCode 为CHECK_TRANSACTION_STATE给 Producer来询问事务状态。Producer会调用应用代码注册的Listener去决定状态,并告知broker.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24public void checkTransactionState(final String addr, final MessageExt msg, final CheckTransactionStateRequestHeader header) {
Runnable request = new Runnable() {
public void run() {
// 1. 获取应用代码注册的 TransactionCheckListener
TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
if (transactionCheckListener != null) {
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable exception = null;
try {
// 2. 应用代码决定事务状态
localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
} catch (Throwable e) {
....
}
// 3. 这里面会根据状态发送该消息是COMMIT还是ROLLBACK还是继续UNKONWN
this.processTransactionState(localTransactionState, group, exception);
} else {
...
}
}
}
...
}
消息被COMMIT后,BBB 就可以消费了,然后就可以执行 【芋芫-100】的操作了。 BBB 消费有异常情况会不停的重试,如果最终还是消费失败,就只能人工介入了。
谢晞鸣的思考
以上是 RocketMQ 处理事务性消息的过程,这种方案和下面这种方案相比,优势在哪里呢?下面这种方案有什么缺点吗?
先【有君+100】后发MQ,然后把这两个操作放在一个本地事务里(这里假设用的是Spring事务模板)),如果 【有君+100】失败,事务直接回滚,消息自然也不会发送,如果 【有君+100】 成功,消息发送失败,Spring捕捉到异常后会回滚事务,也没问题。
Reference
-
以上所有扯淡都是基于源码 https://github.com/apache/incubator-rocketmq (tag:rocketmq-all-4.1.0-incubating) 所贴代码有所删减。