简介
RocketMQ是一款阿里巴巴开源的消息中间件,在2017年9月份成为Apache的顶级项目,是国内首个互联网中间件在 Apache 上的顶级项目。RocketMQ经过了阿里多年双十一大促的考验,性能和稳定性得到了充分的考验。目前在业界被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binlog分发等场景。
官网地址
http://rocketmq.apache.org/
开源项目地址
https://github.com/apache/rocketmq
常用中间件对比

架构设计

官方文档链接
下载安装
1. 执行解压命令
| 1
 | unzip rocketmq-all-4.9.0-bin-release.zip 
 | 
注意启动前修改bin目录下的 runbroker.sh 和 runserver.sh 中JVM内存参数,否者可能启动不了。
2. 进入bin目录
3. 启动NameServer
| 1
 | nohup sh bin/mqnamesrv &
 | 
4. 启动Broker
| 12
 
 | nohup sh mqbroker -n localhost:9876 &
 
 | 
5.  测试
| 12
 3
 4
 
 | export NAMESRV_ADDR=localhost:9876
 
 sh tools.sh org.apache.rocketmq.example.quickstart.Producer
 
 | 
| 12
 3
 4
 
 | export NAMESRV_ADDR=localhost:9876
 
 sh tools.sh org.apache.rocketmq.example.quickstart.C
 
 | 
6. 关闭
| 12
 3
 4
 
 | sh mqshutdown namesrv
 
 sh mqshutdown broker
 
 | 
配置详情
在config目录下有相应配置示例

- 2m-2s-async:双主双从异步更新
- 2m-2s-sync:双主双从同步更新
- 2m-noslave:双主
参数详情
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 
 | brokerClusterName=DefaultCluster
 
 brokerName=broker-a
 
 brokerId=0
 
 namesrvAddr=localhost:9876;localhost:9877
 
 autoCreateTopicEnable=true
 
 defaultTopicQueueNums=4
 
 autoCreateSubscriptionGroup=true
 
 listenPort=10911
 
 deleteWhen=04
 
 fileReservedTime=48
 
 mapedFileSizeCommitLog=1073741824
 
 mapedFileSizeConsumeQueue=300000
 
 
 
 diskMaxUsedSpaceRatio=88
 
 storePathRootDir=/usr/local/RoccketMQ/rocketmq1/store
 
 storePathCommitLog=/usr/local/RoccketMQ/rocketmq1/store/commitlog
 
 storePathConsumeQueue=/usr/local/RoccketMQ/rocketmq1/store/consumequeue
 
 storePathIndex=/usr/local/RoccketMQ/rocketmq1/store/index
 
 storeCheckpoint=/usr/local/RoccketMQ/rocketmq1/store/checkpoint
 
 abortFile=/usr/local/RoccketMQ/rocketmq1/store/abort
 
 maxMessageSize=65536
 
 
 
 
 
 
 
 
 brokerRole=SYNC_MASTER
 
 
 
 flushDiskType=SYNC_FLUSH
 
 
 
 
 
 
 
 | 
注:集群配置时每个broker的storePath要不同否则会报java.lang.RuntimeException: Lock failed,MQ already started错误
管理工具
mqadmin
在bin目录下直接执行 ./mqadmin {command} {args}。
图形化管理界面
- 到github上下载RocketMQ的扩展类开源项目

- 解压后需修改rocketmq-console的配置文件application.properties(添加nameServer的地址)

- 再执行maven打包为可执行jar包运行:- mvn clean package -Dmaven.test.skip=true
 
- 直接运行jar包访问8080端口即可:java - jar ×××.jar 

代码示例
maven依赖
| 12
 3
 4
 5
 
 |  <dependency><groupId>org.apache.rocketmq</groupId>
 <artifactId>rocketmq-client</artifactId>
 <version>4.4.0</version>
 </dependency>
 
 | 
生产者
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 
 | public class SyncProducer {
 public static void main(String[] args) throws Exception {
 
 DefaultMQProducer producer = new DefaultMQProducer("group1");
 
 
 producer.setNamesrvAddr(Common.NAME_SERVER);
 
 producer.start();
 
 
 for (int i = 0; i < 10; i++) {
 
 
 
 
 
 
 Message msg = new Message("base", "Tag1", ("Hello World" + i).getBytes());
 
 
 
 
 
 
 SendResult result = producer.send(msg);
 
 SendStatus status = result.getSendStatus();
 System.out.println("发送结果:" + result);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 }
 
 producer.shutdown();
 }
 }
 
 | 
消费者
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 
 | public class Consumer {
 public static void main(String[] args) throws Exception {
 
 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
 
 consumer.setNamesrvAddr(Common.NAME_SERVER);
 
 consumer.subscribe("base", "*");
 
 
 consumer.setMessageModel(MessageModel.BROADCASTING);
 
 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
 
 
 consumer.registerMessageListener(new MessageListenerConcurrently() {
 
 
 @Override
 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
 for (MessageExt msg : msgs) {
 System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
 }
 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
 }
 });
 
 consumer.start();
 }
 }
 
 | 
消息延迟
// 设置延时等级,延迟消息时使用
//在源码的org.apache.rocketmq.store.config.MessageStoreConfig类中有相应级别声明
//private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
//在消息发送前设置 0表示不延迟,3表示延时10秒
//msg.setDelayTimeLevel(3);
消息过滤
使用tag是实现
| 12
 
 | DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");consumer.subscribe("TOPIC", "TAG1 || TAG2 || TAG3");
 
 | 
使用MessageSelector实现
生产者通过putUserProperty设置消息的属性
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 
 | DefaultMQProducer producer = new DefaultMQProducer("group1");producer.start();
 Message msg = new Message("TopicTest",
 tag,
 "Hello World ".getBytes(RemotingHelper.DEFAULT_CHARSET)
 );
 
 msg.putUserProperty("t", String.valueOf(i));
 SendResult sendResult = producer.send(msg);
 producer.shutdown();
 
 | 
消费者通过MessageSelector获取符合相应sql条件的消息
| 12
 3
 4
 5
 6
 7
 8
 9
 
 | DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");consumer.subscribe("tag", MessageSelector.bySql("t between 0 and 5");
 consumer.registerMessageListener(new MessageListenerConcurrently() {
 @Override
 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
 }
 });
 consumer.start();
 
 | 
RocketMQ顺序消费
参考链接
参考链接
其他链接
RocketMQ事务
Rocket博客
RocketMQ基础
源码分析
END