【RocketMQ源码学习】1-整体结构

1.为什么是RocketMQ

为什么是 RocketMQ,而不是 ActiveMQ/RabbitMQ/Kafka 呢?这不是技术选型,我只是想找一个业界比较好的、开源的 MQ 系统,学习一下 MQ 的工作原理。所以首选 Java 的(虽然语言对我来说不是问题,然还是有点学习成本的),这就只剩下 RocketMQ 和 ActiveMQ 了,这两个那就肯定 选RocketMQ 了,毕竟人家是这么吹牛逼的: “万亿级数据洪峰下的分布式消息引擎”。

从xxl-job看分布式调度中心的设计

xxl-job是xuxueli同学开源的XXL系列中的一员,是一个分布式任务调度平台。我在撸完了TCP/IP协议栈源码(netX, 非开源,讲真,里面关于TCP拥塞控制那部分没看明白)、Tomcat源码(JSP/Servlet容器)、Motan源码(RPC框架)之后。想研究一下调度中心的实现, 也没有仔细比较过就选择了XXL_JOB. 撸完一遍源码后,发现是一个比较小的项目,所以就不打算搞一个系列了,就整一篇博客总结一下,还是有所收获的。

关于xxl_job的功能介绍以及使用方法,作者写的文档很详细,也很容易上手,这里就不追溯了。代码相对简单,没有复杂的算法、数据结构、线程模型等,我就不抠细节了,从大的方面来总结下。

1. 项目结构

【Motan源码学习】10-编码&序列化

编解码是通过在Netty中添加Pipeline来实现的。

1
2
3
4
5
6
7
8
9
10
11
//com.weibo.api.motan.transport.netty.NettyServer#initServerBootstrap
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("channel_manage", channelManage);
pipeline.addLast("decoder", new NettyDecoder(codec, NettyServer.this, maxContentLength));
pipeline.addLast("encoder", new NettyEncoder(codec, NettyServer.this));
pipeline.addLast("handler", handler);
return pipeline;
}
});

1
2
3
4
5
6
//com.weibo.api.motan.codec.Codec
@Spi(scope=Scope.PROTOTYPE)
public interface Codec {
byte[] encode(Channel channel, Object message) throws IOException;
Object decode(Channel channel, String remoteIp, byte[] buffer) throws IOException;
}

Codec是一个扩展点,提供encode/decode两个方法。motan-core 实现了 DefaultRpcCodec 和 CompressRpcCodec, 后者和前者差不多,就是编码后又用GZIP压缩了一下。motan-extensios 里还实现了处理 protobuf 的 Codec. 下面以DefaultRpcCodec为例了解下Motan的编码机制,解码机制类似,就是反过来,就不细说了。

【Motan源码学习】9-负载均衡

Motan中的LoadBalance就是实现按一定的策略选择Refer去调用服务。这是一个扩展点,用户可以方便的扩展,按自己的需求实现策略。Motan支持以下策略。
1. RandomLoadBalance
没什么可说的,随机从数组里取一个Refer.
2. RoundRobinLoadBalance
也没啥可说的,就是一个数组循环取。
3. LocalFirstLoadBalance
本地服务优先,对referers根据ip顺序查找本地服务,多存在多个本地服务,获取Active最小的本地服务进行服务。当不存在本地服务,但是存在远程RPC服务,则根据ActivWeight获取远程RPC服务。当两者都存在,所有本地服务都应优先于远程服务,本地RPC服务与远程RPC服务内部则根据ActiveWeight进行
4. ActiveWeightLoadBalance
按活跃权重(即Refer被在使用的count),被用的少的优先。由于Referer List可能很多,比如上百台,如果每次都要从这上百个Referer或者最低并发的几个,性能有些损耗,因此 random.nextInt(list.size()) 获取一个起始的index,然后获取最多不超过MAX_REFERER_COUNT的,状态是isAvailable的referer进行判断activeCount.

【Motan源码学习】8-高可用策略

Motan 实现了 FailFast、FailOver、FailBack 这几种HA策略。
1.FailFast
Fail-fast is a property of a system or module with respect to its response to failures. A fail-fast system is designed to immediately report at its interface anyfailure or condition that is likely to lead to failure. Fail-fast systems are usually designed to stop normal operation rather than attempt to continue a possibly flawed process. Such designs often check the system’s state at several points in an operation, so any failures can be detected early. A fail-fast module passes the responsibility for handling errors, but not detecting them, to the next-higher system design level.

这是维基百科关于FailFast的描述,就是错误要尽早发现,失败立即报错结束。 Motan 的 FailFast 实现的比较简单,和简单的调用一样。这让我想起了Hystrix这个熔断框架,里面也有FailFast的机制,可以去了解一下。

【Motan源码学习】7-Shutdown机制

Motan中很多地方用了线程池,有一些还是按一定频率一直在run的任务,比如心跳、Failback机制中的一些重试线程。这样会有一个问题,当把应用部署在Tomcat中的时候,用shutdown.sh关闭Tomcat的时候,这些线程池就关不掉,导致通过只能kill进程的方式来关闭。

在 0.3.1 中,Motan 实现了 ServletContextListener 接口和 ShutdownHook,在创建任务的时候,会去 ShutdownHook 中注册, 当 Context 关闭的时候,会通知该 Listener,然后该 Listener 就会去 shutdown 已经注册的这些任务。

1
2
3
4
5
6
public class ShutDownHookListener implements ServletContextListener {
@Override
public void contextDestroyed(ServletContextEvent sce) {
ShutDownHook.runHook(true);
}
}

【Motan源码学习】6-异步调用

Motan支持异步调用,在需要支持异步调用的接口上加 @MotanAsync 注解,在编译时就会自动生成一个Async结尾的异步接口。例如:

1
2
3
4
@MotanAsync
public interface MotanDemoService {
String hello(String name);
}

编译后会生成

1
2
3
public interface MotanDemoServiceAsync extends MotanDemoService {
ResponseFuture helloAsync(String name);
}

服务调用者就可以用这个接口来配置引用,用带有Async后缀的方法返回的ResponseFuture去异步的获得结果了。

这里用到了编译时处理注解的技术,Motan 中实现了 com.weibo.api.motan.transport.async.MotanAsyncProcessor 用来处理 @MotanAsync 注解,自动生成代码。并在 motan-core META-INF/services 的 javax.annotation.processing.Processor 文件中添加该Processor,这样在编译的时候就会去处理需要异步的接口了。