初始RabbitMQ

基础篇

同步调用

同步调用是一种线性执行模式。当你调用一个函数后,程序会暂停在当前位置,直到这个函数执行完毕并返回结果后,才会继续执行下一行代码。这就像你在餐厅点餐后,站在柜台前一直等到厨师做好餐品拿到手后才离开.

缺点:

  • 拓展性差: 拓展服务需要更改通知代码
  • 性能下降: 串行执行, 效果慢
  • 级联失败: 前面服务失败, 后面服务也失败

使用场景: 下一步操作需要上一步操作的结果才使用同步调用, 否则可优化为异步调用

异步调用

异步调用是一种非阻塞的执行模式。发出调用后,程序不会傻等,而是立即继续执行后续代码。被调用的函数(或任务)会在后台(例如在另一个线程中)执行,当它完成时,会通过一种通知机制(如回调函数、事件或消息)来告知调用方结果已就绪 。这就好比你在餐厅点餐后,拿到一个取餐号,然后可以回座位玩手机,当餐准备好时,服务员会叫号通知你取餐 .

对于消息队列的异步调用, 一般包含三个角色:

  • 消息发送者: 消息生产者
  • 消息代理: 管理, 暂存, 转发消息
  • 消息接收者: 消息消费者

优点:

  • 解除耦合, 拓展性强
  • 无需等待, 性能好
  • 故障隔离
  • 缓存消息, 流量削峰填谷

缺点:

  • 时效性差
  • 无法确认下游服务对消息的处理情况
  • 业务安全依赖于Broker(消息代理/消息队列) 的可靠性
特性维度 同步调用 异步调用
核心机制 调用后必须等待返回结果才继续执行 调用后无需等待,可立即执行后续操作
执行时序 强时序性,顺序执行,上下文一致 非线性,完成顺序不确定,可能先调用的后完成
线程状态 调用线程可能被阻塞(挂起) 调用线程非阻塞,可自由执行其他任务
结果获取 直接通过函数返回值获取 通过回调函数事件通知Future/Promise等方式获取
资源利用率 较低,等待期间线程资源可能闲置 较高,线程资源可被充分利用
代码复杂度 逻辑简单直观,易于理解和调试 相对复杂,需要处理回调地狱、线程安全等问题
典型应用 简单的顺序任务、短时间操作 高并发服务、I/O密集型任务、GUI应用

MQ技术选型

MQ (MessageQueue),中文是消息队列,字面来看就是存放消息的队列。也就是异步调用中的Broker。

特性维度 Kafka RabbitMQ RocketMQ ActiveMQ
核心协议 自定义协议 AMQP, MQTT, STOMP 自研协议 JMS, AMQP, MQTT
吞吐量 极高 (百万级TPS) 中等 (万级TPS) (十万级TPS) 低 (万级TPS)
延迟 较高 (毫秒-秒级) 极低 (毫秒级) 低 (毫秒级) 毫秒级
可靠性 高 (多副本机制) 高 (ACK机制) 极高 (金融级) 中 (依赖配置)
事务消息 不支持 插件支持 原生支持 支持
顺序消息 分区内有序 单队列有序 分区内严格有序 单队列有序
扩展性 水平扩展极佳 集群扩展复杂 水平扩展良好 垂直扩展为主
学习成本 中 (文档详细, 社区支持全面)

RabbitMQ

介绍和安装

  • 基本介绍

    • RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:https://www.rabbitmg.com/
  • 核心概念

    • publisher: 消息发送者
    • consumer: 消息消费者
    • queue: 队列, 存储消息
    • exchange: 交换机, 负责消息的路由
    • binding: 交换机绑定队列
    • routing key: 路由条件

快速入门

使用docker快速安装一个rabbitmq:

  • 不挂载数据卷
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 ----restart=always rabbitmq:management
  • 挂载数据卷
docker run -e RABBITMQ_DEFAULT_USER=fei \
-e RABBITMQ_DEFAULT_PASS=fei \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
--network hmall \
-d \
--restart=always \
rabbitmq:management

使用数据卷挂载:mq-plugins是数据卷名,那么已有的插件会复制到这个数据卷中。

这里不存在该数据卷,会自动创建该数据卷。存放的主机目录可通过 docker inspect mq-plugins进行查看

docker核心命令:

操作类型 核心命令 关键参数/说明
创建数据卷 docker volume create [卷名] 创建命名的数据卷,便于管理
挂载数据卷 docker run -v [卷名]:[容器路径] ... 将数据卷挂载到容器内指定路径
查看数据卷 docker volume ls, docker volume inspect [卷名] 列出所有卷或查看详细信息
删除数据卷 docker volume rm [卷名] 删除指定的数据卷
清理无用卷 docker volume prune 清理所有未被容器引用的数据卷
  1. 新建队列hello.queue1和hello.queue2
  2. 向默认的amp.fanout交换机发送一条消息
  3. 查看消息是否到达hello.queue1和hello.queue2
  4. 总结规律

数据隔离

需求:在RabbitMQ的控制台完成下列操作

  • 新建一个用户hmall
  • 为hmall用户创建一个virtual host
  • 测试不同virtualhost之间的数据隔离现象

只有本用户创建的virtual host 才能被本用户访问

Java客户端

快速入门

需求如下

  • 利用控制台创建队列simple.queue
  • 在publisher服务中,利用SpringAMQP直接向simple.queue发送消息
  • 在consumer服务中,利用SpringAMQP编写消费者,监听simple.queue队列
  • 引入依赖
<!--引入RabbitMQ : AMQP(高级消息队列协议) Advanced-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 配置
# 在生产者或者消费者引入MQ服务端信息
spring:
  rabbitmq:
    host: 192.168.56.2
    port: 5672 
    virtual-host: "/" # 虚拟主机
    username: guest
    password: guest
  • 发送消息
@Autowired
private RabbitTemplate rabbitTemplate;
 
@Test
void contextLoads() {
 
    rabbitTemplate.convertAndSend("hello.queue1", "hello from java");
}
  • 消息接收

SpringAMQP提供声明式的消息监听,我们只需要通过注解在方法上声明要监听的队列名称,将来SpringAMQP就会把消息传递给当前方法:

@Component
@Slf4j
public class SpringRabbitMQListener {
 
    @RabbitListener(queues = {"hello.queue1"})
    public void receive(String message) {
        log.info("receive message: {}", message);
    }
 
}

WorkQueue

案例: 一个队列绑定多个消费者

基本思路如下:

  1. 在RabbitMQ的控制台创建一个队列,名为work.queue
  2. 在publisher服务中定义测试方法,在1秒内产生50条消息,发送到work.queue
  3. 在consumer服务中定义两个消息监听者,都监听work.queue队列
  4. 消费者1每秒处理50条消息,消费者2每秒处理5条消息
  • 生产者
@Test
void testWQ(){
    for (int i = 0; i < 50; i++) {
        rabbitTemplate.convertAndSend("work.queue", "hello from java = " + i);
        try {
            TimeUnit.MILLISECONDS.sleep(20);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}
  • 消费者
@RabbitListener(queues = {"work.queue"})
public void receiveWQ1(String message) {
    System.out.println("消费者1 接收work.queue = " + message);
}

@RabbitListener(queues = {"work.queue"})
public void receiveWQ2(String message) throws InterruptedException {
    System.err.println("消费者2 接收work.queue = " + message);
    TimeUnit.MILLISECONDS.sleep(200);// 1s / 5 == 200 ms
}

默认情况下,RabbitMQ的会将消息依次轮询投递给绑定在队列上的每一个消费者。但这并没有考虑到消费者是否已经处理完消息,可能出现消息堆积。
因此我们需要修改application.yml,设置preFetch值为1,确保同一时刻最多投递给消费者1条消息:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息 # 能者多劳

Fanout交换机

真正生产环境都会经过exchange来发送消息,而不是直接发送到队列,常用的交换机类型有以下三种:

  • Fanout: 广播
  • Direct: 定向
  • Topic: 话题
  • headers
  • x-local-random

Fanout Exchange 会将接收到的消息广播到每一个跟其绑定的queue,所以也叫广播模式

案例

  1. 在RabbitMQ控制台中,声明队列fanout.queue1和fanout.queue2
  2. 在RabbitMQ控制台中,声明交换机hmall.fanout,将两个队列与其绑定
  3. 在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
  4. 在publisher中编写测试方法,向hmall.fanout发送消息
  • 总结: 交换机的作用
    • 接收publisher发送的消息
    • 将消息按照规则路由到与之绑定的队列
    • FanoutExchange的会将消息路由到每个绑定的队列

Direct交换机

Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由。

  • 每一个Queue都与Exchange设置一个BindingKey
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

案例

  1. 在RabbitMQ控制台中,声明队列direct.queue1和direct.queue2
  2. 在RabbitMQ控制台中,声明交换机hmall.direct,将两个队列与其绑定
  3. 在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
  4. 在publisher中编写测试方法,利用不同的RoutingKey向hmall.direct发送消息
@Test
void testDirect(){
    rabbitTemplate.convertAndSend("hmall.direct", "dq1", "direct test"); // 只有按dq1绑定的queue可以接收
}

Topic交换机

TopicExchange与DirectExchange类似,区别在于routingKey可以是多个单词的列表,并且以 .分割。

Queue与Exchange指定BindingKey时可以使用通配符

  • # : 代指0个或多个单词
  • * : 代指一个单词

例子:

  • china.# : 以china开头的routingkey, 比如 china.sc.bz, china.sc 都可以路由到
  • china.* : 以china 开头的routingkey, 只有 china.sc可以接收到, china.sc.bz接收不到

描述下Direct交换机与Topic交换机的差异?

  • Topic交换机接收的消息RoutingKey可以是多个单词,以 .分割
  • Topic交换机与队列绑定时的bindingKey可以指定通配符
  • # : 代表0个或多个词
  • * : 代表1个词

队列交换机的声明

SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:

  • Queue:用于声明队列,可以用工厂类QueueBuilder构建
  • Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
  • Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建
  • 配置声明方式
package com.fei.mq.publisher.config;
 
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
@Configuration
public class RabbitMQConfig {
 
    /**
     * 声明FanoutExchange交换机
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("mall.fanout");
    }
 
    /**
     * 声明队列
     * @return
     */
    @Bean
    public Queue Queue1() {
        return new Queue("mall.queue1");
    }
 
    /**
     * 声明绑定关系
     * @param Queue1 已声明的队列
     * @param fanoutExchange 已声明的交换机
     * @return
     */
    @Bean
    public Binding bindingQueue(Queue Queue1, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(Queue1).to(fanoutExchange);
    }
}
  • 注解声明方式
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(name = "inject.queue1"),
                exchange = @Exchange(name = "inject.direct",type = ExchangeTypes.DIRECT),
                key = {"inject.d","d.i"}
        )
)
public void receiveDirect(String message) {
    System.out.println("消费者接收到Direct消息:" + message);
}
  • 声明队列、交换机、绑定关系的Bean是什么?
    • Queue
    • FanoutExchange、DirectExchange、TopicExchange
    • Binding
  • 基于@RabbitListener注解声明队列和交换机有哪些常见注解?
    • @Queue
    • @Exchange

消息转换器

需求: 测试利用SpringAMQP发送对象类型的消息

  1. 声明一个队列,名为object.queue
  2. 编写单元测试,向队列中直接发送一条消息,消息类型为Map
  3. 在控制台查看消息,总结你能发现的问题

Spring的对消息对象的处理是出 org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是 SimpleMessageConverter,基于JDK的 ObjectOutputStream完成序列化。存在下列问题:

  • JDK的序列化有安全风险
  • JDK序列化的消息太大
  • JDK序列化的消息可读性差

建议采用JSON序列化代替默认的JDK序列化,要做两件事情:在publisher和consumer中都要引入jackson依赖:

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>
@Bean
public MessageConverter messageConverter() {
    return new Jackson2JsonMessageConverter();
}
  • 发送者
@Test
void testMap(){
    HashMap<String, String> map = new HashMap<>();
    map.put("name","jack");
    rabbitTemplate.convertAndSend(  "hmall.topic","china", map);
}
  • 消费者
@RabbitListener(queues = "topic.queue1")
public void receiveTopic(Map<String,String > message) {
    System.out.println("消费者 map 绑定  = " + message);
}

消息发送者和消息消费者处理消息的类型需要保持一致