消息队列之RabbitMQ

前言

今天继续整理下跟RabbitMQ有关的内容,加深一下理解,还是避免被面试官锤。

什么是消息队列,为什么要用消息队列

这里的内容在上一篇(谈谈消息队列)内容中已经讲过了,这里就不再赘述了

消息队列之RabbitMQ

RabbitMQ是一款消息队列软件,也被成为消息代理(Message Broker)或队列管理器(Queue manager),他是一个由ErLang语言开发的AMQP的开源实现

AMQP(Adavanced message queue),高级消息队列协议,它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可以传递消息,并且不受产品、开发语言等条件的限制。

这里引用了RabbitMQ官网的图片,AMQP的消息模型

消息代理(messaging broker)从publisher(消息生产者)接收消息,并且将消息路由给消费者,由于AMQP是个网络协议,生产者、消息代理和消费者可以位于不同的机器上

RabbitMQ中的基本概念

  • Exchange 从生产者接收消息,并且根据exchange的类型将消息推送到对应的队列中
  • Binding queue和exchange的绑定,通过这个绑定rabbitmq可以知道如何正确地将消息路由到指定的queue上了
  • Routing key 生产者将消息发送给exchange的时候,rabbitmq 需要指定一个routing key来确定这个消息的路由规则,这个路由规则将决定消息如何路由到queue中
  • Users
  • Vhost,virtual host 是一种可以在同一个rabbitMQ实例中隔离多个不同的应用,不同的用户可以拥有不同virtual host的权限,每一个virtual host都可以拥有不同的queues和exchanges
  • Producer 应用中消息的生产者
  • Consumer 应用中消息的消费者
  • Queue 一个存储消息的buffer
  • Message 生产者通过rabbitMQ投递给消费者的消息
  • Connection 客户端应用程序和RabbitMQ之间的TCP连接,和生产者与RabbitMQ之间都是通过TCP连接建立的。
  • Channel 在Connection中的一个虚拟连接,队列中的消息的发布和消费

由Exchange、Queue、RoutingKey三个才能决定一个从Exchange到Queue的唯一的线路

exchange和exchange类型

Exchanges在AMQP模型中是消息被发送到的实体。Exchanges将消息路由到0个或更多个queue上面去,路由算法取决于exchange的类型和绑定规则,AMQP提供以下exchange类型

类型 默认预定义的名字
Direct Exchange 空串或者 amqp.direct
Fanout Exchange amq.fanout
Topic Exchange amq.topic
Headers Exchange amq.match

此外 AMQP中还定义了几个很重要的属性

  • name
  • durability(broker重启后exchange是否还存在)
  • Auto-delete(当exchange上面的最后一个queue解绑后是否自动删除)
  • Arguments(可选参数,插件或者broker的特定功能会用到)

下面的内容全部引用自rabbitmq官网,提供了AMQP协议的概述

Default Exchanges

默认的exchange是一个broker预定义的没有名字的direct exchange,它有一个特殊的性质:所有新建的queue都会绑定到默认的exchange上面,并且routing-key和queue名相同

Direct Exchange

direct exchange根据消息所带的routing-key将消息投递给对应的队列。direct exchange倾向于用作消息的单播路由(unicast routing),尽管它也可以用作多播路由(multicast routing),工作原理:

  • 一个queue通过一个routing-key K绑定到一个exchange上
  • 当一条routing-key为R的消息到达direct-exchange时,direct exchange会把消息投递给routing-key跟R相等的queue上面去

direct exchange通常用于循环分配任务给多个工作者,需要注意的是,消息的负载均衡是发生在consumer之间的,并不是发生在queue之间,举个例子(一个direct exchange 通过一个routing-key绑定了多个queue,消息会发到多个queue中,一个queue中消息的消费并不影响另一个queue的消费,也就是不会发生抢消息的行为)


图片引用自rabbitmq官网

Fanout Exchange 扇形交换机

扇形交换机会把他收到的消息发送到所有绑定到它的queue中,并且这里的routing-key是被忽略的。如果有n个queue绑定到了扇形交换机上面,当一条新消息发布到交换机中时,消息会复制n份投递到所有的n个queue中,扇形交换机特别适合做广播消息

图片引用自rabbitmq官网

因为扇形交换机会把消息广播到所有的queue中,他的使用场景非常相似:

  • 很多大型多人在线游戏会用它处理排行榜更新或其他全局事件
  • 体育新闻网站可以准实时地将比分发送给移动客户端
  • 分布式系统可以将不同的状态和配置更新进行广播
  • 可以通过扇形交换机来实现分发消息给群聊中 的用户

topic exchange

topic exchange通过routing-key通配的方式将消息路由到一个或多个队列上面,正因如此,topic exchange一般用于实现各种发布/订阅的变种。topic exchange也常用来做消息的广播,topic exchange有很广泛的用途,当涉及到多个消费者可以选择性的接收它们所关注类型的消息的时候,就可以考虑topic exchange的使用了

使用场景

  • 将数据按照地理位置的相关性做分发,比如销售点
  • 多个执行者处理一个后台任务,每一个执行者处理特定的一块任务
  • 股票价格更新(或者其他金融数据的更新)
  • 新闻更新(包括分类、标签的更新)
  • 云上不同类型服务的服务编排
  • 分布式(架构/系统)软件构建或者打包时,每个builder只能处理一种架构或者系统

spring rabbitmq启动流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
通过SpringFactoriesLoader机制,扫描META-INF/Spring.factories
发现需要加载org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration
加载RabbitAutoConfiguration后发现需要按条件进行装配,RabbitTemplate类加载后,则进行自动装配
加载配置文件RabbitProperties(application.yml或application.properties等配置文件中spring.rabbitmq.下的内容),装配CachingConnectionFactory

在Spring容器初始化完毕,并初始化对应listener bean时

RabbitListenerAnnotationBeanPostProcessor#postProcessAfterInitialization()
//
processAmqpListener()
processListener()
RabbitListenerEndpointRegistrar#registerEndpoint()
//用AmqpListenerEndpointDescriptor包装endPoint和containerfactory
//如果是立刻启动,则调用
RabbitListenerEndpointRegistry#registerListenerContainer()

通过RabbitListenerAnnotationBeanPostProcessor注册被rabbitListener注解的方法


未完待续

参考文章

for RabbitMQ beginners- what is RabbitMQ?