RabbitMQ如何成为分布式系统的“神经中枢“?——从安装部署到C++调用实战的完整流程,带你体验它的奥妙所在!​

文章目录

  • 本篇摘要
  • ①·RabbitMq(轻量级消息队列中间件) 介绍
    • RabbitMQ 是什么?
    • 核心功能与特点
      • 1. **核心功能**
      • 2. **核心优势**
    • RabbitMQ 的核心概念
      • 1. **生产者(Producer)**
      • 2. **消费者(Consumer)**
      • 3. **队列(Queue)**
      • 4. **交换机(Exchange)**
      • 5. **绑定(Binding)**
    • 工作流程(以 Direct 交换机为例)
    • 常见应用场景
    • RabbitMQ 与相关技术对比
    • 图像理解
    • 总结一句话
  • ②·RabbitMq 安装教程
    • RabbitMq安装
        • **1. 安装 RabbitMQ**
        • **2. 启动 & 检查状态**
        • **3. 创建管理员用户(默认 `guest` 权限低)**
        • **4. 开启 Web 管理界面(默认端口 15672)**
    • **RabbitMQ 客户端及 C++ 库安装步骤**
        • **1. 安装 RabbitMQ 官方客户端库(C 语言)**
        • **2. 安装 RabbitMQ 的 C++ 客户端库(AMQP-CPP)**
        • 默认端口介绍
  • ③·AMQP-CPP 库介绍
    • 介绍
        • **1. 是什么?**
        • **2. 怎么用?两种模式**
          • **模式 1:TCP 模式(自己管网络)**
          • **模式 2:扩展模式(用现成库省事)**
      • 模式2常见接口介绍
  • ④·AMQP-CPP 库简单使用
    • 测试效果
    • 测试代码
  • ⑤·RabbitMQ 客户端API二次封装
    • 封装思想
    • 测试效果
    • 代码汇总(详细见代码注释)
  • 本篇小结

在这里插入图片描述

本篇摘要

本文档围绕RabbitMQ展开,涵盖其概念、安装、AMQP – CPP库使用及客户端API封装。阐述消息队列原理、特性,详述Linux下安装步骤、库安装,展示简单使用案例与API封装思路。

①·RabbitMq(轻量级消息队列中间件) 介绍

RabbitMQ 是什么?

RabbitMQ 是一个开源的、跨平台的消息队列(Message Queue)中间件,基于 AMQP(Advanced Message Queuing Protocol,高级消息队列协议) 标准实现,由 Erlang 语言编写(以高并发和分布式特性著称),广泛应用于分布式系统中实现异步通信、解耦、流量削峰、任务分发等功能。

简单来说:

RabbitMQ 是一个“邮局”,负责在不同程序(或服务)之间可靠地传递消息,让发送方(生产者)和接收方(消费者)无需直接交互,只需通过 RabbitMQ 进行中转。

核心功能与特点

1. 核心功能

  • 消息存储与转发:生产者将消息发送到 RabbitMQ,RabbitMQ 暂存消息并根据规则将消息路由到对应的消费者。
  • 多协议支持:虽然基于 AMQP,但还支持 MQTT、STOMP 等协议,适配不同场景。
  • 灵活的路由规则:通过 交换机(Exchange)队列(Queue) 的绑定关系,实现复杂的消息路由逻辑(如按主题、路由键分发)。
  • 消息确认机制:确保消息不丢失(生产者确认、消费者确认)。
  • 高可用与集群:支持镜像队列、集群部署,保障消息服务的可靠性。

2. 核心优势

  • 解耦生产者和消费者无需知道彼此的存在,只需与 RabbitMQ 交互,降低系统模块间的依赖。
  • 异步通信:生产者发送消息后无需等待消费者立即处理,提高系统响应速度(生产者消费者谁先谁后都无所谓)。
  • 流量削峰:突发流量时,RabbitMQ 可以暂存消息,避免下游服务被压垮。
  • 可靠性:支持消息持久化、重试、死信队列等机制,确保消息不丢失、不重复处理。
  • 跨语言/平台:提供多种语言客户端(如 Java、Python、C++、Go 等),适配不同技术栈。

RabbitMQ 的核心概念

1. 生产者(Producer)

发送消息的应用程序,将消息发布到 RabbitMQ 的 交换机(Exchange) 中。

2. 消费者(Consumer)

接收消息的应用程序,从 RabbitMQ 的 队列(Queue) 中获取消息并处理。

3. 队列(Queue)

消息的存储容器,实际存放消息的地方。消费者从队列中拉取消息进行处理。

  • 一个队列可以有多个消费者(竞争消费),也可以绑定多个交换机。

4. 交换机(Exchange)

接收生产者发送的消息,并根据路由规则将消息分发到不同的队列。

  • 常见的交换机类型:
    • Direct(直连):根据精确的路由键(Routing Key)匹配队列。
    • Topic(主题):支持通配符(如 *.newsorder.*),按规则模糊匹配路由键。
    • Fanout(广播):将消息广播到所有绑定的队列(无视路由键)。
    • Headers(头部):根据消息的头部属性(而非路由键)匹配队列。

5. 绑定(Binding)

连接交换机和队列的规则,定义了“哪些消息应该被路由到哪个队列”。

  • 通过 路由键(Routing Key)头部属性 指定匹配条件。

工作流程(以 Direct 交换机为例)

  1. 生产者:将消息发送到指定的 交换机(Exchange),并指定一个 路由键(Routing Key)(例如 order.create)。
  2. 交换机:根据绑定规则(比如绑定键 order.create 匹配队列 order_queue),将消息路由到对应的 队列
  3. 队列:存储消息,等待消费者拉取。
  4. 消费者:从队列中获取消息并处理(例如订单服务处理 order.create 消息)。

常见应用场景

  1. 异步任务处理
    例如:用户注册后,发送邮件/短信的通知任务可以异步丢给 RabbitMQ,由专门的消费者服务处理,避免阻塞主流程。

  2. 服务间解耦
    例如:订单服务和库存服务原本直接调用,通过 RabbitMQ 解耦后,订单服务只需发送“创建订单”消息,库存服务自行消费并扣减库存。

  3. 流量削峰
    例如:电商秒杀活动中,突发的大量下单请求可以先进入 RabbitMQ 队列,后端服务按处理能力逐步消费,避免系统崩溃。

  4. 日志收集与监控
    例如:多个服务将日志消息发送到 RabbitMQ,由日志服务统一收集并存储到 Elasticsearch 或数据库。

  5. 分布式事务补偿
    例如:跨服务的分布式事务中,通过 RabbitMQ 协调各服务的最终一致性(如订单支付成功后通知物流服务发货)。

RabbitMQ 与相关技术对比

对比项 RabbitMQ Kafka Redis(Stream)
设计目标 通用消息队列(可靠、灵活路由) 高吞吐、分布式日志流处理 轻量级消息队列(简单场景)
协议 AMQP(及其他) 自研协议 Redis 协议
消息模型 交换机+队列(灵活路由) 主题分区(Pub/Sub 或 Stream) List/PubSub/Stream
可靠性 高(支持持久化、确认机制) 高(副本机制) 较低(依赖配置)
适用场景 业务解耦、异步任务、削峰 大数据流处理、实时日志 简单队列、缓存通知

图像理解

大致流程:

在这里插入图片描述

如果把对应过程再细化一下(也就是客户端与Rabbitmq服务端交互的过程)就如图所示(也就是之前的仿Rabbitmq项目的流程,主要功能大差不多):

在这里插入图片描述

总结一句话

RabbitMQ 是一个轻量级、高可靠的消息队列中间件,通过灵活的路由机制和多种交换机类型,帮助分布式系统实现异步通信、解耦和流量控制,是构建高可用、可扩展服务的核心工具之一。(也就是为了防止有一方挂了,另一方消息不断在自身那里堆积造成问题因此引入了它

②·RabbitMq 安装教程

RabbitMq安装

1. 安装 RabbitMQ
sudo apt install rabbitmq-server
2. 启动 & 检查状态
# 启动服务
sudo systemctl start rabbitmq-server  
# 查看状态(确保 `active (running)`)
sudo systemctl status rabbitmq-server  

之后可以看到这样的画面:

在这里插入图片描述

  • 也就也是说明安装成功了。
3. 创建管理员用户(默认 guest 权限低)
# 添加用户(示例:用户名=root,密码=123456)
sudo rabbitmqctl add_user root 123456  
# 设为管理员
sudo rabbitmqctl set_user_tags root administrator  
# 赋予权限(对所有虚拟主机)
sudo rabbitmqctl set_permissions -p / root ".*" ".*" ".*"  
4. 开启 Web 管理界面(默认端口 15672)
sudo rabbitmq-plugins enable rabbitmq_management
  • 访问:http://你的服务器IP:15672
  • 登录:用户名=root密码=123456

但是如果这里对应服务器的端口没向外开放,因此就无法访问,此时可以改一下对于的配置文件,然后设置好对应web的监听的ip以及部署的port即可。

添加对应字段到对应配置文件(没有就创建):

 vim  /etc/rabbitmq/rabbitmq.conf

如下:

在这里插入图片描述

然后试着浏览器访问对应ip+port,然后试着按照之前建立的root用户登录:

在这里插入图片描述

在这里插入图片描述

  • 看到的都是熟悉的字段(在之前的仿rabbitmq实现的消息队列就已经眼熟过了)。

遇到的问题:

  • 连接web-UI时候的错误:

在这里插入图片描述

  • 当我们进行web界面访问的时候登录正确账号和密码的时候,可能出现上面类似的错误,此时不是账号+密码的问题。
  • 我们只需要重新设置,创建下对应允许登录使用的新用户即可。

解决如下:

rabbitmqctl add_user 你的用户名 你的密码
rabbitmqctl set_user_tags 你的用户名 administrator
rabbitmqctl set_permissions -p / 你的用户名 ".*" ".*" ".*"

最后重启下即可: systemctl restart rabbitmq-server

登录如下:

在这里插入图片描述

但是,这里这种网页的不是很常用,一般都是用对应语言的api来调用直接访问对应服务端,而不是通过web这种交互模式来访问服务端的(这里类似于之前es,比如这里的rabbitmq的服务端对应的就是es的es服务端;而它的web控制台就对应的es的kibana)。

RabbitMQ 客户端及 C++ 库安装步骤

1. 安装 RabbitMQ 官方客户端库(C 语言)
  • 作用:提供基础的 RabbitMQ C 接口,用于消息通信。
  • 命令

    sudo apt-get install librabbitmq-dev
    
  • 安装后会包含 头文件静态库文件,可直接在 C 项目中使用。
2. 安装 RabbitMQ 的 C++ 客户端库(AMQP-CPP)
  • 作用:基于 C 接口的 C++ 封装库,更适合用 C++ 编写客户端程序。

  • 步骤
    (1) 安装依赖库 libev(网络库组件)

    sudo apt install libev-dev
    

    (2) 下载 AMQP-CPP 源码

    git clone https://github.com/CopernicaMarketingSoftware/AMQP-CPP.git
    cd AMQP-CPP/
    

    (3) 编译并安装

    make
    make install
    

    注意:如果编译时出现 SSL 版本错误(如 unsigned short *port_ptr 相关报错),需按以下步骤修复:

    1. 卸载冲突的 SSL 库(强制卸载):

      sudo dpkg -P --force-all libevent-openssl-2.1-7
      sudo dpkg -P --force-all openssl
      sudo dpkg -P --force-all libssl-dev
      
    2. 修复系统依赖

      sudo apt --fix-broken install
      
    3. 重新执行 make
默认端口介绍

对于rabbitmq的web控制台默认部署的是15672端口,但是我们给改成了8072端口了:

在这里插入图片描述

  • 这里可以看到对应监听,以及刚才使用浏览器访问这个web建立的连接,注意这个本地显示的是服务器的内网ip(对内)。

对于rabbitmq的服务端的默认端口就是5672这里也没有更改还是默认的:

在这里插入图片描述

  • 然后还可以看到对应的集群模式使用的端口就是25672,还可以看到对应的rabbitmq程序连接服务端对应的连接。

③·AMQP-CPP 库介绍

介绍

1. 是什么?

AMQP-CPP 是一个 C++ 库,专门用于与 RabbitMQ(消息队列中间件)通信。它的核心功能是:

  • 解析/生成数据:能处理 RabbitMQ 发来的消息,也能构造要发送的消息。
  • 不直接连网络:库本身不负责建立网络连接(比如 TCP 连接),网络 IO 需要用户自己实现(但提供了可选的简化方案)。
  • 高性能:完全异步(无阻塞调用,不用多线程),适合对性能要求高的场景。
  • 依赖 C++17:代码需要用 C++17 或更高版本编译。
2. 怎么用?两种模式
模式 1:TCP 模式(自己管网络)
  • 核心操作:用户需自己实现一个类,继承自 AMQP::TcpHandler(负责 TCP 连接)。
  • 关键步骤

    1. 重写函数:必须重写 monitor 函数(用于监控网络状态)。
    2. 手动管理 IO:在 monitor 函数里,把网络套接字(fd)交给自己的事件循环(比如 select/epoll),当 fd 可读/可写时,调用 connection->process(fd, flags) 处理消息。
  • 适用场景:想完全自定义网络层(比如用自己熟悉的网络库)。
模式 2:扩展模式(用现成库省事)
  • 以 libev 为例:如果用 libev 库(一个异步事件库),不用自己写 monitor 函数
  • 直接继承:直接使用 AMQP-CPP 提供的 AMQP::LibEvHandler 类(已经封装好了网络监控逻辑),库会自动处理 fd 监控和消息处理,更省事。
  • 其他支持:除了 libev,还支持 libevent、libuv、asio 等异步通信组件(需额外安装)。
  • 适用场景:想快速集成,不想手动写网络 IO 代码(推荐大多数用户用)。

而后面我们就以扩展模式来实现对rabbitmq的访问。

模式2常见接口介绍

1. AMQP::Channel: 信道类

  • Channel(Connection *connection); – 构造函数
  • bool connected() – 检查连接状态
  • Deferred &declareExchange() – 声明交换机
  • DeferredQueue &declareQueue() – 声明队列
  • Deferred &bindQueue() – 将交换机与队列进行绑定
  • bool publish() – 发布消息
  • DeferredConsumer &consume() – 订阅队列消息
  • bool ack() – 消费者客户端对收到的消息进行确认应答

2. class Message: 消息类

  • const char *body() – 获取消息正文
  • uint64_t bodySize() – 获取消息正文大小

3. libev 使用到的结构体与接口

  • struct ev_loop *ev_default_loop (); – 实例化并获取 I/O 事件监控结构句柄。
  • #define EV_DEFAULT ev_default_loop (0);
  • int ev_run (struct ev_loop *loop); – 开始运行 I/O 事件监控,这是一个阻塞接口。
  • void ev_break (struct ev_loop *loop, int32_t break_type); – 结束 I/O 监控。

    • 说明:如果在当前线程进行 ev_run 则可以直接调用,如果在其他线程中进行 ev_run 需要通过异步通知进行。
  • void ev_async_init(ev_async *w, callback cb); – 初始化异步事件结构,并设置回调函数。
  • void ev_async_start(struct ev_loop *loop, ev_async *w); – 启动事件监控循环中的异步任务处理。
  • void ev_async_send(struct ev_loop *loop, ev_async *w); – 发送当前异步事件到异步线程中执行。

④·AMQP-CPP 库简单使用

实现思路:

分别实现对应的发布客户端与订阅客户端,然后分别进行底层事件监控,关联amqp框架,建立连接 信道 声明交换机 队列 绑定消息等,然后直接进行消息发布,消息订阅即可(详细步骤见代码注释)。

测试效果

在这里插入图片描述

  • 首先启动发布客户端,进行注册对应信息向mq服务端。

在这里插入图片描述


在这里插入图片描述

  • 可以看到我们publish客户端连对应的连接与信道。

在这里插入图片描述


在这里插入图片描述

在这里插入图片描述

  • 对应的交换机,队列,以及发送进来的消息也都正确。

在这里插入图片描述

  • 这里可以看到web控制台支持的服务不仅比api客户端更全,界面还更加可视化方便,但是对于大批数据存储等还是不方便的,因为更推荐对应语言的api接口进行访问。

在这里插入图片描述

  • 最后启动订阅客户端就能收到对应消息。

在这里插入图片描述

  • 对应mq服务端的消息也都没了。

注意(这里和之前实现的仿Rabbitmq消息队列逻辑还有待呢不同):

这个和之前实现的仿Rabbitmq实现的消息队列不同,本身的Rabbitmq消息队列无论是先启动订阅还是发布端,最后订阅端都能收到发进去的消息;但是之前实现的那个仿照版本的必须先启动订阅端然后再是发布端,因为发布端发布进去消息就去找对应订阅队列的消费者的回调函数给它推送过去,反之则不可能进行推送了。

测试代码

publish.cc 与consume.cc对应代码(点击即跳转)

⑤·RabbitMQ 客户端API二次封装

封装思想

封装思想:

  1. 将事件监控结构 loop、通信连接 connection 以及信道 channel 封装起来。
  2. 提供创建交换机、队列并进行直接绑定的接口。
  3. 提供向指定交换机发布消息的接口。
  4. 提供订阅指定队列消息的接口。

需要注意的:

测试效果

在这里插入图片描述

  • 客户端进行订阅队列,然后对于底层的事件监控交给对应异步线程,主线程阻塞住等待异步线程完成处理后进行异步事件通知来break它后进行回收。

在这里插入图片描述

  • 这里发现打印的顺序和调用顺序不一样,因为这里声明完对应交换机 队列后才回去调用对应回调处理(可能这里比较慢),也就是进行发布的过程速度比进行处理声明完回调速度快。

在这里插入图片描述

  • 成功收到对应消息(这里publish与consume顺序谁先谁后也是无关的)。

代码汇总(详细见代码注释)

Rabbitmq.hpp(点击即跳转)

publish.cc/consume.cc(点击即跳转)

本篇小结

学习本篇可以知道RabbitMQ作为强大消息队列中间件,通过灵活交换机和丰富路由规则实现异步通信、解耦等功能。Linux下安装便捷,AMQP – CPP库提供C++交互方式,客户端API封装提升开发效率,适用于多种分布式场景。

© 版权声明

相关文章