Kafka Vs RocketMQ

以下所有的分析都是基于 RocketMQ (tag rocketmq-all-4.1.0-incubating) 和 Kafka (tag 0.10.0.0),对于 Kafka,涉及 Producer 和 Consumer 的,都是基于该版本的 Java API.
过去大半年时间断断续续看了很多 RocketMQ 和 Kafka 的源码,下面从源码的角度分析下两者的共同点/不同点。

部署和存储模型

RocketMQ 的部署是主从架构,可以一主无备,一主一备,一主多备,多主多备,可靠性依次递增。
Kafka 的部署是互为主备的架构,一个 Broker 即可以是某个分区的主副本,又可以是其他分区的从副本。

【RocketMQ源码学习】9-事务消息

还是拿最经典的有君转账给芋芫100元为例来说明事务问题吧,所谓事务,即要保证 “有君账户的钱减100“和”芋芫账户的钱加100”这两个操作要么同时成功,要么同时失败,这在单机情况下很好实现。如果有君账户减100元的操作是在AAA应用里完成的,芋芫账户加100的操作是在BBB应用里完成的,这个事务要怎么保证呢,分布式系统设计领域有一些办法来实现(什么两阶段提交、paxos,raft等,对后两者我一点都不懂,不瞎BB了,说的好像你对两阶段提交很懂一样。。。)。
我们这里只关注 小事务 + 异步 这种方式。所谓小事务 + 异步, 就是指 【有君+100】、【芋芫-100】这两个本地事务加一个 AAA 发给 BBB 的异步消息,如图:
500

【RocketMQ源码学习】8-消息定时与重试

消息重试

Consumer 端消息消费失败后,会send message back to broker.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
// 将消费失败的消息发回给Broker
boolean result = this.sendMessageBack(msg, context);
if (!result) {
// 消息消费次数递增
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
// 重新提交ConsumeRequest到队列里
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}

【RocketMQ源码学习】7-消息过滤

Broker端的过滤

Broker端的过滤支持 TAG 和 SQL92 两种方式:
1. TAG方式
消息除了指定Topic还可以指定TAG,如果一个消息有多个TAG,可以用||分隔。Consumer 端在订阅消息的时候,可以指定消费哪种TAG的消息,比如:

1
2
// 表示订阅TAG为AAA和BBB的消息,如果传 * ,表示订阅该Topic的所有消息
consumer.subscribe("TopicTest", "AAA||BBB");

Consumer 会将这个订阅请求构建成一个 SubscriptionData,并告知 Broker.

【RocketMQ源码学习】6-消息存储

Store 在写消息之前会做一些校验(broker的角色、Topic的长度、消息Properties的长度等),然后交给 CommitLog去处理,CommitLog的整体流程如下:

  1. 设置一些信息到 MessageExtBrokerInner 对象中(storeTimestamp、message body CRC等)
  2. 获取 mappedFile,并 append消息到 mappedFile
  3. handleDiskFlush,刷盘
  4. handleHA
  5. deReput, 一个单独的线程,会按一定的频率去构建 consumeQueue 和 index

【RocketMQ源码学习】5-消息消费

1. Client端,两种消费方式

  • Push 模式
    1
    2
    3
    4
    5
    6
    7
    8
    consumer.subscribe("TopicTest1", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
    });

应用代码注册一个监听器,Client 在拿到消息后主动 call 这个listener.

【RocketMQ源码学习】4-消息发送

1. Client端,三种发送方式

RocketMQ 支持常见的三种发送方式,

  • SYNC
    1
    producer.send(msg)

同步的发送方式,会等待发送结果后才返回。可以用 send(msg, timeout) 的方式指定等待时间,如果不指定,就是默认的 3000ms. 这个timeout 最终会被设置到 ResponseFuture 里,再发送完消息后,用 countDownLatch 去 await timeout的时间,如果过期,就会抛出异常。

  • ASYNC