简介

RocketMQ是一款分布式、队列模型的消息中间件。基于发布订阅模式,有Push和Pull两种消费方式,支持严格的消息顺序,亿级别的堆积能力,支持消息回溯和多个维度的消息查询。

RocketMQ官方文档

架构设计

架构设计

主要组成成分

  • NameServer:简单的注册中心,用户broker(主机)的动态注册与发现。保存Broker集群的注册信息(路由信息),并有心跳机制(每10s一次检测Broker是否存活)。NameServer集群之间不进行信息通讯,是一个几乎无状态节点,每个实例都保存完整的路由信息。

  • Broker: MQ服务器,负责消息队列主体功能,有主从两种身份,通过配置文件brokerId=0(master)或非0(slave)指定。每个Broker与NameServer集群所有节点建立连接,每隔30s注册Topic信息到NameServer所有节点。注:Master与Slave的关系是一对多,但只有BrokerId=1的从服务器才会参与消息的读负载。

Remoting Module:整个Broker的实体,负责处理来自clients端的请求。

Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息

Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。

HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。

Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。

  • Producer: 消息生产者,随机选择NameServer建立长连接并定期(每隔30s)获取Topic路由信息。与master建立长连接,保持心跳。

  • Consumer:消息消费者,随机选择NameServer建立长连接并定期获取Topic路由信息。与master、slave建立长连接,保持心跳。支持推(实时性好,但需注意消费者消费能力)、拉两种消费模式,集群消费与广播消费两种消费方式。

其他重要概念

  • Topic:主题用于表示一类消息的集合,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。生产消费者与toipc都是1对n关系,Topic是一个上逻辑概念。消息存储不会按Topic分开。
  • Group:组区分,标识一类应用,分生产者组和消费者组,属于同一组的生产/消费者生产/消费同一类消息且处理逻辑一致。组的区分使消费者实现负载均衡和容错的目标或广播变得非常容易。发送分布式事务消息时,如果Producer中途意外宕机,Broker会主动回调同一组内的任意一台Producer机器来确认事务状态。
  • Message:消息载体,生产和消费数据的最小单位,可通过唯一MeaasgeID或业务标识key查询消息。MessageId的长度总共有16字节,其中包含了消息存储主机地址(IP地址和端口),消息Commit Log offset
  • Tag:标签,用于同一主题区分不同消息,消费者可以根据Tag实现对不同子主题的不同消费逻辑,可用于消息过滤,如:consumer.subscribe(topic, “tag1 || tag2”)订阅消费topic主题tag1或tag2的消息。*传入 null,””,”“都是表示订阅全部的作用**。
  • Queue: Topic与queue是一对多关系,主要用于负载均衡。
  • Offset: 存储消息时会为每个topic下的每个Queue生成一个消息的索引文件,每个queue都对应一个offset记录当前queue中消息条数。存储消息数据的log文件是顺序存储的,通过二分查找定位数据,时间复杂度可降到logN。

工作流程

  1. 启动NameServer监听端口,等待Broker、Producer、Consumer连接。
  2. 启动Broker与所有NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。 (下面源码分析RouteInfoManager有介绍)
  3. 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
  4. Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
  5. Consumer跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。

特性

顺序消费

  • 全局顺序

    对于指定topic,全部消息都按照严格的先入先出(FIFO)的顺序进行发布和消费。 (性能低)

  • 局部顺序

    通过是实现MessageQueueSelector重写select方法,对于业务要求顺便的的消息发送到到相同分区(局部顺序性)。

消息过滤

服务端过滤

tag:大多数业务使用场景

bySql:部分业务线使用,支持SQL92语法,通过MessageSelector.bySql(“sql语句”)编写。

优点:减少了对于Consumer无用消息的网络传输。

缺点:增加了Broker的负担、而且实现相对复杂。

定时消息

延时等级(从1开始,小于0不重试,直接存入死信队列,18个level,通过msg.setDelayLevel(level)设置):“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”

原理:定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。

消息存储

存储结构

1. CommitLog: 消息主体以及元数据储存主体,默认大小为1G。文件名以20位数字组成(右边为起偏移量,左边补0)如:00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件。

2. ConsumeQueue:消费者队列(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。引入的目的主要是提高消息消费的性能。consumequeue文件采取定长设计,每一个条目共20个字节,分别为8字节的commitlog物理偏移量、4字节的消息长度、8字节tag hashcode,单个文件由30W个条目组成,可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约5.72M。

结构如下:

img

3. IndexFile: 索引文件,用于通过key或msgId来查询消息。单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引,底层存储设计为在文件系统中实现HashMap结构即hash索引。RocketMQ的混合型存储结构(统一broker的多个Topic的消息实体内容都存储于一个CommitLog中),当消费者无法拉取到消息后,可以等下一次消息拉取,同时服务端也支持长轮询模式,如果一个消息拉取请求未拉取到消息,Broker允许等待30s的时间,只要这段时间内有新消息到达,将直接返回给消费端。这里,RocketMQ的具体做法是,使用Broker端的后台服务线程—ReputMessageService不停地分发请求并异步构建ConsumeQueue(逻辑消费队列)和IndexFile(索引文件)数据。

数据存取优化

内存映射

RocketMQ通过MappedByteBuffer对文件进行读写操作。其中,利用了NIO中的FileChannel模型将磁盘上的物理文件直接映射到用户态的内存地址中(减少了内核缓冲区和用户缓冲区之间数据拷贝),将对文件的操作转化为直接对内存地址进行操作,从而极大地提高了文件的读写效率(正因为需要使用内存映射机制,故RocketMQ的文件存储都使用定长结构来存储,方便一次将整个文件映射至内存

页缓存

页缓存(PageCache)是OS对文件的缓存,用于加速对文件的读写。OS访问物理磁盘上文件时,会顺序对其他相邻块的数据文件进行预读取。涉及局部性原理。对于数据的顺序写入,OS会先写入至Cache内,随后通过异步的方式由pdflush内核线程将Cache内的数据刷盘至物理磁盘上。ConsumeQueue逻辑消费队列存储的数据较少,并且是顺序读取,读取性能几乎接近内存,CommitLog文件会有较多的随机访问,所以才应生出index用于加速查找。

消息过滤

Tag过滤方式

Consumer端在订阅消息时除了指定Topic还可以指定TAG,如果一个消息有多个TAG,可以用||分隔。其中,Consumer端会将这个订阅请求构建成一个 SubscriptionData,发送一个Pull消息的请求给Broker端。Broker端从RocketMQ的文件存储层—Store读取数据之前,会用这些数据先构建一个MessageFilter,然后传给Store。Store从 ConsumeQueue读取到一条记录后,会用它记录的消息tag hash(后8字节)值去做过滤,由于在服务端只是根据hashcode进行判断,无法精确对tag原始字符串进行过滤,故在消息消费端拉取到消息后,还需要对消息的原始tag字符串进行比对,如果不同,则丢弃该消息,不进行消息消费。

思考:在服务端实现精准过滤会增加服务端压力,不过滤直接交给客户端又有网络传输浪费,hash过滤是中间的权衡。

SQL92的过滤方式

这种方式的大致做法和上面的Tag过滤方式一样,只是在Store层的具体过滤过程不太一样,真正的 SQL expression 的构建和执行由rocketmq-filter模块负责的。每次过滤都去执行SQL表达式会影响效率,所以RocketMQ使用了BloomFilter避免了每次都去执行。SQL92的表达式上下文为消息的属性。

注:过滤相关类:org.apache.rocketmq.store.MessageFilter

负载均衡

producer

发送端先通过topic找到对应路由信息:TopicPublishInfo,再调用selectOneMessageQueue()方法选择具体队列MessageQueue发送消息。

consumer

负载均衡的核心—RebalanceImpl类的rebalanceByTopic方法。

Consumer端的两种消费模式(Push/Pull)都是基于拉模式来获取消息的,而在Push模式只是对pull模式的一种封装,其本质实现为消息拉取线程在从服务器拉取到一批消息后,然后提交到消息消费线程池后,又“马不停蹄”的继续向服务器再次尝试拉取消息。(org.apache.rocketmq.client.impl.consumer.RebalanceImpl#dispatchPullRequest)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
//广播模式
case BROADCASTING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged {} {} {} {}",
consumerGroup,
topic,
mqSet,
mqSet);
}
} else {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
break;
}
case CLUSTERING: {
//获取该topic下的所有MessageQueue
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
//向Broker获取该消费组下消费者Id列表(通过org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeSync
//发送rpc请求,Broker端基于Consumer端上报的心跳包数据而构建的consumerTable做出响应返回,业务请求码:GET_CONSUMER_LIST_BY_GROUP);
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
}

if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
}

if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);

//排序
Collections.sort(mqAll);
Collections.sort(cidAll);
//消息队列分配策略算法(默认为:消息队列的平均分配算法AllocateMessageQueueAveragely),计算出待拉取的消息队列。
// 这里的平均分配算法,类似于分页的算法,将所有MessageQueue排好序类似于记录,将所有消费端Consumer排好序类似页数,
// 并求出每一页需要包含的平均size和每个页面记录的范围range,
// 最后遍历整个range而计算出当前Consumer端应该分配到的记录(这里即为:MessageQueue)。
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

List<MessageQueue> allocateResult = null;
try {
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
}

Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
//将分配到的消息队列集合(mqSet)与processQueueTable做一个过滤比对。
//该方法最后会执行dispatchPullRequest()方法,
//push模式会将Pull消息的请求对象PullRequest依次放入PullMessageService服务线程的阻塞队列pullRequestQueue中,待该服务线程取出后向Broker端发起Pull消息的请求。
//pull模式为空实现
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
log.info(
"rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
allocateResultSet.size(), allocateResultSet);
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}

默认分配策略:org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely#allocate

同一组的消费者数量大于队列数,会有消费者无法消费到数据!注释说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
if (currentCID == null || currentCID.length() < 1) {
throw new IllegalArgumentException("currentCID is empty");
}
if (mqAll == null || mqAll.isEmpty()) {
throw new IllegalArgumentException("mqAll is null or mqAll empty");
}
if (cidAll == null || cidAll.isEmpty()) {
throw new IllegalArgumentException("cidAll is null or cidAll empty");
}

List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!cidAll.contains(currentCID)) {
log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
consumerGroup,
currentCID,
cidAll);
return result;
}

int index = cidAll.indexOf(currentCID);
int mod = mqAll.size() % cidAll.size();
//计算消费队列数
int averageSize =
mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
+ 1 : mqAll.size() / cidAll.size());
//开始索引,注意!!!若消费者队列数小于同一组的消费者数,并且索引大于mod(此时等于队列数)
//index * averageSize + mod 会 大于mqAll.size(),
//下面range会小于0,所以没有分配到队列,会无法消费到消息
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
}

负载均衡执行过程简要描述:

RebalanceService线程run方法(默认20秒执行一次)最终会调用org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic,在updateProcessQueueTableInRebalance方法中会为新分配的MessageQueue 会创建一个 PullRequest 对象,然后通过 dispatchPullRequest方法(pull模式为空实现)调用defaultMQPushConsumerImpl的executePullRequestImmediately方法 将 PullRequest放入到 PullMessageService 线程的 LinkedBlockingQueue, 进而唤醒 queue.take()方法,然后执行 DefaultMQPushConsumerImpl 的 pullMessage, 通过网络从broker端拉取消息,一次最多拉取的消息条数可配置,默认为32条,然后然后将拉取的消息,执行过滤等,然后封装成任务(ConsumeRequest), 提交到消费者的线程池去执行,每次消费消息后,又将该 PullRequest 放入到 PullMessageService中,(DefaultMQPushConsumerImpl)的延迟的机制就是pullInterval 为 0;

事务消息

事务消息

源码地址

https://github.com/apache/rocketmq

主要目录简介

  • acl 权限模块
  • broker broker模块,核心任务就是持久化消息
  • client 消息客户端,包括生产与消费者
  • common 公共依赖包,通用的常量枚举、基类方法或者数据结构。
  • dev 开发者信息(就一个py文件)
  • distribution 部署脚本、配置模块
  • docs 开发者文档
  • example 示例模块
  • filter 消息过滤器
  • namesrv NameServer模块
  • openmessaging 消息开放标准
  • remoting 远程通信模块
  • serutil 服务工具类
  • store 消息存储模块

主要模块类功能分析

版本:4.9.1-SNAPSHOT

namesrv模块

NamesrvStartup:启动类,主要是调用NamesrvController的初始化及开始方法。

NamesrvController:核心控制类。

核心初始化方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public boolean initialize() {
this.kvConfigManager.load();
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

// 初始化负责处理Netty网络交互数据的线程池,
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

// 注册负责处理Netty网络交互数据的DefaultRequestProcessor,客户端请求会由这个Processor来处理
this.registerProcessor();
// 每10s扫描存活的Broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
//通过比较最近心跳时间距今时间是否超过两分钟决定
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
// 每10分钟打印kv配置
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
//注册tls文件变更的监听器
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
// Register a listener to reload SslContext
try {
fileWatchService = new FileWatchService(
new String[] {
TlsSystemConfig.tlsServerCertPath,
TlsSystemConfig.tlsServerKeyPath,
TlsSystemConfig.tlsServerTrustCertPath
},
new FileWatchService.Listener() {
boolean certChanged, keyChanged = false;
@Override
public void onChanged(String path) {
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
log.info("The trust certificate changed, reload the ssl context");
reloadServerSslContext();
}
if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
certChanged = true;
}
if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
keyChanged = true;
}
if (certChanged && keyChanged) {
log.info("The certificate and private key changed, reload the ssl context");
certChanged = keyChanged = false;
reloadServerSslContext();
}
}
private void reloadServerSslContext() {
((NettyRemotingServer) remotingServer).loadSslContext();
}
});
} catch (Exception e) {
log.warn("FileWatchService created error, can't load the certificate dynamically");
}
}
return true;
}

​ NamesrvConfig:NameServer相关属性配置类,如配置文件路径。

​ NettyServerConfig、RemotingServer 、ExecutorService:网络通讯类与Netty密切相关,相关配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
//NettyServerConfig默认字段配置
//NamesrvStartup#createNamesrvController将该值设置为9876
private int listenPort = 8888;
private int serverWorkerThreads = 8;
private int serverCallbackExecutorThreads = 0;
private int serverSelectorThreads = 3;
//单向请求的信号量(并发度),一般用在发送心跳包场景下
private int serverOnewaySemaphoreValue = 256;
//异步调用的信号量(并发度)
private int serverAsyncSemaphoreValue = 64;
// 通道空闲时间
private int serverChannelMaxIdleTimeSeconds = 120;

KVConfigManager :读取或变更NameServer的配置属性,load方法加载 NamesrvConfig 中配置的配置文件到内存。

请求处理核心逻辑:org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest

RouteInfoManager : 用HashMap保存/管理Broker、Topic等信息。内部使用ReadWriteLock保证线程安全。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//空闲时长,超过两分钟没收到Broker的心跳包则,断开连接
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
//路由信息保存在org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager的以下字段里
private final ReadWriteLock lock = new ReentrantReadWriteLock();
//主题与队列关系,记录分布在哪些Broker上,Broker上存在该主题的队列个数,QueueData队列描述信息
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
//BrokerData信息描述每一个 broker信息(所属集群,broker名,还有以brokerId为key地址为value的map集合)(broker主从集合)。
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
//集群包含的brokerName
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
//当前存活的broker,NameServer10秒扫描一次,非实时有误差
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
//过滤服务列表
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

BrokerHousekeepingService:实现ChannelEventListener接口,处理与Broker连接通道关闭,异常,空闲的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public void onChannelClose(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}

@Override
public void onChannelException(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}

@Override
public void onChannelIdle(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}

FileWatchService : 监控tls文件的改动。使用MessageDigest类,用MD5加密算法提取概要信息存ArrayList集合fileCurrentHash里,WATCH_INTERVAL = 500毫秒查询校验一次。(为啥频率这么高???)

broker模块

broker也有与namesrv一样相应的BrokerStartup与BrokerController作用类似。

消息执行器:org.apache.rocketmq.broker.processor.SendMessageProcessor。

存储配置类:org.apache.rocketmq.store.config.MessageStoreConfig,部分配置字段如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//设置Broker的存储根目录,默认为 $Broker_Home/store。
private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";
//设置commitlog的存储目录,默认为$Broker_Home/store/commitlog。
private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store"
+ File.separator + "commitlog";
// CommitLog文件大小默认 1G
private int mappedFileSizeCommitLog = 1024 * 1024 * 1024;
// ConsumeQueue 存放的是定长的信息(20个字节,偏移量、size、tagscode) 默认是 30W * 20
private int mappedFileSizeConsumeQueue = 300000 * ConsumeQueue.CQ_STORE_UNIT_SIZE;
// 是否开启 consumeQueueExt,默认为 false,就是如果消费端消息消费速度跟不上,是否创建一个扩展的 ConsumeQueue文件,如果不开启,应该会阻塞从 commitlog 文件中获取消息。
private boolean enableConsumeQueueExt = false;
// 扩展consume文件的大小,默认为48M。
private int mappedFileSizeConsumeQueueExt = 48 * 1024 * 1024;
// Bit count of filter bit map.
// this will be set by pipe of calculate filter bit map.
private int bitMapLengthConsumeQueueExt = 64;

//刷写 CommitLog 的间隔时间,RocketMQ 后台会启动一个线程,将消息刷写到磁盘,
// 这个也就是该线程每次运行后等待的时间,默认为500毫秒。flush 操作,调用文件通道的force()方法。
private int flushIntervalCommitLog = 500;
//提交消息到 CommitLog 对应的文件通道的间隔时间,原理与上面类似;
// 将消息写入到文件通道(调用FileChannel.write方法)得到最新的写指针,默认为200毫秒。
private int commitIntervalCommitLog = 200;

存储核心类:DefaultMessageStore,存储消息的入口方法为:putMessage,核心步骤如下:

  1. 检查状态:checkStoreStatus()

    1. 检查当前节点是否已shutdown。
    2. 是否是SLAVE节点(SLAVE只读)。
    3. 是否可写。
    4. 操作系统页写入是否繁忙:isOSPageCacheBusy()。
  2. 检查消息checkMessage(msg):检查topic与propertiesString长度。

  3. 将日志写入CommitLog 文件:this.commitLog.putMessage(msg)。

  4. 统计写入耗时信息。

    1
    this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
  5. 记录写commitlog 失败次数。

    1
    2
    3
    if (null == result || !result.isOk()) {
    this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
    }

消息写入:org.apache.rocketmq.store.CommitLog#putMessage,核心步骤如下:

  1. 获取消息类型(事务消息,非事务消息,Commit消息。

  2. 获取一个 MappedFile 对象,内存映射的具体实现。

  3. 追加消息需要加锁PutMessageLock(两种实现PutMessageReentrantLock(内部就个ReentrantLock)和PutMessageSpinLock(默认)(内部含有AtomicBoolean,使用do-while,cas加锁)。

  4. 验证 MappedFile 对象,获取一个可用的 MappedFile (如果没有,则创建一个)。

  5. 通过MappedFile对象写入文件。result = mappedFile.appendMessage(msg, this.appendMessageCallback);

  6. 根据刷盘策略刷盘:handleDiskFlush(result, putMessageResult, msg);

  7. 主从同步:handleHA(result, putMessageResult, msg);

**消息长度计算: **org.apache.rocketmq.store.CommitLog#calMsgLength

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
protected static int calMsgLength(int sysFlag, int bodyLength, int topicLength, int propertiesLength) {
int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20;
int storehostAddressLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 8 : 20;
//单位:字节
final int msgLen = 4 //TOTALSIZE 消息总长度
+ 4 //MAGICCODE 模数
+ 4 //BODYCRC 数据校验CR
+ 4 //QUEUEID 消息队列id
+ 4 //FLAG 标记位
+ 8 //QUEUEOFFSET 消息队列偏移量
+ 8 //PHYSICALOFFSET 物理偏移量
+ 4 //SYSFLAG 系统标记
+ 8 //BORNTIMESTAMP bornt存储时间戳
+ bornhostLength //BORNHOST Broken地址、端口
+ 8 //STORETIMESTAMP 时间戳
+ storehostAddressLength //STOREHOSTADDRESS 存储地址端口
+ 4 //RECONSUMETIMES 消息重试次数
+ 8 //Prepared Transaction Offset
+ 4 + (bodyLength > 0 ? bodyLength : 0) //BODY 4字节的body长度+具体消息长度
+ 1 + topicLength //TOPIC 一字节的topic长度与topic内容长度
+ 2 + (propertiesLength > 0 ? propertiesLength : 0) //propertiesLength 消息属性长度
+ 0;
return msgLen;
}

全局唯一msgId生成代码位置:org.apache.rocketmq.common.message.MessageDecoder#createMessageId(java.nio.ByteBuffer, java.nio.ByteBuffer, long) 由ip+port+commitLog偏移地址组成

1
2
3
4
5
6
7
8
9
10
public static String createMessageId(final ByteBuffer input, final ByteBuffer addr, final long offset) {
input.flip();
int msgIDLength = addr.limit() == 8 ? 16 : 28;
input.limit(msgIDLength);

input.put(addr);
input.putLong(offset);

return UtilAll.bytes2string(input.array());
}

**消息刷盘 **: org.apache.rocketmq.store.CommitLog#handleDiskFlush

具体刷盘org.apache.rocketmq.store.MappedFile#flush:就是调用 FileChannel 或 MappedByteBuffer 的force 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {

//同步刷写
// Synchronization flush
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (messageExt.isWaitStoreMsgOK()) { //是否一定要收到存储MSG信息,才返回,默认为true。如果要等待存储结果。
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
CompletableFuture<PutMessageStatus> flushOkFuture = request.future();
PutMessageStatus flushStatus = null;
try {
flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
//flushOK=false;
}
if (flushStatus != PutMessageStatus.PUT_OK) {
log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
+ " client address: " + messageExt.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
service.wakeup(); //唤醒同步刷盘线程。
}
}
// Asynchronous flush //异步刷盘机制
else {

if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
commitLogService.wakeup();
}
}
}

消息存储过程

client模块

发送消息

默认发送方法org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl

核心步骤

  1. 消息(group、topic、body等)校验:Validators.checkMessage(msg, this.defaultMQProducer);

  2. 获取topic的路由信息:TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

  3. 根据topic负载均衡算法选择一个MessageQueue。可实现MessageQueueSelector自定义。

    默认org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue方法有ThreadLocalIndex sendWhichQueue保存上次发送队列下标,使用轮询获取发送队列下标,对开启失败延迟的话还会对队列进行判断验证。

  4. 向 MessageQueue 发送消息。

  5. 更新失败策略,主要用于规避发生故障的 broker。

  6. 如果是同步调用方式(SYNC),则执行失败重试策略,默认重试两次, 异步消息发送的重试是在回调时。

1
2
int timesTotal = communicationMode == CommunicationMode.SYNC ? 
1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;

延迟类MQFaultStrategy

1
2
3
4
//latencyMax:最大延迟时间数值,在消息发送之前,先记录当前时间(start),然后消息发送成功或失败时记录当前时间(end),(end-start)代表一次消息延迟时间,发送错误时
//对之前失败的,按一定的时间做退避,下面通过索引相对应,如上次请求的latency超过550Lms,就退避3000Lms;
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

校验配置类

1
2
3
4
5
6
7
8
9
10
public class Validators {
public static final String VALID_PATTERN_STR = "^[%|a-zA-Z0-9_-]+$";
//分组与topic的命名符号限制
public static final Pattern PATTERN = Pattern.compile(VALID_PATTERN_STR);
//group长度限制
public static final int CHARACTER_MAX_LENGTH = 255;
//topic长度限制
public static final int TOPIC_MAX_LENGTH = 127;
//……
}

其他限制:默认超时时间3s(org.apache.rocketmq.client.producer.DefaultMQProducer#sendMsgTimeout),消息body最大4M(org.apache.rocketmq.client.producer.DefaultMQProducer#maxMessageSize)。

消费消息

主要类简介

DefaultMQPushConsumerImpl :消息消息者默认实现类,应用程序中直接用该类的实例完成消息的消费,并回调业务方法。

RebalanceImpl 消费端消费者与消息队列的重新分布。

MQClientInstance 消息客户端实例,负载与MQ服务器(Broker,Nameserver)交互的网络实现。

NettyRemotingClient :管理连接ChannelWrapper(channel的包装类)

PullAPIWrapper pull与Push在RocketMQ中,其实就只有Pull模式,所以Push其实就是用pull封装一下。

MessageListenerInner 消费消费回调类,当消息分配给消费者消费时,执行的业务代码入口。

OffsetStore 消息消费进度保存。

ConsumeMessageService 消息消费逻辑。

核心类关系梳理

一个客户端对应一个MQClientInstance,在org.apache.rocketmq.client.impl.MQClientManager#factoryTable维持关系,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable =
new ConcurrentHashMap<String, MQClientInstance>();


//客户端id生成规则
//org.apache.rocketmq.client.ClientConfig#buildMQClientId
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());

sb.append("@");
//private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
sb.append(this.getInstanceName());
if (!UtilAll.isBlank(this.unitName)) {
sb.append("@");
sb.append(this.unitName);
}

return sb.toString();
}

一个应用程序(消费端)同一个IP:端口,一个消费组对应一个DefaultMQPushConsumerImpl,DefaultMQPushConsumerImpl持有有一个MQClientInstance(从MQClientManage获取),一个MQClientInstance中持有一个PullMessageServive实例。

结论:同一个应用程序中,如果存在多个消费组,那么就有多个DefaultMQPushConsumerImpl ,所有相同clientId的DefaultMQPushConsumerImpl 都需要依靠同一个PullMessageServive拉取消息。

启动核心

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start

核心步骤如下:

  1. checkConfig,检查配置信息,主要检查消费者组(consumeGroup)、消息消费方式(messageModel)、消息消费开始偏移量(consumeFromWhere)、消息队列分配算法(AllocateMessageQueueStrategy)、订阅消息主题(Map<String /* topic */, String /* sub expression */> subscription), 消息回调监听器(MessageListener)等。

  2. copySubscription 加工订阅信息,将 Map<String /* topic*/, String/*subextends*/>转换为Map<String,SubscriptionData>,如果messageListenerInner为空,则设置为defaultMQPushConsumer的MessageListener

    同时,如果消息消费模式为集群模式,还需要为该消费组对应一个重试主题。

  3. 如果消息消费模式为集群模式,并且当前的实例名为 DEFAULT,替换为当前客户端进程的PID + “#” + System.nanoTime()。

  4. 负载均衡相关实现。rebalanceImpl

  5. 创建pullAPIWrapper,消息拉取API封装类。

  6. 消费进度存储,如果是集群模式,使用远程存储 RemoteBrokerOffsetStore,如果是广播模式,则使用本地存储LocalFileOffsetStore,后文重点关注。

  7. 加载消息消费进度。

  8. 消息消费服务并启动。this.consumeMessageService.start();

  9. 向远程 Broker 服务器注册消费者。

  10. 更新订阅信息。this.updateTopicSubscribeInfoWhenSubscriptionChanged();

  11. 检测broker状态。this.mQClientFactory.checkClientInBroker();

  12. 发送心跳包。this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

  13. 重新负载。this.mQClientFactory.rebalanceImmediately();

    最终调用org.apache.rocketmq.client.impl.consumer.RebalanceImpl#doRebalance

客户端定时任务

org.apache.rocketmq.client.impl.factory.MQClientInstance#startScheduledTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
private void startScheduledTask() {
if (null == this.clientConfig.getNamesrvAddr()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
//每2分钟获取NameServer地址,内部使用http的get请求获取默认3秒超时
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
} catch (Exception e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
//每30秒更新topic信息
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
//每30S 进行Broker心跳检测
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
//默认每隔5秒持久化ConsumeOffset
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
//每分钟调整一次线程
MQClientInstance.this.adjustThreadPool();
} catch (Exception e) {
log.error("ScheduledTask adjustThreadPool exception", e);
}
}
}, 1, 1, TimeUnit.MINUTES);
}

消息拉取服务 org.apache.rocketmq.client.impl.consumer.PullMessageService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Override
public void run() {
log.info(this.getServiceName() + " service started");

while (!this.isStopped()) {
try {
//从LinkedBlockingQueue<PullRequest>中获取request拉取消息
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}

log.info(this.getServiceName() + " service end");
}

private void pullMessage(final PullRequest pullRequest) {
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
//调用的是DefaultMQPushConsumerImpl的pullMessage
impl.pullMessage(pullRequest);
} else {
log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
}
}

真正拉取消息:

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
public void pullMessage(final PullRequest pullRequest) {
final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped()) {
log.info("the pull request[{}] is dropped.", pullRequest.toString());
return;
}

pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());

try {
//检查当前状态是否running
this.makeSureStateOK();
} catch (MQClientException e) {
log.warn("pullMessage exception, consumer state not ok", e);
//延后3秒再执行,其实就是延后3秒后再把pullRequest放入org.apache.rocketmq.client.impl.consumer.PullMessageService.pullRequestQueue队列里
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
return;
}
//暂停状态延后1秒
if (this.isPause()) {
log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
return;
}

//流量控制
long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
//缓存消息数量大于1000
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
//延后50毫秒
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
//缓存消息大小超过100MB
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
//延后50毫秒
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}

if (!this.consumeOrderly) { //非顺序消息
//消费队列所存范围lastqueueOffset-firstqueueOffset > 2000
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
//延后50毫秒
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(
"the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
pullRequest, queueMaxSpanFlowControlTimes);
}
return;
}
} else {
if (processQueue.isLocked()) {
if (!pullRequest.isPreviouslyLocked()) {
long offset = -1L;
try {
offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue());
} catch (MQClientException e) {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.error("Failed to compute pull offset, pullResult: {}", pullRequest, e);
return;
}
boolean brokerBusy = offset < pullRequest.getNextOffset();
log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
pullRequest, offset, brokerBusy);
if (brokerBusy) {
log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
pullRequest, offset);
}

pullRequest.setPreviouslyLocked(true);
pullRequest.setNextOffset(offset);
}
} else {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.info("pull message later because not locked in broker, {}", pullRequest);
return;
}
}
//通过topic获取订阅信息
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.warn("find the consumer's subscription failed, {}", pullRequest);
return;
}

final long beginTimestamp = System.currentTimeMillis();

//拉取消息回调
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
//对消息体解码成并执行消息过滤
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);

switch (pullResult.getPullStatus()) {
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);

long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();

DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
//将拉取到的消息放入消费队列中:就是将拉取的消息,放入到ProcessQueue的msgTreeMap容器中。
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
//消费消息服务提交,提交消息会被ConsumeRequest包装提交到消费端消费线程池消费
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);

if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}

if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset) {
log.warn(
"[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
pullResult.getNextBeginOffset(),
firstMsgOffset,
prevRequestOffset);
}

break;
case NO_NEW_MSG:
case NO_MATCHED_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());

DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case OFFSET_ILLEGAL:
log.warn("the pull request offset illegal, {} {}",
pullRequest.toString(), pullResult.toString());
pullRequest.setNextOffset(pullResult.getNextBeginOffset());

pullRequest.getProcessQueue().setDropped(true);
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {

@Override
public void run() {
try {
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
pullRequest.getNextOffset(), false);

DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());

DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());

log.warn("fix the pull request offset, {}", pullRequest);
} catch (Throwable e) {
log.error("executeTaskLater Exception", e);
}
}
}, 10000);
break;
default:
break;
}
}
}

@Override
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("execute the pull request exception", e);
}

DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
};

boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
//如果是集群消费模式,从内存中获取MessageQueue的commitlog偏移量。
commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
if (commitOffsetValue > 0) {
commitOffsetEnable = true;
}
}

String subExpression = null;
boolean classFilter = false;
SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (sd != null) {
if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
subExpression = sd.getSubString();
}

classFilter = sd.isClassFilterMode();
}
//构建拉取消息系统Flag: 是否支持comitOffset,suspend,subExpression,classFilter
int sysFlag = PullSysFlag.buildSysFlag(
commitOffsetEnable, // commitOffset
true, // suspend
subExpression != null, // subscription
classFilter // class filter
);
try {
//消息拉取核心api执行
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(), //消息消费队列
subExpression, //tag过滤表达式
subscriptionData.getExpressionType(), //tag
subscriptionData.getSubVersion(), //版本其实就是时间戳
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(), //批量拉取大小,32
sysFlag, // 系统标记
commitOffsetValue,//当前消息队列的MessageQueue的commitlog偏移量
BROKER_SUSPEND_MAX_TIME_MILLIS, //允许broker暂停时间,15s
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, //超时时间,30s
CommunicationMode.ASYNC, // 异步
pullCallback //pull消息回调
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
}

消费端消费线程池

非顺序:org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#consumeExecutor

顺序:org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService#consumeExecutor

消费任务类:

org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest

org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService.ConsumeRequest

ConsumeRequest的run方法会调用我们注册的自定义consumeMessage方法。

顺序消费实现原理

ConsumeMessageOrderlyService.ConsumeRequest的run方法,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Override
public void run() {
if (this.processQueue.isDropped()) {
log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
//以消息队列为粒度获取锁对象,也就是说同一时间每个消费队列只能由一个线程执行,保证同一消费队列消息的有序性
//如果要使用全局顺序消费,那么对应主题只能允许存在一个队列(不建议使用!!!无法发挥多线程优势,并发处理能力降低)
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
//……
}
}
//MessageQueueLock内部就个map存储消息队列与锁对象关系。
public class MessageQueueLock {
private ConcurrentMap<MessageQueue, Object> mqLockTable =
new ConcurrentHashMap<MessageQueue, Object>();

public Object fetchLockObject(final MessageQueue mq) {
Object objLock = this.mqLockTable.get(mq);
if (null == objLock) {
objLock = new Object();
Object prevLock = this.mqLockTable.putIfAbsent(mq, objLock);
if (prevLock != null) {
objLock = prevLock;
}
}

return objLock;
}
}

消费重试核心

org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#processConsumeResult

根据消费结果,设置ackIndex 的值,成功:ackIndex = consumeRequest.getMsgs().size() - 1; 失败:ackIndex = -1
如果是消费失败,根据消费模式(集群消费还是广播消费),广播模式,就遍历打印日志,集群模式调用 sendMessageBack。
发送成功,重试机制由 broker 处理。发送发失败的,客户端会进行重试:调用如下

1
2
3
4
5
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
//重新提交发送消费失败的延后5s消费
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}

sendMessageBack:发送的RequestCode为CONSUMER_SEND_MSG_BACK=36

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {
int delayLevel = context.getDelayLevelWhenNextConsume();

// Wrap topic with namespace before sending back message.
msg.setTopic(this.defaultMQPushConsumer.withNamespace(msg.getTopic()));
try {
this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());
return true;
} catch (Exception e) {
log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
}

return false;
}

broker处理核心代码位置:org.apache.rocketmq.broker.processor.SendMessageProcessor#asyncConsumerSendMsgBack

延迟级别设置:如果消息的延迟级别为0,则 3 + 消息重试的次数。即第一次重试的延时时间为10s,如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes (=16
|| delayLevel < 0) {
//如果消息次数超过最大限制或延迟级别小于0,设置消息的主题为 DLQ(死信队列) + 消费组名称

newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;

topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP,
PermName.PERM_WRITE, 0);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return CompletableFuture.completedFuture(response);
}
} else {
if (0 == delayLevel) {
//如果消息的延迟级别为0,则 3 + 消息重试的次数。即第一次重试的延时时间为10s
delayLevel = 3 + msgExt.getReconsumeTimes();
}
msgExt.setDelayTimeLevel(delayLevel);
}

注:对于需要延迟执行的消息,在存入 commitlog 之前,会将主题修改为SCHEDULE_TOPIC_XXXX,会被延迟任务 ScheduleMessageService 延迟拉取。ScheduleMessageService 在执行过程中,会再次存入 commitlog 文件中放入之前,会清空延迟等级,并恢复主题与队列,这样,就能被消费者所消费,因为消费者在启动时就订阅了该消费组的重试主题(org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#copySubscription)

延迟等级与时间关系加载:org.apache.rocketmq.store.schedule.ScheduleMessageService#parseDelayLevel

集群模式消费者offset偏移量维护

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
//treemap保存当前还在处理队列中的消息
//移除已经处理的批处理消息:org.apache.rocketmq.client.impl.consumer.ProcessQueue#removeMessage
public long removeMessage(final List<MessageExt> msgs) {
long result = -1;
final long now = System.currentTimeMillis();
try {
this.treeMapLock.writeLock().lockInterruptibly();
this.lastConsumeTimestamp = now;
try {
if (!msgTreeMap.isEmpty()) {
//该处理队列最大的偏移量+1
result = this.queueOffsetMax + 1;
int removedCnt = 0;
for (MessageExt msg : msgs) {
MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
if (prev != null) {
removedCnt--;
msgSize.addAndGet(0 - msg.getBody().length);
}
}
msgCount.addAndGet(removedCnt);

if (!msgTreeMap.isEmpty()) {
//还有消息在处理队列中,返回处理队列中最小的偏移量,优点:防止消息丢失(也就是没有消费到)缺点:会造成消息重复消费。
result = msgTreeMap.firstKey();
}
}
} finally {
this.treeMapLock.writeLock().unlock();
}
} catch (Throwable t) {
log.error("removeMessage exception", t);
}

return result;
}

参考链接

源码分析RocketMQ

END