后端-RabbitMQ

RabbitMQ

目录

  • RabbitMQ
    • 初识MQ
      • 协议
        • AMQP
        • XMPP
        • MQTT
      • 产品
        • ActiveMQ
        • RocketMQ
        • Kafka
        • RabbitMQ
      • 比较
    • 安装
    • 消息收发方式
      • 架构简介
      • 准备工作
      • 消息收发
        • Hello World
        • Work Queues
        • Publish/Subscribe
          • Fanout交换机
          • Direct交换机
          • Topic交换机

初识MQ

MQ(MessageQueue),中文是消息队列,字面来看就是存放消息的队列。也就是异步调用中的Broker

协议

AMQP

AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一个应用层的开放式标准协议,用于在分布式系统中实现消息的可靠传递。它定义了消息的结构、交换方式、路由规则等规范,使不通过厂商的消息中间件能够互联互通
在AMQP协议中,消息收发涉及到如下一些概念:

  • Borker:接收和分发消息的应用
  • Virtual host:出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 中创建 exchange/queue
  • Connection:publisher/consumer 和 broker 之间的 TCP 连接,断开连接的操作只会在 client 端进行,Broker 不会断开连接,除非出现网络故障或 broker 服务出现问题
  • Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 Connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 Thread 创建单独的 Channel 进行通讯,AMQP method 包含了 Channel id 帮助客户端和 Message Broker 识别 Channel,所以 Channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP Connection 的开销
  • Exchange:Message 到达 Broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (点对点), topic(发布订阅) 以及 fanout (广播)
  • Queue:消息最终被送到这里等待 Consumer 取走,一个 Message 可以被同时拷贝到多个 queue 中
  • Binding:Exchange 和 Queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保存到 Exchange 中的查询表中,作为 Message 的分发依据
XMPP

XMPP(可扩展消息处理现场协议,Extensible Messaging and Presence Protocol)是一个基于 XML 的协议,多用于即时消息(IM)以及在线现场探测,适用于服务器之间的准即时操作。核心是基于 XML 流传输,这个协议可能最终允许因特网用户向因特网上的其他任何人发送即时消息,即使其操作系统和浏览器不同。 它的优点是通用公开、兼容性强、可扩展、安全性高,缺点是 XML 编码格式占用带宽大)

MQTT

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是 IBM 开发的一个即时通讯协议,目前看来算是物联网开发中比较重要的协议之一了,该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和 Actuator(比如通过 Twitter 让房屋联网)的通信协议,它的优点是格式简洁、占用带宽小、支持移动端通信、支持 PUSH、适用于嵌入式系统

产品

ActiveMQ

ActiveMQ 是 Apache 下的一个子项目,使用完全支持 JMS1.1 和 J2EE1.4 规范的 JMS Provider 实现,少量代码就可以高效地实现高级应用场景,并且支持可插拔的传输协议,如:in-VM, TCP, SSL, NIO, UDP, multicast, JGroups and JXTA transports
ActiveMQ 支持常用的多种语言客户端如 C++、Java、.Net,、Python、 Php、 Ruby 等
现在的 ActiveMQ 分为两个版本:

  • ActiveMQ Classic
  • ActiveMQ Artemis
    这里的 ActiveMQ Classic 就是原来的 ActiveMQ,而 ActiveMQ Artemis 是在 RedHat 捐赠的 HornetQ 服务器代码的基础上开发的,两者代码完全不同,后者支持 JMS2.0,使用基于 Netty 的异步 IO,大大提升了性能,更为神奇的是,后者不仅支持 JMS 协议,还支持 AMQP 协议、STOMP 以及 MQTT,可以说后者的玩法相当丰富
    因此大家在使用时,建议直接选择 ActiveMQ Artemis
RocketMQ

RocketMQ 是阿里开源的一款分布式消息中间件,原名 Metaq,从 3.0 版本开始改名为 RocketMQ,是阿里参照 Kafka 设计思想使用 Java 语言实现的一套 MQ。RocketMQ 将阿里内部多款 MQ 产品(Notify、Metaq)进行整合,只维护核心功能,去除了所有其他运行时依赖,保证核心功能最简化,在此基础上配合阿里上述其他开源产品实现不同场景下 MQ 的架构,目前主要用于订单交易系统
RocketMQ 具有以下特点:

  • 保证严格的消息顺序
  • 提供针对消息的过滤功能
  • 提供丰富的消息拉取模式
  • 高效的订阅者水平扩展能力
  • 实时的消息订阅机制
  • 亿级消息堆积能力
Kafka

Kafka 是 Apache 下的一个开源流处理平台,由 Scala 和 Java 编写。Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作(网页浏览,搜索和其他用户的行动)流数据。Kafka 的目的是通过 Hadoop 的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息
Kafka 具有以下特性:

  • 快速持久化:通过磁盘顺序读写与零拷贝机制,可以在 O(1) 的系统开销下进行消息持久化
  • 高吞吐:在一台普通的服务器上既可以达到 10W/s 的吞吐速率
  • 高堆积:支持 topic 下消费者较长时间离线,消息堆积量大
  • 完全的分布式系统:Broker、Producer、Consumer 都原生自动支持分布式,通过 Zookeeper 可以自动实现更加复杂的负载均衡
  • 支持 Hadoop 数据并行加载
RabbitMQ

RabbitMQ 支持 AMQP、XMPP、SMTP、STOMP 等多种协议,功能强大,适用于企业级开发

比较

RabbitMQ ActiveMQ RocketMQ Kafka
社区 Rabbit Apache 阿里 Apache
开发语言 Erlang Java Java Scala&Java
协议支持 AMQP,XMPP,SMTP,STOMP OpenWire,STOMP,REST,XMPP,AMQP 自定义协议 自定义协议
可用性 一般
单机吞吐量 一般 非常高
消息延迟 微秒级 毫秒级 毫秒级 毫秒以内
消息可靠性 一般 一般

安装

  1. 创建data文件夹
mkdir -p /opt/rabbitmq/data
  1. 拉取镜像
docker pull rabbitmq
  1. 查看镜像
docker images
  1. 启动rabbitmq
docker run -d -v /opt/rabbitmq/data:/var/lib/rabbitmq -p 5672:5672 -p 15672:15672 --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq
  1. 启动rabbitmq_management
docker exec -it rabbitmq
rabbitmq-plugins enable rabbitmq_management
  1. 访问rabbitmq管理页面
  • 浏览器访问http://192.168.159.128:15672
  • 初始账号密码admin admin

消息收发方式

架构简介

在这里插入图片描述


这张图中涉及到如下一些概念:

  • 生产者(Producer):发布消息到 RabbitMQ 中的交换机(Exchange)上
  • 交换机(Exchange):和生产者建立连接并接收生产者的消息
  • 消费者(Consumer):监听 RabbitMQ 中的 Queue 中的消息
  • 队列(Queue):Exchange 将消息分发到指定的 Queue,Queue 和消费者进行交互
  • 路由(Routes):交换机转发消息到队列的规则

准备工作

RabbitMQ是AMQP的产品,Spring Boot为AMQP提供了自动化配置依赖spring-boot-starter-amqp,因此首先创建Spring Boot项目并添加该依赖,如下:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

在application.yaml中配置RabbitMQ的基本连接信息,如下:

spring:
  rabbitmq:
    host: 192.168.159.128
    port: 5672
    username: freedom
    password: freedom
    virtual-host: /freedom

消费者和生产者的启动类都需要添加@EnableRabbit注解
在 RabbitMQ 中,所有的消息生产者提交的消息都会交由 Exchange 进行再分配,Exchange 会根据不同的策略将消息分发到不同的 Queue 中
RabbitMQ 官网介绍了如下几种消息分发的形式:

在这里插入图片描述

消息收发

Hello World

一个生产者,一个队列,一个消费者。消息传播图如下:

在这里插入图片描述


常量:

public class Constants {
    public static final String HELLO_WORLD_QUEUE = "hello_world.queue";
}

队列的定义:

@Configuration
public class RabbitmqConfig {
    @Bean
    public Queue queueOne() {
        return new Queue(Constants.HELLO_WORLD_QUEUE);
    }
}

消息消费者的定义:

@Component
public class ConsumerTest {
    @RabbitListener(queues = Constants.HELLO_WORLD_QUEUE)
    public void receive(String msg) {
        System.out.println("receive msg: " + msg);
    }
}

消息发送:

@SpringBootTest
public class ProducerTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    void testSendMessageQueue() {
        String message = "Hello World";
        rabbitTemplate.convertAndSend(Constants.HELLO_WORLD_QUEUE, message);
    }
}

使用的其实是默认的直连交换机(DirectExchange),DirectExchange的路由策略是将消息队列绑定到一个DirectExchange上,当一条消息到达DirectExchange时会被转发到与该条消息routing key相同的Queue上,例如消息队列名为hello_world.queue,则routingkey为hello_world.queue的消息会被该消息队列接收

Work Queues

一个生产者,一个默认的交换机(DirectExchange),一个队列,两个消费者

在这里插入图片描述


一个队列对应了多个消费者,默认情况下,由队列对消息进行平均分配,消息会被分到不同的消费者手中。消费者可以配置各自的并发能力,进而提高消息的消费能力,也可以配置手动 ack,来决定是否要消费某一条消息
并发能力的配置:

@Component
public class ConsumerTest {
    @RabbitListener(queues = Constants.HELLO_WORLD_QUEUE)
    public void receive(String msg) {
        System.out.println("receive msg: " + msg);
    }
    @RabbitListener(queues = Constants.HELLO_WORLD_QUEUE, concurrency = "10")
    public void receive2(String msg) {
        System.out.println("receive2 msg: " + msg + "--------" + Thread.currentThread().getName());
    }
}

由此可见,第二个消费者配置了concurrency为10,此时,对于第二个消费者,将会同时存在10个子线程去消费消息
如果生产者发送10条消息,就会一下都被消费掉
消息发送:

@SpringBootTest
public class ProducerTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    void testSendMessageQueue() {
        String message = "Hello World";
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend(Constants.HELLO_WORLD_QUEUE, message);
        }
    }
}

消息消费日志:

在这里插入图片描述


可以看到,消息不是都被第二个消费者都消费了,也有可能被第一个消费者消费(由于第二个消费者有十个线程一起开动,所以第二个消费者消费的消息占比更大)
消费者可以开启手动ack,这样可以自行决定是否消费RabbitMQ发来的消息,配置手动ack的方式:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual

消费者的定义:

@Component
public class ConsumerTest {
    @RabbitListener(queues = Constants.HELLO_WORLD_QUEUE)
    public void receive(Message message, Channel channel) throws IOException {
        System.out.println("receive msg: " + message.getPayload());
        channel.basicAck((Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG), true);
        System.out.println("receive");
    }
    @RabbitListener(queues = Constants.HELLO_WORLD_QUEUE, concurrency = "10")
    public void receive2(Message message, Channel channel) throws IOException {
        System.out.println("receive2 msg: " + message.getPayload() + "--------" + Thread.currentThread().getName());
        channel.basicReject((Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG), true);
    }
}

默认情况下,RabbitMQ的会将消息依次轮询投递给绑定在队列上的每一个消费者。但这并没有考虑到消费者是否已经处理完消息,可能出现消息堆积
因此设置preFetch值为1,确保同一时刻最多投递给消费者1条消息

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1  # 每次只能获取一条消息,处理完成才能获取下一个消息
Publish/Subscribe

一个生产者,一个交换机,多个队列,多个消费者
每一个消费者都有自己的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列绑定交换机,生产者发送的消息经过交换机,到达队列,实现一个消息被多个消费者获取的目的。需要注意的是,如果将消息发送到一个没有队列绑定的 Exchange 上面,那么该消息将会丢失,这是因为在 RabbitMQ 中 Exchange 不具备存储消息的能力,只有队列具备存储消息的能力

在这里插入图片描述


真正生产环境会经过exchange来发送消息,而不是直接发送到队列
交换机类型:

  • Direct:定向
  • Fanout:广播
  • Topic:话题
Fanout交换机

Fanout Exchange会将接受到的消息广播到每一个跟其绑定的queue,所以也叫广播模式
常量:

public class Constants {
    public static final String FREEDOM_FANOUT_EXCHANGE = "freedom.fanout";
    public static final String FANOUT_ONE_QUEUE = "fanout_one.queue";
    public static final String FANOUT_TWO_QUEUE = "fanout_two.queue";
}

队列和交换机的定义:

@Configuration
public class RabbitmqConfig {
    @Bean
    public Exchange fanoutExchange() {
        return new FanoutExchange(Constants.FREEDOM_FANOUT_EXCHANGE, true, false);
    }
    @Bean
    public Queue queueOne() {
        return new Queue(Constants.FANOUT_ONE_QUEUE);
    }
    @Bean
    public Queue queueTwo() {
        return new Queue(Constants.FANOUT_TWO_QUEUE);
    }
    @Bean
    public Binding bindingOne() {
        return BindingBuilder.bind(queueOne())
                .to(fanoutExchange())
                .with("")
                .noargs();
    }
    @Bean
    public Binding bindingTwo() {
        return BindingBuilder.bind(queueTwo())
                .to(fanoutExchange())
                .with("")
                .noargs();
    }
}

消费者的定义:

@Component
public class ConsumerTest {
    @RabbitListener(queues = Constants.FANOUT_ONE_QUEUE)
    public void receiveOne(String message) throws IOException {
        System.out.println("receiveOne msg: " + message);
    }
    @RabbitListener(queues = Constants.FANOUT_TWO_QUEUE)
    public void receiveTwo(String message) throws IOException {
        System.out.println("receiveTwo msg: " + message);
    }
}

生产者的定义:

@SpringBootTest
public class ProducerTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    void testSendMessageQueue() {
        rabbitTemplate.convertAndSend(Constants.FREEDOM_FANOUT_EXCHANGE, null, "hello fanout");
    }
}

两个消费者分别消费两个消费队列中的消息,发送消息时不需要routingkey,指定exchange即可,routingkey可以直接传一个null,生产者发送一条消息,被两个消费者同时消费
消息消费日志:

在这里插入图片描述

Direct交换机

Direct Exchange会将接收到的消息根据规则路由到指定的Queue,因此成为定向路由

  • 每一个Queue都与Exchange设置一个BindingKey
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

常量:

public class Constants {
    public static final String FREEDOM_DIRECT_EXCHANGE = "freedom.direct";
    public static final String DIRECT_QUEUE = "direct.queue";
}

队列和交换机的定义:

@Configuration
public class RabbitmqConfig {
    @Bean
    public Exchange directExchange() {
        return new DirectExchange(Constants.FREEDOM_DIRECT_EXCHANGE, true, false);
    }
    @Bean
    public Queue queueOne() {
        return new Queue(Constants.DIRECT_QUEUE);
    }
    @Bean
    public Binding bindingOne() {
        return BindingBuilder.bind(queueOne())
                .to(directExchange())
                .with("hello-direct")
                .noargs();
    }
}

消费者的定义:

@Component
public class ConsumerTest {
    @RabbitListener(queues = Constants.DIRECT_QUEUE)
    public void receiveOne(String message) throws IOException {
        System.out.println("direct msg: " + message);
    }
}

生产者的定义:

@SpringBootTest
public class ProducerTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    void testSendMessageQueue() {
        rabbitTemplate.convertAndSend(Constants.FREEDOM_DIRECT_EXCHANGE, "hello-direct", "hello direct");
    }
}

消息消费日志:

在这里插入图片描述

Topic交换机

TopicExchange与DirectExchange类似,区别在于routingKey可以是多个单词的列表,并且以.分割
Queue与Exchange指定BindingKey时可以使用通配符:

  • #:代指0个或多个单词
  • *:代指一个单词

常量:

public class Constants {
    public static final String FREEDOM_TOPIC_EXCHANGE = "freedom.topic";
    public static final String TOPIC_XIAOMI_QUEUE = "topic_xiaomi.queue";
    public static final String TOPIC_IPHONE_QUEUE = "topic_iphone.queue";
}

队列和交换机的定义:

@Configuration
public class RabbitmqConfig {
    @Bean
    public Exchange topicExchange() {
        return new TopicExchange(Constants.FREEDOM_TOPIC_EXCHANGE, true, false);
    }
    @Bean
    public Queue queueOne() {
        return new Queue(Constants.TOPIC_XIAOMI_QUEUE);
    }
    @Bean
    public Queue queueTwo() {
        return new Queue(Constants.TOPIC_IPHONE_QUEUE);
    }
    @Bean
    public Binding bindingOne() {
        return BindingBuilder.bind(queueOne())
                .to(topicExchange())
                .with("xiaomi.#")
                .noargs();
    }
    @Bean
    public Binding bindingTwo() {
        return BindingBuilder.bind(queueTwo())
                .to(topicExchange())
                .with("iphone.#")
                .noargs();
    }
}

消费者的定义:

@Component
public class ConsumerTest {
    @RabbitListener(queues = Constants.TOPIC_XIAOMI_QUEUE)
    public void receiveOne(String message) throws IOException {
        System.out.println("xiaomi msg: " + message);
    }
    @RabbitListener(queues = Constants.TOPIC_IPHONE_QUEUE)
    public void receiveTwo(String message) throws IOException {
        System.out.println("iphone msg: " + message);
    }
}

生产者的定义:

@SpringBootTest
public class ProducerTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    void testSendMessageQueue() {
        rabbitTemplate.convertAndSend(Constants.FREEDOM_TOPIC_EXCHANGE, "xiaomi.queue", "小米手机");
        rabbitTemplate.convertAndSend(Constants.FREEDOM_TOPIC_EXCHANGE, "iphone.queue", "苹果手机");
    }
}

消息消费日志:

在这里插入图片描述

© 版权声明

相关文章