简介
RocketMQ是一款阿里巴巴开源的消息中间件,在2017年9月份成为Apache的顶级项目,是国内首个互联网中间件在 Apache 上的顶级项目。RocketMQ经过了阿里多年双十一大促的考验,性能和稳定性得到了充分的考验。目前在业界被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binlog分发等场景。
官网地址
http://rocketmq.apache.org/
开源项目地址
https://github.com/apache/rocketmq
常用中间件对比
架构设计
官方文档链接
下载安装
1. 执行解压命令
1
| unzip rocketmq-all-4.9.0-bin-release.zip
|
注意启动前修改bin目录下的 runbroker.sh 和 runserver.sh 中JVM内存参数,否者可能启动不了。
2. 进入bin目录
3. 启动NameServer
1
| nohup sh bin/mqnamesrv &
|
4. 启动Broker
1 2
| nohup sh mqbroker -n localhost:9876 &
|
5. 测试
1 2 3 4
| export NAMESRV_ADDR=localhost:9876
sh tools.sh org.apache.rocketmq.example.quickstart.Producer
|
1 2 3 4
| export NAMESRV_ADDR=localhost:9876
sh tools.sh org.apache.rocketmq.example.quickstart.C
|
6. 关闭
1 2 3 4
| sh mqshutdown namesrv
sh mqshutdown broker
|
配置详情
在config目录下有相应配置示例
- 2m-2s-async:双主双从异步更新
- 2m-2s-sync:双主双从同步更新
- 2m-noslave:双主
参数详情
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
namesrvAddr=localhost:9876;localhost:9877
autoCreateTopicEnable=true
defaultTopicQueueNums=4
autoCreateSubscriptionGroup=true
listenPort=10911
deleteWhen=04
fileReservedTime=48
mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue=300000
diskMaxUsedSpaceRatio=88
storePathRootDir=/usr/local/RoccketMQ/rocketmq1/store
storePathCommitLog=/usr/local/RoccketMQ/rocketmq1/store/commitlog
storePathConsumeQueue=/usr/local/RoccketMQ/rocketmq1/store/consumequeue
storePathIndex=/usr/local/RoccketMQ/rocketmq1/store/index
storeCheckpoint=/usr/local/RoccketMQ/rocketmq1/store/checkpoint
abortFile=/usr/local/RoccketMQ/rocketmq1/store/abort
maxMessageSize=65536
brokerRole=SYNC_MASTER
flushDiskType=SYNC_FLUSH
|
注:集群配置时每个broker的storePath要不同否则会报java.lang.RuntimeException: Lock failed,MQ already started错误
管理工具
mqadmin
在bin目录下直接执行 ./mqadmin {command} {args}。
图形化管理界面
- 到
github
上下载RocketMQ的扩展类开源项目
- 解压后需修改rocketmq-console的配置文件application.properties(添加nameServer的地址)
再执行maven打包为可执行jar包运行:mvn clean package -Dmaven.test.skip=true
直接运行jar包访问8080端口即可:java - jar ×××.jar
代码示例
maven依赖
1 2 3 4 5
| <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.4.0</version> </dependency>
|
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| public class SyncProducer {
public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("group1"); producer.setNamesrvAddr(Common.NAME_SERVER); producer.start(); for (int i = 0; i < 10; i++) {
Message msg = new Message("base", "Tag1", ("Hello World" + i).getBytes());
SendResult result = producer.send(msg); SendStatus status = result.getSendStatus(); System.out.println("发送结果:" + result);
} producer.shutdown(); } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public class Consumer {
public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); consumer.setNamesrvAddr(Common.NAME_SERVER); consumer.subscribe("base", "*");
consumer.setMessageModel(MessageModel.BROADCASTING); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } }
|
消息延迟
// 设置延时等级,延迟消息时使用
//在源码的org.apache.rocketmq.store.config.MessageStoreConfig类中有相应级别声明
//private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
//在消息发送前设置 0表示不延迟,3表示延时10秒
//msg.setDelayTimeLevel(3);
消息过滤
使用tag是实现
1 2
| DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); consumer.subscribe("TOPIC", "TAG1 || TAG2 || TAG3");
|
使用MessageSelector实现
生产者通过putUserProperty
设置消息的属性
1 2 3 4 5 6 7 8 9 10
| DefaultMQProducer producer = new DefaultMQProducer("group1"); producer.start(); Message msg = new Message("TopicTest", tag, "Hello World ".getBytes(RemotingHelper.DEFAULT_CHARSET) );
msg.putUserProperty("t", String.valueOf(i)); SendResult sendResult = producer.send(msg); producer.shutdown();
|
消费者通过MessageSelector获取符合相应sql条件的消息
1 2 3 4 5 6 7 8 9
| DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); consumer.subscribe("tag", MessageSelector.bySql("t between 0 and 5"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start();
|
RocketMQ顺序消费
参考链接
参考链接
其他链接
RocketMQ事务
Rocket博客
RocketMQ基础
源码分析
END