简介

RocketMQ是一款阿里巴巴开源的消息中间件,在2017年9月份成为Apache的顶级项目,是国内首个互联网中间件在 Apache 上的顶级项目。RocketMQ经过了阿里多年双十一大促的考验,性能和稳定性得到了充分的考验。目前在业界被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binlog分发等场景。

官网地址

http://rocketmq.apache.org/

开源项目地址

https://github.com/apache/rocketmq

常用中间件对比

20200904180457430

架构设计

架构设计

官方文档链接

下载安装

1. 执行解压命令

1
unzip rocketmq-all-4.9.0-bin-release.zip 

注意启动前修改bin目录下的 runbroker.sh 和 runserver.sh 中JVM内存参数,否者可能启动不了。

2. 进入bin目录

3. 启动NameServer

1
nohup sh bin/mqnamesrv &

4. 启动Broker

1
2
nohup sh mqbroker -n localhost:9876 &
# nohup sh mqbroker -c 自定义配置文件路径 &

5. 测试

  • 发送消息
1
2
3
4
# 1.设置环境变量
export NAMESRV_ADDR=localhost:9876
# 2.使用安装包的Demo发送消息
sh tools.sh org.apache.rocketmq.example.quickstart.Producer
  • 接收消息
1
2
3
4
# 1.设置环境变量
export NAMESRV_ADDR=localhost:9876
# 2.接收消息
sh tools.sh org.apache.rocketmq.example.quickstart.C

6. 关闭

1
2
3
4
# 1.关闭NameServer
sh mqshutdown namesrv
# 2.关闭Broker
sh mqshutdown broker

配置详情

在config目录下有相应配置示例

image-20210626150015562

  • 2m-2s-async:双主双从异步更新
  • 2m-2s-sync:双主双从同步更新
  • 2m-noslave:双主

参数详情

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#所属集群名字
brokerClusterName=DefaultCluster
#broker名字
brokerName=broker-a
#身份Id 0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,多个用
namesrvAddr=localhost:9876;localhost:9877
#是否允许 Broker 自动创建Topic
autoCreateTopicEnable=true
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建订阅组
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=48
#commitLog每个文件的大小默认1G ;1G = 1073741824 字节
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/RoccketMQ/rocketmq1/store
#commitLog 存储路径
storePathCommitLog=/usr/local/RoccketMQ/rocketmq1/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/RoccketMQ/rocketmq1/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/RoccketMQ/rocketmq1/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/RoccketMQ/rocketmq1/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/RoccketMQ/rocketmq1/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

注:集群配置时每个broker的storePath要不同否则会报java.lang.RuntimeException: Lock failed,MQ already started错误

管理工具

mqadmin

在bin目录下直接执行 ./mqadmin {command} {args}。

图形化管理界面

  1. github上下载RocketMQ的扩展类开源项目

image-20210626160220910

  1. 解压后需修改rocketmq-console的配置文件application.properties(添加nameServer的地址)

image-20210626160428573

  1. 再执行maven打包为可执行jar包运行:mvn clean package -Dmaven.test.skip=true

  2. 直接运行jar包访问8080端口即可:java - jar ×××.jar

image-20210626161251007

代码示例

maven依赖

1
2
3
4
5
 <dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class SyncProducer {

public static void main(String[] args) throws Exception {
//1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1");
//2.指定Nameserver地址
// String NAME_SERVER = "czm.com:9876;czm.com:9877";
producer.setNamesrvAddr(Common.NAME_SERVER);
//3.启动producer
producer.start();
//生产者设置监听器,可用于事务
//producer.setTransactionListener(transactionListener);
for (int i = 0; i < 10; i++) {
//4.创建消息对象,指定主题Topic、Tag和消息体
/**
* 消息主题Topic:base
* 消息Tag: Tag1
* 消息内容:byte数组
*/
Message msg = new Message("base", "Tag1", ("Hello World" + i).getBytes());



//发送单向消息只管发送,无返回值无需等待,producer.sendOneway(msg);

//5.发送消息
SendResult result = producer.send(msg);
//发送状态
SendStatus status = result.getSendStatus();
System.out.println("发送结果:" + result);

//发送异步消息
// producer.send(msg, new SendCallback() {
// /**
// * 发送成功回调函数
// * @param sendResult
// */
// @Override
// public void onSuccess(SendResult sendResult) {
// System.out.println("发送结果:" + sendResult);
// }
//
// /**
// * 发送失败回调函数
// * @param e
// */
// @Override
// public void onException(Throwable e) {
// System.out.println("发送异常:" + e);
// }
// });

}
//6.关闭生产者producer
producer.shutdown();
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class Consumer {

public static void main(String[] args) throws Exception {
//1.创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//2.指定Nameserver地址
consumer.setNamesrvAddr(Common.NAME_SERVER);
//3.订阅主题Topic和Tag=*匹配所有,可用tag或MessageSelector实现消息过滤
consumer.subscribe("base", "*");

//设定消费模式:负载均衡CLUSTERING(默认)| 广播模式BROADCASTING
consumer.setMessageModel(MessageModel.BROADCASTING);
//设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

//4.设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {

//接受消息内容
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5.启动消费者consumer
consumer.start();
}
}

消息延迟

// 设置延时等级,延迟消息时使用
//在源码的org.apache.rocketmq.store.config.MessageStoreConfig类中有相应级别声明
//private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
//在消息发送前设置 0表示不延迟,3表示延时10秒
//msg.setDelayTimeLevel(3);

消息过滤

使用tag是实现

1
2
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.subscribe("TOPIC", "TAG1 || TAG2 || TAG3"); //接收TAG1|TAG2|TAG3的消息

使用MessageSelector实现

生产者通过putUserProperty设置消息的属性

1
2
3
4
5
6
7
8
9
10
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.start();
Message msg = new Message("TopicTest",
tag,
"Hello World ".getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 设置一些属性
msg.putUserProperty("t", String.valueOf(i));
SendResult sendResult = producer.send(msg);
producer.shutdown();

消费者通过MessageSelector获取符合相应sql条件的消息

1
2
3
4
5
6
7
8
9
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.subscribe("tag", MessageSelector.bySql("t between 0 and 5");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();

RocketMQ顺序消费

参考链接

参考链接

其他链接

RocketMQ事务

Rocket博客

RocketMQ基础

源码分析

END