消息重试
Consumer 端消息消费失败后,会send message back to broker.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
// 将消费失败的消息发回给Broker
boolean result = this.sendMessageBack(msg, context);
if (!result) {
// 消息消费次数递增
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
// 重新提交ConsumeRequest到队列里
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
发回去的requestHeader里有两个和重试相关的关键字段 delayLevel(决定了延迟的时间) 和 maxConsumeRetryTimes。
Broker 在接收到消息后,将消息的 topic 改成 “%RETRY% + consumerGroup”, 将真正的 topic 放到消息的RETRY_TOPIC属性中。
然后交给Store去存储.1
2
3String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
msgInner.setTopic(newTopic);
store 先判断它的 delayLevel,如果大于0,证明要重试,又将消息的 topic 改成 “SCHEDULE_TOPIC_XXXX”,并把 “%RETRY% + consumerGroup” 保存在消息的 REAL_TOPIC 属性中。然后去写 commitLog,构建 ConsumeQueuqe. 这个时候,图中的
SCHEDULE_TOPIC_XXXX 目录下就会有 ConsumeQueuqe 了.
1 | topic = ScheduleMessageService.SCHEDULE_TOPIC; |
ScheduleMessageService 是 broker 中一个单独的线程,它会不停的去扫描 SCHEDULE_TOPIC_XXXX 里的 ConsumeQueuqe,根据里面tagsCode字段记录的时间戳判断是否到了delay的时间, 如果到了,就根据 ConsumeQueuqe 找到对应的 CommitLog里的消息,并将它的 topic 改回 “%RETRY% + consumerGroup”, 重新交给 store 去做 putMessage,这个时候图中的 %RETRY%xxxxx 之类的目录下就会有 ConsumeQueue了。1
msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));
每个delayLevel对应要delay的时间是通过这个变量关联起来的,这个变量在 MessageStoreConfig 中,是可配置的。1
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
Consumer端在启动的时候,就订阅了 %RETRY%+consumerGroup 这个 topic 的消息,所以它会一直去长轮询这类消息,拿到消息后,将Topic 从 “%RETRY% + consumerGroup” 改回 RETRY_TOPIC属性中的真正的Topic。1
2
3
4
5
6
7
8
9public void resetRetryTopic(final List<MessageExt> msgs) {
final String groupTopic = MixAll.getRetryTopic(consumerGroup);
for (MessageExt msg : msgs) {
String retryTopic = msg.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (retryTopic != null && groupTopic.equals(msg.getTopic())) {
msg.setTopic(retryTopic);
}
}
}
定时消息
讲完了消息的重试之后,定时消息原理也就很简单了,Broker先将消息的 消息的 topic 改成 “SCHEDULE_TOPIC_XXXX”,并把真正的Topic保存在消息的 REAL_TOPIC 属性中. ScheduleMessageService 在时间到了之后,会将它捞出来,恢复Topic, 并开始写 CommitLog 和 ConsumeQueue.