RocketMQ
消息队列
有两种模型:队列模型(RabbitMQ)和发布/订阅模型(RocketMQ、Kafka)
异步、削峰、解耦
解耦:
你下单了,你就把你支付成功的消息告诉别的系统,他们收到了去处理就好了,你只用走完自己的流程,把自己的消息发出去,那后面要接入什么系统简单,直接订阅你发送的支付成功消息,你支付成功了我监听就好了。
问题:
数据一致性(使用分布式事务)、高可用、消息重复消费(幂等(强校验、弱校验))、消息丢失、消息的顺序消费(一个topic下有多个队列,为了保证发送有序,RocketMQ提供了MessageQueueSelector队列选择机制)
分布式事务:
- 2pc(两段式提交)
- 3pc(三段式提交)
- TCC(Try、Confirm、Cancel)
- 最大努力通知
- XA
- 本地消息表(ebay研发出的)
- 半消息/最终一致性(RocketMQ)
- 业务主动方本地事务提交失败,业务被动方不会收到消息的投递。
- 只要业务主动方本地事务执行成功,那么消息服务一定会投递消息给下游的业务被动方,并最终保证业务被动方一定能成功消费该消息(消费成功或失败,即最终一定会有一个最终态)。
为了提高并发度,往往发布/订阅模型还会引入队列或者分区的概念。即消息是发往一个主题下的某个队列或者某个分区中。RocketMQ中叫队列,Kafka叫分区,本质一样,例如某个主题下有 5 个队列,那么这个主题的并发度就提高为 5 ,同时可以有 5 个消费者并行消费该主题的消息。一般可以采用轮询或者 key hash 取余等策略来将同一个主题的消息分配到不同的队列中。
与之对应的消费者一般都有组的概念 Consumer Group, 即消费者都是属于某个消费组的。一条消息会发往多个订阅了这个主题的消费组。
假设现在有两个消费组分别是Group 1 和 Group 2,它们都订阅了Topic-a。此时有一条消息发往Topic-a,那么这两个消费组都能接收到这条消息。
然后这条消息实际是写入Topic某个队列中,消费组中的某个消费者对应消费一个队列的消息。
在物理上除了副本拷贝之外,一条消息在Broker中只会有一份,每个消费组会有自己的offset即消费点位来标识消费到的位置。在消费点位之前的消息表明已经消费过了。当然这个offset是队列级别的。每个消费组都会维护订阅的Topic下的每个队列的offset
正文开始
发布/订阅
基本组件:Producer 消息发送者、Broker 消息服务器(存储消息)、Consumer 消息消费、NameServer 路由发现
底层的通信和连接都是基于 Netty 实现 的
路由中心 NameServer
Producer 发送某一主题的消息到 Broker,Broker 负责该消息的持久化存储,Consumer 订阅感兴趣的主题,Broker 根据订阅信息(路由信息)将消息推送到 Consumer(PUSH)或者 Consumer 主动向 Broker 拉取消息(PULL),从而实现 Producer 与 Consumer 解耦。
为了避免 Broker 的单点故障导致的整个系统瘫痪,通常会部署多台 Broker 共同承担消息的存储。那 Producer 如何知道消息要发往哪台 Broker 呢?如果某一台 Broker 宕机了,那么 Producer 如何在不重启服务的情况下感知呢?
这就是 NameServer 的作用了
由图可知,多个 NameServer 之间互不通信,即某一时刻的数据并不会完全一样,但不会对消息发送造成影响,简单高效
Broker 在启动时向所有 NameServer 注册,Producer 在发送消息之前先从 NameServer 获取 Broker 地址列表,然后根据负载算法从列表中选择一台 Broker 进行消息发送。NameServer 与每台 Broker 服务器保持长连接,并间隔 30 s 检测 Broker 是否存活,如果检测到 Broker 宕机,则从路由注册表中将其移除。但是路由变化不会马上通知 Producer,这是为了降低 NameServer 的复杂性,在消息发送端提供容错机制来保证消息发送的高可用性。
如果代码中使用了线程池,一种优雅停机的方式就是注册一个 JVM 钩子函数,在 JVM 进程关闭之前,先将线程池关闭,及时释放资源。
路由元消息
NameServer 由 RouteInfoManager 实现,通过 HashMap 存储元消息
- topicQueueTable:Topic 消息队列路由信息,消息发送时根据路由表进行负载均衡
- HashMap<String /* topic */, List<QueueData>>
- topic:List<QueueData>
- HashMap<String /* topic */, List<QueueData>>
- brokerAddrTable:Broker 基础信息,包含 brokerName、所属集群名称、主备 Broker 地址
- HashMap<String /* brokerName */, BrokerData>
- brokerName:BrokerData
- HashMap<String /* brokerName */, BrokerData>
- clusterAddrTable:Broker 集群信息,存储集群中所有 Broker 名称
- HashMap<String /* clusterName */, Set<String /* brokerName */>>
- clusterName:Set<String /* brokerName */>
- HashMap<String /* clusterName */, Set<String /* brokerName */>>
- brokerLiveTable:Broker 状态信息,NameServer 每次收到心跳包时会替换该信息
- HashMap<String /* brokerAddr */, BrokerLiveinfo>
- brokerAddr:BrokerLiveinfo
- HashMap<String /* brokerAddr */, BrokerLiveinfo>
- filterServerTable:Broker上的 FilterServer 列表,用于类模式消息过滤
- HashMap<String /* brokerAddr */, List<String> /* Filter Server */>
- brokerAddr:List<String> /* Filter Server */
- HashMap<String /* brokerAddr */, List<String> /* Filter Server */>
RocketMQ 基于订阅发布机制
一个 Topic 拥有多个消息队列
一个 Broker 为每一个 Topic 默认创建 4 个读队列 4 个写队列
多个 Broker 组成一个集群,BrokerName 由相同的多台 Broker 组成 Master-Slave 架构
brokerId 为 0 代表 Master,大于 0 表示 Slave
BrokerLiveInfo 中的 lastUpdateTimestamp 存储上次收到 Broker 心跳包的时间
路由注册
通过 Broker 与 NameServer 的心跳功能实现注册
- Broker 启动时向集群中所有的 NameServer 发送心跳语句,每隔 30 s 向集群中所有 NameServer 发送心跳包
- NameServer 收到 Broker 心跳包时会更新 brokerLiveTable 缓存中 BrokerLiveInfo 的 lastUpdateTimestamp
- 然后 NameServer 每隔 10 s 扫描 brokerLiveTable,如果连续 120 s 没有收到心跳包,NameServer 将移除该 Broker 的路由信息,同时关闭 Socket 连接
注册需要加读写锁(ReentrantReadWriteLock),防止并发修改 RouteInfoManager 中的路由表
NameServe 与 Broker 保持长连接,Broker 状态存储在 brokerLiveTable 中,NameServer 每收到一个心跳包,将更新 brokerLiveTable 中关于 Broker 的状态信息以及路由表(topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable)更新上述路由表使用了 ReentrantReadWriteLock,允许多个消息发送者(Producer)并发读,保证消息发送时的高并发。但同一时刻 NameServer 只处理一个 Broker 心跳包,多个心跳包请求串行执行
路由删除
如果 Broker 宕机,NameServer 无法收到心跳包
NameServer 会每隔 10 s 扫描 brokerLiveTable 状态表,如果 BrokerlLive 的 lastUpdateTimestamp 的时间戳距当前时间超过 120s,则认为 Broker 失效,移除该 Broker,关闭与 Broker 连接,并同时更新 topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable
RocktMQ 有两个触发点来触发路由删除
- NameServer 定时扫描 brokerLiveTable 检测上次心跳包与当前系统时间的时间差,如果时间戳大于 120 s,则需要移除该 Broker 信息
- Broker 在正常被关闭的情况下,会执行 unregisterBroker 指令
两种方式触发的路由删除,都是从 topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable 删除与该 Broker 相关的信息,这两种方式维护路由信息时会抽取公共代码
路由发现
RocketMQ 路由发现是非实时的,当 Topic 路由出现变化后,NameServer 不主动推送给客户端,而是由客户端根据主题名定时拉取主题最新的路由
总结
消息
Message 类
Message 的基础属性主要包括消息所属主题 topic、消息 Flag(不做处理)、扩展属性、消息体
Message 扩展属性主要包含下面几个
- tag:消息 TAG,用于消息过滤(可以看成低级的 topic)
- keys:Message 索引键,多个用空格隔开,RocketMQ 可以根据这些 key 快速检索到消息
- waitStoreMsgOK:消息发送时是否等消息存储完成后再返回
- delayTimeLeve:消息延迟级别,用于定时消息或消息重试
这些扩展属性存储在 Message properties
消息发送
RocketMQ 发送普通消息有 3 种实现方式:可靠同步发送、可靠异步发送、单向 (Oneway)发送
RocketMQ 支持 3 种消息发送方式 :同步 sync、异步 async、单向 oneway
- 同步:发送者向 MQ 执行发送消息 API 时,同步等待,直到到消息服务器返回发送结果
- 异步:发送者向 MQ 执行发送消息 API 时,指定消息发送成功后的回调函数,然后调用消息发送 API 后,立即返回,消息发送者线程不阻塞,直到运行结束,消息发送成功或失败的回调任务在另一个新的线程中执行
- 单向:发送者向 MQ 执行发送消息 API 时,直接返回,不等待消息服务器的结果,也不注册回调函数,简单地说,就是只管发,不在乎消息是否成功存储在消息服务器上
生产者
消息生产者的代码都在 client 模块中
DefaultMQProducer 默认的消息生产者实现类,它实现 MQAdmin 接口
启动
一个 JVM 实例中只存在一个 MQCientManager 实例,维护一个 MQClientlnstance 缓存表(ConcurrentHashMap),也就是一个 clientld 只会创建一个 MQClientinstance
clientld 为客户端 IP + instance +(unitname 可选)
如果在一台物理服务器部署两个应用程序,应用程序 clientld 相同,可能造成混乱,所以
如果 instance 为默认值 DEFAULT 的话, RocketMQ 会自动将 instance 设置为进程 ID ,这样避免了不同进程的相互影响,但同 JVM 的不同消费者和不同生产者在启动时获取到的 MQClientlnstane 实例都是同一个。
MQClientlnstance 封装了 RocketMQ 网络处理 API ,是 Producer、Consumer 与 NameServer、Broker 打交道的网络通道
主要流程
消息发送流程:验证消息、查找路由、消息发送(包括异常处理)(sendMessage 方法)
默认是同步,默认超时时间为 3s
验证
主要是判空、判大小:maxMessageSize = 1024 * 1024 * 4(4M)
查找路由
。。。
根据路由消息选择消息队列,返回的消息队列按照 broker、序号排序
首先在一次消息发送过程中,可能会多次执行选择消息队列这个方法,lastBrokerName 就是上一次选择的执行发送消息失败的 Broker。第一次执行消息队列选择时,lastBrokerName 为 null 此时直接用 sendWhichQueue 自增再获取值,与当前路由表中消息队列个数取模,返回该位置的 MessageQueue(selectOneMessage Queue() 方法),如果消息发送再失败的话,下次进行消息队列选择时规避上次 MessageQueue 所在的 Broker,否则还是很有可能再次失败。
该算法在一次消息发送过程中能成功规避故障的 Broker,但如果 Broker 宕机,由于路由算法中的消息队列是按 Broker排序的,如果上一次根据路由算法选择的是宕机的 Broker 的第一个队列,那么随后的下次选择的是宕机 Broker 的第二个队列,消息发送很有可能会失败,再次引发重试,带来不必要的性能损耗,那么有什么方法在一次消息发送失败后,暂时将该 Broker 排除在消息队列选择范围外呢?或许有朋友会问,Broker不可用后,路由信息中为什么还会包含该 Broker 的路由信息呢?其实这不难解释:首先,Name Server 检测 Broker 是否可用是有延迟的,最短为一次心跳检测间隔(10s),其次,NameServer 不会检测到 Broker 宕机后马上推送消息给消息生产者,而是消息生产者每隔 30s 更新一次路由信息,所以消息生产者最快感知 Broker 最新的路由信息也需要 30s。如果能引入一种机制,在 Broker 宕机期间,如果一次消息发送失败后,可以将该 Broker 暂时排除在消息队列的选择范围中。-》Borker 故障延迟机制
Borker 故障延迟机制
。。。
消息发送
DefaultMQProducerImpl#sendKernelImpl
消息消费
Consumer
支持PUSH和PULL两种消费模式,支持集群消费和广播消息,提供实时的消息订阅机制
Pull:拉取型消费者(Pull Consumer)主动从消息服务器拉取信息,只要批量拉取到消息,用户应用就会启动消费过程,所以 Pull 称为主动消费型。
Push:推送型消费者(Push Consumer)封装了消息的拉取、消费进度和其他的内部维护工作,将消息到达时执行的回调接口留给用户应用程序来实现。所以 Push 称为被动消费类型,但从实现上看还是从消息服务器中拉取消息,不同于 Pull 的是 Push 首先要注册消费监听器,当监听器处触发后才开始消费消息。
问题
消费模式有几种
- 集群消费(CLUSTERING)
- 一个 ConsumerGroup 中的 Consumer 实例平均分摊消费消息。例如某个 Topic有 9 条消息,其中一个 ConsumerGroup 有 3 个实例(可能是 3 个进程,或者 3 台机器),那么每个实例只消费其中部分,消费完的消息不能被其他实例消费
- 存在消息重复消费
- 广播消费(BROADCASTING)
- 一条消息被多个 Consumer 消费,即使这些 Consumer 属于同一个ConsumerGroup,消息也会被 ConsumerGroup 中的每个 Consumer 都消费一次,广播消费中 ConsumerGroup 概念可以认为在消息划分方面无意义
消息重复消费
- 发送时消息重复
- 当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且Message ID也相同的消息
- 投递时消息重复
- 消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。为了保证消息至少被消费一次,消息队列RocketMQ版的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且Message ID也相同的消息
- 负载均衡时消息重复(包括但不限于网络抖动、Broker重启以及消费者应用重启)
- 当消息队列RocketMQ版的Broker或客户端重启、扩容或缩容时,会触发Rebalance,此时消费者可能会收到重复消息
- 通过幂等性来保证不重复
- 因为Message ID有可能出现冲突(重复)的情况,所以真正安全的幂等处理,不建议以Message ID作为处理依据。最好的方式是以业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消息Key设置
- 以支付场景为例,可以将消息的Key设置为订单号,作为幂等处理的依据
- 消费者收到消息时可以根据消息的Key,即订单号来实现消息幂等
- 即生产者 setkey,在消费者 getkey
- 因为Message ID有可能出现冲突(重复)的情况,所以真正安全的幂等处理,不建议以Message ID作为处理依据。最好的方式是以业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消息Key设置
消息顺序消费
首先多个 queue 只能保证单个 queue 里的顺序,queue 是典型的 FIFO,天然顺序。多个queue 同时消费是无法绝对保证消息的有序性的
可以使用同一 topic,同一个 queue,发消息的时候一个线程去发送消息,消费的时候 一个线程去消费一个 queue 里的消息
即:
如果是使用 MessageListenerOrderly 则自带此实现
如果是使用 MessageListenerConcurrently,则需要把线程池改为单线程模式
rocketmq怎么保证队列完全顺序消费? - Jaskey Lam的回答 - 知乎 https://www.zhihu.com/question/30195969/answer/142416274
保证消息不丢失
Producer
采取 send() 同步发消息,发送结果是同步感知的。发送失败后可以重试,设置重试次数。默认3次
Broker
修改刷盘策略为同步刷盘。默认情况下是异步刷盘的。集群部署
Consumer
完全消费正常后在进行手动ack确认
实现分布式事务
half message
发送prepared消息(half消息,一个意思)
消息成功后执行本地事务
本地事务成功,发送confirm消息
以上是正常成功流程
- 本地事务失败处理:在发送prepared消息时,会在MQ Server注册监听回调,MQ Server会启定时任务,查询MQ服务器上所有的prepared状态消息,根据消息id,回查接入方producor,看本地事务是否成功,根据本地事务成功与否,确认是发送confirm消息还是callback消息。
- 最后,mq订阅方都是通过拉的方式,去消费。往MQ Server发送confirm消息就是,根据消息id查找到对应log,把消息状态置为commit,而MQ订阅方就是拉取commit的消息。
消息堆积
1、如果可以添加消费者解决,就添加消费者的数据量
2、如果出现了queue,但是消费者多的情况。可以使用准备一个临时的topic,同时创建一些queue,在临时创建一个消费者来把这些消息转移到topic中,让消费者消费