RocketMQ

参考

参考2

参考3

消息队列

有两种模型:队列模型(RabbitMQ)和发布/订阅模型(RocketMQ、Kafka)

异步、削峰、解耦

解耦:

你下单了,你就把你支付成功的消息告诉别的系统,他们收到了去处理就好了,你只用走完自己的流程,把自己的消息发出去,那后面要接入什么系统简单,直接订阅你发送的支付成功消息,你支付成功了我监听就好了。

问题:

数据一致性(使用分布式事务)、高可用、消息重复消费(幂等(强校验、弱校验))、消息丢失、消息的顺序消费(一个topic下有多个队列,为了保证发送有序,RocketMQ提供了MessageQueueSelector队列选择机制)

分布式事务:

  • 2pc(两段式提交)
  • 3pc(三段式提交)
  • TCC(Try、Confirm、Cancel)
  • 最大努力通知
  • XA
  • 本地消息表(ebay研发出的)
  • 半消息/最终一致性(RocketMQ)
    • 业务主动方本地事务提交失败,业务被动方不会收到消息的投递。
    • 只要业务主动方本地事务执行成功,那么消息服务一定会投递消息给下游的业务被动方,并最终保证业务被动方一定能成功消费该消息(消费成功或失败,即最终一定会有一个最终态)。

为了提高并发度,往往发布/订阅模型还会引入队列或者分区的概念。即消息是发往一个主题下的某个队列或者某个分区中。RocketMQ中叫队列,Kafka叫分区,本质一样,例如某个主题下有 5 个队列,那么这个主题的并发度就提高为 5 ,同时可以有 5 个消费者并行消费该主题的消息。一般可以采用轮询或者 key hash 取余等策略来将同一个主题的消息分配到不同的队列中。

与之对应的消费者一般都有组的概念 Consumer Group, 即消费者都是属于某个消费组的。一条消息会发往多个订阅了这个主题的消费组。

假设现在有两个消费组分别是Group 1 和 Group 2,它们都订阅了Topic-a。此时有一条消息发往Topic-a,那么这两个消费组都能接收到这条消息。

然后这条消息实际是写入Topic某个队列中,消费组中的某个消费者对应消费一个队列的消息。

在物理上除了副本拷贝之外,一条消息在Broker中只会有一份,每个消费组会有自己的offset即消费点位来标识消费到的位置。在消费点位之前的消息表明已经消费过了。当然这个offset是队列级别的。每个消费组都会维护订阅的Topic下的每个队列的offset

正文开始

发布/订阅

基本组件:Producer 消息发送者、Broker 消息服务器(存储消息)、Consumer 消息消费、NameServer 路由发现

底层的通信和连接都是基于 Netty 实现 的

路由中心 NameServer

Producer 发送某一主题的消息到 Broker,Broker 负责该消息的持久化存储,Consumer 订阅感兴趣的主题,Broker 根据订阅信息(路由信息)将消息推送到 Consumer(PUSH)或者 Consumer 主动向 Broker 拉取消息(PULL),从而实现 Producer 与 Consumer 解耦。

为了避免 Broker 的单点故障导致的整个系统瘫痪,通常会部署多台 Broker 共同承担消息的存储。那 Producer 如何知道消息要发往哪台 Broker 呢?如果某一台 Broker 宕机了,那么 Producer 如何在不重启服务的情况下感知呢?

这就是 NameServer 的作用了

image-20201221155507719

由图可知,多个 NameServer 之间互不通信,即某一时刻的数据并不会完全一样,但不会对消息发送造成影响,简单高效

Broker 在启动时向所有 NameServer 注册,Producer 在发送消息之前先从 NameServer 获取 Broker 地址列表,然后根据负载算法从列表中选择一台 Broker 进行消息发送。NameServer 与每台 Broker 服务器保持长连接,并间隔 30 s 检测 Broker 是否存活,如果检测到 Broker 宕机,则从路由注册表中将其移除但是路由变化不会马上通知 Producer,这是为了降低 NameServer 的复杂性,在消息发送端提供容错机制来保证消息发送的高可用性。

如果代码中使用了线程池,一种优雅停机的方式就是注册一个 JVM 钩子函数,在 JVM 进程关闭之前,先将线程池关闭,及时释放资源。

路由元消息

NameServer 由 RouteInfoManager 实现,通过 HashMap 存储元消息

  • topicQueueTable:Topic 消息队列路由信息,消息发送时根据路由表进行负载均衡
    • HashMap<String /* topic */, List<QueueData>>
      • topic:List<QueueData>
  • brokerAddrTable:Broker 基础信息,包含 brokerName、所属集群名称、主备 Broker 地址
    • HashMap<String /* brokerName */, BrokerData>
      • brokerName:BrokerData
  • clusterAddrTable:Broker 集群信息,存储集群中所有 Broker 名称
    • HashMap<String /* clusterName */, Set<String /* brokerName */>>
      • clusterName:Set<String /* brokerName */>
  • brokerLiveTable:Broker 状态信息,NameServer 每次收到心跳包时会替换该信息
    • HashMap<String /* brokerAddr */, BrokerLiveinfo>
      • brokerAddr:BrokerLiveinfo
  • filterServerTable:Broker上的 FilterServer 列表,用于类模式消息过滤
    • HashMap<String /* brokerAddr */, List<String> /* Filter Server */>
      • brokerAddr:List<String> /* Filter Server */
image-20201221215150117

image-20201221220307047

image-20201221221810371

RocketMQ 基于订阅发布机制

一个 Topic 拥有多个消息队列

一个 Broker 为每一个 Topic 默认创建 4 个读队列 4 个写队列

多个 Broker 组成一个集群,BrokerName 由相同的多台 Broker 组成 Master-Slave 架构

brokerId 为 0 代表 Master,大于 0 表示 Slave

BrokerLiveInfo 中的 lastUpdateTimestamp 存储上次收到 Broker 心跳包的时间

路由注册

通过 Broker 与 NameServer 的心跳功能实现注册

  • Broker 启动时向集群中所有的 NameServer 发送心跳语句,每隔 30 s 向集群中所有 NameServer 发送心跳包
  • NameServer 收到 Broker 心跳包时会更新 brokerLiveTable 缓存中 BrokerLiveInfo 的 lastUpdateTimestamp
  • 然后 NameServer 每隔 10 s 扫描 brokerLiveTable,如果连续 120 s 没有收到心跳包,NameServer 将移除该 Broker 的路由信息,同时关闭 Socket 连接

注册需要加读写锁(ReentrantReadWriteLock),防止并发修改 RouteInfoManager 中的路由表

NameServe 与 Broker 保持长连接,Broker 状态存储在 brokerLiveTable 中,NameServer 每收到一个心跳包,将更新 brokerLiveTable 中关于 Broker 的状态信息以及路由表(topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable)更新上述路由表使用了 ReentrantReadWriteLock,允许多个消息发送者(Producer)并发读,保证消息发送时的高并发。但同一时刻 NameServer 只处理一个 Broker 心跳包,多个心跳包请求串行执行

路由删除

如果 Broker 宕机,NameServer 无法收到心跳包

NameServer 会每隔 10 s 扫描 brokerLiveTable 状态表,如果 BrokerlLive 的 lastUpdateTimestamp 的时间戳距当前时间超过 120s,则认为 Broker 失效,移除该 Broker,关闭与 Broker 连接,并同时更新 topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable

RocktMQ 有两个触发点来触发路由删除

  • NameServer 定时扫描 brokerLiveTable 检测上次心跳包与当前系统时间的时间差,如果时间戳大于 120 s,则需要移除该 Broker 信息
  • Broker 在正常被关闭的情况下,会执行 unregisterBroker 指令

两种方式触发的路由删除,都是从 topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable 删除与该 Broker 相关的信息,这两种方式维护路由信息时会抽取公共代码

路由发现

RocketMQ 路由发现是非实时的,当 Topic 路由出现变化后,NameServer 不主动推送给客户端,而是由客户端根据主题名定时拉取主题最新的路由

总结

image-20201221232130324

消息

Message 类

Message 的基础属性主要包括消息所属主题 topic、消息 Flag(不做处理)、扩展属性、消息体

Message 扩展属性主要包含下面几个

  • tag:消息 TAG,用于消息过滤(可以看成低级的 topic)
  • keys:Message 索引键,多个用空格隔开,RocketMQ 可以根据这些 key 快速检索到消息
  • waitStoreMsgOK:消息发送时是否等消息存储完成后再返回
  • delayTimeLeve:消息延迟级别,用于定时消息或消息重试

这些扩展属性存储在 Message properties

消息发送

RocketMQ 发送普通消息有 3 种实现方式:可靠同步发送、可靠异步发送、单向 (Oneway)发送

RocketMQ 支持 3 种消息发送方式 :同步 sync、异步 async、单向 oneway

  • 同步:发送者向 MQ 执行发送消息 API 时,同步等待,直到到消息服务器返回发送结果
  • 异步:发送者向 MQ 执行发送消息 API 时,指定消息发送成功后的回调函数,然后调用消息发送 API 后,立即返回,消息发送者线程不阻塞,直到运行结束,消息发送成功或失败的回调任务在另一个新的线程中执行
  • 单向:发送者向 MQ 执行发送消息 API 时,直接返回,不等待消息服务器的结果,也不注册回调函数,简单地说,就是只管发,不在乎消息是否成功存储在消息服务器上

生产者

消息生产者的代码都在 client 模块中

DefaultMQProducer 默认的消息生产者实现类,它实现 MQAdmin 接口

启动

一个 JVM 实例中只存在一个 MQCientManager 实例,维护一个 MQClientlnstance 缓存表(ConcurrentHashMap),也就是一个 clientld 只会创建一个 MQClientinstance

clientld 为客户端 IP + instance +(unitname 可选)

如果在一台物理服务器部署两个应用程序,应用程序 clientld 相同,可能造成混乱,所以

如果 instance 为默认值 DEFAULT 的话, RocketMQ 会自动将 instance 设置为进程 ID ,这样避免了不同进程的相互影响,但同 JVM 的不同消费者和不同生产者在启动时获取到的 MQClientlnstane 实例都是同一个。

MQClientlnstance 封装了 RocketMQ 网络处理 API ,是 Producer、Consumer 与 NameServer、Broker 打交道的网络通道

主要流程

消息发送流程:验证消息、查找路由、消息发送(包括异常处理)(sendMessage 方法)

默认是同步,默认超时时间为 3s

验证

主要是判空、判大小:maxMessageSize = 1024 * 1024 * 4(4M)

查找路由

。。。

根据路由消息选择消息队列,返回的消息队列按照 broker、序号排序

首先在一次消息发送过程中,可能会多次执行选择消息队列这个方法,lastBrokerName 就是上一次选择的执行发送消息失败的 Broker。第一次执行消息队列选择时,lastBrokerName 为 null 此时直接用 sendWhichQueue 自增再获取值,与当前路由表中消息队列个数取模,返回该位置的 MessageQueue(selectOneMessage Queue() 方法),如果消息发送再失败的话,下次进行消息队列选择时规避上次 MessageQueue 所在的 Broker,否则还是很有可能再次失败。

该算法在一次消息发送过程中能成功规避故障的 Broker,但如果 Broker 宕机,由于路由算法中的消息队列是按 Broker排序的,如果上一次根据路由算法选择的是宕机的 Broker 的第一个队列,那么随后的下次选择的是宕机 Broker 的第二个队列,消息发送很有可能会失败,再次引发重试,带来不必要的性能损耗,那么有什么方法在一次消息发送失败后,暂时将该 Broker 排除在消息队列选择范围外呢?或许有朋友会问,Broker不可用后,路由信息中为什么还会包含该 Broker 的路由信息呢?其实这不难解释:首先,Name Server 检测 Broker 是否可用是有延迟的,最短为一次心跳检测间隔(10s),其次,NameServer 不会检测到 Broker 宕机后马上推送消息给消息生产者,而是消息生产者每隔 30s 更新一次路由信息,所以消息生产者最快感知 Broker 最新的路由信息也需要 30s。如果能引入一种机制,在 Broker 宕机期间,如果一次消息发送失败后,可以将该 Broker 暂时排除在消息队列的选择范围中。-》Borker 故障延迟机制

Borker 故障延迟机制

。。。

消息发送

DefaultMQProducerImpl#sendKernelImpl

消息消费

Consumer

支持PUSH和PULL两种消费模式,支持集群消费和广播消息,提供实时的消息订阅机制

  • Pull:拉取型消费者(Pull Consumer)主动从消息服务器拉取信息,只要批量拉取到消息,用户应用就会启动消费过程,所以 Pull 称为主动消费型。

  • Push:推送型消费者(Push Consumer)封装了消息的拉取、消费进度和其他的内部维护工作,将消息到达时执行的回调接口留给用户应用程序来实现。所以 Push 称为被动消费类型,但从实现上看还是从消息服务器中拉取消息,不同于 Pull 的是 Push 首先要注册消费监听器,当监听器处触发后才开始消费消息。


问题

消费模式有几种

  • 集群消费(CLUSTERING)
    • 一个 ConsumerGroup 中的 Consumer 实例平均分摊消费消息。例如某个 Topic有 9 条消息,其中一个 ConsumerGroup 有 3 个实例(可能是 3 个进程,或者 3 台机器),那么每个实例只消费其中部分,消费完的消息不能被其他实例消费
    • 存在消息重复消费
  • 广播消费(BROADCASTING)
    • 一条消息被多个 Consumer 消费,即使这些 Consumer 属于同一个ConsumerGroup,消息也会被 ConsumerGroup 中的每个 Consumer 都消费一次,广播消费中 ConsumerGroup 概念可以认为在消息划分方面无意义

消息重复消费

  • 发送时消息重复
    • 当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且Message ID也相同的消息
  • 投递时消息重复
    • 消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。为了保证消息至少被消费一次,消息队列RocketMQ版的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且Message ID也相同的消息
  • 负载均衡时消息重复(包括但不限于网络抖动、Broker重启以及消费者应用重启)
    • 当消息队列RocketMQ版的Broker或客户端重启、扩容或缩容时,会触发Rebalance,此时消费者可能会收到重复消息
  • 通过幂等性来保证不重复
    • 因为Message ID有可能出现冲突(重复)的情况,所以真正安全的幂等处理,不建议以Message ID作为处理依据。最好的方式是以业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消息Key设置
      • 以支付场景为例,可以将消息的Key设置为订单号,作为幂等处理的依据
      • 消费者收到消息时可以根据消息的Key,即订单号来实现消息幂等
      • 即生产者 setkey,在消费者 getkey

消息顺序消费

首先多个 queue 只能保证单个 queue 里的顺序,queue 是典型的 FIFO,天然顺序。多个queue 同时消费是无法绝对保证消息的有序性的

可以使用同一 topic,同一个 queue,发消息的时候一个线程去发送消息,消费的时候 一个线程去消费一个 queue 里的消息

即:

如果是使用 MessageListenerOrderly 则自带此实现

如果是使用 MessageListenerConcurrently,则需要把线程池改为单线程模式

rocketmq怎么保证队列完全顺序消费? - Jaskey Lam的回答 - 知乎 https://www.zhihu.com/question/30195969/answer/142416274

保证消息不丢失

https://segmentfault.com/a/1190000023661463

Producer

采取 send() 同步发消息,发送结果是同步感知的。发送失败后可以重试,设置重试次数。默认3次

Broker

修改刷盘策略为同步刷盘。默认情况下是异步刷盘的。集群部署

Consumer

完全消费正常后在进行手动ack确认

实现分布式事务

https://www.jianshu.com/p/878fed4e3165

half message

  • 发送prepared消息(half消息,一个意思)

  • 消息成功后执行本地事务

  • 本地事务成功,发送confirm消息

以上是正常成功流程

  • 本地事务失败处理:在发送prepared消息时,会在MQ Server注册监听回调,MQ Server会启定时任务,查询MQ服务器上所有的prepared状态消息,根据消息id,回查接入方producor,看本地事务是否成功,根据本地事务成功与否,确认是发送confirm消息还是callback消息。
  • 最后,mq订阅方都是通过拉的方式,去消费。往MQ Server发送confirm消息就是,根据消息id查找到对应log,把消息状态置为commit,而MQ订阅方就是拉取commit的消息。

消息堆积

1、如果可以添加消费者解决,就添加消费者的数据量

2、如果出现了queue,但是消费者多的情况。可以使用准备一个临时的topic,同时创建一些queue,在临时创建一个消费者来把这些消息转移到topic中,让消费者消费