SpringBoot 集成 Spring AMQP + RabbitMQ 一文搞定多种工作模式(点对点、工作队列、发布订阅、路由、主题、RPC、死信队列)与各种自定义配置详解
SpringBoot 集成 Spring AMQP + RabbitMQ 一文搞定多种工作模式(点对点、工作队列、发布订阅、路由、主题、RPC、死信队列)与各种自定义配置详解
文中代码见 Don212
一、快速开始
1、在 Spring Boot 项目中引入一个依赖
<!-- Starter for using Spring AMQP and Rabbit MQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
引入依赖以后,Spring Boot 会自动创建 RabbitMQ 的连接工厂 ConnectionFactory,并创建 RabbitTemplate 对象,并注册到
Spring 容器中。
2、在 Spring Boot 项目配置 RabbitMQ 信息
spring:
##### RabbitMQ 配置
rabbitmq:
host: 127.0.0.1
port: 5672
username: test
password: test
# 连接到代理时使用的虚拟主机
virtual-host: /
经过上述 2 步骤配置以后,Spring Boot 项目就可以使用 RabbitMQ 了。
3、实现发收消息
创建消费者:com.don.learn.springboot.rabbitmq.consumer.SimpleConsumer 、
com.don.learn.springboot.rabbitmq.consumer.SimpleConsumer2
- 注解
@RabbitListener:声明消息消费者、支持多种配置 - 注解
@Queue:定义队列的注解- name / value :指定队列的名称
- durable :队列是否持久化(服务重启后队列是否还存在),默认持久化
- exclusive :是否为排他队列(是否只能被一个连接使用(只有声明该队列的连接才能使用它,连接断开时会自动删除这个队列)),默认非排他
- autoDelete :当最后一个消费者断开连接后,队列是否自动删除,默认不自动删除
- ignoreDeclarationExceptions :是否忽略队列声明时的异常,默认抛出异常
- arguments :设置队列的额外参数
- declare :是否在应用启动时声明队列,默认自动声明队列
- admins :指定管理队列的 RabbitAdmin Bean 名称
创建生产者:com.don.learn.springboot.rabbitmq.producer.SimpleProducer
- 使用
RabbitTemplate:给 RabbitMQ 发送消息
注解 @RabbitListener 既可以标注在方法上,也可以标注在类上。标记在类上时,方法上的 @RabbitHandler 注解起作用。
当接收到消息以后,会调用 @RabbitHandler 注解标注的方法,具体哪个方法被调用,需要根据消息类型与方法参数的类型进行匹配。
4、生产者启动声明队列
@Bean
public Queue queue() {
return new Queue("simple_queue", true, false, false);
}
5、自动配置
org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration
org.springframework.boot.autoconfigure.amqp.RabbitProperties
-
ConnectionFactory:自动配置连接工厂 -
RabbitProperties:封装了RabbitMQ的配置 -
RabbitTemplate:给RabbitMQ发送和接受消息 -
AmqpAdmin:RabbitMQ系统管理功能组件
二、RabbitMQ 多种工作模式
RabbitMQ 支持多种消息传递模式(也称为交换器类型),每种模式都有不同的路由逻辑和适用场景。以下是 RabbitMQ 的 6 种主要工作模式:
点对点模式(简单模式)
- 结构特点:一个生产者、一个队列、一个消费者
- 路由规则:直接发送到指定队列
- 适用场景:简单的点对点通信
示例代码:
配置类 RabbitMQSimpleConfig,生产者 SimpleMessageProducer,消费者 SimpleMessageConsumer,API
SimpleMessageController
工作队列模式
- 结构特点:一个生产者、一个队列、多个消费者(竞争关系)
- 路由规则:轮询(默认)或公平分发
- 适用场景:任务分发、负载均衡
示例代码:
和简单模式配置一样,只是多个消费者。一次性发送多个消息到队列后,消费者会轮询接收该队列的消息。
发布订阅模式(Publish/Subscribe)
- 结构特点:一个生产者、一个 Fanout 交换器、多个队列、多个消费者(竞争关系)
- 路由规则:广播到所有绑定的队列
- 适用场景:广播通知、日志收集、事件广播、监控告警
- 广播机制:一个事件被所有绑定的队列接收
示例代码:
配置类 RabbitMQFanoutConfig,生产者 FanoutMessageProducer,消费者 FanoutMessageConsumer,API
FanoutMessageController
路由模式(Routing)
- 结构特点:一个生产者、一个 Direct 交换器、多个队列(按路由键精确绑定)、多个消费者(竞争关系)
- 路由规则:精确匹配路由键
- 适用场景:按消息类型分类处理、系统日志分级
示例代码:
配置类 RabbitMQDirectConfig,生产者 DirectMessageProducer,消费者 DirectMessageConsumer,API
DirectMessageController
主题模式(Topics)
- 结构特点:一个生产者、一个 Topic 交换器、多个队列(使用通配符模式匹配绑定)、多个消费者(竞争关系)
- 通配符路由规则:
*:匹配一个单词(用.分隔的段)、#:匹配零个或多个单词 - 适用场景:消息分类、多维度筛选、复杂事件路由
示例代码:
配置类 RabbitMQTopicConfig,生产者 TopicMessageProducer,消费者 TopicMessageConsumer,API TopicMessageController
RPC 模式(Remote Procedure Call)
RPC 是远程过程调用(Remote Procedure Call)的缩写形式,简单说就是一个节点去请求另一个节点上面的服务并获得响应结果。
RabbitMQ 文档
- 结构特点:客户端发送请求到请求队列,服务端处理并返回响应到回调队列
- 适用场景:远程调用、分布式服务调用
RPC 的工作流程:
- Client 先发送一条消息,和普通的消息相比,消息多了两个关键内容:一个是 correlation_id,表示这条消息的唯一 id,一个是
reply_to,表示回复队列的名字 - Server 从消息发送队列获取消息并处理相应的业务逻辑,处理完成后,将处理结果发送到 reply_to 指定的回调队列中,并回传
correlation_id - Client 从回调队列中读取消息,从消息头中获取 correlation_id 并比较可确认是否一致,消息内容为执行结果
手写 RPC 回调与 Spring AMQP 内置 RPC 实现的代码示例:
配置类 RabbitMQRpcConfig,RPC 客户端 RpcMessageClient,RPC 服务端 RpcMessageServer,API RpcMessageController
消息头路由 Headers
Headers 类型的 Exchange 使用的较少,是忽略 routingKey 的一种路由方式,是使用 Headers 参数来匹配的,Headers 参数是一个键值对。
消费者在绑定时需要关联额外 Headers 键值对参数,生产者在发消息时传入键值对,两者匹配的话,则对应的队列就可以收到消息。
匹配有两种方式 all 和 any,在消费者端必须要用键值 x-mactch 定义。all 代表定义的多个键值对都满足,any 则只要满足一个即可。
示例代码:
配置类 RabbitMQHeadersConfig,生产者 HeadersMessageProducer,消费者 HeadersMessageConsumer,API
HeadersMessageController
死信队列模式(Dead Letter Queue,DLQ)
消息进入死信队列的 4 种核心场景:
- 消息被消费者拒绝(Reject/Nack)且不重新入队,则消息会被直接丢弃,进入死信队列。
- 消息在队列中存活时间超过 TTL(Time-To-Live)。消息不会因处理时间超过TTL而进入死信队列,TTL机制在消息投递后就已"失效"。
- 队列达到最大长度限制,队列已满新消息会进入死信队列 ❌ (需要额外配置(备用交换器或手动处理)才能路由到死信队列)
- 消息无法路由到任何队列,消息发送到 Exchange 但没有匹配的队列时,如果配置备用交换器(Alternate
Exchange),消息会通过备用交换器进入死信队列。(备用交换器使用 FanoutExchange 或 TopicExchange+#通配路由) - 达到最大重试次数后(企业级最常用)通过 Spring AMQP 的重试机制和 MessageRecoverer 实现
代码示例:
配置类 RabbitMQDLXConfig,生产者 DLXMessageProducer,消费者 DLXMessageConsumer,API DLXMessageController
延迟队列模式
- 延迟队列:消息发送到队列以后,延迟指定时间后,消息被消费
三、RabbitMQ 配置
消息转换器
在 org.springframework.amqp.rabbit.core.RabbitTemplate 中消息转换器:
private MessageConverter messageConverter = new SimpleMessageConverter();
默认使用:org.springframework.amqp.support.converter.SimpleMessageConverter
自定义消息转换器:
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
消息监听
- 启动类上开启
@EnableRabbit基于注解的RabbitMQ模式,启动时会自动创建队列、交换器,并绑定队列到交换器。 - 使用
@RabbitListener监听队列,并指定队列名称、交换器名称、路由键名称。
四、AmqpAdmin 管理组件
- AmqpAdmin 是一个管理组件,用于创建、修改、删除交换器、队列、绑定规则。
@Test
public void testAmqpAdmin() {
System.out.println("创建交换器 DirectExchange");
amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange"));
System.out.println("创建队列 Queue");
amqpAdmin.declareQueue(new Queue("amqpadmin.queue", true));
System.out.println("创建绑定规则 Binding");
amqpAdmin.declareBinding(new Binding("amqpadmin.queue", Binding.DestinationType.QUEUE, "amqpadmin.exchange", "amqp.test", null));
}
五、参考文档
文档资料 RabbitMQ
文档资料 Spring AMQP
文档资料 Spring Boot AMQP