RabbitMQ详解(完结)
目录
一、MQ基础
1.MQ介绍
1.1同步调用
1.2.异步调用
1.3.技术选型
2.RabbitMQ
2.1安装
2.2.收发消息
2.2.1.交换机
2.2.2.队列
2.2.3.绑定关系
2.2.4.发送消息
2.3.数据隔离
3.SpringAMQP
3.1快速入门
3.2.WorkQueues模型
3.3.交换机类型编辑
3.4.Fanout交换机
3.5 Direct交换机
3.7.Topic交换机
3.7.java代码声明队列和交换机
3.8.消息格式转换器
二、MQ高级
1.发送者的可靠性
1.1.生产者重试机制
1.2.生产者确认机制
1.3.实现生产者确认
2.MQ的可靠性
2.1.数据持久化
2.2.LazyQueue
3.消费者的可靠性
3.1.消费者确认机制
3.2.失败重试机制
3.3.失败处理策略
3.4.业务幂等性
4.延迟消息
4.1.死信交换机和延迟消息
4.2.DelayExchange插件
4.3超时订单问题
一、MQ基础
1.MQ介绍
1.1同步调用

目前我们采用的是基于OpenFeign的同步调用,也就是说业务执行流程是这样的:
- 支付服务需要先调用用户服务完成余额扣减
- 然后支付服务自己要更新支付流水单的状态
- 然后支付服务调用交易服务,更新业务订单状态为已支付
三个步骤依次执行。
这其中就存在3个问题:
第一,拓展性差
我们目前的业务相对简单,但是随着业务规模扩大,产品的功能也在不断完善。
某些电商项目中,还会有积分或金币的概念。假如产品经理提出需求,用户支付成功后,给用户以积分奖励或者返还金币,你怎么办?是不是要在上述业务中再加入积分业务、返还金币业务?
最终你的支付业务会越来越臃肿:
也就是说每次有新的需求,现有支付逻辑都要跟着变化,代码经常变动,不符合开闭原则,拓展性不好
第二,性能下降
由于我们采用了同步调用,调用者需要等待服务提供者执行完返回结果后,才能继续向下执行,也就是说每次远程调用,调用者都是阻塞等待状态。最终整个业务的响应时长就是每次远程调用的执行时长之和:
假如每个微服务的执行时长都是50ms,则最终整个业务的耗时可能高达300ms,性能太差了
第三,级联失败
由于我们是基于OpenFeign调用交易服务、通知服务。当交易服务、通知服务出现故障时,整个事务都会回滚,交易失败。
这其实就是同步调用的级联失败问题。
而要解决这些问题,我们就必须用异步调用的方式来代替同步调用。
1.2.异步调用
步调用方式其实就是基于消息通知的方式,一般包含三个角色:
- 消息发送者:投递消息的人,就是原来的调用方
- 消息Broker:管理、暂存、转发消息,你可以把它理解成微信服务器
- 消息接收者:接收和处理消息的人,就是原来的服务提供方

在异步调用中,发送者不再直接同步调用接收者的业务接口,而是发送一条消息投递给消息Broker。然后接收者根据自己的需求从消息Broker那里订阅消息。每当发送方发送消息后,接受者都能获取消息并处理。
这样,发送消息的人和接收消息的人就完全解耦了。
还是以余额支付业务为例:

除了扣减余额、更新支付流水单状态以外,其它调用逻辑全部取消。而是改为发送一条消息到Broker。而相关的微服务都可以订阅消息通知,一旦消息到达Broker,则会分发给每一个订阅了的微服务,处理各自的业务。
假如产品经理提出了新的需求,比如要在支付成功后更新用户积分。支付代码完全不用变更,而仅仅是让积分服务也订阅消息即可:

不管后期增加了多少消息订阅者,作为支付服务来讲,执行问扣减余额、更新支付流水状态后,发送消息即可。业务耗时仅仅是这三部分业务耗时,仅仅100ms,大大提高了业务性能。
1.3.技术选型
消息Broker,目前常见的实现方案就是消息队列(MessageQueue),简称为MQ.
目比较常见的MQ实现:
- ActiveMQ
- RabbitMQ
- RocketMQ
- Kafka
几种常见MQ的对比:
| RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
| 公司/社区 | Rabbit | Apache | 阿里 | Apache |
| 开发语言 | Erlang | Java | Java | Scala&Java |
| 协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义协议 | 自定义协议 |
| 可用性 | 高 | 一般 | 高 | 高 |
| 单机吞吐量 | 一般 | 差 | 高 | 非常高 |
| 消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
| 消息可靠性 | 高 | 一般 | 高 | 一般 |
追求可用性:Kafka、 RocketMQ 、RabbitMQ
追求可靠性:RabbitMQ、RocketMQ
追求吞吐能力:RocketMQ、Kafka
追求消息低延迟:RabbitMQ、Kafka
2.RabbitMQ
RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:
https://www.rabbitmq.com/
接下来,我们就学习它的基本概念和基础用法。
2.1安装
我们同样基于Docker来安装RabbitMQ,使用下面的命令即可:
docker run \
-e RABBITMQ_DEFAULT_USER=itheima \
-e RABBITMQ_DEFAULT_PASS=123321 \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
--network hm-net\
-d \
rabbitmq:3.8-management
如果拉取镜像困难的话,就使用压缩包就可以
![]()
1.开放防火墙端口
2.用自己的IP端口
3.docker ps看看mq容器在不在运行,不在得打开
可以看到在安装命令中有两个映射的端口:
-
15672:RabbitMQ提供的管理控制台的端口
-
5672:RabbitMQ的消息发送处理接口
安装完成后,我们访问 http://192.168.150.101:15672即可看到管理控制台。首次访问需要登录,默认的用户名和密码在配置文件中已经指定了。

RabbitMQ对应的架构如图:

其中包含几个概念:
-
publisher:生产者,也就是发送消息的一方 -
consumer:消费者,也就是消费消息的一方 -
queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理 -
exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。 -
virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue
2.2.收发消息
2.2.1.交换机
我们打开Exchanges选项卡,可以看到已经存在很多交换机
我们点击任意交换机,即可进入交换机详情页面。仍然会利用控制台中的publish message 发送一条消息:
这里是由控制台模拟了生产者发送的消息。由于没有消费者存在,最终消息丢失了,这样说明交换机没有存储消息的能力。
2.2.2.队列
我们打开Queues选项卡,可以新建队列:
此时,我们再次向amq.fanout交换机发送一条消息。会发现消息依然没有到达队列!!
怎么回事呢?
发送到交换机的消息,只会路由到与其绑定的队列,因此仅仅创建队列是不够的,我们还需要将其与交换机绑定。
2.2.3.绑定关系
点击Exchanges选项卡,点击amq.fanout交换机,进入交换机详情页,然后点击Bindings菜单,在表单中填写要绑定的队列名称:


2.2.4.发送消息
这时候再发送消息
就收到了


2.3.数据隔离
2.3.1.用户管理
点击Admin选项卡,首先会看到RabbitMQ控制台的用户管理界面:

这里的用户都是RabbitMQ的管理或运维人员。目前只有安装RabbitMQ时添加的itheima这个用户。仔细观察用户表格中的字段,如下:
-
Name:itheima,也就是用户名 -
Tags:administrator,说明itheima用户是超级管理员,拥有所有权限 -
Can access virtual host:/,可以访问的virtual host,这里的/是默认的virtual host
比如,我们给黑马商城创建一个新的用户,命名为hmall:
2.3.2.virtual host
我们先退出登录:
切换到刚刚创建的hmall用户登录,然后点击Virtual Hosts菜单,进入virtual host管理页

可以看到目前只有一个默认的virtual host,名字为 /。
我们可以给黑马商城项目创建一个单独的virtual host,而不是使用默认的/。
创建完成后如图:

由于我们是登录hmall账户后创建的virtual host,因此回到users菜单,你会发现当前用户已经具备了对/hmall这个virtual host的访问权限了
此时,点击页面右上角的virtual host下拉菜单,切换virtual host为 /hmall
然后再次查看queues选项卡,会发现之前的队列已经看不到了:
这就是基于virtual host 的隔离效果。
用户与 Virtual Host 的关系
默认情况
新创建的用户默认没有访问任何 Virtual Host 的权限需要手动为用户分配 Virtual Host 权限
✅ 一个用户可以访问多个 Virtual Host
✅ 一个 Virtual Host 可以被多个用户访问
✅ 多对多关系提供灵活的权限管理
✅ 可以为不同用户在同一 Virtual Host 分配不同权限
3.SpringAMQP
将来我们开发业务功能的时候,肯定不会在控制台收发消息,而是应该基于编程的方式。由于RabbitMQ采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与RabbitMQ交互。并且RabbitMQ官方也提供了各种不同语言的客户端。
但是,RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。并且还基于SpringBoot对其实现了自动装配,使用起来非常方便
SpringAMQP提供了三个功能:
- 自动声明队列、交换机及其绑定关系
- 基于注解的监听器模式,异步接收消息
- 封装了RabbitTemplate工具,用于发送消息
3.1快速入门
我们都是经过交换机发送消息到队列,不过有时候为了测试方便,我们也可以直接向队列发送消息,跳过交换机。
在入门案例中,我们就演示这样的简单模型,如图
也就是:
-
publisher直接发送消息到队列
-
消费者监听并处理队列中的消息

3.2.1.消息发送
首先配置MQ地址,在publisher服务的application.yml中添加配置:
spring:
rabbitmq:
host: 192.168.150.101 # 你的虚拟机IP
port: 5672 # 端口
virtual-host: /hmall # 虚拟主机
username: hmall # 用户名
password: 123 # 密码
然后在publisher服务中编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送:
package com.itheima.publisher.amqp;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, spring amqp!";
// 发送消息
rabbitTemplate.convertAndSend(queueName, message);
}
}
打开控制台,可以看到消息已经发送到队列中:
3.2.2.消息接收
首先配置MQ地址,在consumer服务的application.yml中添加配置:
spring:
rabbitmq:
host: 192.168.150.101 # 你的虚拟机IP
port: 5672 # 端口
virtual-host: /hmall # 虚拟主机
username: hmall # 用户名
password: 123 # 密码
然后在consumer服务的com.itheima.consumer.listener包中新建一个类SpringRabbitListener,代码如下:
package com.itheima.consumer.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class SpringRabbitListener {
// 利用RabbitListener来声明要监听的队列信息
// 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。
// 可以看到方法体中接收的就是消息体的内容
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消费者接收到消息:【" + msg + "】");
}
}
结果

3.2.WorkQueues模型
Work queues,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。
当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。
此时就可以使用work 模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了。
接下来,我们就来模拟这样的场景。
首先,我们在控制台创建一个新的队列,命名为work.queue:
3.2.1.消息发送
这次我们循环发送,模拟大量消息堆积现象。
在publisher服务中的SpringAmqpTest类中添加一个测试方法:
/**
* workQueue
* 向队列中不停发送消息,模拟消息堆积。
*/
@Test
public void testWorkQueue() throws InterruptedException {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, message_";
for (int i = 0; i < 50; i++) {
// 发送消息,每20毫秒发送一次,相当于每秒发送50条消息
rabbitTemplate.convertAndSend(queueName, message + i);
Thread.sleep(20);
}
}
3.2.2.消息接收
要模拟多个消费者绑定同一个队列,我们在consumer服务的SpringRabbitListener中添加2个新的方法:
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(20);
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(200);
}
注意到这两消费者,都设置了Thead.sleep,模拟任务耗时:
-
消费者1 sleep了20毫秒,相当于每秒钟处理50个消息
-
消费者2 sleep了200毫秒,相当于每秒处理5个消息
3.2.3.测试
启动ConsumerApplication后,在执行publisher服务中刚刚编写的发送测试方法testWorkQueue。
最终结果如下:
消费者1接收到消息:【hello, message_0】21:06:00.869555300
消费者2........接收到消息:【hello, message_1】21:06:00.884518
消费者1接收到消息:【hello, message_2】21:06:00.907454400
消费者1接收到消息:【hello, message_4】21:06:00.953332100
消费者1接收到消息:【hello, message_6】21:06:00.997867300
消费者1接收到消息:【hello, message_8】21:06:01.042178700
消费者2........接收到消息:【hello, message_3】21:06:01.086478800
消费者1接收到消息:【hello, message_10】21:06:01.087476600
消费者1接收到消息:【hello, message_12】21:06:01.132578300
消费者1接收到消息:【hello, message_14】21:06:01.175851200
消费者1接收到消息:【hello, message_16】21:06:01.218533400
消费者1接收到消息:【hello, message_18】21:06:01.261322900
消费者2........接收到消息:【hello, message_5】21:06:01.287003700
消费者1接收到消息:【hello, message_20】21:06:01.304412400
消费者1接收到消息:【hello, message_22】21:06:01.349950100
消费者1接收到消息:【hello, message_24】21:06:01.394533900
消费者1接收到消息:【hello, message_26】21:06:01.439876500
消费者1接收到消息:【hello, message_28】21:06:01.482937800
消费者2........接收到消息:【hello, message_7】21:06:01.488977100
消费者1接收到消息:【hello, message_30】21:06:01.526409300
消费者1接收到消息:【hello, message_32】21:06:01.572148
消费者1接收到消息:【hello, message_34】21:06:01.618264800
消费者1接收到消息:【hello, message_36】21:06:01.660780600
消费者2........接收到消息:【hello, message_9】21:06:01.689189300
消费者1接收到消息:【hello, message_38】21:06:01.705261
消费者1接收到消息:【hello, message_40】21:06:01.746927300
消费者1接收到消息:【hello, message_42】21:06:01.789835
消费者1接收到消息:【hello, message_44】21:06:01.834393100
消费者1接收到消息:【hello, message_46】21:06:01.875312100
消费者2........接收到消息:【hello, message_11】21:06:01.889969500
消费者1接收到消息:【hello, message_48】21:06:01.920702500
消费者2........接收到消息:【hello, message_13】21:06:02.090725900
消费者2........接收到消息:【hello, message_15】21:06:02.293060600
消费者2........接收到消息:【hello, message_17】21:06:02.493748
消费者2........接收到消息:【hello, message_19】21:06:02.696635100
消费者2........接收到消息:【hello, message_21】21:06:02.896809700
消费者2........接收到消息:【hello, message_23】21:06:03.099533400
消费者2........接收到消息:【hello, message_25】21:06:03.301446400
消费者2........接收到消息:【hello, message_27】21:06:03.504999100
消费者2........接收到消息:【hello, message_29】21:06:03.705702500
消费者2........接收到消息:【hello, message_31】21:06:03.906601200
消费者2........接收到消息:【hello, message_33】21:06:04.108118500
消费者2........接收到消息:【hello, message_35】21:06:04.308945400
消费者2........接收到消息:【hello, message_37】21:06:04.511547700
消费者2........接收到消息:【hello, message_39】21:06:04.714038400
消费者2........接收到消息:【hello, message_41】21:06:04.916192700
消费者2........接收到消息:【hello, message_43】21:06:05.116286400
消费者2........接收到消息:【hello, message_45】21:06:05.318055100
消费者2........接收到消息:【hello, message_47】21:06:05.520656400
消费者2........接收到消息:【hello, message_49】21:06:05.723106700
可以看到消费者1和消费者2竟然每人消费了25条消息:
-
消费者1很快完成了自己的25条消息
-
消费者2却在缓慢的处理自己的25条消息。
也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。导致1个消费者空闲,另一个消费者忙的不可开交。没有充分利用每一个消费者的能力,最终消息处理的耗时远远超过了1秒。这样显然是有问题的。
3.2.4.能者多劳
在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
再次测试,发现结果如下:
消费者1接收到消息:【hello, message_0】21:12:51.659664200
消费者2........接收到消息:【hello, message_1】21:12:51.680610
消费者1接收到消息:【hello, message_2】21:12:51.703625
消费者1接收到消息:【hello, message_3】21:12:51.724330100
消费者1接收到消息:【hello, message_4】21:12:51.746651100
消费者1接收到消息:【hello, message_5】21:12:51.768401400
消费者1接收到消息:【hello, message_6】21:12:51.790511400
消费者1接收到消息:【hello, message_7】21:12:51.812559800
消费者1接收到消息:【hello, message_8】21:12:51.834500600
消费者1接收到消息:【hello, message_9】21:12:51.857438800
消费者1接收到消息:【hello, message_10】21:12:51.880379600
消费者2........接收到消息:【hello, message_11】21:12:51.899327100
消费者1接收到消息:【hello, message_12】21:12:51.922828400
消费者1接收到消息:【hello, message_13】21:12:51.945617400
消费者1接收到消息:【hello, message_14】21:12:51.968942500
消费者1接收到消息:【hello, message_15】21:12:51.992215400
消费者1接收到消息:【hello, message_16】21:12:52.013325600
消费者1接收到消息:【hello, message_17】21:12:52.035687100
消费者1接收到消息:【hello, message_18】21:12:52.058188
消费者1接收到消息:【hello, message_19】21:12:52.081208400
消费者2........接收到消息:【hello, message_20】21:12:52.103406200
消费者1接收到消息:【hello, message_21】21:12:52.123827300
消费者1接收到消息:【hello, message_22】21:12:52.146165100
消费者1接收到消息:【hello, message_23】21:12:52.168828300
消费者1接收到消息:【hello, message_24】21:12:52.191769500
消费者1接收到消息:【hello, message_25】21:12:52.214839100
消费者1接收到消息:【hello, message_26】21:12:52.238998700
消费者1接收到消息:【hello, message_27】21:12:52.259772600
消费者1接收到消息:【hello, message_28】21:12:52.284131800
消费者2........接收到消息:【hello, message_29】21:12:52.306190600
消费者1接收到消息:【hello, message_30】21:12:52.325315800
消费者1接收到消息:【hello, message_31】21:12:52.347012500
消费者1接收到消息:【hello, message_32】21:12:52.368508600
消费者1接收到消息:【hello, message_33】21:12:52.391785100
消费者1接收到消息:【hello, message_34】21:12:52.416383800
消费者1接收到消息:【hello, message_35】21:12:52.439019
消费者1接收到消息:【hello, message_36】21:12:52.461733900
消费者1接收到消息:【hello, message_37】21:12:52.485990
消费者1接收到消息:【hello, message_38】21:12:52.509219900
消费者2........接收到消息:【hello, message_39】21:12:52.523683400
消费者1接收到消息:【hello, message_40】21:12:52.547412100
消费者1接收到消息:【hello, message_41】21:12:52.571191800
消费者1接收到消息:【hello, message_42】21:12:52.593024600
消费者1接收到消息:【hello, message_43】21:12:52.616731800
消费者1接收到消息:【hello, message_44】21:12:52.640317
消费者1接收到消息:【hello, message_45】21:12:52.663111100
消费者1接收到消息:【hello, message_46】21:12:52.686727
消费者1接收到消息:【hello, message_47】21:12:52.709266500
消费者2........接收到消息:【hello, message_48】21:12:52.725884900
消费者1接收到消息:【hello, message_49】21:12:52.746299900
可以发现,由于消费者1处理速度较快,所以处理了更多的消息;消费者2处理速度较慢,只处理了6条消息。而最终总的执行耗时也在1秒左右,大大提升。
正所谓能者多劳,这样充分利用了每一个消费者的处理能力,可以有效避免消息积压问题。
3.3.交换机类型
可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:
- Publisher:生产者,不再发送消息到队列中,而是发给交换机
- Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
- Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。
- Consumer:消费者,与以前一样,订阅队列,没有变化
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
交换机的类型有四种:
- Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
- Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
- Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
- Headers:头匹配,基于MQ的消息头匹配,用的较少。
3.4.Fanout交换机
Fanout,英文翻译是扇出,我觉得在MQ中叫广播更合适。
在广播模式下,消息发送流程是这样的:
- 1) 可以有多个队列
- 2) 每个队列都要绑定到Exchange(交换机)
- 3) 生产者发送的消息,只能发送到交换机
- 4) 交换机把消息发送给绑定过的所有队列
- 5) 订阅队列的消费者都能拿到消息
案例:

- 创建一个名为
hmall.fanout的交换机,类型是Fanout - 创建两个队列
fanout.queue1和fanout.queue2,绑定到交换机hmall.fanout
3.5.1.声明队列和交换机
在控制台创建队列fanout.queue1:
在创建一个队列fanout.queue2:

然后再创建一个交换机:

然后绑定两个队列到交换机:
3.5.2.消息发送
在publisher服务的SpringAmqpTest类中添加测试方法:
@Test
public void testFanoutExchange() {
// 交换机名称
String exchangeName = "hmall.fanout";
// 消息
String message = "hello, everyone!";
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
3.5.3.消息接收
在consumer服务的SpringRabbitListener中添加两个方法,作为消费者:
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}
总结
交换机的作用是什么?
- 接收publisher发送的消息
- 将消息按照规则路由到与之绑定的队列
- 不能缓存消息,路由失败,消息丢失
- FanoutExchange的会将消息路由到每个绑定的队列
3.5 Direct交换机
在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey(路由key) - 消息的发送方在 向 Exchange发送消息时,也必须指定消息的
RoutingKey。 - Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key进行判断,只有队列的Routingkey与消息的Routing key完全一致,才会接收到消息
案例:

- 声明一个名为
hmall.direct的交换机 - 声明队列
direct.queue1,绑定hmall.direct,bindingKey为blud和red - 声明队列
direct.queue2,绑定hmall.direct,bindingKey为yellow和red - 在
consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2 - 在publisher中编写测试方法,向
hmall.direct发送消息
3.6.1.声明队列和交换机
首先在控制台声明两个队列direct.queue1和direct.queue2
略
然后声明一个direct类型的交换机,命名为hmall.direct:
然后使用red和blue作为key,绑定direct.queue1到hmall.direct:
同理,使用red和yellow作为key,绑定direct.queue2到hmall.direct,步骤略,
3.6.2.消息接收
在consumer服务的SpringRabbitListener中添加方法:
@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) {
System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}
@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) {
System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}
3.6.3.消息发送
在publisher服务的SpringAmqpTest类中添加测试方法:
@Test
public void testSendDirectExchange() {
// 交换机名称
String exchangeName = "hmall.direct";
// 消息
String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "red", message);
}
由于使用的red这个key,所以两个消费者都收到了消息:
我们再切换为blue这个key:
@Test
public void testSendDirectExchange() {
// 交换机名称
String exchangeName = "hmall.direct";
// 消息
String message = "最新报道,哥斯拉是居民自治巨型气球,虚惊一场!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "blue", message);
}
只有消费者1收到了消息:

总结
描述下Direct交换机与Fanout交换机的差异?
- Fanout交换机将消息路由给每一个与之绑定的队列
- Direct交换机根据RoutingKey判断路由给哪个队列
- 如果多个队列具有相同的RoutingKey,则与Fanout功能类似
3.7.Topic交换机
3.7.1.说明
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。
只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符!
BindingKey 一般都是有一个或多个单词组成,多个单词之间以.分割,例如: item.insert
通配符规则:
-
#:匹配一个或多个词 -
*:匹配不多不少恰好1个词
举例:
-
item.#:能够匹配item.spu.insert或者item.spu -
item.*:只能匹配item.spu
图示:
案例:
假如此时publisher发送的消息使用的RoutingKey共有四种:
-
china.news代表有中国的新闻消息; -
china.weather代表中国的天气消息; -
japan.news则代表日本新闻 -
japan.weather代表日本的天气消息;
解释:
-
topic.queue1:绑定的是china.#,凡是以china.开头的routing key都会被匹配到,包括:-
china.news -
china.weather
-
-
topic.queue2:绑定的是#.news,凡是以.news结尾的routing key都会被匹配。包括:-
china.news -
japan.news
-
首先,在控制台按照图示例子创建队列、交换机,并利用通配符绑定队列和交换机。此处步骤略。最终结果如下:

3.7.2.消息发送
在publisher服务的SpringAmqpTest类中添加测试方法:
/**
* topicExchange
*/
@Test
public void testSendTopicExchange() {
// 交换机名称
String exchangeName = "hmall.topic";
// 消息
String message = "喜报!孙悟空大战哥斯拉,胜!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}
3.7.3.消息接收
在consumer服务的SpringRabbitListener中添加方法:
@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg){
System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}
@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg){
System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}
总结
描述下Direct交换机与Topic交换机的差异?
- Topic交换机接收的消息RoutingKey必须是多个单词,以
.分割 - Topic交换机与队列绑定时的bindingKey可以指定通配符
-
#:代表0个或多个词 -
*:代表1个词
3.7.java代码声明队列和交换机
在之前我们都是基于RabbitMQ控制台来创建队列、交换机。但是在实际开发时,队列和交换机是程序员定义的,将来项目上线,又要交给运维去创建。那么程序员就需要把程序中运行的所有队列和交换机都写下来,交给运维。在这个过程中是很容易出现错误的。
因此推荐的做法是由程序启动时检查队列和交换机是否存在,如果不存在自动创建。
3.8.1.基本API
SpringAMQP提供了一个Queue类,用来创建队列:
SpringAMQP还提供了一个Exchange接口,来表示所有不同类型的交换机:
我们可以自己创建队列和交换机,不过SpringAMQP还提供了ExchangeBuilder来简化这个过程:
而在绑定队列和交换机时,则需要使用BindingBuilder来创建Binding对象:
3.8.2.fanout示例/direct示例
在consumer中创建一个类,声明队列和交换机:
package com.itheima.consumer.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutConfig {
/**
* 声明交换机
* @return Fanout类型交换机
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("hmall.fanout");
}
/**
* 第1个队列
*/
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
/**
* 第2个队列
*/
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
direct模式由于要绑定多个KEY,会非常麻烦,每一个Key都要编写一个binding:
package com.itheima.consumer.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DirectConfig {
/**
* 声明交换机
* @return Direct类型交换机
*/
@Bean
public DirectExchange directExchange(){
return ExchangeBuilder.directExchange("hmall.direct").build();
}
/**
* 第1个队列
*/
@Bean
public Queue directQueue1(){
return new Queue("direct.queue1");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){
return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){
return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
}
/**
* 第2个队列
*/
@Bean
public Queue directQueue2(){
return new Queue("direct.queue2");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){
return BindingBuilder.bind(directQueue2).to(directExchange).with("red");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){
return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");
}
}
这里不会导致 AOP 失效的原因:
1. @Configuration 类的特殊代理
Spring 会对
@Configuration类进行 CGLIB 代理:java
@Configuration public class FanoutConfig { // 这个类会被 Spring 代理,所有 @Bean 方法调用都会被拦截 }2. Spring 的处理流程
java
@Bean public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){ // 这里的 fanoutQueue2 和 fanoutExchange 参数是 Spring 注入的 // 不是直接调用 fanoutQueue2() 方法! return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); }实际发生的情况:
Spring 先创建
fanoutQueue2Bean 和fanoutExchangeBean然后调用
bindingQueue2()方法,并自动注入这两个参数不是通过调用
fanoutQueue2()方法来获取实例3. 错误的使用方式(会导致 AOP 失效)
java
@Configuration public class WrongConfig { @Bean public Queue queue1() { return new Queue("queue1"); } @Bean public Binding wrongBinding() { // 错误:直接调用方法,会绕过 Spring 代理 Queue queue = queue1(); // 这会导致新实例,AOP 失效 return BindingBuilder.bind(queue).to(exchange()); } @Bean public FanoutExchange exchange() { return new FanoutExchange("test"); } }
3.8.3.基于注解声明
基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。
例如,我们同样声明Direct模式的交换机和队列
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}
再试试Topic模式:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueue1(String msg){
System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenTopicQueue2(String msg){
System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}
3.8.消息格式转换器
Spring的消息发送代码接收的消息体是一个Object:
而在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。
只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:
- 数据体积过大
- 有安全漏洞
- 可读性差
3.9.1.测试默认转换器
发送消息
@Test
public void testSendMap() throws InterruptedException {
// 准备消息
Map<String,Object> msg = new HashMap<>();
msg.put("name", "柳岩");
msg.put("age", 21);
// 发送消息
rabbitTemplate.convertAndSend("object.queue", msg);
}
发送消息后查看控制台:
可以看到消息格式非常不友好。
3.9.2.配置JSON转换器
显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。
在publisher和consumer两个服务中都引入依赖:
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
注意,如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。
配置消息转换器,在publisher和consumer两个服务的启动类中添加一个Bean即可:
@Bean
public MessageConverter messageConverter(){
// 1.定义消息转换器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jackson2JsonMessageConverter.setCreateMessageIds(true);
return jackson2JsonMessageConverter;
}
此时,我们到MQ控制台删除object.queue中的旧的消息。然后再次执行刚才的消息发送的代码,到MQ的控制台查看消息结构:

3.9.3.消费者接收Object
我们在consumer服务中定义一个新的消费者,publisher是用Map发送,那么消费者也一定要用Map接收,格式如下:
@RabbitListener(queues = "object.queue")
public void listenSimpleQueueMessage(Map<String, Object> msg) throws InterruptedException {
System.out.println("消费者接收到object.queue消息:【" + msg + "】");
}
二、MQ高级
消息从发送者发送消息,到消费者处理消息,需要经过的流程是这样的:

消息从生产者到消费者的每一步都可能导致消息丢失:
-
发送消息时丢失:
-
生产者发送消息时连接MQ失败
-
生产者发送消息到达MQ后未找到
Exchange -
生产者发送消息到达MQ的
Exchange后,未找到合适的Queue -
消息到达MQ后,处理消息的进程发生异常
-
-
MQ导致消息丢失:
-
消息到达MQ,保存到队列后,尚未消费就突然宕机
-
-
消费者处理消息时:
-
消息接收后尚未处理突然宕机
-
消息接收后处理过程中抛出异常
-
综上,我们要解决消息丢失问题,保证MQ的可靠性,就必须从3个方面入手:
-
确保生产者一定把消息发送到MQ
-
确保MQ不会将消息弄丢
-
确保消费者一定要处理消息
1.发送者的可靠性
1.1.生产者重试机制
首先第一种情况,就是生产者发送消息时,出现了网络故障,导致与MQ的连接中断。
为了解决这个问题,SpringAMQP提供的消息发送时的重试机制。即:当RabbitTemplate与MQ连接超时后,多次重试。
修改publisher模块的application.yaml文件,添加下面的内容:
spring:
rabbitmq:
connection-timeout: 1s # 设置MQ的连接超时时间
template:
retry:
enabled: true # 开启超时重试机制
initial-interval: 1000ms # 失败后的初始等待时间
multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
max-attempts: 3 # 最大重试次数
当利用命令停掉RabbitMQ服务时:
docker stop mq
然后测试发送一条消息,会发现会每隔1秒重试1次,总共重试了3次。消息发送的超时重试机制配置成功了!
注意:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。
如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。
1.2.生产者确认机制
一般情况下,只要生产者与MQ之间的网路连接顺畅,基本不会出现发送消息丢失的情况,因此大多数情况下我们无需考虑这种问题。
不过,在少数情况下,也会出现消息发送到MQ之后丢失的现象,比如:
- MQ内部处理消息的进程发生了异常
- 生产者发送消息到达MQ后未找到
Exchange - 生产者发送消息到达MQ的
Exchange后,未找到合适的Queue,因此无法路由
针对上述情况,RabbitMQ提供了生产者消息确认机制,包括Publisher Confirm和Publisher Return两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的回执。
具体如图所示:
总结如下:
- 当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack的确认信息,代表投递成功
- 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
- 持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功
- 其它情况都会返回NACK,告知投递失败
其中ack和nack属于Publisher Confirm机制,ack是投递成功;nack是投递失败。而return则属于Publisher Return机制。
默认两种机制都是关闭状态,需要通过配置文件来开启。
1.3.实现生产者确认
1)开启生产者确认
spring:
rabbitmq:
publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
publisher-returns: true # 开启publisher return机制
这里publisher-confirm-type有三种模式可选:
-
none:关闭confirm机制 -
simple:同步阻塞等待MQ的回执 -
correlated:MQ异步回调返回回执
一般我们推荐使用correlated,回调机制。
2)定义ReturnCallback
每个RabbitTemplate只能配置一个ReturnCallback,因此我们可以在配置类中统一设置。我们在publisher模块定义一个配置类:
package com.itheima.publisher.config;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {
private final RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
log.error("触发return callback,");
log.debug("exchange: {}", returned.getExchange());
log.debug("routingKey: {}", returned.getRoutingKey());
log.debug("message: {}", returned.getMessage());
log.debug("replyCode: {}", returned.getReplyCode());
log.debug("replyText: {}", returned.getReplyText());
}
});
}
}
private final RabbitTemplate rabbitTemplate;这确实是注入的! 原因:
@AllArgsConstructor注解会生成包含所有字段的构造器Spring 会自动通过构造器注入
RabbitTemplate这是设置 Return 回调,用于处理消息路由失败的情况。
触发时机:
消息成功到达 Exchange
但 Exchange 无法将消息路由到任何 Queue
mandatory=true 时才会触发(默认false,需要手动开启)
3)定义ConfirmCallback
由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数:
这里的CorrelationData中包含两个核心的东西:
-
id:消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆 -
SettableListenableFuture:回执结果的Future对象
将来MQ的回执就会通过这个Future来返回,我们可以提前给CorrelationData中的Future添加回调函数来处理消息回执:
我们新建一个测试,向系统自带的交换机发送消息,并且添加ConfirmCallback:
@Test
void testPublisherConfirm() {
// 1.创建CorrelationData
CorrelationData cd = new CorrelationData();
// 2.给Future添加ConfirmCallback
cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
@Override
public void onFailure(Throwable ex) {
// 2.1.Future发生异常时的处理逻辑,基本不会触发
log.error("send message fail", ex);
}
@Override
public void onSuccess(CorrelationData.Confirm result) {
// 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容
if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执
log.debug("发送消息成功,收到 ack!");
}else{ // result.getReason(),String类型,返回nack时的异常描述
log.error("发送消息失败,收到 nack, reason : {}", result.getReason());
}
}
});
// 3.发送消息
rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
}
CorrelationData的作用:这是消息关联数据,用于:
关联消息和确认回执
携带业务标识(如订单ID)
获取异步确认结果
cd.getFuture().addCallback()的含义:这是添加异步回调,监听 Confirm 确认结果。
Confirm结果 Return结果 含义 ACK 无Return 消息完全成功(到达Broker且路由成功) ACK 有Return 消息到达Broker,但路由失败 NACK – 消息根本没能到达Broker
什么是"回调"(Callback)?
回调就像是"留下电话号码,等有事时通知你"的编程模式。
现实生活比喻:
text
你去餐厅吃饭: 1. 你:点餐后留下手机号(注册回调函数) 2. 餐厅:准备餐食(执行主流程) 3. 餐厅:餐好了打电话通知你(触发回调) 4. 你:收到电话去取餐(执行回调函数)1.
setReturnsCallback– 设置Return回调java
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returned) { // 当消息路由失败时,RabbitMQ会"回调"这个函数 log.error("消息路由失败: {}", returned.getReplyText()); } });2.
cd.getFuture().addCallback()– 设置Confirm回调java
CorrelationData cd = new CorrelationData(); cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() { @Override public void onSuccess(CorrelationData.Confirm result) { // 当RabbitMQ确认收到消息时,Spring会"回调"这个函数 if(result.isAck()) {// 1. 设置回调(预先登记) rabbitTemplate.setReturnsCallback(returned -> { System.out.println("Return回调被触发: " + returned.getReplyText()); }); CorrelationData cd = new CorrelationData(); cd.getFuture().addCallback(result -> { System.out.println("Confirm回调被触发: " + result.isAck()); }); // 2. 发送消息 rabbitTemplate.convertAndSend("exchange", "routingKey", "message", cd); // 3. 底层发生的事件序列: // [网络层面] // a) 应用程序 → RabbitMQ: 发送消息 // b) RabbitMQ处理消息... // c) RabbitMQ → 应用程序: 发送Confirm确认(ack/nack) // d) (如果路由失败) RabbitMQ → 应用程序: 发送Return消息 // // [Spring AMQP层面] // a) Spring接收到Confirm确认 → 调用 cd.getFuture().setResult() // b) Spring接收到Return消息 → 调用 returnsCallback.returnedMessage() // // [你的代码层面] // a) Confirm回调函数自动执行 // b) Return回调函数自动执行log.debug("Broker确认收到消息"); } } });
RabbitTemplate中还有其他的内部接口:java
public class RabbitTemplate { // Confirm回调接口 @FunctionalInterface public interface ConfirmCallback { void confirm(CorrelationData correlationData, boolean ack, String cause); } // Return回调接口 @FunctionalInterface public interface ReturnsCallback { void returnedMessage(ReturnedMessage returned); } }
注意:
开启生产者确认比较消耗MQ性能,一般不建议开启。而且大家思考一下触发确认的几种情况:
路由失败:一般是因为RoutingKey错误导致,往往是编程导致
交换机名称错误:同样是编程错误导致
MQ内部故障:这种需要处理,但概率往往较低。因此只有对消息可靠性要求非常高的业务才需要开启,而且仅仅需要开启ConfirmCallback处理nack就可以了
2.MQ的可靠性
消息到达MQ以后,如果MQ不能及时保存,也会导致消息丢失,所以MQ的可靠性也非常重要。
2.1.数据持久化
为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置数据持久化,包括:
- 交换机持久化
- 队列持久化
- 消息持久化
1)交换机持久化
在控制台的Exchanges页面,添加交换机时可以配置交换机的Durability参数:

设置为Durable就是持久化模式,Transient就是临时模式。
2)队列持久化
在控制台的Queues页面,添加队列时,同样可以配置队列的Durability参数:

除了持久化以外,你可以看到队列还有很多其它参数,有一些我们会在后期学习。
3)消息持久化
在控制台发送消息的时候,可以添加很多参数,而消息的持久化是要配置一个properties:

如果交换机未设置为持久化,在 RabbitMQ 重启后,该交换机及其绑定关系将丢失,所有依赖该交换机的消息传递逻辑将无法正常工作。
队列持久化,就是单纯能保留队列的壳子,想保存信息,必须消息持久化
2.2.LazyQueue
在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。但在某些特殊情况下,这会导致消息积压,比如:
-
消费者宕机或出现网络故障
-
消息发送量激增,超过了消费者处理速度
-
消费者处理业务发生阻塞
(LazyQueue已经成为所有队列的默认格式)
1)在添加队列的时候,添加x-queue-mod=lazy参数即可设置队列为Lazy模式

2)代码配置Lazy模式
在利用SpringAMQP声明队列的时候,添加x-queue-mod=lazy参数也可设置队列为Lazy模式:
@Bean
public Queue lazyQueue(){
return QueueBuilder
.durable("lazy.queue")
.lazy() // 开启Lazy模式
.build();
}
这里是通过QueueBuilder的lazy()函数配置Lazy模式,底层源码如下:

当然,我们也可以基于注解来声明队列并设置为Lazy模式:
@RabbitListener(queuesToDeclare = @Queue(
name = "lazy.queue",
durable = "true",
arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){
log.info("接收到 lazy.queue的消息:{}", msg);
}
3)更新已有队列为lazy模式
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
命令解读:
-
rabbitmqctl:RabbitMQ的命令行工具 -
set_policy:添加一个策略 -
Lazy:策略名称,可以自定义 -
"^lazy-queue$":用正则表达式匹配队列的名字 -
'{"queue-mode":"lazy"}':设置队列模式为lazy模式 -
--apply-to queues:策略的作用对象,是所有的队列
mq在内存满的情况下,会持久化一部分到磁盘,然而这个过程较为耗时,这个过程中的消息发过来则就相当于丢失了。这个过程是阻塞式的,mq就不能处理消息了。每一次把消息从内存写出到磁盘,做page out的过程中,mq处于阻塞状态,消息处理会直接降为0。这就是基于内存带来的影响。所以需要将消息设置为持久化的。
1. 普通持久化消息 (Persistent Messages)
存储行为:
消息同时写入内存 + 磁盘
内存中保留消息副本供快速消费
磁盘上立即持久化到消息存储文件
消费者确认后从磁盘删除
2. 非持久化消息 (Non-Persistent Messages)
存储行为:(pageout时会阻塞mq)
消息只写入内存
正常情况下不写入磁盘
内存压力大时触发
pageout到磁盘重启后消息丢失
3. Lazy Queue (惰性队列)
存储行为:
消息立即写入磁盘
内存中不保留消息副本
消费时才从磁盘加载到内存
专门为高吞吐、消息积压设计
模式 内存存储 磁盘存储 存储时机 非持久化 ✅ 主要存储 ❌ 仅pageout时 内存满时被动写入 普通持久化 ✅ 活跃消息 ✅ 所有消息 消息到达时立即写入 Lazy Queue ❌ 不存储 ✅ 所有消息 消息到达时立即写入
public class PerformanceComparison { // 写入性能(从高到低) // 1. 非持久化消息 - 无磁盘IO // 2. Lazy Queue - 异步刷盘 // 3. 普通持久化 - 同步/半同步刷盘 // 读取性能(从高到低) // 1. 非持久化消息 - 纯内存访问 // 2. 普通持久化 - 大部分在内存 // 3. Lazy Queue - 需要磁盘IO // 内存使用(从低到高) // 1. Lazy Queue - 极低内存占用 // 2. 普通持久化 - 中等内存占用 // 3. 非持久化消息 - 高内存占用 }Spring AMQP 配置
java
@Configuration public class RabbitMQConfig { // 1. 普通持久化队列 @Bean public Queue persistentQueue() { return new Queue("orders.queue", true); // durable = true } // 2. 非持久化队列 @Bean public Queue nonPersistentQueue() { return new Queue("metrics.queue", false); // durable = false } // 3. 惰性队列 @Bean public Queue lazyQueue() { Map<String, Object> args = new HashMap<>(); args.put("x-queue-mode", "lazy"); return new Queue("backup.queue", true, false, false, args); } }
为什么非持久化的pageout会带来阻塞?
一是非持久化的时候会有大量的磁盘io,这是会造成cpu忙碌,并且mq会有保护机制,使得这时候阻塞无法处理消息
二是非持久化是大量的io随机刷盘,速度很慢,但是lazy和持久化的话mq的底层采取了优化的策略
// 1. 顺序写入优化 private void writeToJournal() { // 所有消息追加到同一个日志文件 // 顺序写入比随机写入快100倍 }
3.消费者的可靠性
当RabbitMQ向消费者投递消息以后,需要知道消费者的处理状态如何。因为消息投递给消费者并不代表就一定被正确消费了,可能出现的故障有很多,比如:
- 消息投递的过程中出现了网络故障
- 消费者接收到消息后突然宕机
- 消费者接收到消息后,因处理不当导致异常
- …
一旦发生上述情况,消息也会丢失。因此,RabbitMQ必须知道消费者的处理状态,一旦消息处理失败才能重新投递消息。
但问题来了:RabbitMQ如何得知消费者的处理状态呢?
3.1.消费者确认机制
为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:
- ack:成功处理消息,RabbitMQ从队列中删除该消息
- nack:消息处理失败,RabbitMQ需要再次投递消息
- reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息
一般reject方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过try catch机制捕获,消息处理成功时返回ack,处理失败时返回nack.
由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:
-
none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用 -
manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活 -
auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:-
如果是业务异常,会自动返回
nack; -
如果是消息处理或校验异常,自动返回
reject;
-

通过下面的配置可以修改SpringAMQP的ACK处理方式:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: none # 不做处理
修改consumer服务的SpringRabbitListener类中的方法,模拟一个消息处理的异常:
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
log.info("spring 消费者接收到消息:【" + msg + "】");
if (true) {
throw new MessageConversionException("故意的");
}
log.info("消息处理完成");
}
测试可以发现:当消息处理发生异常时,消息依然被RabbitMQ删除了。
我们再次把确认机制修改为auto:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto # 自动ack
由于抛出的是消息转换异常,因此Spring会自动返回reject,所以消息依然会被删除:
我们将异常改为RuntimeException类型:
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
log.info("spring 消费者接收到消息:【" + msg + "】");
if (true) {
throw new RuntimeException("故意的");
}
log.info("消息处理完成");
}
由于抛出的是业务异常,所以Spring返回ack,最终消息恢复至Ready状态,并且没有被RabbitMQ删除
当我们把配置改为auto时,消息处理失败后,会回到RabbitMQ,并重新投递到消费者。
3.2.失败重试机制
当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。
极端情况就是消费者一直无法执行成功,那么消息requeue就会无限循环,导致mq的消息处理飙升,带来不必要的压力:
当然,上述极端情况发生的概率还是非常低的,不过不怕一万就怕万一。为了应对上述情况Spring又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。
修改consumer服务的application.yml文件,添加内容:
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000ms # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
重启consumer服务,重复之前的测试。可以发现:
-
消费者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次
-
本地重试3次以后,抛出了
AmqpRejectAndDontRequeueException异常。查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是reject
上述引入了auto之后,对返回的三种状态有了基本的管理的,当返回ack的时候,消息直接删除,当返回reject和nack的时候,重新requeue(默认配置defaultRequeuerejected = true),这是 Spring 对 AMQP 协议的高级封装,不是 RabbitMQ 原生的行为。不过每次失败重新requeue也不是个办法,有没有办法,当消息达到消费者端,占用一个消费者线程的时候,增加几次重试的机会呢??这个就是所谓的失败重试机制,准格尔是Spring提供的retry机制。同时需要注意的是,这里所谓的重试是本地重试的,也就是在jvm中,占用了消费者线程对消费方法进行了不断重试。下面的stateless表示的是每次重试都不保存上下文的。(每次重试都是独立的,不记录之前的重试状态重试之间不会保持事务状态)
3.3.失败处理策略
在之前的测试中,本地测试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。
因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的,它有3个不同实现:
-
RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式 -
ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队 -
RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
默认的是第一个,所以上文中重试三次后会删除,报reject
比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。
1)在consumer服务中定义处理失败消息的交换机和队列
2)定义一个RepublishMessageRecoverer,关联队列和交换机
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
// 在consumer服务中定义处理失败消息的交换机和队列
public class ErrorMessageConfig {
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
// 定义一个RepublishMessageRecoverer,关联队列和交换机
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}
核心作用:条件化配置
这个注解的意思是:只有当配置文件中
spring.rabbitmq.listener.simple.retry.enabled=true时,才会创建这个配置类中的所有Bean。🔄 工作流程解析
java
@Configuration @ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true") public class ErrorMessageConfig { // 只有当重试机制开启时,下面的Bean才会被创建 }场景1:重试开启时 (
enabled=true)yaml
spring: rabbitmq: listener: simple: retry: enabled: true # 重试机制开启 max-attempts: 3
✅
ErrorMessageConfig配置类生效✅ 创建
error.direct交换器✅ 创建
error.queue队列✅ 创建绑定关系
✅ 创建
RepublishMessageRecoverer结果:重试失败的消息会被转发到错误队列
场景2:重试关闭时 (
enabled=false或 未配置)yaml
spring: rabbitmq: listener: simple: retry: enabled: false # 重试机制关闭
❌
ErrorMessageConfig配置类不生效❌ 所有Bean都不会创建
❌ 没有错误交换器和队列
结果:重试失败的消息不会被特殊处理
3.4.业务幂等性
何为幂等性?
幂等是一个数学概念,用函数表达式来描述是这样的:f(x) = f(f(x)),例如求绝对值函数。
在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。例如:
- 根据id删除数据
- 查询数据
- 新增数据
但数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。比如:
- 取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增加的情况
- 退款业务。重复退款对商家而言会有经济损失。
所以,我们要尽可能避免业务被重复执行。
然而在实际业务场景中,由于意外经常会出现业务被重复执行的情况,例如:
- 页面卡顿时频繁刷新导致表单重复提交
- 服务间调用的重试
- MQ消息的重复投递
我们在用户支付成功后会发送MQ消息到交易服务,修改订单状态为已支付,就可能出现消息重复投递的情况。如果消费者不做判断,很有可能导致消息被消费多次,出现业务故障。
举例:
- 假如用户刚刚支付完成,并且投递消息到交易服务,交易服务更改订单为已支付状态。
- 由于某种原因,例如网络故障导致生产者没有得到确认,隔了一段时间后重新投递给交易服务。
- 但是,在新投递的消息被消费之前,用户选择了退款,将订单状态改为了已退款状态。
- 退款完成后,新投递的消息才被消费,那么订单状态会被再次改为已支付。业务异常。
1)唯一消息ID
这个思路非常简单:
- 每一条消息都生成一个唯一的id,与消息一起投递给消费者。
- 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
- 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。
SpringAMQP的MessageConverter自带了MessageID的功能,我们只要开启这个功能即可。
以Jackson的消息转换器为例:
@Bean
public MessageConverter messageConverter(){
// 1.定义消息转换器
Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jjmc.setCreateMessageIds(true);
return jjmc;
}

2)业务判断
业务判断就是基于业务本身的逻辑或状态来判断是否是重复的请求或消息,不同的业务场景判断的思路也不一样。
例如我们当前案例中,处理消息的业务逻辑是把订单状态从未支付修改为已支付。因此我们就可以在执行业务时判断订单状态是否是未支付,如果不是则证明订单已经被处理过,无需重复处理。
相比较而言,消息ID的方案需要改造原有的数据库,所以我更推荐使用业务判断的方案。
UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1
我们在where条件中除了判断id以外,还加上了status必须为1的条件。如果条件不符(说明订单已支付),则SQL匹配不到数据,根本不会执行。
首先仅依赖 MQ 的生产者确认和消费者确认机制,确保消息的可靠投递。
如果业务确实需要更高的可靠性,可以考虑引入幂等性设计,即允许重复处理同一条消息。
如果担心 MQ 故障,可以考虑引入消息补偿机制,例如将关键信息持久化到数据库,定期检查和补偿。
如果上述方案仍不满足需求,再考虑引入定时任务等机制作为补充。但要注意控制复杂度,减少对系统的干扰。
4.延迟消息
在电商的支付业务中,对于一些库存有限的商品,为了更好的用户体验,通常都会在用户下单时立刻扣减商品库存。例如电影院购票、高铁购票,下单后就会锁定座位资源,其他人无法重复购买。
但是这样就存在一个问题,假如用户下单后一直不付款,就会一直占有库存资源,导致其他客户无法正常交易,最终导致商户利益受损!
因此,电商中通常的做法就是:对于超过一定时间未支付的订单,应该立刻取消订单并释放占用的库存。
像这种在一段时间以后才执行的任务,我们称之为延迟任务,而要实现延迟任务,最简单的方案就是利用MQ的延迟消息了。
在RabbitMQ中实现延迟消息也有两种方案:
-
死信交换机+TTL
-
延迟消息插件
4.1.死信交换机和延迟消息
1)死信交换机
当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):
- 消费者使用
basic.reject或basic.nack声明消费失败,并且消息的requeue参数设置为false - 消息是一个过期消息,超时无人消费
- 要投递的队列消息满了,无法投递
如果一个队列中的消息已经成为死信,并且这个队列通过dead-letter-exchange属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange)。而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。
死信交换机有什么作用呢?
- 收集那些因处理失败而被拒绝的消息
- 收集那些因队列满了而被拒绝的消息
- 收集因TTL(有效期)到期的消息
2)延迟消息
有一组绑定的交换机(ttl.fanout)和队列(ttl.queue)。但是ttl.queue没有消费者监听,而是设定了死信交换机hmall.direct,而队列direct.queue1则与死信交换机绑定,RoutingKey是blue:
假如我们现在发送一条消息到ttl.fanout,RoutingKey为blue,并设置消息的有效期为5000毫秒:
注意:尽管这里的ttl.fanout不需要RoutingKey,但是当消息变为死信并投递到死信交换机时,会沿用之前的RoutingKey,这样hmall.direct才能正确路由消息。
消息肯定会被投递到ttl.queue之后,由于没有消费者,因此消息无人消费。5秒之后,消息的有效期到期,成为死信:
死信被再次投递到死信交换机hmall.direct,并沿用之前的RoutingKey,也就是blue:
由于direct.queue1与hmall.direct绑定的key是blue,因此最终消息被成功路由到direct.queue1,如果此时有消费者与direct.queue1绑定, 也就能成功消费消息了。但此时已经是5秒钟以后了:
1. 定义死信交换机及死信队列
首先,定义死信交换机(DLX)和存储死信的消息队列(DLQ),并将它们绑定。
java
import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class DeadLetterConfig { // 1. 定义死信交换机 (DLX) @Bean public DirectExchange dlxExchange() { return new DirectExchange("dlx.exchange", true, false); // 持久化,不自动删除 } // 2. 定义死信队列 (DLQ) @Bean public Queue dlxQueue() { return QueueBuilder.durable("dlx.queue").build(); // 持久化队列 } // 3. 将死信队列与死信交换机绑定,并指定路由键 @Bean public Binding dlxBinding(Queue dlxQueue, DirectExchange dlxExchange) { return BindingBuilder.bind(dlxQueue) .to(dlxExchange) .with("dlx.routing.key"); // 定义死信路由规则 } }2. 为正常队列指定死信交换机
然后,创建一个正常业务队列,通过参数为其设置死信交换机。
java
@Configuration public class NormalQueueConfig { // 定义正常业务交换机 @Bean public DirectExchange normalExchange() { return new DirectExchange("normal.exchange"); } // 定义正常业务队列,并指定死信交换机和相关参数 @Bean public Queue normalQueue() { return QueueBuilder.durable("normal.queue") // 队列名 .withArgument("x-dead-letter-exchange", "dlx.exchange") // 指定DLX .withArgument("x-dead-letter-routing-key", "dlx.routing.key") // 指定发送到DLX的路由键 .withArgument("x-message-ttl", 10000) // (可选) 设置队列中消息的TTL为10秒 .withArgument("x-max-length", 10) // (可选) 设置队列最大长度为10条消息 .build(); } // 将正常队列与正常交换机绑定 @Bean public Binding normalBinding(Queue normalQueue, DirectExchange normalExchange) { return BindingBuilder.bind(normalQueue) .to(normalExchange) .with("normal.routing.key"); } }关键参数说明:
x-dead-letter-exchange:指定死信交换机的名称。
x-dead-letter-routing-key:可选。指定消息成为死信后发送到死信交换机时使用的路由键。若不设置,则默认使用原消息的路由键。
x-message-ttl:可选。设置队列中消息的存活时间(毫秒),超时未消费则变为死信。
x-max-length:可选。设置队列的最大长度,超出后最早的消息可能成为死信。3. 监听处理死信
最后,定义一个消费者来监听处理死信队列中的消息。
java
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class DeadLetterConsumer { @RabbitListener(queues = "dlx.queue") public void handleDeadLetter(String message) { System.out.println("接收到死信消息: " + message); // 在这里实现你的死信处理逻辑,例如记录日志、发送警报、分析原因等 }
4.2.DelayExchange插件
基于死信队列虽然可以实现延迟消息,但是太麻烦了。
RabbitMQ的官方也推出了一个插件,原生支持延迟消息功能。该插件的原理是设计了一种支持延迟消息功能的交换当消息投递到交换机后可以暂存一定时间,到期后再投递到队列。
1)下载
完整的步骤在这:https://developer.aliyun.com/article/1482289
2)安装
因为我们是基于Docker安装,所以需要先查看RabbitMQ的插件目录对应的数据卷。
docker volume inspect mq-plugins
结果:
插件目录被挂载到了/var/lib/docker/volumes/mq-plugins/_data这个目录,我们上传插件到该目录下。
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
3.).声明延迟交换机
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.direct", delayed = "true"),
key = "delay"
))
public void listenDelayMessage(String msg){
log.info("接收到delay.queue的延迟消息:{}", msg);
}
package com.itheima.consumer.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class DelayExchangeConfig {
@Bean
public DirectExchange delayExchange(){
return ExchangeBuilder
.directExchange("delay.direct") // 指定交换机类型和名称
.delayed() // 设置delay的属性为true
.durable(true) // 持久化
.build();
}
@Bean
public Queue delayedQueue(){
return new Queue("delay.queue");
}
@Bean
public Binding delayQueueBinding(){
return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");
}
}
4)发送延迟消息
发送消息时,必须通过x-delay属性设定延迟时间:
@Test
void testPublisherDelayMessage() {
// 1.创建消息
String message = "hello, delayed message";
// 2.发送消息,利用消息后置处理器添加消息头
rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 添加延迟消息属性
message.getMessageProperties().setDelay(5000);
return message;
}
});
}
用拉姆达也一样
@Test
void testSendTTLMessage() {
rabbitTemplate.convertAndSend("simple.direct", "hi", "hello", message -> {
message.getMessageProperties().setExpiration("10000");
return message;
});
}
@Test void testSendTTLMessage() { Message message = MessageBuilder .withBody("hello".getBytes(StandardCharsets.UTF_8)) .setExpiration("10000") .build(); rabbitTemplate.convertAndSend("simple.direct", "hi", message); } 我确实配置了消息转换器,但是这这样写就报错了我确实配置了消息转换器,但是这这样写就报错了
当配置了
Jackson2JsonMessageConverter后,RabbitTemplate期望发送的对象会被自动转换为 JSON。但当您直接使用Message对象时:
消息转换器仍然会尝试处理:RabbitTemplate 会调用消息转换器
类型不匹配:消息转换器期望处理普通对象,但收到的是
Message类型转换异常:导致序列化错误
- 使用MessageBuilder创建的Message对象是Spring AMQP的Message类型,它直接包含了字节数组 body 和消息属性。
- 当使用rabbitTemplate发送消息时,如果配置了消息转换器,默认情况下,rabbitTemplate会使用该消息转换器来转换消息。
- 但是,当我们直接使用Message对象时,rabbitTemplate不会对Message对象进行转换,而是直接发送。这可能导致与消息转换器的预期行为不一致。
4.3超时订单问题
接下来,我们就在交易服务中利用延迟消息实现订单超时取消功能。其大概思路如下:
假如订单超时支付时间为30分钟,理论上说我们应该在下单时发送一条延迟消息,延迟时间为30分钟。这样就可以在接收到消息时检验订单支付状态,关闭未支付订单。
1)定义好时间:
@Data
public class MultiDelayMessage<T>{
// 消息体
private T data;
// 记录延迟时间的集合
private List<Long> delayMillis;
public MultiDelayMessage(T data, List<Long> delayMillis){
this,data = data;
this.delayMillis = delayMillis;
}
public static <T> MultiDelayMessage<T> of(T data, Long... delaymilis){
return new MultiDelayMessage<>(data, Collutils.newArrayList(delayMillis)
}
// 获取并移除下一个延迟时间
// Returns:队列中的第一个延迟时间
public Long removeNextDelay(){ return delayMillis.remove( index: 0); }
//是否还有下一个延迟时间
public boolean hasNextDelay(){ return !delayMillis.isEmpty();}
}
2)定义常量
package com.hmall.trade.constants;
public interface MQConstants {
String DELAY_EXCHANGE_NAME = "trade.delay.direct";
String DELAY_ORDER_QUEUE_NAME = "trade.delay.order.queue";
String DELAY_ORDER_KEY = "delay.order.query";
}
3)配置mq
<!--amqp-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
rabbitmq:
host: 192.168.150.101
port: 5672
virtual-host: /hmall
username: hmall
password: 123
4)改造下单业务,发送延迟消息
trade-service模块的com.hmall.trade.service.impl.OrderServiceImpl类的createOrder方法,添加消息发送的代码:

上面直接写了,用拉姆达也可以
或者定义为类,就不用每次都写
5)监听消息,查询支付状态
接下来,我们在trader-service编写一个监听器,监听延迟消息,查询订单支付状态:
@Component
public class OrderDelayMessageListener {
private final IOrderService orderService;
private final PayClient payClient;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = MQConstants.DELAY_ORDER_QUEUE_NAME),
exchange = @Exchange(name = MQConstants.DELAY_EXCHANGE_NAME, delayed = "true"),
key = MQConstants.DELAY_ORDER_KEY
))
public void listenOrderDelayMessage
//1.查询订单状态
Order order =orderservice.getById(msg.getData());
//2.判断是否已经支付
if(order ==nullorder.getstatus()== 2){
// 订单不存在或者已经被处理
return;
}
// TOD0 3.去支付服务查询真正的支付状态
boolean isPay=false;
//3.1.已支付,标记订单状态为已支付
if(isPay){
orderservice.markOrderPaySuccess(order.getId());
return;
}
// 4.判断是否存在延迟时间
if(msg.hasNextDelay()){
//4.1.存在,重发延迟消息
Long nextDelay=msg.removeNextDelay();
rabbitTemplate.convertAndSend(
MGConstantS.DELAY_EXCHANGE, MGCOnStantS.DELAY_ORDER_ROUTING_KEY,
msg,new DelayMessageProcessor(nextDelay.intValue()));
return;
//5不存在,取消订单
orderservice.lambdaUpdate()
.set(Order::getstatus,5)
.set(Order::getCloseTime,LocalDateTime.now())
.eq(Order::getId,order.getId())
.update();
//TODO 6 恢复库存



