RabbitMQ相关介绍

最好的学习文档: RabbitMQ 教程 - “Hello World!” |RabbitMQ 函数 — RabbitMQ tutorial - “Hello World!” | RabbitMQ

什么是消息队列 & 为什么使用?

Message Queue

顾名思义,存储消息的队列。

先来说说一个场景,以前送外卖的话或者送快递之类的,都是点到点,也就是快递小哥会直接送到你家,要是敲门发现不在,给你打个电话,此时你好像不得不回去?

其实先不说菜鸟驿站或者外卖柜。你们家家门口那块地就勉强算得上消息队列了,只不过不能保证安全,其他人也可以消费(bushi)

快递小哥只需要把快递放到你家门口,然后通过软件或者发短信,提醒你,就可以去干自己的事情了。而且你也不用马上去处理这个快递而打断当下做的事情。

什么是消息队列
MQ全称为Message Queue,即消息队列。“消息队列”是在消息的传输过程中保存消息的容器。它是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。

MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。

各种知名消息队列优劣

RabbitMQ 工作原理

  • Broker: 消息队列服务进程
  • Exchange: 交换机,通过管道接受生产者信息,再通过一定规则将信息转发给某个队列(过滤消息)
  • Queue:消息队列,存储信息的队列,消息到达队列则会转发给指定的消费者
  • Producer:消息生产者,即生产方客户端,生产方客户端将消息发送
  • Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。

RabbitMQ 快速入门

六种消息模型

Simple

简单队列: 简单的点对点服务

生产者

package com.moying.demo;

import com.moying.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;


public class Send {

private final static String QUEUE_NAME = "simple_queue";
public static void main(String[] args) throws Exception {
// 获取连接
Connection connection = ConnectionUtil.getConnection();
// 创建通道
Channel channel = null;
try {
channel = connection.createChannel();
/**
* 参数1:队列名称 参数2:是否持久化 参数3:是否独占 参数4:是否自动删除 参数5:其他参数 比如 TTL 等
* 创建队列
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 发送消息
String message = "Hello RabbitMQ!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
} catch (Exception e) {
e.printStackTrace();
} finally {
assert channel != null;
channel.close();
connection.close();
}
}
}

消费者

package com.moying.demo;

import com.moying.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Recv {
private final static String QUEUE_NAME = "simple_queue";

public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
// 创建会话通道,生产者和mq服务所有通信都在channel通道中完成
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 实现消费方法
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 当接收到消息后此方法将被调用
// 参数1:消费者标签 参数2:消息队列信息 参数3:消息属性 参数4:消息内容
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
// 模拟异常
// int i = 1/0;
// 交换机
String exchange = envelope.getExchange();
// 消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
long deliveryTag = envelope.getDeliveryTag();
// body 消息内容
String msg = new String(body);
System.out.println(" [x] Received '" + msg + "'");

// 手动ack
// 参数1:消息id,参数2:是否批量确认
// multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息
try {
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
};
// 监听队列,第二个参数为是否自动确认,true表示自动确认 callback为回调函数
// channel.basicConsume(QUEUE_NAME, true, consumer);
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}

work

工作队列:一对多,分工合作消费

The main idea behind Work Queues (aka: Task Queues) is to avoid doing a resource-intensive task immediately and having to wait for it to complete. Instead we schedule the task to be done later. We encapsulate a task as a message and send it to a queue. A worker process running in the background will pop the tasks and eventually execute the job. When you run many workers the tasks will be shared between them.

package com.moying.demo2;

import com.moying.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;


public class WorkSend {
private final static String QUEUE_NAME = "test_work_queue";

public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 循环发布任务
for (int i = 0; i < 50; i++) {
String message = "task...." + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
Thread.sleep(i*2);
}
channel.close();
connection.close();

}
}

package com.moying.demo2;

import com.moying.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;

import java.util.concurrent.TimeUnit;

public class WorkRecv1 {
private final static String QUEUE_NAME = "test_work_queue";

public static void main(String[] args)throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 只接收一条未确认的消息
// channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) {
String msg = new String(body);
System.out.println(" [Consumer 1] Received '" + msg + "'");
try {
TimeUnit.SECONDS.sleep(1);
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
e.printStackTrace();
}
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);
// channel.basicConsume(QUEUE_NAME, true, consumer);
}
}

默认情况下这里会是C1和C2均摊消息进行消费,比如P发送50条,C1和C2各处理25条,但显然不太行(没有考虑C1和C2之间的设备性能差距等等)

所以可以通过设置

让消费者处理完一条消息之后再去接收消息进行消费而不是囤积

int prefetchCount = 1;
channel.basicQos(prefetchCount);

Note about queue size

If all the workers are busy, your queue can fill up. You will want to keep an eye on that, and maybe add more workers, or have some other strategy.

ACK 机制

消息一旦被消费者接收,队列中的消息就被删除了。RabbitMQ是通过何种机制呢?

如果有了解TCP的话,应该就会恍然大悟(并且本身Producer,Broker,Consumer也是通过TCP连接的)

当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收。不过这种回执ACK分两种情况:

  • 自动ACK:消息一旦被接收,消费者自动发送ACK
  • 手动ACK:消息接收后,不会发送ACK,需要手动调用

这里需要考虑场景去判断选择哪种更好

  • 如果是牵扯到钱之类的,肯定是得手动消费,得等到相关业务完成之后再去发送ACK,不然中途要是崩了,数据库方面进行回滚,但是你自动Ack后这条消息就没了啊。
  • 如果是不太重要的消息,无脑自动吧(额,阅历不够,不清楚那些算不重要)
// 监听队列,第二个参数false,手动进行ACK
channel.basicConsume(QUEUE_NAME, false, consumer);

// 手动进行ACK
/*
* void basicAck(long deliveryTag, boolean multiple) throws IOException;
* deliveryTag:用来标识消息的id
* multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
*/
channel.basicAck(envelope.getDeliveryTag(), false);

这里的multiple 不就跟TCP 的S-ACK很像嘛,我觉得底层应该就是这个

Publish/SubScirbe

fanout

direct

Topic

RPC

交换机类型

routingkey ,routing pattern

Fanout: 广播,

Direct: 定向,

Topic: 通配符,

Header: (https://blog.csdn.net/zhu_tianwei/article/details/40923131)

整合Springboot