1. 基本架构
RocketMQ
架构上主要分为四部分,如下图所示:
Producer
:消息发布的角色,支持分布式集群方式部署。Producer
通过MQ
的负载均衡模块选择相应的Broker
集群队列进行消息投递,投递的过程支持快速失败并且低延迟。Consumer
:消息消费的角色,支持分布式集群方式部署。支持以push
推,pull
拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。NameServer
:NameServer
是一个非常简单的Topic
路由注册中心,其角色类似Dubbo
中的zookeeper
,支持Broker
的动态注册与发现。主要包括两个功能:Broker
管理,NameServer
接受Broker
集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker
是否还存活;- 路由信息管理,每个
NameServer
将保存关于Broker
集群的整个路由信息和用于客户端查询的队列信息。然后Producer
和Conumser
通过NameServer
就可以知道整个Broker
集群的路由信息,从而进行消息的投递和消费。
NameServer
通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker
是向每一台NameServer
注册自己的路由信息,所以每一个NameServer
实例上面都保存一份完整的路由信息。当某个NameServer
因某种原因下线了,Broker
仍然可以向其它NameServer
同步其路由信息,Producer
,Consumer
仍然可以动态感知Broker
的路由的信息。BrokerServer
:Broker
主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker
包含了以下几个重要子模块:Remoting Module
:整个Broker
的实体,负责处理来自clients
端的请求。Client Manager
:负责管理客户端(Producer
/Consumer
)和维护Consumer
的Topic
订阅信息Store Service
:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。HA Service
:高可用服务,提供Master Broker
和Slave Broker
之间的数据同步功能。Index Service
:根据特定的Message key
对投递到Broker
的消息进行索引服务,以提供消息的快速查询。
2. 获取源码
rocketMq项目的github
仓库为github.com/apache/rock…,由于网络原因,我们并不会直接使用github
仓库,而是将其导入到gitee
上,只需在gitee
创建新仓库时,选择导入已有仓库即可:
导入到gitee
后,就可以进行checkout
了,本文对应的gitee仓库为gitee.com/funcy/rocke…。
checkout
源码到本地后,默认是master
分支,本人习惯基于tag
创建自己的分支,然后在自己的分支上进行分析,rocketMq
的tag
如下:
最新版本是4.8.0
,我们将基于此tag创建新分支,使用的命令如下:
# 切换到 rocketmq-all-4.8.0
git checkout rocketmq-all-4.8.0
# 基于 rocketmq-all-4.8.0 创建自己的分析,名称为 rocketmq-all-4.8.0-LEARN
git checkout -b rocketmq-all-4.8.0-LEARN
# 将 rocketmq-all-4.8.0-LEARN 分支推送到远程仓库
git push -u origin rocketmq-all-4.8.0-LEARN
复制代码
接下来,我们所有的操作都是在rocketmq-all-4.8.0-LEARN
分支上进行了。
3. 本地启动
拿到代码后,我们就开始进行本地启动了,没错,就是在idea中进行启动。
3.1 复制conf
目录
在启动项目前,我们需要进行一些配置,rocketMq
项目的配置文件位于rocketmq/distribution
模块下的conf
目录中,直接整个复制到rocketmq
目录下:
也不需要改动,复制出来就行了,这些配置的内容后面分析源码时再讲解吧。
3.2 启动nameServer
nameServer
的主类为org.apache.rocketmq.namesrv.NamesrvStartup
:
如果我们直接运行main()
方法,会报错:
报错信息已经很明确了,需要我们配置ROCKETMQ_HOME
目录,我们在idea
中进行配置即可:
打开配置界面:
填写ROCKETMQ_HOME
配置:
这里我填写的是ROCKETMQ_HOME=/Users/chengyan/IdeaProjects/myproject/rocketmq
,这个ROCKETMQ_HOME
路径就是conf
文件夹所在的目录。
填写好后,就可以启动了:
3.3 启动broker
broker
的主类为org.apache.rocketmq.broker.BrokerStartup
,启动方式与nameServer
很相似,启动前也要配置ROCKETMQ_HOME
路径:
相比于nameServer
,这里多配置了启动参数:
-n localhost:9876 autoCreateTopicEnable=true
复制代码
这个启动参数是指定nameServer
的地址,以及开启自动创建topic
的功能。
配置完成之后就可以启动了:
3.4 启动管理后台
rocketMq
的管理后台在另一个仓库github.com/apache/rock…,除了后台,这个仓库还包含了许多的其他模块:
我们并不需要分析这个项目,源码本可以不必下载,但我在找这个项目的release
版本时,发现并没有提供已编译好的jar包,需要自己构建代码,因此我就再次下载了这个代码源码。当然,由于网络的原因,这个项目的源码也被我导入到了gitee
上,地址为gitee.com/funcy/rocke….
这个项目的代码我们并不分析,因此直接在master
分支上操作即可,
管理后台项目为rocketmq-console
,主类为org.apache.rocketmq.console.App
:
在启动前,我们需要修改下application.properties
的配置,找到rocketmq.config.namesrvAddr
配置,添加nameServer
的ip与端口,这里我们连接的是本地应用,直接填写localhost:9876
:
...
rocketmq.config.namesrvAddr=localhost:9876
...
复制代码
启动,结果如下:
访问http://localhost:8080
,结果如下:
可以看到broker
已经出现在cluster
列表中了,这就表明启动成功了。
4. 收发消息测试
rocketMq
项目的example
模块下有大量的测试示例,我们选择其一进行消息收发测试。
4.1 启动Consumer
我们先找到org.apache.rocketmq.example.simple.PushConsumer
,代码如下:
public class PushConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
String nameServer = "localhost:9876";
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.setNamesrvAddr(nameServer);
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//wrong time format 2017_0422_221800
consumer.setConsumeTimestamp("20181109221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n",
Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
复制代码
这个Consumer
监听的topic
是TopicTest
,后面我们就会往这个topic
发送消息。另外,需要注意nameServer
的配置,我们是在本地启动的nameServer
,因此这里配置的是localhost:9876
。
运行main()
方法,结果如下:
4.2 启动Producer
我们找到 org.apache.rocketmq.example.simple.Producer
类,代码如下:
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
String nameServer = "localhost:9876";
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr(nameServer);
producer.start();
for (int i = 0; i < 10; i++)
try {
{
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
复制代码
同样地,这里使用的是的nameServer
地址是localhost:9876
,topic
是TopicTest
,运行,结果如下:
再回过头看看PushConsumer
的控制台:
可以看到,Producer
发送消息成功了,PushConsumer
也成功获取到消息了。
4.3 异常分析
如图所示:
如果出现异常:
org.apache.rocketmq.client.exception.MQClientException:
No route info of this topic: TopicTest
复制代码
这表明当前broker
中没有TopicTest
的topic
,这时我们可以手动创建topic
,也可以在启动时指定autoCreateTopicEnable=true
.
如果是按上面步骤进行的,请确认下org.apache.rocketmq.broker.BrokerStartup
是否配置启动参数
-n localhost:9876 autoCreateTopicEnable=true
复制代码
配置方式就按3.3节
的方式配置就行了。
5. 总结
本文主要介绍了rocketMq
的基本架构,通过源码展示了rocketMq
的启动方式,最后通过rocketMq
项目下example
模块中的测试代码展示了消息的收发过程。
总的来说,本文还是在准备源码分析的环境,下篇文章开始,我们就正式开始rocketMq
的源码分析了。