MQ消息队列(2)RabbitMQ-AMQP

1.RabbitMQ概述

消息队列(Message Queue)起源于一位来自MIT的硬件设计工作者Vivek Randive,其设想一种通用的软件总线,类似电路板上的总线一样,可以供其他软件接入。Vivek在1983年成立了Teknekron公司,高盛等公司作为第一批在金融中使用Teknekorn的软件,同时还诞生了第一代消息队列软件:Teknekron的TIB(THE Information Bus).

  Teknekron的TIB可以让开发者建立一系列的规则去描述消息内容,只需要将消息按照这个规则发送出去,任何消费者只要订阅了当前的内容,就可以接受到这些发送的消息,从而让生产者和消费者完全解耦,同时可以在消息的传递过程中灵活的结合。这个软件后来后来就受到了电信行业特别是新闻机构的注意。1995年路透社收购了Teknekron公司。

  由于消息队列在金融交易中取得的不错的反响。1990年开始IBM公司也开始研制自己的消息队列软件(IBM MQ),并且逐步发展成为WebSphereMQ同时占领着广阔的商业消息队列平台市场。于此同时,微软也开始了消息队列的研制MSMS(Microsoft Message Queue)应运而生,随着众多厂商参与MQ的研制,同时带来了一个问题,各个厂商的MQ无法进行互通。为了解决这个问题,JMS(java message service)在2001年诞生了,其试图通过提供公共java API的方式来隐藏各个供应商提供的MQ实际的接口,从而解决各个MQ之间互通的问题,但是由于使用单独的标准化接口来胶合不同的接口应用程序反而变的更加脆弱,最终失败了。

  但在2004年JPMorgan Chase和IMatix公司一起合作开发了Advanced Message Queueing Protocl(AMQP,高级消息队列协议),从一开始就设计成一个开放标准协议,任何公司都可以使用这个协议来设计自己的MQ产品,这样任何使用这种协议的MQ就可以进行互通使用。RabbitMQ就是基于AMQP协议进行的一个MQ实现。

2.AMQP(Advanced Message Queuing Protocol)高级消息队列协议

2.1 AMQP协议简介

  AMQP(advanced message queue protocol)高级消息队列协议,是一个公开的应用层协议标准,基于此协议可以实现客户端和中间件传递消息并不受客户端和消息中间件的不同产品,不同语言的制约。以下简单介绍一些其中的重要组成:

20220118155221
  由于amqp是一种网络协议,所以这个过程中的生产者,消费者,消费代理可以存在于不同的设备之上。AMQP 0-9-1的工作流程如上,消息和发布者发送给交换机,交换机通常被比喻成邮局,然后交换机将收到的消息根据路由规则发给绑定的队列,最后AMQP代理会将消息投递给订阅了当前队列的消费或者消费者按照自己的需求来使用。

2.2 Exchange(交换机)

  • 2.2.1 默认交换机(default exchange)
      默认交换机是一个由消息队列预先声明好的没有名字的直连交换机,对于默认交换机而言,新建的每一个队列都会自动的绑定到这个默认交换机上面,绑定的路由键(routing key)名称和队列名称相同。
      例如:当声明一个“queueA”的队列,AMQP会自动地将其绑定到默认交换机上面,同时绑定的路由名称键名也是“queueA”,因此当携带有一样的routing key的消息来时就会被路由到queueA的队列中。

  • 2.2.2 直连交换机播路由(unicast routing)。
      1)将一个队列A绑定到某个交换机上面,赋予该绑定一个绑定键(binding key),为R
      2)当一个携带路由键R的消息来到交换机,就会把该消息发送到队列A中。
    20220118150124
      直连交换机的队列通常是循环发送任务给多个消费者,轮询的方式。

  • 2.2.3 扇形交换机
    20220118151156
      扇形交换机(funout exchange)将消息路由绑定给到他身上所有的队列中,而不会去管路由键(routing key),如果N个队列绑定到一个扇形路由器上面,当有消息发送到这个路由器上时,交换机会将消息发送到所有的N个队列中。扇形交换机用来处理消息的广播路由(broadcast routing)。
      由于扇形路由器的这种特性,和一些的使用场景非常像:
      1)大规模用户在线游戏使用它来处理排行榜更新等全局事件;
      2)体育新闻网站用来近乎实时地将比分更新到移动客户端;
      3)分发系统用它来广播各种状态和配置更新;

  • 2.2.4 主题交换机
      主题交换机(topic exchage)与之前的之前交换机较复杂,但是可以处理的场景也更加灵活。本质还是一个直连交换机,同样需要将传来的消息中的routing key和队列绑定的binding key进行对应,不过可以对其中的key进行如正则匹配的处理。将原有的一个key按照’.’进行切分成多个单词,每个单词可以用“”,“#”这两个关键字进行匹配。
      其中的
    表示一个单词,#表示匹配零个或者多个单词。
    如:主题的定义是根据动物不同特点来分发 速度.颜色.名称.
      队列A根据binding key:“*.red.”绑定到交换机上
      队列B根据binding key:“low.#”和“
    .yellow.*”绑定到主题交换机上面
    使用上面的方法,可以将所有的红色动物的消息发送到队列A中,将所有的跑到慢的动物和黄色的动物消息发送到队列B中来消费。
    20220118152332

2.3 Queue 队列

  AMQP中的队列跟其他消息队列中的队列是相似的,其中存储着需要消费的消息。

  • 2.3.1 队列属性
    队列属性和交换机共享一些属性,但也有一些是其特有的
    1)Name
    2)Durable 队列持久化,消息队列重启后队列是否依旧存在,注意只可以保证其队列的持久化,不能保证队列中原还没有消费的消息也持久化
    3)Exclusive 只被一个连接,当连接关闭时队列即被销毁
    4)Auto-delete 当最后一个消费者退订后即被销毁
    5)Arauments 一些消息代理用它来完成类似与TTL的某些额外功能

  • 2.3.2 队列的创建
      队列在声明后才可以被使用,如果一个队列开始不存在在声明时会对它进行创建,如果队列已经存在了而且和声明的属性相同,那么这个声明就不会对原有的队列有任何影响,如果和原有的队列属性有差异,那么就会抛出406的通道异常。

  • 2.3.3 队列持久化
      持久化话队列会将队列的信息存储在硬盘上,当消息代理broker重启的时候,会依旧存在,没有被持久化队列被称之为暂存队列Transient queues,并不是所有的场景和案例都需要持久化队列。
      注意:持久化队列并不会将路由到当前都列的消息也持久化。

2.4 消费者Consumer

  消息如果只是被存储在队列中不被消费是没有意义的,在amqp 0-9-1模型中,有两种途径可以让消费者来消费队列中的消息:
1)将消息投递给消费者应用(push API)
2) 消费者应用主动地获取消息(pull API)
  使用push的消费者应用需要明确表达它对某个特定的队列感兴趣,可以规定为该队列注册了一个消费者,或者叫该消费者订阅了一个队列,注意一个队列可以注册多个消费者,也可以注册一个独享的消费者,当独享消费者存在时,其他的消费者即被排除在外。
  每个消费者都有一个消费者标签的标识符,可以用来退订消息,消费者标签实际上就是一个字符串。

2.5 消息确认

  • 2.5.1 消息确认
      由于AMQP是一个网络协议,在处理消息的时候会碰见各种因为网络问题消费丢掉的问题,所以需要我对消息进行确认。在AMQP中有两种方法来对消息进行确认:
      1)自动确认模式:当消息代理broker将消息发送后立即删除消息,basic.deliver或者basic.get-ok。
      2)显示确认模式:当broker发送一个消息之后,需要等待一个确认回执acknowledgement后再删除消息,basic.ack。
      如果是第二种方法,当一个消费者当在未发送回执的情况下挂掉了,那么broker代理会将消息重新投递给另外的一个消费者,如果没有其他的消费者,消息代理会等待下一个订阅当前队列的消费者,然后再次尝试投递。

  • 2.5.2 拒绝消息
      当一个消费者拒绝消息之后,消费者可以告诉消息代理如何处理这条消息-销毁或者重新返回队列中。
      注意:当只有一个消费者订阅当前队列,确实不要因为消费者拒绝了消息放回在队列中又重新返回给这个消费者,造成不断循环拒绝放回。
      在AMQP中,basic.reject方法用来拒绝消息的操作,但是有个限制不能用来拒绝多个带有确认回执的消息,如果使用的是RabbitMQ可以使用nacks来解决这个问题。

  • 2.5.3 预读消息
      在多个消费者共享一个队列的案例中,明确指定在收到下一个确认回执前每个消费者一次可以接受多少条消息是非常有用的,可以在视图批量发布消息的时候起到简单的负载均衡和提供吞吐量的作用。RabbitMQ只支持通道级的预取计数,而不是连接级的预取。

  • 2.5.4 消息属性
      AMQP模型中的消息是带有属性的,常见的有以下这些:

    类型 字段 名称
    Content type 内容类型
    Content encoding 内容编码
    Routing key 路由键
    Delivery mode persistent or not
    Message priority 消息优先权
    Message publishing timestamp
    Expiration period 消息有效期
    Publisher application id 发布应用的id
  • 2.5.5 消息主体
      AMQP的消息除了属性之外还有一个有效载荷Payload消息实际携带的数据,被AMQP代理当成一个不透明的字节数组来对待。消息代理不会检查或者修改有效载荷,消息可以只包含属性而不携带任何有效载荷,通常会使用类似JSON这种序列化的格式数据,为了节省传输的数据大小,协议为缓冲器会和MessagePack将结构化的数据序列化,以便以消息的有效载荷方式来传递。通常会用content-type和content-encoding这两个字段来与消息沟通有效载荷的辨识工作。

  • 2.5.6 消息持久化
      消息也能够被持久化,AMQP会将消息保存到磁盘上,如果代理重启,系统会确认保存的消息未丢失。
      需要注意的是:简单的对队列持久化并不会让其中的消息也持久化,凶啊系的持久化需要设置消息自己的模式persistence mode,将消息持久化之后自然会影响性能,所以需要在根据自己的情景来设置是否需要对消息进行持久化,健壮性的增加自然需要牺牲一下性能。

  • 2.5.7 连接
      AMQP的连接通常是长连接,AMQP是使用TCP提供可靠的应用层协议。AMQP使用认证机制并且提供TLS保护,当一个应用不需要连接到AMQP代理的时候,只需要释放到AMQP的连接,而不是直接将TCP连接关闭。

  • 2.5.8 通道
      在有些系统中需要与AMQP建立多个连接,但是同时开启多个TCP连接会消耗很多资源,因此AMQP提供了通道Channels来处理多连接,可以将通道看成一个TCP连接的多个轻量化连接。
      在涉及多线程或者多进程的系统中,为每个线程开启一个通道是常见的,并且这些通道之间不会被共享资源。

  • 2.5.9 虚拟主机vhost