RabbitMQ相关介绍 最好的学习文档: RabbitMQ 教程 - “Hello World!” |RabbitMQ 函数 — RabbitMQ tutorial - “Hello World!” | RabbitMQ
什么是消息队列 & 为什么使用?
Message Queue
顾名思义,存储消息的队列。
先来说说一个场景,以前送外卖的话或者送快递之类的,都是点到点,也就是快递小哥会直接送到你家,要是敲门发现不在,给你打个电话,此时你好像不得不回去?
其实先不说菜鸟驿站或者外卖柜。你们家家门口那块地就勉强算得上消息队列了,只不过不能保证安全,其他人也可以消费(bushi)
快递小哥只需要把快递放到你家门口,然后通过软件或者发短信,提醒你,就可以去干自己的事情了。而且你也不用马上去处理这个快递而打断当下做的事情。
什么是消息队列 MQ全称为Message Queue,即消息队列。“消息队列”是在消息的传输过程中保存消息的容器。它是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。
MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。
各种知名消息队列优劣 RabbitMQ 工作原理
Broker: 消息队列服务进程
Exchange: 交换机,通过管道接受生产者信息,再通过一定规则将信息转发给某个队列(过滤消息)
Queue:消息队列,存储信息的队列,消息到达队列则会转发给指定的消费者
Producer:消息生产者,即生产方客户端,生产方客户端将消息发送
Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。
RabbitMQ 快速入门 六种消息模型 Simple 简单队列: 简单的点对点服务
生产者
package com.moying.demo;import com.moying.utils.ConnectionUtil;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;public class Send { private final static String QUEUE_NAME = "simple_queue" ; public static void main (String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = null ; try { channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false , false , false , null ); String message = "Hello RabbitMQ!" ; channel.basicPublish("" , QUEUE_NAME, null , message.getBytes()); System.out.println(" [x] Sent '" + message + "'" ); } catch (Exception e) { e.printStackTrace(); } finally { assert channel != null ; channel.close(); connection.close(); } } }
消费者
package com.moying.demo;import com.moying.utils.ConnectionUtil;import com.rabbitmq.client.*;import java.io.IOException;public class Recv { private final static String QUEUE_NAME = "simple_queue" ; public static void main (String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false , false , false , null ); DefaultConsumer consumer = new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) { String exchange = envelope.getExchange(); long deliveryTag = envelope.getDeliveryTag(); String msg = new String (body); System.out.println(" [x] Received '" + msg + "'" ); try { channel.basicAck(deliveryTag, false ); } catch (IOException e) { throw new RuntimeException (e); } } }; channel.basicConsume(QUEUE_NAME, false , consumer); } }
work 工作队列:一对多,分工合作消费
The main idea behind Work Queues (aka: Task Queues ) is to avoid doing a resource-intensive task immediately and having to wait for it to complete. Instead we schedule the task to be done later. We encapsulate a task as a message and send it to a queue. A worker process running in the background will pop the tasks and eventually execute the job. When you run many workers the tasks will be shared between them.
package com.moying.demo2;import com.moying.utils.ConnectionUtil;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;public class WorkSend { private final static String QUEUE_NAME = "test_work_queue" ; public static void main (String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false , false , false , null ); for (int i = 0 ; i < 50 ; i++) { String message = "task...." + i; channel.basicPublish("" , QUEUE_NAME, null , message.getBytes()); System.out.println(" [x] Sent '" + message + "'" ); Thread.sleep(i*2 ); } channel.close(); connection.close(); } }
package com.moying.demo2;import com.moying.utils.ConnectionUtil;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.DefaultConsumer;import java.util.concurrent.TimeUnit;public class WorkRecv1 { private final static String QUEUE_NAME = "test_work_queue" ; public static void main (String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false , false , false , null ); DefaultConsumer consumer = new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte [] body) { String msg = new String (body); System.out.println(" [Consumer 1] Received '" + msg + "'" ); try { TimeUnit.SECONDS.sleep(1 ); channel.basicAck(envelope.getDeliveryTag(), false ); } catch (Exception e) { e.printStackTrace(); } } }; channel.basicConsume(QUEUE_NAME, false , consumer); } }
默认情况下这里会是C1和C2均摊消息进行消费,比如P发送50条,C1和C2各处理25条,但显然不太行(没有考虑C1和C2之间的设备性能差距等等)
所以可以通过设置
让消费者处理完一条消息之后再去接收消息进行消费而不是囤积
int prefetchCount = 1 ;channel.basicQos(prefetchCount);
Note about queue size
If all the workers are busy, your queue can fill up. You will want to keep an eye on that, and maybe add more workers, or have some other strategy.
ACK 机制 消息一旦被消费者接收,队列中的消息就被删除了。RabbitMQ是通过何种机制呢?
如果有了解TCP的话,应该就会恍然大悟(并且本身Producer,Broker,Consumer也是通过TCP连接的)
当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收。不过这种回执ACK分两种情况:
自动ACK:消息一旦被接收,消费者自动发送ACK
手动ACK:消息接收后,不会发送ACK,需要手动调用
这里需要考虑场景去判断选择哪种更好
如果是牵扯到钱之类的,肯定是得手动消费,得等到相关业务完成之后再去发送ACK,不然中途要是崩了,数据库方面进行回滚,但是你自动Ack后这条消息就没了啊。
如果是不太重要的消息,无脑自动吧(额,阅历不够,不清楚那些算不重要)
channel.basicConsume(QUEUE_NAME, false , consumer);
channel.basicAck(envelope.getDeliveryTag(), false );
这里的multiple 不就跟TCP 的S-ACK很像嘛,我觉得底层应该就是这个
Publish/SubScirbe fanout direct Topic RPC 交换机类型
routingkey ,routing pattern
Fanout: 广播,
Direct: 定向,
Topic: 通配符,
Header: (https://blog.csdn.net/zhu_tianwei/article/details/40923131)
整合Springboot