初始RabbitMQ
初始RabbitMQ
基础篇
同步调用
同步调用是一种线性执行模式。当你调用一个函数后,程序会暂停在当前位置,直到这个函数执行完毕并返回结果后,才会继续执行下一行代码。这就像你在餐厅点餐后,站在柜台前一直等到厨师做好餐品拿到手后才离开.
缺点:
- 拓展性差: 拓展服务需要更改通知代码
- 性能下降: 串行执行, 效果慢
- 级联失败: 前面服务失败, 后面服务也失败
使用场景: 下一步操作需要上一步操作的结果才使用同步调用, 否则可优化为异步调用
异步调用
异步调用是一种非阻塞的执行模式。发出调用后,程序不会傻等,而是立即继续执行后续代码。被调用的函数(或任务)会在后台(例如在另一个线程中)执行,当它完成时,会通过一种通知机制(如回调函数、事件或消息)来告知调用方结果已就绪 。这就好比你在餐厅点餐后,拿到一个取餐号,然后可以回座位玩手机,当餐准备好时,服务员会叫号通知你取餐 .
对于消息队列的异步调用, 一般包含三个角色:
- 消息发送者: 消息生产者
- 消息代理: 管理, 暂存, 转发消息
- 消息接收者: 消息消费者
优点:
- 解除耦合, 拓展性强
- 无需等待, 性能好
- 故障隔离
- 缓存消息, 流量削峰填谷
缺点:
- 时效性差
- 无法确认下游服务对消息的处理情况
- 业务安全依赖于Broker(消息代理/消息队列) 的可靠性
| 特性维度 | 同步调用 | 异步调用 |
|---|---|---|
| 核心机制 | 调用后必须等待返回结果才继续执行 | 调用后无需等待,可立即执行后续操作 |
| 执行时序 | 强时序性,顺序执行,上下文一致 | 非线性,完成顺序不确定,可能先调用的后完成 |
| 线程状态 | 调用线程可能被阻塞(挂起) | 调用线程非阻塞,可自由执行其他任务 |
| 结果获取 | 直接通过函数返回值获取 | 通过回调函数、事件通知、Future/Promise等方式获取 |
| 资源利用率 | 较低,等待期间线程资源可能闲置 | 较高,线程资源可被充分利用 |
| 代码复杂度 | 逻辑简单直观,易于理解和调试 | 相对复杂,需要处理回调地狱、线程安全等问题 |
| 典型应用 | 简单的顺序任务、短时间操作 | 高并发服务、I/O密集型任务、GUI应用 |
MQ技术选型
MQ (MessageQueue),中文是消息队列,字面来看就是存放消息的队列。也就是异步调用中的Broker。
| 特性维度 | Kafka | RabbitMQ | RocketMQ | ActiveMQ |
|---|---|---|---|---|
| 核心协议 | 自定义协议 | AMQP, MQTT, STOMP | 自研协议 | JMS, AMQP, MQTT |
| 吞吐量 | 极高 (百万级TPS) | 中等 (万级TPS) | 高 (十万级TPS) | 低 (万级TPS) |
| 延迟 | 较高 (毫秒-秒级) | 极低 (毫秒级) | 低 (毫秒级) | 毫秒级 |
| 可靠性 | 高 (多副本机制) | 高 (ACK机制) | 极高 (金融级) | 中 (依赖配置) |
| 事务消息 | 不支持 | 插件支持 | 原生支持 | 支持 |
| 顺序消息 | 分区内有序 | 单队列有序 | 分区内严格有序 | 单队列有序 |
| 扩展性 | 水平扩展极佳 | 集群扩展复杂 | 水平扩展良好 | 垂直扩展为主 |
| 学习成本 | 高 | 中 (文档详细, 社区支持全面) | 中 | 低 |
RabbitMQ
介绍和安装
-
基本介绍
- RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:https://www.rabbitmg.com/
-
核心概念
- publisher: 消息发送者
- consumer: 消息消费者
- queue: 队列, 存储消息
- exchange: 交换机, 负责消息的路由
- binding: 交换机绑定队列
- routing key: 路由条件
快速入门
使用docker快速安装一个rabbitmq:
- 不挂载数据卷
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 ----restart=always rabbitmq:management
- 挂载数据卷
docker run -e RABBITMQ_DEFAULT_USER=fei \
-e RABBITMQ_DEFAULT_PASS=fei \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
--network hmall \
-d \
--restart=always \
rabbitmq:management
使用数据卷挂载:mq-plugins是数据卷名,那么已有的插件会复制到这个数据卷中。
这里不存在该数据卷,会自动创建该数据卷。存放的主机目录可通过
docker inspect mq-plugins进行查看
docker核心命令:
| 操作类型 | 核心命令 | 关键参数/说明 |
|---|---|---|
| 创建数据卷 | docker volume create [卷名] |
创建命名的数据卷,便于管理 |
| 挂载数据卷 | docker run -v [卷名]:[容器路径] ... |
将数据卷挂载到容器内指定路径 |
| 查看数据卷 | docker volume ls, docker volume inspect [卷名] |
列出所有卷或查看详细信息 |
| 删除数据卷 | docker volume rm [卷名] |
删除指定的数据卷 |
| 清理无用卷 | docker volume prune |
清理所有未被容器引用的数据卷 |
- 新建队列hello.queue1和hello.queue2
- 向默认的amp.fanout交换机发送一条消息
- 查看消息是否到达hello.queue1和hello.queue2
- 总结规律
数据隔离
需求:在RabbitMQ的控制台完成下列操作
- 新建一个用户hmall
- 为hmall用户创建一个virtual host
- 测试不同virtualhost之间的数据隔离现象
只有本用户创建的virtual host 才能被本用户访问
Java客户端
快速入门
需求如下
- 利用控制台创建队列simple.queue
- 在publisher服务中,利用SpringAMQP直接向simple.queue发送消息
- 在consumer服务中,利用SpringAMQP编写消费者,监听simple.queue队列
- 引入依赖
<!--引入RabbitMQ : AMQP(高级消息队列协议) Advanced-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 配置
# 在生产者或者消费者引入MQ服务端信息
spring:
rabbitmq:
host: 192.168.56.2
port: 5672
virtual-host: "/" # 虚拟主机
username: guest
password: guest
- 发送消息
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void contextLoads() {
rabbitTemplate.convertAndSend("hello.queue1", "hello from java");
}
- 消息接收
SpringAMQP提供声明式的消息监听,我们只需要通过注解在方法上声明要监听的队列名称,将来SpringAMQP就会把消息传递给当前方法:
@Component
@Slf4j
public class SpringRabbitMQListener {
@RabbitListener(queues = {"hello.queue1"})
public void receive(String message) {
log.info("receive message: {}", message);
}
}
WorkQueue
案例: 一个队列绑定多个消费者
基本思路如下:
- 在RabbitMQ的控制台创建一个队列,名为work.queue
- 在publisher服务中定义测试方法,在1秒内产生50条消息,发送到work.queue
- 在consumer服务中定义两个消息监听者,都监听work.queue队列
- 消费者1每秒处理50条消息,消费者2每秒处理5条消息
- 生产者
@Test
void testWQ(){
for (int i = 0; i < 50; i++) {
rabbitTemplate.convertAndSend("work.queue", "hello from java = " + i);
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
- 消费者
@RabbitListener(queues = {"work.queue"})
public void receiveWQ1(String message) {
System.out.println("消费者1 接收work.queue = " + message);
}
@RabbitListener(queues = {"work.queue"})
public void receiveWQ2(String message) throws InterruptedException {
System.err.println("消费者2 接收work.queue = " + message);
TimeUnit.MILLISECONDS.sleep(200);// 1s / 5 == 200 ms
}
默认情况下,RabbitMQ的会将消息依次轮询投递给绑定在队列上的每一个消费者。但这并没有考虑到消费者是否已经处理完消息,可能出现消息堆积。
因此我们需要修改application.yml,设置preFetch值为1,确保同一时刻最多投递给消费者1条消息:
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息 # 能者多劳
Fanout交换机
真正生产环境都会经过exchange来发送消息,而不是直接发送到队列,常用的交换机类型有以下三种:
- Fanout: 广播
- Direct: 定向
- Topic: 话题
- headers
- x-local-random
Fanout Exchange 会将接收到的消息广播到每一个跟其绑定的queue,所以也叫广播模式
案例
- 在RabbitMQ控制台中,声明队列fanout.queue1和fanout.queue2
- 在RabbitMQ控制台中,声明交换机hmall.fanout,将两个队列与其绑定
- 在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
- 在publisher中编写测试方法,向hmall.fanout发送消息
- 总结: 交换机的作用
- 接收publisher发送的消息
- 将消息按照规则路由到与之绑定的队列
- FanoutExchange的会将消息路由到每个绑定的队列
Direct交换机
Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由。
- 每一个Queue都与Exchange设置一个BindingKey
- 发布者发送消息时,指定消息的RoutingKey
- Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
案例
- 在RabbitMQ控制台中,声明队列direct.queue1和direct.queue2
- 在RabbitMQ控制台中,声明交换机hmall.direct,将两个队列与其绑定
- 在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
- 在publisher中编写测试方法,利用不同的RoutingKey向hmall.direct发送消息
@Test
void testDirect(){
rabbitTemplate.convertAndSend("hmall.direct", "dq1", "direct test"); // 只有按dq1绑定的queue可以接收
}
Topic交换机
TopicExchange与DirectExchange类似,区别在于routingKey可以是多个单词的列表,并且以 .分割。
Queue与Exchange指定BindingKey时可以使用通配符
#: 代指0个或多个单词*: 代指一个单词
例子:
china.#: 以china开头的routingkey, 比如china.sc.bz,china.sc都可以路由到china.*: 以china 开头的routingkey, 只有china.sc可以接收到,china.sc.bz接收不到
描述下Direct交换机与Topic交换机的差异?
- Topic交换机接收的消息RoutingKey可以是多个单词,以
.分割 - Topic交换机与队列绑定时的bindingKey可以指定通配符
#: 代表0个或多个词*: 代表1个词
队列交换机的声明
SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:
- Queue:用于声明队列,可以用工厂类QueueBuilder构建
- Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
- Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建
- 配置声明方式
package com.fei.mq.publisher.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
/**
* 声明FanoutExchange交换机
* @return
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("mall.fanout");
}
/**
* 声明队列
* @return
*/
@Bean
public Queue Queue1() {
return new Queue("mall.queue1");
}
/**
* 声明绑定关系
* @param Queue1 已声明的队列
* @param fanoutExchange 已声明的交换机
* @return
*/
@Bean
public Binding bindingQueue(Queue Queue1, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(Queue1).to(fanoutExchange);
}
}
- 注解声明方式
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(name = "inject.queue1"),
exchange = @Exchange(name = "inject.direct",type = ExchangeTypes.DIRECT),
key = {"inject.d","d.i"}
)
)
public void receiveDirect(String message) {
System.out.println("消费者接收到Direct消息:" + message);
}
- 声明队列、交换机、绑定关系的Bean是什么?
- Queue
- FanoutExchange、DirectExchange、TopicExchange
- Binding
- 基于@RabbitListener注解声明队列和交换机有哪些常见注解?
- @Queue
- @Exchange
消息转换器
需求: 测试利用SpringAMQP发送对象类型的消息
- 声明一个队列,名为object.queue
- 编写单元测试,向队列中直接发送一条消息,消息类型为Map
- 在控制台查看消息,总结你能发现的问题
Spring的对消息对象的处理是出 org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是 SimpleMessageConverter,基于JDK的 ObjectOutputStream完成序列化。存在下列问题:
- JDK的序列化有安全风险
- JDK序列化的消息太大
- JDK序列化的消息可读性差
建议采用JSON序列化代替默认的JDK序列化,要做两件事情:在publisher和consumer中都要引入jackson依赖:
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
- 发送者
@Test
void testMap(){
HashMap<String, String> map = new HashMap<>();
map.put("name","jack");
rabbitTemplate.convertAndSend( "hmall.topic","china", map);
}
- 消费者
@RabbitListener(queues = "topic.queue1")
public void receiveTopic(Map<String,String > message) {
System.out.println("消费者 map 绑定 = " + message);
}
消息发送者和消息消费者处理消息的类型需要保持一致