简介 RocketMQ是一款分布式、队列模型的消息中间件。基于发布订阅模式,有Push和Pull两种消费方式,支持严格的消息顺序,亿级别的堆积能力,支持消息回溯和多个维度的消息查询。
RocketMQ官方文档
架构设计
主要组成成分
NameServer:简单的注册中心,用户broker(主机)的动态注册与发现。保存Broker集群的注册信息(路由信息),并有心跳机制(每10s一次检测Broker是否存活)。NameServer集群之间不进行信息通讯 ,是一个几乎无状态节点,每个实例都保存完整的路由信息。
Broker: MQ服务器,负责消息队列主体功能,有主从两种身份,通过配置文件brokerId=0(master)或非0(slave)指定。每个Broker与NameServer集群所有节点 建立连接,每隔30s 注册Topic信息到NameServer所有节点。注:Master与Slave的关系是一对多,但只有BrokerId=1的从服务器才会参与消息的读负载。
Remoting Module:整个Broker的实体,负责处理来自clients端的请求。
Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息
Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。
Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。
其他重要概念
Topic:主题用于表示一类消息的集合,每条消息只能属于一个主题 ,是RocketMQ进行消息订阅的基本单位。生产消费者与toipc都是1对n关系,Topic是一个上逻辑概念。消息存储不会按Topic分开。
Group:组区分,标识一类应用,分生产者组和消费者组,属于同一组的生产/消费者生产/消费同一类消息且处理逻辑一致。组的区分使消费者实现负载均衡和容错的目标或广播变得非常容易。发送分布式事务消息时,如果Producer中途意外宕机,Broker会主动回调同一组内的任意一台Producer机器来确认事务状态。
Message:消息载体,生产和消费数据的最小单位,可通过唯一MeaasgeID或业务标识key查询消息。MessageId的长度总共有16字 节,其中包含了消息存储主机地址 (IP地址和端口),消息Commit Log offset 。
Tag:标签,用于同一主题区分不同消息,消费者可以根据Tag实现对不同子主题的不同消费逻辑,可用于消息过滤 ,如:consumer.subscribe(topic, “tag1 || tag2”)订阅消费topic主题tag1或tag2的消息。*传入 null,””,” “都是表示订阅全部的作用**。
Queue: Topic与queue是一对多关系,主要用于负载均衡。
Offset: 存储消息时会为每个topic下的每个Queue生成一个消息的索引文件,每个queue都对应一个offset记录当前queue中消息条数。存储消息数据的log文件是顺序存储的,通过二分查找定位数据,时间复杂度可降到logN。
工作流程
启动NameServer监听端口,等待Broker、Producer、Consumer连接。
启动Broker与所有NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息 。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。 (下面源码分析RouteInfoManager有介绍)
收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
Consumer跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。
特性 顺序消费
消息过滤 服务端过滤
tag:大多数业务使用场景
bySql:部分业务线使用,支持SQL92 语法,通过MessageSelector.bySql(“sql语句”)编写。
优点:减少了对于Consumer无用消息的网络传输。
缺点:增加了Broker的负担、而且实现相对复杂。
定时消息 延时等级(从1开始,小于0不重试,直接存入死信队列,18个level,通过msg.setDelayLevel(level)设置):“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”
原理:定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。
消息存储
存储结构 1. CommitLog : 消息主体以及元数据储存主体,默认大小为1G。文件名以20位数字组成(右边为起偏移量,左边补0)如:00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写 入日志文件,当文件满了,写入下一个文件。
2. ConsumeQueue: 消费者队列(逻辑消费队列)作为消费消息的索引 ,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。引入的目的主要是提高消息消费的性能 。consumequeue文件采取定长设计,每一个条目共20个字节,分别为8字节的commitlog物理偏移量、4字节的消息长度、8字节tag hashcode ,单个文件由30W个条目组成,可以像数组一样随机访问每一个条目 ,每个ConsumeQueue文件大小约5.72M。
结构如下:
3. IndexFile: 索引文件,用于通过key或msgId来查询消息 。单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引,底层存储设计为在文件系统中实现HashMap 结构即hash索引。 RocketMQ的混合型存储结构(统一broker的多个Topic的消息实体内容都存储于一个CommitLog中),当消费者无法拉取到消息后,可以等下一次消息拉取,同时服务端也支持长轮询模式,如果一个消息拉取请求未拉取到消息,Broker允许等待30s 的时间,只要这段时间内有新消息到达,将直接返回给消费端。这里,RocketMQ的具体做法是,使用Broker端的后台服务线程—ReputMessageService不停地分发请求并异步构建ConsumeQueue(逻辑消费队列)和IndexFile(索引文件)数据。
数据存取优化 内存映射 RocketMQ通过MappedByteBuffer对文件进行读写操作。其中,利用了NIO中的FileChannel模型将磁盘上的物理文件直接映射到用户态的内存地址中(减少了内核缓冲区和用户缓冲区之间数据拷贝),将对文件的操作转化为直接对内存地址进行操作,从而极大地提高了文件的读写效率(正因为需要使用内存映射机制,故RocketMQ的文件存储都使用定长结构来存储,方便一次将整个文件映射至内存 )
页缓存 页缓存(PageCache)是OS对文件的缓存,用于加速对文件的读写。OS访问物理磁盘上文件时,会顺序对其他相邻块的数据文件进行预读取。涉及局部性原理 。对于数据的顺序 写入,OS会先写入至Cache内,随后通过异步的方式由pdflush内核线程将Cache内的数据刷盘至物理磁盘上。ConsumeQueue逻辑消费队列存储的数据较少,并且是顺序读取,读取性能几乎接近内存,CommitLog文件会有较多的随机访问 ,所以才应生出index用于加速查找。
消息过滤 Tag过滤方式 Consumer端在订阅消息时除了指定Topic还可以指定TAG,如果一个消息有多个TAG,可以用||分隔。其中,Consumer端会将这个订阅请求构建成一个 SubscriptionData,发送一个Pull消息的请求给Broker端。Broker端从RocketMQ的文件存储层—Store读取数据之前,会用这些数据先构建一个MessageFilter,然后传给Store。Store从 ConsumeQueue读取到一条记录后,会用它记录的消息tag hash(后8字节)值去做过滤,由于在服务端只是根据hashcode进行判断,无法精确对tag原始字符串进行过滤 ,故在消息消费端拉取到消息后,还需要对消息的原始tag字符串进行比对,如果不同,则丢弃该消息,不进行消息消费。
思考:在服务端实现精准过滤会增加服务端压力,不过滤直接交给客户端又有网络传输浪费,hash过滤是中间的权衡。
SQL92的过滤方式 这种方式的大致做法和上面的Tag过滤方式一样,只是在Store层的具体过滤过程不太一样,真正的 SQL expression 的构建和执行由rocketmq-filter模块负责的。每次过滤都去执行SQL表达式会影响效率,所以RocketMQ使用了BloomFilter 避免了每次都去执行。SQL92的表达式上下文为消息的属性。
注:过滤相关类:org.apache.rocketmq.store.MessageFilter
负载均衡 producer 发送端先通过topic找到对应路由信息:TopicPublishInfo,再调用selectOneMessageQueue()方法选择具体队列MessageQueue发送消息。
consumer 负载均衡的核心—RebalanceImpl类的rebalanceByTopic方法。
Consumer端的两种消费模式(Push/Pull)都是基于拉模式来获取消息的, 而在Push模式只是对pull模式的一种封装,其本质实现为消息拉取线程在从服务器拉取到一批消息后,然后提交到消息消费线程池后,又“马不停蹄”的继续向服务器再次尝试拉取消息。(org.apache.rocketmq.client.impl.consumer.RebalanceImpl#dispatchPullRequest)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 private void rebalanceByTopic (final String topic, final boolean isOrder) { switch (messageModel) { case BROADCASTING: { Set<MessageQueue> mqSet = this .topicSubscribeInfoTable.get(topic); if (mqSet != null ) { boolean changed = this .updateProcessQueueTableInRebalance(topic, mqSet, isOrder); if (changed) { this .messageQueueChanged(topic, mqSet, mqSet); log.info("messageQueueChanged {} {} {} {}" , consumerGroup, topic, mqSet, mqSet); } } else { log.warn("doRebalance, {}, but the topic[{}] not exist." , consumerGroup, topic); } break ; } case CLUSTERING: { Set<MessageQueue> mqSet = this .topicSubscribeInfoTable.get(topic); List<String> cidAll = this .mQClientFactory.findConsumerIdList(topic, consumerGroup); if (null == mqSet) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("doRebalance, {}, but the topic[{}] not exist." , consumerGroup, topic); } } if (null == cidAll) { log.warn("doRebalance, {} {}, get consumer id list failed" , consumerGroup, topic); } if (mqSet != null && cidAll != null ) { List<MessageQueue> mqAll = new ArrayList<MessageQueue>(); mqAll.addAll(mqSet); Collections.sort(mqAll); Collections.sort(cidAll); AllocateMessageQueueStrategy strategy = this .allocateMessageQueueStrategy; List<MessageQueue> allocateResult = null ; try { allocateResult = strategy.allocate( this .consumerGroup, this .mQClientFactory.getClientId(), mqAll, cidAll); } catch (Throwable e) { log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}" , strategy.getName(), e); return ; } Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>(); if (allocateResult != null ) { allocateResultSet.addAll(allocateResult); } boolean changed = this .updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); if (changed) { log.info( "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}" , strategy.getName(), consumerGroup, topic, this .mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), allocateResultSet.size(), allocateResultSet); this .messageQueueChanged(topic, mqSet, allocateResultSet); } } break ; } default : break ; } }
默认分配策略:org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely#allocate
同一组的消费者数量大于队列数,会有消费者无法消费到数据!注释说明
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public List<MessageQueue> allocate (String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { if (currentCID == null || currentCID.length() < 1 ) { throw new IllegalArgumentException("currentCID is empty" ); } if (mqAll == null || mqAll.isEmpty()) { throw new IllegalArgumentException("mqAll is null or mqAll empty" ); } if (cidAll == null || cidAll.isEmpty()) { throw new IllegalArgumentException("cidAll is null or cidAll empty" ); } List<MessageQueue> result = new ArrayList<MessageQueue>(); if (!cidAll.contains(currentCID)) { log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}" , consumerGroup, currentCID, cidAll); return result; } int index = cidAll.indexOf(currentCID); int mod = mqAll.size() % cidAll.size(); int averageSize = mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size()); int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; int range = Math.min(averageSize, mqAll.size() - startIndex); for (int i = 0 ; i < range; i++) { result.add(mqAll.get((startIndex + i) % mqAll.size())); } return result; }
负载均衡执行过程简要描述: RebalanceService线程run方法(默认20秒执行一次)最终会调用org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic,在updateProcessQueueTableInRebalance方法中会为新分配的MessageQueue 会创建一个 PullRequest 对象,然后通过 dispatchPullRequest方法(pull模式为空实现)调用defaultMQPushConsumerImpl的executePullRequestImmediately方法 将 PullRequest放入到 PullMessageService 线程的 LinkedBlockingQueue, 进而唤醒 queue.take()方法,然后执行 DefaultMQPushConsumerImpl 的 pullMessage, 通过网络从broker端拉取消息,一次最多拉取的消息条数可配置,默认为32条,然后然后将拉取的消息,执行过滤等,然后封装成任务(ConsumeRequest), 提交到消费者的线程池去执行,每次消费消息后,又将该 PullRequest 放入到 PullMessageService中,(DefaultMQPushConsumerImpl)的延迟的机制就是pullInterval 为 0;
事务消息
源码地址 https://github.com/apache/rocketmq
主要目录简介
acl 权限模块
broker broker模块,核心任务就是持久化消息
client 消息客户端,包括生产与消费者
common 公共依赖包,通用的常量枚举、基类方法或者数据结构。
dev 开发者信息(就一个py文件)
distribution 部署脚本、配置模块
docs 开发者文档
example 示例模块
filter 消息过滤器
namesrv NameServer模块
openmessaging 消息开放标准
remoting 远程通信模块
serutil 服务工具类
store 消息存储模块
主要模块类功能分析 版本:4.9.1-SNAPSHOT
namesrv模块 NamesrvStartup:启动类,主要是调用NamesrvController的初始化及开始方法。
NamesrvController:核心控制类。
核心初始化方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 public boolean initialize () { this .kvConfigManager.load(); this .remotingServer = new NettyRemotingServer(this .nettyServerConfig, this .brokerHousekeepingService); this .remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_" )); this .registerProcessor(); this .scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run () { NamesrvController.this .routeInfoManager.scanNotActiveBroker(); } }, 5 , 10 , TimeUnit.SECONDS); this .scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run () { NamesrvController.this .kvConfigManager.printAllPeriodically(); } }, 1 , 10 , TimeUnit.MINUTES); if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) { try { fileWatchService = new FileWatchService( new String[] { TlsSystemConfig.tlsServerCertPath, TlsSystemConfig.tlsServerKeyPath, TlsSystemConfig.tlsServerTrustCertPath }, new FileWatchService.Listener() { boolean certChanged, keyChanged = false ; @Override public void onChanged (String path) { if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) { log.info("The trust certificate changed, reload the ssl context" ); reloadServerSslContext(); } if (path.equals(TlsSystemConfig.tlsServerCertPath)) { certChanged = true ; } if (path.equals(TlsSystemConfig.tlsServerKeyPath)) { keyChanged = true ; } if (certChanged && keyChanged) { log.info("The certificate and private key changed, reload the ssl context" ); certChanged = keyChanged = false ; reloadServerSslContext(); } } private void reloadServerSslContext () { ((NettyRemotingServer) remotingServer).loadSslContext(); } }); } catch (Exception e) { log.warn("FileWatchService created error, can't load the certificate dynamically" ); } } return true ; }
NamesrvConfig:NameServer相关属性配置类,如配置文件路径。
NettyServerConfig、RemotingServer 、ExecutorService:网络通讯类与Netty密切相关,相关配置如下:
1 2 3 4 5 6 7 8 9 10 11 12 private int listenPort = 8888 ; private int serverWorkerThreads = 8 ;private int serverCallbackExecutorThreads = 0 ;private int serverSelectorThreads = 3 ;private int serverOnewaySemaphoreValue = 256 ; private int serverAsyncSemaphoreValue = 64 ;private int serverChannelMaxIdleTimeSeconds = 120 ;
KVConfigManager :读取或变更NameServer的配置属性,load方法加载 NamesrvConfig 中配置的配置文件到内存。
请求处理核心逻辑:org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest
RouteInfoManager : 用HashMap保存/管理Broker、Topic等信息。内部使用ReadWriteLock保证线程安全。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2 ;private final ReadWriteLock lock = new ReentrantReadWriteLock();private final HashMap<String, List<QueueData>> topicQueueTable;private final HashMap<String, BrokerData> brokerAddrTable;private final HashMap<String, Set<String>> clusterAddrTable;private final HashMap<String, BrokerLiveInfo> brokerLiveTable;private final HashMap<String, List<String>> filterServerTable;
BrokerHousekeepingService:实现ChannelEventListener接口,处理与Broker连接通道关闭,异常,空闲的情况。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Override public void onChannelClose (String remoteAddr, Channel channel) { this .namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel); } @Override public void onChannelException (String remoteAddr, Channel channel) { this .namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel); } @Override public void onChannelIdle (String remoteAddr, Channel channel) { this .namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel); }
FileWatchService : 监控tls文件的改动。使用MessageDigest类,用MD5加密算法提取概要信息存ArrayList集合fileCurrentHash里,WATCH_INTERVAL = 500毫秒 查询校验一次。(为啥频率这么高???)
broker模块 broker也有与namesrv一样相应的BrokerStartup与BrokerController作用类似。
消息执行器:org.apache.rocketmq.broker.processor.SendMessageProcessor。
存储配置类 :org.apache.rocketmq.store.config.MessageStoreConfig,部分配置字段如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private String storePathRootDir = System.getProperty("user.home" ) + File.separator + "store" ;private String storePathCommitLog = System.getProperty("user.home" ) + File.separator + "store" + File.separator + "commitlog" ; private int mappedFileSizeCommitLog = 1024 * 1024 * 1024 ;private int mappedFileSizeConsumeQueue = 300000 * ConsumeQueue.CQ_STORE_UNIT_SIZE;private boolean enableConsumeQueueExt = false ;private int mappedFileSizeConsumeQueueExt = 48 * 1024 * 1024 ;private int bitMapLengthConsumeQueueExt = 64 ;private int flushIntervalCommitLog = 500 ;private int commitIntervalCommitLog = 200 ;
存储核心类 :DefaultMessageStore,存储消息的入口方法为:putMessage,核心步骤如下:
检查状态:checkStoreStatus()
检查当前节点是否已shutdown。
是否是SLAVE节点(SLAVE只读)。
是否可写。
操作系统页写入是否繁忙:isOSPageCacheBusy()。
检查消息checkMessage(msg):检查topic与propertiesString长度。
将日志写入CommitLog 文件:this.commitLog.putMessage(msg)。
统计写入耗时信息。
1 this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
记录写commitlog 失败次数。
1 2 3 if (null == result || !result.isOk()) { this .storeStatsService.getPutMessageFailedTimes().incrementAndGet(); }
消息写入 :org.apache.rocketmq.store.CommitLog#putMessage,核心步骤如下:
获取消息类型(事务消息,非事务消息,Commit消息。
获取一个 MappedFile 对象,内存映射的具体实现。
追加消息需要加锁PutMessageLock(两种实现PutMessageReentrantLock(内部就个ReentrantLock)和PutMessageSpinLock(默认)(内部含有AtomicBoolean,使用do-while,cas加锁)。
验证 MappedFile 对象,获取一个可用的 MappedFile (如果没有,则创建一个)。
通过MappedFile对象写入文件。result = mappedFile.appendMessage(msg, this.appendMessageCallback);
根据刷盘策略刷盘:handleDiskFlush(result, putMessageResult, msg);
主从同步:handleHA(result, putMessageResult, msg);
**消息长度计算: **org.apache.rocketmq.store.CommitLog#calMsgLength
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 protected static int calMsgLength (int sysFlag, int bodyLength, int topicLength, int propertiesLength) { int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20 ; int storehostAddressLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 8 : 20 ; final int msgLen = 4 + 4 + 4 + 4 + 4 + 8 + 8 + 4 + 8 + bornhostLength + 8 + storehostAddressLength + 4 + 8 + 4 + (bodyLength > 0 ? bodyLength : 0 ) + 1 + topicLength + 2 + (propertiesLength > 0 ? propertiesLength : 0 ) + 0 ; return msgLen; }
全局唯一msgId 生成代码位置:org.apache.rocketmq.common.message.MessageDecoder#createMessageId(java.nio.ByteBuffer, java.nio.ByteBuffer, long) 由ip+port+commitLog偏移地址组成
1 2 3 4 5 6 7 8 9 10 public static String createMessageId (final ByteBuffer input, final ByteBuffer addr, final long offset) { input.flip(); int msgIDLength = addr.limit() == 8 ? 16 : 28 ; input.limit(msgIDLength); input.put(addr); input.putLong(offset); return UtilAll.bytes2string(input.array()); }
**消息刷盘 **: org.apache.rocketmq.store.CommitLog#handleDiskFlush
具体刷盘org.apache.rocketmq.store.MappedFile#flush:就是调用 FileChannel 或 MappedByteBuffer 的force 方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public void handleDiskFlush (AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { if (FlushDiskType.SYNC_FLUSH == this .defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { final GroupCommitService service = (GroupCommitService) this .flushCommitLogService; if (messageExt.isWaitStoreMsgOK()) { GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); service.putRequest(request); CompletableFuture<PutMessageStatus> flushOkFuture = request.future(); PutMessageStatus flushStatus = null ; try { flushStatus = flushOkFuture.get(this .defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(), TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { } if (flushStatus != PutMessageStatus.PUT_OK) { log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + " client address: " + messageExt.getBornHostString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); } } else { service.wakeup(); } } else { if (!this .defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { flushCommitLogService.wakeup(); } else { commitLogService.wakeup(); } } }
消息存储过程
client模块 发送消息 默认发送方法org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl
核心步骤
消息(group、topic、body等)校验:Validators.checkMessage(msg, this.defaultMQProducer);
获取topic的路由信息:TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
根据topic负载均衡算法选择一个MessageQueue。可实现MessageQueueSelector自定义。
默认org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue方法有ThreadLocalIndex sendWhichQueue保存上次发送队列下标,使用轮询获取发送队列下标,对开启失败延迟的话还会对队列进行判断验证。
向 MessageQueue 发送消息。
更新失败策略,主要用于规避发生故障的 broker。
如果是同步调用方式(SYNC),则执行失败重试策略,默认重试两次, 异步消息发送的重试是在回调时。
1 2 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this .defaultMQProducer.getRetryTimesWhenSendFailed() : 1 ;
延迟类MQFaultStrategy 1 2 3 4 private long [] latencyMax = {50L , 100L , 550L , 1000L , 2000L , 3000L , 15000L };private long [] notAvailableDuration = {0L , 0L , 30000L , 60000L , 120000L , 180000L , 600000L };
校验配置类 1 2 3 4 5 6 7 8 9 10 public class Validators { public static final String VALID_PATTERN_STR = "^[%|a-zA-Z0-9_-]+$" ; public static final Pattern PATTERN = Pattern.compile(VALID_PATTERN_STR); public static final int CHARACTER_MAX_LENGTH = 255 ; public static final int TOPIC_MAX_LENGTH = 127 ; }
其他限制:默认超时时间3s(org.apache.rocketmq.client.producer.DefaultMQProducer#sendMsgTimeout),消息body最大4M(org.apache.rocketmq.client.producer.DefaultMQProducer#maxMessageSize)。
消费消息 主要类简介 :DefaultMQPushConsumerImpl :消息消息者默认实现类,应用程序中直接用该类的实例完成消息的消费,并回调业务方法。
RebalanceImpl 消费端消费者与消息队列的重新分布。
MQClientInstance 消息客户端实例,负载与MQ服务器(Broker,Nameserver)交互的网络实现。
NettyRemotingClient :管理连接ChannelWrapper(channel的包装类)
PullAPIWrapper pull与Push在RocketMQ中,其实就只有Pull模式,所以Push其实就是用pull封装一下。
MessageListenerInner 消费消费回调类,当消息分配给消费者消费时,执行的业务代码入口。
OffsetStore 消息消费进度保存。
ConsumeMessageService 消息消费逻辑。
核心类关系梳理 一个客户端对应一个MQClientInstance ,在org.apache.rocketmq.client.impl.MQClientManager#factoryTable维持关系,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private ConcurrentMap<String, MQClientInstance> factoryTable = new ConcurrentHashMap<String, MQClientInstance>(); public String buildMQClientId () { StringBuilder sb = new StringBuilder(); sb.append(this .getClientIP()); sb.append("@" ); sb.append(this .getInstanceName()); if (!UtilAll.isBlank(this .unitName)) { sb.append("@" ); sb.append(this .unitName); } return sb.toString(); }
一个应用程序(消费端)同一个IP:端口,一个消费组对应一个DefaultMQPushConsumerImpl,DefaultMQPushConsumerImpl持有有一个MQClientInstance(从MQClientManage获取),一个MQClientInstance中持有一个PullMessageServive实例。
结论:同一个应用程序中,如果存在多个消费组,那么就有多个DefaultMQPushConsumerImpl ,所有相同clientId的DefaultMQPushConsumerImpl 都需要依靠同一个PullMessageServive拉取消息。
启动核心 :org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
核心步骤如下:
checkConfig,检查配置信息,主要检查消费者组(consumeGroup)、消息消费方式(messageModel)、消息消费开始偏移量(consumeFromWhere)、消息队列分配算法(AllocateMessageQueueStrategy)、订阅消息主题(Map<String /* topic */, String /* sub expression */> subscription), 消息回调监听器(MessageListener)等。
copySubscription 加工订阅信息,将 Map<String /* topic*/, String/*subextends*/>转换为Map<String,SubscriptionData>,如果messageListenerInner为空,则设置为defaultMQPushConsumer的MessageListener
同时,如果消息消费模式为集群模式,还需要为该消费组对应一个重试主题。
如果消息消费模式为集群模式,并且当前的实例名为 DEFAULT,替换为当前客户端进程的PID + “#” + System.nanoTime()。
负载均衡相关实现。rebalanceImpl
创建pullAPIWrapper,消息拉取API封装类。
消费进度存储,如果是集群模式,使用远程存储 RemoteBrokerOffsetStore,如果是广播模式,则使用本地存储LocalFileOffsetStore,后文重点关注。
加载消息消费进度。
消息消费服务并启动。this.consumeMessageService.start();
向远程 Broker 服务器注册消费者。
更新订阅信息。this.updateTopicSubscribeInfoWhenSubscriptionChanged();
检测broker状态。this.mQClientFactory.checkClientInBroker();
发送心跳包。this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
重新负载。this.mQClientFactory.rebalanceImmediately();
最终调用org.apache.rocketmq.client.impl.consumer.RebalanceImpl#doRebalance
客户端定时任务 org.apache.rocketmq.client.impl.factory.MQClientInstance#startScheduledTask
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 private void startScheduledTask () { if (null == this .clientConfig.getNamesrvAddr()) { this .scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run () { try { MQClientInstance.this .mQClientAPIImpl.fetchNameServerAddr(); } catch (Exception e) { log.error("ScheduledTask fetchNameServerAddr exception" , e); } } }, 1000 * 10 , 1000 * 60 * 2 , TimeUnit.MILLISECONDS); } this .scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run () { try { MQClientInstance.this .updateTopicRouteInfoFromNameServer(); } catch (Exception e) { log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception" , e); } } }, 10 , this .clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS); this .scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run () { try { MQClientInstance.this .cleanOfflineBroker(); MQClientInstance.this .sendHeartbeatToAllBrokerWithLock(); } catch (Exception e) { log.error("ScheduledTask sendHeartbeatToAllBroker exception" , e); } } }, 1000 , this .clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS); this .scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run () { try { MQClientInstance.this .persistAllConsumerOffset(); } catch (Exception e) { log.error("ScheduledTask persistAllConsumerOffset exception" , e); } } }, 1000 * 10 , this .clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS); this .scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run () { try { MQClientInstance.this .adjustThreadPool(); } catch (Exception e) { log.error("ScheduledTask adjustThreadPool exception" , e); } } }, 1 , 1 , TimeUnit.MINUTES); }
消息拉取服务 org.apache.rocketmq.client.impl.consumer.PullMessageService 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Override public void run () { log.info(this .getServiceName() + " service started" ); while (!this .isStopped()) { try { PullRequest pullRequest = this .pullRequestQueue.take(); this .pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception" , e); } } log.info(this .getServiceName() + " service end" ); } private void pullMessage (final PullRequest pullRequest) { final MQConsumerInner consumer = this .mQClientFactory.selectConsumer(pullRequest.getConsumerGroup()); if (consumer != null ) { DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer; impl.pullMessage(pullRequest); } else { log.warn("No matched consumer for the PullRequest {}, drop it" , pullRequest); } }
真正拉取消息:
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 public void pullMessage (final PullRequest pullRequest) { final ProcessQueue processQueue = pullRequest.getProcessQueue(); if (processQueue.isDropped()) { log.info("the pull request[{}] is dropped." , pullRequest.toString()); return ; } pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis()); try { this .makeSureStateOK(); } catch (MQClientException e) { log.warn("pullMessage exception, consumer state not ok" , e); this .executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); return ; } if (this .isPause()) { log.warn("consumer was paused, execute pull request later. instanceName={}, group={}" , this .defaultMQPushConsumer.getInstanceName(), this .defaultMQPushConsumer.getConsumerGroup()); this .executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND); return ; } long cachedMessageCount = processQueue.getMsgCount().get(); long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024 ); if (cachedMessageCount > this .defaultMQPushConsumer.getPullThresholdForQueue()) { this .executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000 ) == 0 ) { log.warn( "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}" , this .defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return ; } if (cachedMessageSizeInMiB > this .defaultMQPushConsumer.getPullThresholdSizeForQueue()) { this .executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000 ) == 0 ) { log.warn( "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}" , this .defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return ; } if (!this .consumeOrderly) { if (processQueue.getMaxSpan() > this .defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) { this .executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueMaxSpanFlowControlTimes++ % 1000 ) == 0 ) { log.warn( "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}" , processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), pullRequest, queueMaxSpanFlowControlTimes); } return ; } } else { if (processQueue.isLocked()) { if (!pullRequest.isPreviouslyLocked()) { long offset = -1L ; try { offset = this .rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue()); } catch (MQClientException e) { this .executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); log.error("Failed to compute pull offset, pullResult: {}" , pullRequest, e); return ; } boolean brokerBusy = offset < pullRequest.getNextOffset(); log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}" , pullRequest, offset, brokerBusy); if (brokerBusy) { log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}" , pullRequest, offset); } pullRequest.setPreviouslyLocked(true ); pullRequest.setNextOffset(offset); } } else { this .executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); log.info("pull message later because not locked in broker, {}" , pullRequest); return ; } } final SubscriptionData subscriptionData = this .rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); if (null == subscriptionData) { this .executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); log.warn("find the consumer's subscription failed, {}" , pullRequest); return ; } final long beginTimestamp = System.currentTimeMillis(); PullCallback pullCallback = new PullCallback() { @Override public void onSuccess (PullResult pullResult) { if (pullResult != null ) { pullResult = DefaultMQPushConsumerImpl.this .pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData); switch (pullResult.getPullStatus()) { case FOUND: long prevRequestOffset = pullRequest.getNextOffset(); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); long pullRT = System.currentTimeMillis() - beginTimestamp; DefaultMQPushConsumerImpl.this .getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullRT); long firstMsgOffset = Long.MAX_VALUE; if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { DefaultMQPushConsumerImpl.this .executePullRequestImmediately(pullRequest); } else { firstMsgOffset = pullResult.getMsgFoundList().get(0 ).getQueueOffset(); DefaultMQPushConsumerImpl.this .getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size()); boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); DefaultMQPushConsumerImpl.this .consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume); if (DefaultMQPushConsumerImpl.this .defaultMQPushConsumer.getPullInterval() > 0 ) { DefaultMQPushConsumerImpl.this .executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this .defaultMQPushConsumer.getPullInterval()); } else { DefaultMQPushConsumerImpl.this .executePullRequestImmediately(pullRequest); } } if (pullResult.getNextBeginOffset() < prevRequestOffset || firstMsgOffset < prevRequestOffset) { log.warn( "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}" , pullResult.getNextBeginOffset(), firstMsgOffset, prevRequestOffset); } break ; case NO_NEW_MSG: case NO_MATCHED_MSG: pullRequest.setNextOffset(pullResult.getNextBeginOffset()); DefaultMQPushConsumerImpl.this .correctTagsOffset(pullRequest); DefaultMQPushConsumerImpl.this .executePullRequestImmediately(pullRequest); break ; case OFFSET_ILLEGAL: log.warn("the pull request offset illegal, {} {}" , pullRequest.toString(), pullResult.toString()); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); pullRequest.getProcessQueue().setDropped(true ); DefaultMQPushConsumerImpl.this .executeTaskLater(new Runnable() { @Override public void run () { try { DefaultMQPushConsumerImpl.this .offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), false ); DefaultMQPushConsumerImpl.this .offsetStore.persist(pullRequest.getMessageQueue()); DefaultMQPushConsumerImpl.this .rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue()); log.warn("fix the pull request offset, {}" , pullRequest); } catch (Throwable e) { log.error("executeTaskLater Exception" , e); } } }, 10000 ); break ; default : break ; } } } @Override public void onException (Throwable e) { if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("execute the pull request exception" , e); } DefaultMQPushConsumerImpl.this .executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); } }; boolean commitOffsetEnable = false ; long commitOffsetValue = 0L ; if (MessageModel.CLUSTERING == this .defaultMQPushConsumer.getMessageModel()) { commitOffsetValue = this .offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY); if (commitOffsetValue > 0 ) { commitOffsetEnable = true ; } } String subExpression = null ; boolean classFilter = false ; SubscriptionData sd = this .rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); if (sd != null ) { if (this .defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) { subExpression = sd.getSubString(); } classFilter = sd.isClassFilterMode(); } int sysFlag = PullSysFlag.buildSysFlag( commitOffsetEnable, true , subExpression != null , classFilter ); try { this .pullAPIWrapper.pullKernelImpl( pullRequest.getMessageQueue(), subExpression, subscriptionData.getExpressionType(), subscriptionData.getSubVersion(), pullRequest.getNextOffset(), this .defaultMQPushConsumer.getPullBatchSize(), sysFlag, commitOffsetValue, BROKER_SUSPEND_MAX_TIME_MILLIS, CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, CommunicationMode.ASYNC, pullCallback ); } catch (Exception e) { log.error("pullKernelImpl exception" , e); this .executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); } }
消费端消费线程池 非顺序:org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#consumeExecutor
顺序:org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService#consumeExecutor
消费任务类: org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest
org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService.ConsumeRequest
ConsumeRequest的run方法会调用我们注册的自定义consumeMessage方法。
顺序消费实现原理 ConsumeMessageOrderlyService.ConsumeRequest的run方法,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 @Override public void run () { if (this .processQueue.isDropped()) { log.warn("run, the message queue not be able to consume, because it's dropped. {}" , this .messageQueue); return ; } final Object objLock = messageQueueLock.fetchLockObject(this .messageQueue); synchronized (objLock) { } } public class MessageQueueLock { private ConcurrentMap<MessageQueue, Object> mqLockTable = new ConcurrentHashMap<MessageQueue, Object>(); public Object fetchLockObject (final MessageQueue mq) { Object objLock = this .mqLockTable.get(mq); if (null == objLock) { objLock = new Object(); Object prevLock = this .mqLockTable.putIfAbsent(mq, objLock); if (prevLock != null ) { objLock = prevLock; } } return objLock; } }
消费重试核心 org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#processConsumeResult
根据消费结果,设置ackIndex 的值,成功:ackIndex = consumeRequest.getMsgs().size() - 1; 失败:ackIndex = -1 如果是消费失败,根据消费模式(集群消费还是广播消费),广播模式,就遍历打印日志,集群模式调用 sendMessageBack。 发送成功,重试机制由 broker 处理。发送发失败的,客户端会进行重试:调用如下
1 2 3 4 5 if (!msgBackFailed.isEmpty()) { consumeRequest.getMsgs().removeAll(msgBackFailed); this .submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue()); }
sendMessageBack :发送的RequestCode为CONSUMER_SEND_MSG_BACK=36
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public boolean sendMessageBack (final MessageExt msg, final ConsumeConcurrentlyContext context) { int delayLevel = context.getDelayLevelWhenNextConsume(); msg.setTopic(this .defaultMQPushConsumer.withNamespace(msg.getTopic())); try { this .defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName()); return true ; } catch (Exception e) { log.error("sendMessageBack exception, group: " + this .consumerGroup + " msg: " + msg.toString(), e); } return false ; }
broker处理核心代码位置:org.apache.rocketmq.broker.processor.SendMessageProcessor#asyncConsumerSendMsgBack
延迟级别设置:如果消息的延迟级别为0,则 3 + 消息重试的次数。即第一次重试的延时时间为10s ,如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 if (msgExt.getReconsumeTimes() >= maxReconsumeTimes (=16 ) || delayLevel < 0 ) { newTopic = MixAll.getDLQTopic(requestHeader.getGroup()); queueIdInt = Math.abs(this .random.nextInt() % 99999999 ) % DLQ_NUMS_PER_GROUP; topicConfig = this .brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, DLQ_NUMS_PER_GROUP, PermName.PERM_WRITE, 0 ); if (null == topicConfig) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("topic[" + newTopic + "] not exist" ); return CompletableFuture.completedFuture(response); } } else { if (0 == delayLevel) { delayLevel = 3 + msgExt.getReconsumeTimes(); } msgExt.setDelayTimeLevel(delayLevel); }
注:对于需要延迟执行的消息,在存入 commitlog 之前,会将主题修改为SCHEDULE_TOPIC_XXXX,会被延迟任务 ScheduleMessageService 延迟拉取。ScheduleMessageService 在执行过程中,会再次存入 commitlog 文件中放入之前,会清空延迟等级,并恢复主题与队列,这样,就能被消费者所消费,因为消费者在启动时就订阅了该消费组的重试主题(org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#copySubscription)
延迟等级与时间关系加载 :org.apache.rocketmq.store.schedule.ScheduleMessageService#parseDelayLevel
集群模式消费者offset偏移量维护 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public long removeMessage (final List<MessageExt> msgs) { long result = -1 ; final long now = System.currentTimeMillis(); try { this .treeMapLock.writeLock().lockInterruptibly(); this .lastConsumeTimestamp = now; try { if (!msgTreeMap.isEmpty()) { result = this .queueOffsetMax + 1 ; int removedCnt = 0 ; for (MessageExt msg : msgs) { MessageExt prev = msgTreeMap.remove(msg.getQueueOffset()); if (prev != null ) { removedCnt--; msgSize.addAndGet(0 - msg.getBody().length); } } msgCount.addAndGet(removedCnt); if (!msgTreeMap.isEmpty()) { result = msgTreeMap.firstKey(); } } } finally { this .treeMapLock.writeLock().unlock(); } } catch (Throwable t) { log.error("removeMessage exception" , t); } return result; }
参考链接 源码分析RocketMQ
END