消息中间件(MQ)
简介
消息中间件是基于队列与消息传递技术,在网络环境中为应用系统提供同步或异步、可靠的消息传输的支撑性软件系统。——百度百科
MQ全称为Message Queue,消息队列是程序和程序之间的通信方法。
使用场景
MQ相当于一个中介,消息生产方将消息发给MQ,消息消费方接收消息并进行相应逻辑处理,它将两应用程序进行解耦合。
在项目中,可将一些无需即时返回且耗时的操作提取出来,进行异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。
如订单抢票系统,开始抢票瞬间高请求,高并发,若此时都操作数据库,需要大量IO操作,消耗系统性能,系统很可能崩溃,我们可以先将订单存消息队列里,然后系统就可以避开高峰期再按照自己的消费能力消费消息队列里的消息。
主流实现方式
AMQP
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。 ——百度百科
AMQP不从API层进行限定,而是直接定义网络交换的数据格式。
JMS
JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。 ——百度百科
AMQP 与 JMS 区别
- JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
- JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
- JMS规定了两种消息模式;而AMQP的消息模式更加丰富
常见的消息队列
RabbitMQ
官网
简介
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。 ——百度百科
6种模式
简单模式:一个生产者发送消息到队列,一个消费者接收,不需要设置交换机(使用默认的交换机)
work工作队列模式,一个生产者,多个消费者,每个消费者获取到的消息唯一,多个消费者竞争同一个队列的消息
Publish/Subscribe发布与订阅模式:一个生产者发送的消息会被多个消费者获取。一个生产者、一个交换机、多个队列、多个消费者
Routing路由模式(direct模式):生产者发送消息到交换机并且要指定路由key,消费者将队列绑定到交换机时需要指定路由key
Topics通配符模式: 生产者发送消息到交换机,交换机类型设置topic,交换机根据绑定队列的routing key的值进行通配符匹配,
“#”:匹配零个或者多个词topic.# 可以匹配topic,topic.text,topic.test.queue
““:匹配l零个或一个词,topic. 可以匹配topic,topic.text或topic.queue
RPC远程调用模式:功能如名,调用远程项目的功能并等待结果。
入门使用
maven坐标
1 2 3 4 5
| <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.5</version> </dependency>
|
java代码示例
注:这里以direct模式示例,其他模式类似
消息生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("czm");
connectionFactory.setPassword("123456");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String message = "hello world";
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .deliveryMode(2) .contentEncoding("UTF-8") .expiration("10000") .headers(headers) .build();
channel.basicPublish("test_direct_exchange", "test.direct", properties, message.getBytes());
channel.close(); connection.close();
|
消息消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("czm"); connectionFactory.setPassword("123456");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_direct_exchange"; String exchangeType = "direct"; String queueName = "test_direct_queue"; String routingKey = "test.direct";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while(true){ Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("收到消息:" + msg); }
|
用xml配置方式与spring整合
生产端
xml配置
注:以下配置rabbitMQ可靠性投递用到,消息的延迟投递,做二次确认,回调检查
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
| <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<context:property-placeholder location="classpath:application.properties"/> <rabbit:connection-factory id="rabbitmqConnectionFactory" host="${rmq.ip}" port="${rmq.port}" virtual-host="${rmq.manager.virtual}" username="${rmq.manager.user}" password="${rmq.manager.password}" publisher-confirms="true" publisher-returns="true"/> <rabbit:admin connection-factory="rabbitmqConnectionFactory"/>
<bean id="RabbitmqProduct" class="生产者类路径">
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /> <bean id="confirmCallBackListener" class="回调类路径"/> <bean id="returnCallBackListener" class="回调类路径" /> <rabbit:template id="rabbitTemplate" connection-factory="rabbitmqConnectionFactory" message-converter="jsonMessageConverter" confirm-callback="confirmCallBackListener" return-callback="returnCallBackListener" mandatory="true"/>
<rabbit:queue name="${rmq.manager.queue}" id="rabbitQueue" durable="true" auto-delete="false" exclusive="false" />
<rabbit:queue id="queueDelay" name="${rmq.manager.queue_delay}" durable="true" auto-delete="false" exclusive="false" > <rabbit:queue-arguments> <entry key="x-message-ttl" value="20000" value-type="java.lang.Long" /> <entry key="x-dead-letter-exchange" value="${rmq.manager.exchange_dead}" /> <entry key="x-dead-letter-routing-key" value="${rmq.manager.key_dead}" /> </rabbit:queue-arguments> </rabbit:queue>
<rabbit:direct-exchange id="exchange" name="${rmq.manager.exchange}" durable="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="rabbitQueue" key="${rmq.manager.routingKey}"></rabbit:binding> <rabbit:binding queue="queueDelay" key="${rmq.manager.key_delay}"></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange>
<rabbit:queue name="${rmq.manager.queue_dead}" id="queueDead" durable="true" auto-delete="false" exclusive="false" />
<rabbit:direct-exchange id="exchange_dead" name="${rmq.manager.exchange_dead}" durable="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="queueDead" key="${rmq.manager.key_dead}"></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange>
<bean name="Handler" class="消息类路径"></bean>
<rabbit:listener-container connection-factory="rabbitmqConnectionFactory" acknowledge="manual" > <rabbit:listener queues="${rmq.manager.queue_dead}" ref="Handler"/> </rabbit:listener-container>
</beans>
|
生产者代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
|
public class RabbitmqProduct {
private RabbitTemplate rabbitTemplate; private String exchange; private String routingKey;
private String delayKey;
@Resource SocketMsgService socketMsgService;
public void sendMessage(SocketMsg socketMsg, boolean isFirst){ if(isFirst){ socketMsgService.insert(socketMsg); }
try { Object jsonSocketMsg = JSON.toJSON(socketMsg); rabbitTemplate.convertAndSend(exchange,routingKey, jsonSocketMsg, new CorrelationData(socketMsg.getId().toString())); rabbitTemplate.convertAndSend(exchange,delayKey, jsonSocketMsg, new CorrelationData(socketMsg.getId().toString()+"delay")); } catch (Exception e) { e.printStackTrace(); } }
public void setExchange(String exchange) { this.exchange = exchange; }
public void setRoutingKey(String routingKey) { this.routingKey = routingKey; }
public void setRabbitTemplate(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; }
public void setDelayKey(String delayKey) { this.delayKey = delayKey; } }
|
延时队列消费者代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| @Slf4j public class Handler implements ChannelAwareMessageListener {
@Resource SocketMsgService socketMsgService;
@Resource RabbitmqProduct rabbitmqProduct;
@Override public void onMessage(Message message, Channel channel) {
log.info(new String(message.getBody())); Object parse = JSONObject.parse(message.getBody()); log.info(parse.toString()); SocketMsg socketMsg = JSONObject.parseObject(message.getBody(), SocketMsg.class); log.info(socketMsg.toString()); try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (IOException e) { e.printStackTrace(); }
if(socketMsgService.findById(socketMsg.getId()).getStatus() == 0){ if (socketMsgService.getTryCountById(socketMsg.getId()) < 3){ socketMsgService.addTryCountById(socketMsg.getId()); rabbitmqProduct.sendMessage(socketMsg, false); } } } }
|
投递成功回调类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
|
@Slf4j @Service("confirmCallBackListener") public class ConfirmCallBackListener implements ConfirmCallback { @Resource SocketMsgService socketMsgService;
@Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack){ log.info("################发送到exchange成功:"+correlationData); if (!correlationData.getId().endsWith("delay")){ socketMsgService.succeedSendById(Long.parseLong(correlationData.getId())); } }else { log.error("#########发送失败:"+correlationData); } } }
|
投递失败回调类
1 2 3 4 5 6 7 8 9 10 11 12 13
|
@Slf4j public class ReturnCallBackListener implements ReturnCallback { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.error("无对应key队列,发送失败了"+message); log.info(new String(message.getBody())); } }
|
消费端
配置与上面延时队列的消息接收方类似,此处不再累赘
常问问题1
常问问题2
END