消息队列知识点 & 面试题总结
“RabbitMQ?”“Kafka?”“RocketMQ?”…在日常学习与开发过程中,我们常常听到消息队列这个关键词。我也在我的多篇文章中提到了这个概念。可能你是熟练使用消息队列的老手,又或者你是不懂消息队列的新手,不论你了不了解消息队列,本文都将带你搞懂消息队列的一些基本理论。如果你是老手,你可能从本文学到你之前不曾注意的一些关于消息队列的重要概念,如果你是新手,相信本文将是你打开消息队列大门的一板砖。
一 什么是消息队列
我们可以把消息队列看作是一个存放消息的容器,当我们需要使用消息的时候,直接从容器中取出消息供自己使用即可。
消息队列是分布式系统中重要的组件之一。使用消息队列主要是为了通过异步处理提高系统性能和削峰、降低系统耦合性。
我们知道队列 Queue 是一种先进先出的数据结构,所以消费消息时也是按照顺序来消费的。
二 为什么要用消息队列
通常来说,使用消息队列能为我们的系统带来下面三点好处:
- 通过异步处理提高系统性能(减少响应所需时间)。
- 削峰/限流
- 降低系统耦合性。
如果在面试的时候你被面试官问到这个问题的话,一般情况是你在你的简历上涉及到消息队列这方面的内容,这个时候推荐你结合你自己的项目来回答。
《大型网站技术架构》第四章和第七章均有提到消息队列对应用性能及扩展性的提升。
2.1 通过异步处理提高系统性能(减少响应所需时间)
将用户的请求数据存储到消息队列之后就立即返回结果。随后,系统再对消息进行消费。
因为用户请求数据写入消息队列之后就立即返回给用户了,但是请求数据在后续的业务校验、写数据库等操作中可能失败。因此,使用消息队列进行异步处理之后,需要适当修改业务流程进行配合,比如用户在提交订单之后,订单数据写入消息队列,不能立即返回用户订单提交成功,需要在消息队列的订单消费者进程真正处理完该订单之后,甚至出库后,再通过电子邮件或短信通知用户订单成功,以免交易纠纷。这就类似我们平时手机订火车票和电影票。
2.2 削峰/限流
先将短时间高并发产生的事务消息存储在消息队列中,然后后端服务再慢慢根据自己的能力去消费这些消息,这样就避免直接把后端服务打垮掉。
举例:在电子商务一些秒杀、促销活动中,合理使用消息队列可以有效抵御促销活动刚开始大量订单涌入对系统的冲击。如下图所示:
2.3 降低系统耦合性
使用消息队列还可以降低系统耦合性。我们知道如果模块之间不存在直接调用,那么新增模块或者修改模块就对其他模块影响较小,这样系统的可扩展性无疑更好一些。还是直接上图吧:
生产者(客户端)发送消息到消息队列中去,接受者(服务端)处理消息,需要消费的系统直接去消息队列取消息进行消费即可而不需要和其他系统有耦合,这显然也提高了系统的扩展性。
消息队列使用发布-订阅模式工作,消息发送者(生产者)发布消息,一个或多个消息接受者(消费者)订阅消息。 从上图可以看到消息发送者(生产者)和消息接受者(消费者)之间没有直接耦合,消息发送者将消息发送至分布式消息队列即结束对消息的处理,消息接受者从分布式消息队列获取该消息后进行后续处理,并不需要知道该消息从何而来。对新增业务,只要对该类消息感兴趣,即可订阅该消息,对原有系统和业务没有任何影响,从而实现网站业务的可扩展性设计。
消息接受者对消息进行过滤、处理、包装后,构造成一个新的消息类型,将消息继续发送出去,等待其他消息接受者订阅该消息。因此基于事件(消息对象)驱动的业务架构可以是一系列流程。
另外,为了避免消息队列服务器宕机造成消息丢失,会将成功发送到消息队列的消息存储在消息生产者服务器上,等消息真正被消费者服务器处理后才删除消息。在消息队列服务器宕机后,生产者服务器会选择分布式消息队列服务器集群中的其他服务器发布消息。
备注: 不要认为消息队列只能利用发布-订阅模式工作,只不过在解耦这个特定业务环境下是使用发布-订阅模式的。除了发布-订阅模式,还有点对点订阅模式(一个消息只有一个消费者),我们比较常用的是发布-订阅模式。另外,这两种消息模型是 JMS 提供的,AMQP 协议还提供了 5 种消息模型。
三 使用消息队列带来的一些问题
- 系统可用性降低: 系统可用性在某种程度上降低,为什么这样说呢?在加入 MQ 之前,你不用考虑消息丢失或者说 MQ 挂掉等等的情况,但是,引入 MQ 之后你就需要去考虑了!
- 系统复杂性提高: 加入 MQ 之后,你需要保证消息没有被重复消费、处理消息丢失的情况、保证消息传递的顺序性等等问题!
- 一致性问题: 我上面讲了消息队列可以实现异步,消息队列带来的异步确实可以提高系统响应速度。但是,万一消息的真正消费者并没有正确消费消息怎么办?这样就会导致数据不一致的情况了!
四 JMS VS AMQP
4.1 JMS
4.1.1 JMS 简介
JMS(JAVA Message Service,java 消息服务)是 java 的消息服务,JMS 的客户端之间可以通过 JMS 服务进行异步的消息传输。JMS(JAVA Message Service,Java 消息服务)API 是一个消息服务的标准或者说是规范,允许应用程序组件基于 JavaEE 平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。
ActiveMQ 就是基于 JMS 规范实现的。
4.1.2 JMS 两种消息模型
① 点到点(P2P)模型
使用队列(Queue)作为消息通信载体;满足生产者与消费者模式,一条消息只能被一个消费者使用,未被消费的消息在队列中保留直到被消费或超时。比如:我们生产者发送 100 条消息的话,两个消费者来消费一般情况下两个消费者会按照消息发送的顺序各自消费一半(也就是你一个我一个的消费。)
② 发布/订阅(Pub/Sub)模型
发布订阅模型(Pub/Sub) 使用主题(Topic)作为消息通信载体,类似于广播模式;发布者发布一条消息,该消息通过主题传递给所有的订阅者,在一条消息广播之后才订阅的用户则是收不到该条消息的。
4.1.3 JMS 五种不同的消息正文格式
JMS 定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。
- StreamMessage – Java 原始值的数据流
- MapMessage–一套名称-值对
- TextMessage–一个字符串对象
- ObjectMessage–一个序列化的 Java 对象
- BytesMessage–一个字节的数据流
4.2 AMQP
AMQP,即 Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准 高级消息队列协议(二进制应用层协议),是应用层协议的一个开放标准,为面向消息的中间件设计,兼容 JMS。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件同产品,不同的开发语言等条件的限制。
RabbitMQ 就是基于 AMQP 协议实现的。
4.3 JMS vs AMQP
| 对比方向 | JMS | AMQP |
|---|---|---|
| 定义 | Java API | 协议 |
| 跨语言 | 否 | 是 |
| 跨平台 | 否 | 是 |
| 支持消息类型 | 提供两种消息模型:①Peer-2-Peer;②Pub/sub | 提供了五种消息模型:①direct exchange;②fanout exchange;③topic change;④headers exchange;⑤system exchange。本质来讲,后四种和 JMS 的 pub/sub 模型没有太大差别,仅是在路由机制上做了更详细的划分; |
| 支持消息类型 | 支持多种消息类型 ,我们在上面提到过 | byte[](二进制) |
总结:
- AMQP 为消息定义了线路层(wire-level protocol)的协议,而 JMS 所定义的是 API 规范。在 Java 体系中,多个 client 均可以通过 JMS 进行交互,不需要应用修改代码,但是其对跨平台的支持较差。而 AMQP 天然具有跨平台、跨语言特性。
- JMS 支持 TextMessage、MapMessage 等复杂的消息类型;而 AMQP 仅支持 byte[] 消息类型(复杂的类型可序列化后发送)。
- 由于 Exchange 提供的路由算法,AMQP 可以提供多样化的路由方式来传递消息到消息队列,而 JMS 仅支持 队列 和 主题/订阅 方式两种。
五 常见的消息队列对比
| 对比方向 | 概要 |
|---|---|
| 吞吐量 | 万级的 ActiveMQ 和 RabbitMQ 的吞吐量(ActiveMQ 的性能最差)要比 十万级甚至是百万级的 RocketMQ 和 Kafka 低一个数量级。 |
| 可用性 | 都可以实现高可用。ActiveMQ 和 RabbitMQ 都是基于主从架构实现高可用性。RocketMQ 基于分布式架构。 kafka 也是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用 |
| 时效性 | RabbitMQ 基于 erlang 开发,所以并发能力很强,性能极其好,延时很低,达到微秒级。其他三个都是 ms 级。 |
| 功能支持 | 除了 Kafka,其他三个功能都较为完备。 Kafka 功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用,是事实上的标准 |
| 消息丢失 | ActiveMQ 和 RabbitMQ 丢失的可能性非常低, RocketMQ 和 Kafka 理论上不会丢失。 |
总结:
- ActiveMQ 的社区算是比较成熟,但是较目前来说,ActiveMQ 的性能比较差,而且版本迭代很慢,不推荐使用。
- RabbitMQ 在吞吐量方面虽然稍逊于 Kafka 和 RocketMQ ,但是由于它基于 erlang 开发,所以并发能力很强,性能极其好,延时很低,达到微秒级。但是也因为 RabbitMQ 基于 erlang 开发,所以国内很少有公司有实力做 erlang 源码级别的研究和定制。如果业务场景对并发量要求不是太高(十万级、百万级),那这四种消息队列中,RabbitMQ 一定是你的首选。如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。
- RocketMQ 阿里出品,Java 系开源项目,源代码我们可以直接阅读,然后可以定制自己公司的 MQ,并且 RocketMQ 有阿里巴巴的实际业务场景的实战考验。RocketMQ 社区活跃度相对较为一般,不过也还可以,文档相对来说简单一些,然后接口这块不是按照标准 JMS 规范走的有些系统要迁移需要修改大量代码。还有就是阿里出台的技术,你得做好这个技术万一被抛弃,社区黄掉的风险,那如果你们公司有技术实力我觉得用 RocketMQ 挺好的
- Kafka 的特点其实很明显,就是仅仅提供较少的核心功能,但是提供超高的吞吐量,ms 级的延迟,极高的可用性以及可靠性,而且分布式可以任意扩展。同时 kafka 最好是支撑较少的 topic 数量即可,保证其超高吞吐量。kafka 唯一的一点劣势是有可能消息重复消费,那么对数据准确性会造成极其轻微的影响,在大数据领域中以及日志采集中,这点轻微影响可以忽略这个特性天然适合大数据实时计算以及日志收集。
参考:《Java 工程师面试突击第 1 季-中华石杉老师》
并入整理
以下内容由同主题重复文章合并而来,便于集中复习。
RocketMQ 是什么?
RocketMQ 是阿里巴巴在 2012 年开源的分布式消息中间件,目前已经捐赠给 Apache 软件基金会,并于 2017 年 9 月 25 日成为 Apache 的顶级项目。作为经历过多次阿里巴巴双十一这种“超级工程”的洗礼并有稳定出色表现的国产中间件,以其高性能、低延时和高可靠等特性近年来已经也被越来越多的国内企业使用。
如下是 RocketMQ 产生的原因:
淘宝内部的交易系统使用了淘宝自主研发的 Notify 消息中间件,使用 MySQL 作为消息存储媒介,可完全水平扩容,为了进一步降低成本,我们认为存储部分可以进一步优化,2011 年初,Linkin开源了 Kafka 这个优秀的消息中间件,淘宝中间件团队在对 Kafka 做过充分 Review 之后, Kafka 无限消息堆积,高效的持久化速度吸引了我们,但是同时发现这个消息系统主要定位于日志传输,对于使用在淘宝交易、订单、充值等场景下还有诸多特性不满足,为此我们重新用 Java 语言编写了 RocketMQ ,定位于非日志的可靠消息传输(日志场景也OK),目前 RocketMQ 在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理, binglog 分发等场景。
RocketMQ 是否会弄丢数据?
艿艿:注意,RocketMQ 是否会丢数据,主要取决于我们如何使用。这点,非常重要噢。
🦅 消费端弄丢了数据?
对于消费端,如果我们在使用 Push 模式的情况下,只有我们消费返回成功,才会异步定期更新消费进度到 Broker 上。
如果消费端异常崩溃,可能导致消费进度未更新到 Broker 上,那么无非是 Consumer 可能重复拉取到已经消费过的消息。关于这个,就需要消费端做好消费的幂等性。
🦅 Broker 弄丢了数据?
在上面的问题中,我们已经看到了 Broker 提供了两个特性:
- 刷盘方式:同步刷盘、异步刷盘。
- 复制方式:同步复制、异步复制。
如果要保证 Broker 数据最大化的不丢,需要在搭建 Broker 集群时,设置为同步刷盘、同步复制。当然,带来了可靠性,也会一定程度降低性能。
如果想要在可靠性和性能之间做一个平衡,可以选择同步复制,加主从 Broker 都是和异步刷盘。因为,刷盘比较消耗性能。
🦅 生产者会不会弄丢数据?
Producer 可以设置三次发送消息重试。
RocketMQ 由哪些角色组成?
如下图所示:
RocketMQ 角色
- 生产者(Producer):负责产生消息,生产者向消息服务器发送由业务应用程序系统生成的消息。
- 消费者(Consumer):负责消费消息,消费者从消息服务器拉取信息并将其输入用户应用程序。
- 消息服务器(Broker):是消息存储中心,主要作用是接收来自 Producer 的消息并存储, Consumer 从这里取得消息。
- 名称服务器(NameServer):用来保存 Broker 相关 Topic 等元信息并给 Producer ,提供 Consumer 查找 Broker 信息。
RocketMQ常见问题
本文来自读者 PR。
1 单机版消息中心
一个消息中心,最基本的需要支持多生产者、多消费者,例如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
class Scratch {
public static void main(String[] args) {
// 实际中会有 nameserver 服务来找到 broker 具体位置以及 broker 主从信息
Broker broker = new Broker();
Producer producer1 = new Producer();
producer1.connectBroker(broker);
Producer producer2 = new Producer();
producer2.connectBroker(broker);
Consumer consumer1 = new Consumer();
consumer1.connectBroker(broker);
Consumer consumer2 = new Consumer();
consumer2.connectBroker(broker);
for (int i = 0; i < 2; i++) {
producer1.asyncSendMsg("producer1 send msg" + i);
producer2.asyncSendMsg("producer2 send msg" + i);
}
System.out.println("broker has msg:" + broker.getAllMagByDisk());
for (int i = 0; i < 1; i++) {
System.out.println("consumer1 consume msg:" + consumer1.syncPullMsg());
}
for (int i = 0; i < 3; i++) {
System.out.println("consumer2 consume msg:" + consumer2.syncPullMsg());
}
}
}
class Producer {
private Broker broker;
public void connectBroker(Broker broker) {
this.broker = broker;
}
public void asyncSendMsg(String msg) {
if (broker == null) {
throw new RuntimeException("please connect broker first");
}
new Thread(() -> {
broker.sendMsg(msg);
}).start();
}
}
class Consumer {
private Broker broker;
public void connectBroker(Broker broker) {
this.broker = broker;
}
public String syncPullMsg() {
return broker.getMsg();
}
}
class Broker {
// 对应 RocketMQ 中 MessageQueue,默认情况下 1 个 Topic 包含 4 个 MessageQueue
private LinkedBlockingQueue<String> messageQueue = new LinkedBlockingQueue(Integer.MAX_VALUE);
// 实际发送消息到 broker 服务器使用 Netty 发送
public void sendMsg(String msg) {
try {
messageQueue.put(msg);
// 实际会同步或异步落盘,异步落盘使用的定时任务定时扫描落盘
} catch (InterruptedException e) {
}
}
public String getMsg() {
try {
return messageQueue.take();
} catch (InterruptedException e) {
}
return null;
}
public String getAllMagByDisk() {
StringBuilder sb = new StringBuilder("\n");
messageQueue.iterator().forEachRemaining((msg) -> {
sb.append(msg + "\n");
});
return sb.toString();
}
}
问题:
- 没有实现真正执行消息存储落盘
- 没有实现 NameServer 去作为注册中心,定位服务
- 使用 LinkedBlockingQueue 作为消息队列,注意,参数是无限大,在真正 RocketMQ 也是如此是无限大,理论上不会出现对进来的数据进行抛弃,但是会有内存泄漏问题(阿里巴巴开发手册也因为这个问题,建议我们使用自制线程池)
- 没有使用多个队列(即多个 LinkedBlockingQueue),RocketMQ 的顺序消息是通过生产者和消费者同时使用同一个 MessageQueue 来实现,但是如果我们只有一个 MessageQueue,那我们天然就支持顺序消息
- 没有使用 MappedByteBuffer 来实现文件映射从而使消息数据落盘非常的快(实际 RocketMQ 使用的是 FileChannel+DirectBuffer)
2 分布式消息中心
2.1 问题与解决
2.1.1 消息丢失的问题
- 当你系统需要保证百分百消息不丢失,你可以使用生产者每发送一个消息,Broker 同步返回一个消息发送成功的反馈消息
- 即每发送一个消息,同步落盘后才返回生产者消息发送成功,这样只要生产者得到了消息发送生成的返回,事后除了硬盘损坏,都可以保证不会消息丢失
- 但是这同时引入了一个问题,同步落盘怎么才能快?
2.1.2 同步落盘怎么才能快
- 使用 FileChannel + DirectBuffer 池,使用堆外内存,加快内存拷贝
- 使用数据和索引分离,当消息需要写入时,使用 commitlog 文件顺序写,当需要定位某个消息时,查询index 文件来定位,从而减少文件IO随机读写的性能损耗
2.1.3 消息堆积的问题
- 后台定时任务每隔72小时,删除旧的没有使用过的消息信息
- 根据不同的业务实现不同的丢弃任务,具体参考线程池的 AbortPolicy,例如FIFO/LRU等(RocketMQ没有此策略)
- 消息定时转移,或者对某些重要的 TAG 型(支付型)消息真正落库
2.1.4 定时消息的实现
- 实际 RocketMQ 没有实现任意精度的定时消息,它只支持某些特定的时间精度的定时消息
- 实现定时消息的原理是:创建特定时间精度的 MessageQueue,例如生产者需要定时1s之后被消费者消费,你只需要将此消息发送到特定的 Topic,例如:MessageQueue-1 表示这个 MessageQueue 里面的消息都会延迟一秒被消费,然后 Broker 会在 1s 后发送到消费者消费此消息,使用 newSingleThreadScheduledExecutor 实现
2.1.5 顺序消息的实现
- 与定时消息同原理,生产者生产消息时指定特定的 MessageQueue ,消费者消费消息时,消费特定的 MessageQueue,其实单机版的消息中心在一个 MessageQueue 就天然支持了顺序消息
- 注意:同一个 MessageQueue 保证里面的消息是顺序消费的前提是:消费者是串行的消费该 MessageQueue,因为就算 MessageQueue 是顺序的,但是当并行消费时,还是会有顺序问题,但是串行消费也同时引入了两个问题:
- 引入锁来实现串行
- 前一个消费阻塞时后面都会被阻塞
2.1.6 分布式消息的实现
- 需要前置知识:2PC
- RocketMQ4.3 起支持,原理为2PC,即两阶段提交,prepared->commit/rollback
- 生产者发送事务消息,假设该事务消息 Topic 为 Topic1-Trans,Broker 得到后首先更改该消息的 Topic 为 Topic1-Prepared,该 Topic1-Prepared 对消费者不可见。然后定时回调生产者的本地事务A执行状态,根据本地事务A执行状态,来是否将该消息修改为 Topic1-Commit 或 Topic1-Rollback,消费者就可以正常找到该事务消息或者不执行等
注意,就算是事务消息最后回滚了也不会物理删除,只会逻辑删除该消息
2.1.7 消息的 push 实现
- 注意,RocketMQ 已经说了自己会有低延迟问题,其中就包括这个消息的 push 延迟问题
- 因为这并不是真正的将消息主动的推送到消费者,而是 Broker 定时任务每5s将消息推送到消费者
- pull模式需要我们手动调用consumer拉消息,而push模式则只需要我们提供一个listener即可实现对消息的监听,而实际上,RocketMQ的push模式是基于pull模式实现的,它没有实现真正的push。
- push方式里,consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送过来的。
2.1.8 消息重复发送的避免
- RocketMQ 会出现消息重复发送的问题,因为在网络延迟的情况下,这种问题不可避免的发生,如果非要实现消息不可重复发送,那基本太难,因为网络环境无法预知,还会使程序复杂度加大,因此默认允许消息重复发送
- RocketMQ 让使用者在消费者端去解决该问题,即需要消费者端在消费消息时支持幂等性的去消费消息
- 最简单的解决方案是每条消费记录有个消费状态字段,根据这个消费状态字段来判断是否消费或者使用一个集中式的表,来存储所有消息的消费状态,从而避免重复消费
- 具体实现可以查询关于消息幂等消费的解决方案
2.1.9 广播消费与集群消费
- 消息消费区别:广播消费,订阅该 Topic 的消息者们都会消费每个某个 消息。集群消费,订阅该 Topic 的消息者们只会有一个去消费 消息
- 消息落盘区别:具体表现在消息消费进度的保存上。广播消费,由于每个消费者都独立的去消费每个消息,因此每个消费者各自保存自己的消息消费进度。而集群消费下,订阅了某个 Topic,而旗下又有多个 MessageQueue,每个消费者都可能会去消费不同的 MessageQueue,因此总体的消费进度保存在 Broker 上集中的管理
2.1.10 RocketMQ 不使用 ZooKeeper 作为注册中心的原因,以及自制的 NameServer 优缺点?
- ZooKeeper 作为支持顺序一致性的中间件,在某些情况下,它为了满足一致性,会丢失一定时间内的可用性,RocketMQ 需要注册中心只是为了发现组件地址,在某些情况下,RocketMQ 的注册中心可以出现数据不一致性,这同时也是 NameServer 的缺点,因为 NameServer 集群间互不通信,它们之间的注册信息可能会不一致
- 另外,当有新的服务器加入时,NameServer 并不会立马通知到 Producer,而是由 Producer 定时去请求 NameServer 获取最新的 Broker/Consumer 信息(这种情况是通过 Producer 发送消息时,负载均衡解决)
2.1.11 其它
加分项咯
- 包括组件通信间使用 Netty 的自定义协议
- 消息重试负载均衡策略(具体参考 Dubbo 负载均衡策略)
- 消息过滤器(Producer 发送消息到 Broker,Broker 存储消息信息,Consumer 消费时请求 Broker 端从磁盘文件查询消息文件时,在 Broker 端就使用过滤服务器进行过滤)
- Broker 同步双写和异步双写中 Master 和 Slave 的交互
- Broker 在 4.5.0 版本更新中引入了基于 Raft 协议的多副本选举,之前这是商业版才有的特性 ISSUE-1046
3 参考
- 《RocketMQ技术内幕》:https://blog.csdn.net/prestigeding/article/details/85233529
- 关于 RocketMQ 对 MappedByteBuffer 的一点优化:https://lishoubo.github.io/2017/09/27/MappedByteBuffer%E7%9A%84%E4%B8%80%E7%82%B9%E4%BC%98%E5%8C%96/
- 十分钟入门RocketMQ:https://developer.aliyun.com/article/66101
- 分布式事务的种类以及 RocketMQ 支持的分布式消息:https://www.infoq.cn/article/2018/08/rocketmq-4.3-release
- 滴滴出行基于RocketMQ构建企业级消息队列服务的实践:https://yq.aliyun.com/articles/664608
- 基于《RocketMQ技术内幕》源码注释:https://github.com/LiWenGu/awesome-rocketmq
如何保证 RocketMQ 消费者的消费消息的幂等性?
在 《精尽【消息队列 】面试题》 中,已经解析过该问题。当然,我们有几点要补充下:
- Producer 在发送消息时,默认会生成消息编号( msgId ),可见 org.apache.rocketmq.common.message.MessageClientExt 类。
- Broker 在存储消息时,会生成结合 offset 的消息编号( offsetMsgId ) 。
- Consumer 在消费消息失败后,将该消息发回 Broker 后,会产生新的 offsetMsgId 编号,但是 msgId 不变。
如何保证消息的顺序性?
如何保证消息的顺序性?
不同的消息队列,其架构不同,所以实现消息的顺序性的方案不同。所以参见如下文章:
- RocketMQ 《精尽 RocketMQ 面试题》 的 「什么是顺序消息?如何实现?」 的面试题。
- RabbitMQ 《精尽 RabbitMQ 面试题》 的 「RabbitMQ 如何保证消息的顺序性?」 面试题。
- Kafka 《精尽 Kafka 面试题》 的 「Kafka 如何保证消息的顺序性?」 的面试题。
如何实现 RocketMQ 高可用?
如何实现 RocketMQ 高可用?
如何实现 RocketMQ 高可用?
在 「RocketMQ 由哪些角色组成?」 中,我们看到 RocketMQ 有四个角色,需要考虑每个角色的高可用。
RocketMQ 集群
🦅 1. Producer
- Producer 自身在应用中,所以无需考虑高可用。
- Producer 配置多个 Namesrv 列表,从而保证 Producer 和 Namesrv 的连接高可用。并且,会从 Namesrv 定时拉取最新的 Topic 信息。
- Producer 会和所有 Broker 直连,在发送消息时,会选择一个 Broker 进行发送。如果发送失败,则会使用另外一个 Broker 。
- Producer 会定时向 Broker 心跳,证明其存活。而 Broker 会定时检测,判断是否有 Producer 异常下线。
🦅 2. Consumer
- Consumer 需要部署多个节点,以保证 Consumer 自身的高可用。当相同消费者分组中有新的 Consumer 上线,或者老的 Consumer 下线,会重新分配 Topic 的 Queue 到目前消费分组的 Consumer 们。
- Consumer 配置多个 Namesrv 列表,从而保证 Consumer 和 Namesrv 的连接高可用。并且,会从 Consumer 定时拉取最新的 Topic 信息。
- Consumer 会和所有 Broker 直连,消费相应分配到的 Queue 的消息。如果消费失败,则会发回消息到 Broker 中。
- Consumer 会定时向 Broker 心跳,证明其存活。而 Broker 会定时检测,判断是否有 Consumer 异常下线。
🦅 3. Namesrv
- Namesrv 需要部署多个节点,以保证 Namesrv 的高可用。
- Namesrv 本身是无状态,不产生数据的存储,是通过 Broker 心跳将 Topic 信息同步到 Namesrv 中。
- 多个 Namesrv 之间不会有数据的同步,是通过 Broker 向多个 Namesrv 多写。
🦅 4. Broker
- 多个 Broker 可以形成一个 Broker 分组。每个 Broker 分组存在一个 Master 和多个 Slave 节点。 Master 节点,可提供读和写功能。Slave 节点,可提供读功能。 Master 节点会不断发送新的 CommitLog 给 Slave节点。Slave 节点不断上报本地的 CommitLog 已经同步到的位置给 Master 节点。 Slave 节点会从 Master 节点拉取消费进度、Topic 配置等等。
- 多个 Broker 分组,形成 Broker 集群。 Broker 集群和集群之间,不存在通信与数据同步。
- Broker 可以配置同步刷盘或异步刷盘,根据消息的持久化的可靠性来配置。
🦅 总结
目前官方提供三套配置:
2m-2s-async
| brokerClusterName | brokerName | brokerRole | brokerId |
|---|---|---|---|
| DefaultCluster | broker-a | ASYNC_MASTER | 0 |
| DefaultCluster | broker-a | SLAVE | 1 |
| DefaultCluster | broker-b | ASYNC_MASTER | 0 |
| DefaultCluster | broker-b | SLAVE | 1 |
2m-2s-sync
| brokerClusterName | brokerName | brokerRole | brokerId |
|---|---|---|---|
| DefaultCluster | broker-a | SYNC_MASTER | 0 |
| DefaultCluster | broker-a | SLAVE | 1 |
| DefaultCluster | broker-b | SYNC_MASTER | 0 |
| DefaultCluster | broker-b | SLAVE | 1 |
2m-noslave
| brokerClusterName | brokerName | brokerRole | brokerId |
|---|---|---|---|
| DefaultCluster | broker-a | ASYNC_MASTER | 0 |
| DefaultCluster | broker-b | ASYNC_MASTER | 0 |
相关的源码解析,胖友可以看看 《RocketMQ 源码分析 —— 高可用》 。
如何配置 Namesrv 地址到生产者和消费者?
如何配置 Namesrv 地址到生产者和消费者?
将 Namesrv 地址列表提供给客户端( 生产者和消费者 ),有四种方法:
- 编程方式,就像 producer.setNamesrvAddr(“ip:port”) 。
- Java 启动参数设置,使用 rocketmq.namesrv.addr 。
- 环境变量,使用 NAMESRV_ADDR 。
- HTTP 端点,例如说:http://namesrv.rocketmq.xxx.com 地址,通过 DNS 解析获得 Namesrv 真正的地址。
请描述下 RocketMQ 的整体流程?
请描述下 RocketMQ 的整体流程?
整体流程
- 启动 Namesrv ,Namesrv起 来后监听端口,等待 Broker、Producer、Consumer 连上来,相当于一个路由控制中心。
- Broker 启动,跟所有的 Namesrv 保持长连接,定时发送心跳包。
心跳包中,包含当前 Broker 信息(IP+端口等)以及存储所有 Topic 信息。注册成功后,Namesrv 集群中就有 Topic 跟 Broker 的映射关系。
- 收发消息前,先创建 Topic 。创建 Topic 时,需要指定该 Topic 要存储在 哪些 Broker上。也可以在发送消息时自动创建Topic。
- Producer 发送消息。
启动时,先跟 Namesrv 集群中的其中一台建立长连接,并从Namesrv 中获取当前发送的 Topic 存在哪些 Broker 上,然后跟对应的 Broker 建立长连接,直接向 Broker 发消息。
- Consumer 消费消息。
Consumer 跟 Producer 类似。跟其中一台 Namesrv 建立长连接,获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接通道,开始消费消息。
请说说你对 Namesrv 的了解?
请说说你对 Namesrv 的了解?
请说说你对 Namesrv 的了解?
- Namesrv 用于存储 Topic、Broker 关系信息,功能简单,稳定性高。
多个 Namesrv 之间相互没有通信,单台 Namesrv 宕机不影响其它 Namesrv 与集群。
多个 Namesrv 之间的信息共享,通过 Broker 主动向多个 Namesrv 都发起心跳。正如上文所说,Broker 需要跟所有 Namesrv 连接。
即使整个 Namesrv 集群宕机,已经正常工作的 Producer、Consumer、Broker 仍然能正常工作,但新起的 Producer、Consumer、Broker 就无法工作。
这点和 Dubbo 有些不同,不会缓存 Topic 等元信息到本地文件。
- Namesrv 压力不会太大,平时主要开销是在维持心跳和提供 Topic-Broker 的关系数据。但有一点需要注意,Broker 向 Namesr 发心跳时,会带上当前自己所负责的所有 Topic 信息,如果 Topic 个数太多(万级别),会导致一次心跳中,就 Topic 的数据就几十 M,网络情况差的话,网络传输失败,心跳失败,导致 Namesrv 误认为 Broker 心跳失败。
当然,一般公司,很难达到过万级的 Topic ,因为一方面体量达不到,另一方面 RocketMQ 提供了 Tag 属性。
另外,内网环境网络相对是比较稳定的,传输几十 M 问题不大。同时,如果真的要优化,Broker 可以把心跳包做压缩,再发送给 Namesrv 。不过,这样也会带来 CPU 的占用率的提升。
MQ如何保证消息不丢失
MQ如何保证消息不丢失
消息不可靠的情况可能是消息丢失,劫持等原因:
哪些环节可能会丢失消息?
生产者丢失消息消息列表丢失消息消费者丢失消息
怎么防止消息丢失
发送消息不丢失
- Kafka:
- 消息发送+回调
- RocketMQ:
- 消息发送+回调
- 事务消息机制
- RabbitMQ:
- 消息发送+回调
- 手动事务:这种方式对channel是会产生阻塞的,造成吞吐量下降
- channel.txSelect() 开启事务
- channel.txCommit() 提交事务
- channel.txRollBack() 回滚事务
- Publisher Confirm:整个处理流程跟RocketMQ的事务消息,基本是一样的。
事务消息机制
MQ主从消息同步不丢失
- RocketMQ:
- 普通集群中
- 同步同步:会阻塞,效率相对会低,但不会丢失消息
- 异步同步:效率高,但可能会丢消息
- Dledger集群:两阶段提交
- 普通集群中
- RabbitMQ:
- 普通集群:消息是分散存储的。节点之间不会主动进行消息同步,是有可能丢失消息的
- 镜像集群:会在节点之间进行数据同步,这样数据安全性得到了提高。
- Kafka:通常都是用在允许消息少量丢失的场景。acks:0、1、all
消息存盘不丢失
RocketMQ:同步刷盘:消息安全性更高,但是效率会降低异步刷盘:效率更高,但是消息可能会丢失
RabbitMQ:将队列配置成持久化队列。新增的Quorum类型队列,会采用Raft协议来进行消息同步。
消费者消费消息不丢失
RocketMQ:在消费端使用默认的方式(同步消费,先提交本地事务,在响应MQ改变偏移量)消费,不要采用异步房补。RabbitMQ:autoCommit -> 手动提交offsetKafka:手动提交offset
MQ选型
MQ选型
常见的MQ
kafka
优点:吞吐量非常大、性能非常好、集群高可用缺点:会丢失数据,功能比较单一使用场景:日志分析、大数据采集
RabbitMQ
优点:消息可靠性高,功能全面缺点:吞吐量比较低,消息积累会严重影响性能。erlang语言不好定制使用场景:小规模场景
RocketMQ
优点:高吞吐、高可用、高性能,功能非常全面缺点:开源版本功能不如商业版。官方文档和周边生态还不好。客户端只支持java
多中MQ的优缺点对比
| — | ActiveMQ | RabbitMQ | RocketMQ | Kafka | ZeroMQ | |
|---|---|---|---|---|---|---|
| 单机吞吐量 | 比RabbitMQ低 | 2.6W/s | 11.6w/s | 17.3w/s | 29w/s | |
| 开发语言 | Java | Erlang | Java | Scala/Java | C | |
| 主要维护者 | Apache | — | Apache | Apache | — | |
| 成熟度 | 成熟 | 成熟 | 开源版本不够成熟 | 比较成熟 | — | |
| 订阅形式 | 点对点(p2p)、广播(发布-订阅) | — | — | — | — | |
| 持久化 | 支持少量堆积 | 支持少量堆积 | 支持大量堆积 | 支持大量堆积 | 不支持 | |
| 顺序消息 | 不支持 | 不支持 | 支持 | 支持 | 不支持 | |
| 性能稳定性 | 好 | 好 | 一般 | 较差 | 很好 | |
| 集群方式 | — | — | — | — | — | |
| 管理页面 | 一般 | 较好 | 一般 | 无 | 无 |
不同的消息队列,其架构不同,所以实现发送消息的可靠性的方案不同。所以参见如下文章:
不同的消息队列,其架构不同,所以实现发送消息的可靠性的方案不同。所以参见如下文章:
不同的消息队列,其架构不同,所以实现发送消息的可靠性的方案不同。所以参见如下文章:
- RocketMQ 《精尽 RocketMQ 面试题》 的 「RocketMQ 是否会弄丢数据?」 的面试题。
- RabbitMQ 《精尽 RabbitMQ 面试题》 的 「RabbitMQ 是否会弄丢数据?」 的面试题。
- Kafka 《精尽 Kafka 面试题》 的 「Kafka 是否会弄丢数据?」 的面试题。
什么是事务消息?如何实现?
什么是事务消息?如何实现?
什么是事务消息?如何实现?
关于事务消息的概念和原理,胖友可以看看官方对这块的解答,即 《RocketMQ 4.3 正式发布,支持分布式事务》 的 「四 事务消息」 小节。
艿艿 16 年的时候,基于 RocketMQ 早期的版本,写了 《RocketMQ 源码分析 —— 事务消息》 文章,虽然 RocketMQ 版本不太一样,但是大体的思路是差不多的,可以帮助胖友更容易的读懂事务消息相关的源码。
- 简单看了下最新版本的 RocketMQ 的事务代码,新增了 RMQ_SYS_TRANS_HALF_TOPIC 和 RMQ_SYS_TRANS_OP_HALF_TOPIC 两个队列。
- Producer 发送 PREPARED Message 到 Broker 后,先存储到 RMQ_SYS_TRANS_HALF_TOPIC 队列中。
- Producer 提交或回滚 PREPARED Message 时,会添加一条消息到 RMQ_SYS_TRANS_OP_HALF_TOPIC 队列中,标记这个消息已经处理。
- Producer 提交 PREPARED Message 时,会将当前消息存储到原 Topic 的队列中,从而该消息能够被 Consumer 拉取消费。
具体的代码实现,可以看看 《芋道 Spring Boot 消息队列 RocketMQ 入门》的「9. 事务消息」 小节。
什么是消息重试?如何实现?
什么是消息重试?如何实现?
消息重试,Consumer 消费消息失败后,要提供一种重试机制,令消息再消费一次。
- Consumer 会将消费失败的消息发回 Broker,进入延迟消息队列。即,消费失败的消息,不会立即消费。
- 也就是说,消息重试是构建在定时消息之上的功能。
🦅 消息重试的主要流程
- Consumer 消费失败,将消息发送回 Broker 。
- Broker 收到重试消息之后置换 Topic ,存储消息。
- Consumer 会拉取该 Topic 对应的 retryTopic 的消息。
- Consumer 拉取到 retryTopic 消息之后,置换到原始的 Topic ,把消息交给 Listener 消费。
这里,可能有几个点,胖友会比较懵逼,艿艿简单解释下:
- Consumer 消息失败后,会将消息的 Topic 修改为 %RETRY% + Topic 进行,添加 “RETRY_TOPIC” 属性为原始 Topic ,然后再返回给 Broker 中。
- Broker 收到重试消息之后,会有两次修改消息的 Topic 。
- 首先,会将消息的 Topic 修改为 %RETRY% + ConsumerGroup ,因为这个消息是当前消费这分组消费失败,只能被这个消费组所重新消费。 😈 注意噢,消费者会默认订阅 Topic 为 %RETRY% + ConsumerGroup 的消息。
- 然后,会将消息的 Topic 修改为 SCHEDULE_TOPIC_XXXX ,添加 “REAL_TOPIC” 属性为 %RETRY% + ConsumerGroup ,因为重试消息需要延迟消费。
- Consumer 会拉取该 Topic 对应的 retryTopic 的消息,此处的 retryTopic 为 %RETRY% + ConsumerGroup 。
- Consumer 拉取到 retryTopic 消息之后,置换到原始的 Topic ,因为有消息的 “RETRY_TOPIC” 属性是原始 Topic ,然后把消息交给 Listener 消费。
😈 有一丢丢复杂,胖友可以在思考思考~详细的,胖友可以看看 《RocketMQ 源码分析 —— 定时消息与消息重试》 。
具体的代码实现,可以看看 《芋道 Spring Boot 消息队列 RocketMQ 入门》的「6. 消费重试」 小节。
%23%20%E4%BB%80%E4%B9%88%E6%98%AF%E6%B6%88%E6%81%AF%E9%87%8D%E8%AF%95%EF%BC%9F%E5%A6%82%E4%BD%95%E5%AE%9E%E7%8E%B0%EF%BC%9F%0A%E6%B6%88%E6%81%AF%E9%87%8D%E8%AF%95%EF%BC%8CConsumer%20%E6%B6%88%E8%B4%B9%E6%B6%88%E6%81%AF%E5%A4%B1%E8%B4%A5%E5%90%8E%EF%BC%8C%E8%A6%81%E6%8F%90%E4%BE%9B%E4%B8%80%E7%A7%8D%E9%87%8D%E8%AF%95%E6%9C%BA%E5%88%B6%EF%BC%8C%E4%BB%A4%E6%B6%88%E6%81%AF%E5%86%8D%E6%B6%88%E8%B4%B9%E4%B8%80%E6%AC%A1%E3%80%82%0A%0A%20Consumer%20%E4%BC%9A%E5%B0%86%E6%B6%88%E8%B4%B9%E5%A4%B1%E8%B4%A5%E7%9A%84%E6%B6%88%E6%81%AF%E5%8F%91%E5%9B%9E%20Broker%EF%BC%8C%E8%BF%9B%E5%85%A5%E5%BB%B6%E8%BF%9F%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97%E3%80%82%E5%8D%B3%EF%BC%8C%E6%B6%88%E8%B4%B9%E5%A4%B1%E8%B4%A5%E7%9A%84%E6%B6%88%E6%81%AF%EF%BC%8C%E4%B8%8D%E4%BC%9A%E7%AB%8B%E5%8D%B3%E6%B6%88%E8%B4%B9%E3%80%82%0A%20%E4%B9%9F%E5%B0%B1%E6%98%AF%E8%AF%B4%EF%BC%8C%E6%B6%88%E6%81%AF%E9%87%8D%E8%AF%95%E6%98%AF%E6%9E%84%E5%BB%BA%E5%9C%A8%E5%AE%9A%E6%97%B6%E6%B6%88%E6%81%AF%E4%B9%8B%E4%B8%8A%E7%9A%84%E5%8A%9F%E8%83%BD%E3%80%82%0A%0A%F0%9F%A6%85%20%E6%B6%88%E6%81%AF%E9%87%8D%E8%AF%95%E7%9A%84%E4%B8%BB%E8%A6%81%E6%B5%81%E7%A8%8B%0A%0A1.%20Consumer%20%E6%B6%88%E8%B4%B9%E5%A4%B1%E8%B4%A5%EF%BC%8C%E5%B0%86%E6%B6%88%E6%81%AF%E5%8F%91%E9%80%81%E5%9B%9E%20Broker%20%E3%80%82%0A2.%20Broker%20%E6%94%B6%E5%88%B0%E9%87%8D%E8%AF%95%E6%B6%88%E6%81%AF%E4%B9%8B%E5%90%8E%E7%BD%AE%E6%8D%A2%20Topic%20%EF%BC%8C%E5%AD%98%E5%82%A8%E6%B6%88%E6%81%AF%E3%80%82%0A3.%20Consumer%20%E4%BC%9A%E6%8B%89%E5%8F%96%E8%AF%A5%20Topic%20%E5%AF%B9%E5%BA%94%E7%9A%84%20retryTopic%20%E7%9A%84%E6%B6%88%E6%81%AF%E3%80%82%0A4.%20Consumer%20%E6%8B%89%E5%8F%96%E5%88%B0%20retryTopic%20%E6%B6%88%E6%81%AF%E4%B9%8B%E5%90%8E%EF%BC%8C%E7%BD%AE%E6%8D%A2%E5%88%B0%E5%8E%9F%E5%A7%8B%E7%9A%84%20Topic%20%EF%BC%8C%E6%8A%8A%E6%B6%88%E6%81%AF%E4%BA%A4%E7%BB%99%20Listener%20%E6%B6%88%E8%B4%B9%E3%80%82%0A%0A%E8%BF%99%E9%87%8C%EF%BC%8C%E5%8F%AF%E8%83%BD%E6%9C%89%E5%87%A0%E4%B8%AA%E7%82%B9%EF%BC%8C%E8%83%96%E5%8F%8B%E4%BC%9A%E6%AF%94%E8%BE%83%E6%87%B5%E9%80%BC%EF%BC%8C%E8%89%BF%E8%89%BF%E7%AE%80%E5%8D%95%E8%A7%A3%E9%87%8A%E4%B8%8B%EF%BC%9A%0A%0A1.%20Consumer%20%E6%B6%88%E6%81%AF%E5%A4%B1%E8%B4%A5%E5%90%8E%EF%BC%8C%E4%BC%9A%E5%B0%86%E6%B6%88%E6%81%AF%E7%9A%84%20Topic%20%E4%BF%AE%E6%94%B9%E4%B8%BA%20%25RETRY%25%20%2B%20Topic%20%E8%BF%9B%E8%A1%8C%EF%BC%8C%E6%B7%BB%E5%8A%A0%20%22RETRY_TOPIC%22%20%E5%B1%9E%E6%80%A7%E4%B8%BA%E5%8E%9F%E5%A7%8B%20Topic%20%EF%BC%8C%E7%84%B6%E5%90%8E%E5%86%8D%E8%BF%94%E5%9B%9E%E7%BB%99%20Broker%20%E4%B8%AD%E3%80%82%0A2.%20Broker%20%E6%94%B6%E5%88%B0%E9%87%8D%E8%AF%95%E6%B6%88%E6%81%AF%E4%B9%8B%E5%90%8E%EF%BC%8C%E4%BC%9A%E6%9C%89%E4%B8%A4%E6%AC%A1%E4%BF%AE%E6%94%B9%E6%B6%88%E6%81%AF%E7%9A%84%20Topic%20%E3%80%82%0A3.%20%E9%A6%96%E5%85%88%EF%BC%8C%E4%BC%9A%E5%B0%86%E6%B6%88%E6%81%AF%E7%9A%84%20Topic%20%E4%BF%AE%E6%94%B9%E4%B8%BA%20%25RETRY%25%20%2B%20ConsumerGroup%20%EF%BC%8C%E5%9B%A0%E4%B8%BA%E8%BF%99%E4%B8%AA%E6%B6%88%E6%81%AF%E6%98%AF%E5%BD%93%E5%89%8D%E6%B6%88%E8%B4%B9%E8%BF%99%E5%88%86%E7%BB%84%E6%B6%88%E8%B4%B9%E5%A4%B1%E8%B4%A5%EF%BC%8C%E5%8F%AA%E8%83%BD%E8%A2%AB%E8%BF%99%E4%B8%AA%E6%B6%88%E8%B4%B9%E7%BB%84%E6%89%80%E9%87%8D%E6%96%B0%E6%B6%88%E8%B4%B9%E3%80%82%F0%9F%98%88%20%E6%B3%A8%E6%84%8F%E5%99%A2%EF%BC%8C%E6%B6%88%E8%B4%B9%E8%80%85%E4%BC%9A%E9%BB%98%E8%AE%A4%E8%AE%A2%E9%98%85%20Topic%20%E4%B8%BA%20%25RETRY%25%20%2B%20ConsumerGroup%20%E7%9A%84%E6%B6%88%E6%81%AF%E3%80%82%0A4.%20%E7%84%B6%E5%90%8E%EF%BC%8C%E4%BC%9A%E5%B0%86%E6%B6%88%E6%81%AF%E7%9A%84%20Topic%20%E4%BF%AE%E6%94%B9%E4%B8%BA%20SCHEDULE_TOPIC_XXXX%20%EF%BC%8C%E6%B7%BB%E5%8A%A0%20%22REAL_TOPIC%22%20%E5%B1%9E%E6%80%A7%E4%B8%BA%20%25RETRY%25%20%2B%20ConsumerGroup%20%EF%BC%8C%E5%9B%A0%E4%B8%BA%E9%87%8D%E8%AF%95%E6%B6%88%E6%81%AF%E9%9C%80%E8%A6%81%E5%BB%B6%E8%BF%9F%E6%B6%88%E8%B4%B9%E3%80%82%0A5.%20Consumer%20%E4%BC%9A%E6%8B%89%E5%8F%96%E8%AF%A5%20Topic%20%E5%AF%B9%E5%BA%94%E7%9A%84%20retryTopic%20%E7%9A%84%E6%B6%88%E6%81%AF%EF%BC%8C%E6%AD%A4%E5%A4%84%E7%9A%84%20retryTopic%20%E4%B8%BA%20%25RETRY%25%20%2B%20ConsumerGroup%20%E3%80%82%0A6.%20Consumer%20%E6%8B%89%E5%8F%96%E5%88%B0%20retryTopic%20%E6%B6%88%E6%81%AF%E4%B9%8B%E5%90%8E%EF%BC%8C%E7%BD%AE%E6%8D%A2%E5%88%B0%E5%8E%9F%E5%A7%8B%E7%9A%84%20Topic%20%EF%BC%8C%E5%9B%A0%E4%B8%BA%E6%9C%89%E6%B6%88%E6%81%AF%E7%9A%84%20%22RETRY_TOPIC%22%20%E5%B1%9E%E6%80%A7%E6%98%AF%E5%8E%9F%E5%A7%8B%20Topic%20%EF%BC%8C%E7%84%B6%E5%90%8E%E6%8A%8A%E6%B6%88%E6%81%AF%E4%BA%A4%E7%BB%99%20Listener%20%E6%B6%88%E8%B4%B9%E3%80%82%0A%0A%F0%9F%98%88%20%E6%9C%89%E4%B8%80%E4%B8%A2%E4%B8%A2%E5%A4%8D%E6%9D%82%EF%BC%8C%E8%83%96%E5%8F%8B%E5%8F%AF%E4%BB%A5%E5%9C%A8%E6%80%9D%E8%80%83%E6%80%9D%E8%80%83~%E8%AF%A6%E7%BB%86%E7%9A%84%EF%BC%8C%E8%83%96%E5%8F%8B%E5%8F%AF%E4%BB%A5%E7%9C%8B%E7%9C%8B%20%5B%E3%80%8ARocketMQ%20%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90%20%E2%80%94%E2%80%94%20%E5%AE%9A%E6%97%B6%E6%B6%88%E6%81%AF%E4%B8%8E%E6%B6%88%E6%81%AF%E9%87%8D%E8%AF%95%E3%80%8B%5D(http%3A%2F%2Fwww.iocoder.cn%2FRocketMQ%2Fmessage-schedule-and-retry%2F)%20%E3%80%82%0A%0A%E5%85%B7%E4%BD%93%E7%9A%84%E4%BB%A3%E7%A0%81%E5%AE%9E%E7%8E%B0%EF%BC%8C%E5%8F%AF%E4%BB%A5%E7%9C%8B%E7%9C%8B%20%E3%80%8A%E8%8A%8B%E9%81%93%20Spring%20Boot%20%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97%20RocketMQ%20%E5%85%A5%E9%97%A8%E3%80%8B%E7%9A%84%E3%80%8C6.%20%E6%B6%88%E8%B4%B9%E9%87%8D%E8%AF%95%E3%80%8D%20%E5%B0%8F%E8%8A%82%E3%80%82










