RocketMQ主要由Producer、Broker、Consumer三部分组成,其中Producer负责生产消息,Consumer负责消费消息,Broker负责存储消息。Broker在实际部署过程中对应一台服务器,每个Broker可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的Broker。MessageQueue用于存储消息的物理地址,每个Topic中的消息地址存储于多个MessageQueue中。ConsumerGroup由多个Consumer实例构成。
负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。
负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。
表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。
消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的BrokerIP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。
Consumer消费的一种类型,应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。
Consumer消费的一种类型,该模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。
同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。
同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。
集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。
广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。
普通顺序消费模式下,消费者通过同一个消息队列(Topic分区,称作Message Queue)收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。
严格顺序消息模式下,消费者收到的所有消息均是有顺序的。
消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过Message ID和Key查询消息的功能。
为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。
消息的发布是指某个生产者向某个topic发送消息;消息的订阅是指某个消费者关注了某个topic中带有某些tag的消息,进而从该topic消费数据。
消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了三条消息分别是订单创建、订单付款、订单完成。消费时要按照这个顺序消费才能有意义,但是同时订单之间是可以并行消费的。RocketMQ可以严格的保证消息有序。顺序消息分为全局顺序消息与分区顺序消息,全局顺序是指某个Topic下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。
RocketMQ的消费者可以根据Tag进行消息过滤,也支持自定义属性过滤。消息过滤目前是在Broker端实现的,优点是减少了对于Consumer无用消息的网络传输,缺点是增加了Broker的负担、而且实现相对复杂。
RocketMQ支持消息的高可靠,影响消息可靠性的几种情况:
1、2、3、4 四种情况都属于硬件资源可立即恢复情况,RocketMQ在这四种情况下能保证消息不丢,或者丢失少量数据(依赖刷盘方式是同步还是异步)。5、6属于单点故障,且无法恢复,一旦发生,在此单点上的消息全部丢失。RocketMQ在这两种情况下,通过异步复制,可保证99%的消息不丢,但是仍然会有极少量的消息可能丢失。通过同步双写技术可以完全避免单点,同步双写势必会影响性能,适合对消息可靠性要求极高的场合,例如与Money相关的应用。注:RocketMQ从3.0版本开始支持同步双写。
至少一次(At least Once)指每个消息必须投递一次。Consumer先Pull消息到本地,消费完成后,才向服务器返回ack,如果没有消费一定不会ack消息,所以RocketMQ可以很好的支持此特性。
回溯消费是指Consumer已经消费成功的消息,由于业务上需求需要重新消费,要支持此功能,Broker在向Consumer投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度,例如由于Consumer系统故障,恢复后需要重新消费1小时前的数据,那么Broker要提供一种机制,可以按照时间维度来回退消费进度。RocketMQ支持按照时间回溯消费,时间维度精确到毫秒。
RocketMQ事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似X/Open XA的分布事务功能,通过事务消息能达到分布式事务的最终一致。
定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。broker有配置项messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。可以配置自定义messageDelayLevel。注意,messageDelayLevel是broker的属性,不属于某个topic。发消息时,设置delayLevel等级即可:msg.setDelayLevel(level)。level有以下三种情况:
定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。需要注意的是,定时消息会在第一次写入和调度写入真实topic时都会计数,因此发送数量、tps都会变高。
Consumer消费消息失败后,要提供一种重试机制,令消息再消费一次。Consumer消费消息失败通常可以认为有以下几种情况:
RocketMQ会为每个消费组都设置一个Topic名称为“%RETRY%+consumerGroup”的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的),用于暂时保存因为各种异常而导致Consumer端无法消费的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。RocketMQ对于重试消息的处理是先保存至Topic名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中,后台定时任务按照对应的时间进行Delay后重新保存至“%RETRY%+consumerGroup”的重试队列中。
生产者在发送消息时,同步消息失败会重投,异步消息有重试,oneway没有任何保证。消息重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在RocketMQ中是无法避免的问题。消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会是大概率事件。另外,生产者主动重发、consumer负载变化也会导致重复消息。如下方法可以设置消息重试策略:
生产者流控,因为broker处理能力达到瓶颈;消费者流控,因为消费能力达到瓶颈。
生产者流控:
注意,生产者流控,不会尝试消息重投。
消费者流控:
死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。RocketMQ将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。在RocketMQ中,可以通过使用console控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费。
消息存储
消息存储是RocketMQ中最为复杂和最为重要的一部分,接下来分别从RocketMQ的消息存储整体架构、RocketMQ中两种不同的刷盘方式来展开叙述。
消息存储整体架构
消息存储架构图中最重要的三个跟消息存储相关的文件构成
在RocketMQ的消息存储整体架构图中可以看出,RocketMQ采用的是混合型的存储结构,即为Broker单个实例下所有的队列共用一个日志数据文件(即为CommitLog)来存储。
RocketMQ的混合型存储结构(多个Topic的消息实体内容都存储于一个CommitLog中)针对Producer和Consumer分别采用了数据和索引部分相分离的存储结构,Producer发送消息至Broker端,然后Broker端使用同步或者异步的方式对消息刷盘持久化,保存至CommitLog中。只要消息被刷盘持久化至磁盘文件CommitLog中,那么Producer发送的消息就不会丢失。正因为如此,Consumer也就肯定有机会去消费这条消息。
当无法拉取到消息后,可以等下一次消息拉取,同时服务端也支持长轮询模式,如果一个消息拉取请求未拉取到消息,Broker允许等待30s的时间,只要这段时间内有新消息到达,将直接返回给消费端。这里,RocketMQ的具体做法是,使用Broker端的后台服务线程—ReputMessageService不停地分发请求并异步构建ConsumeQueue(逻辑消费队列)和IndexFile(索引文件)数据。
消息刷盘
RocketMQ架构上主要分为四部分,如上图所示:
RocketMQ 网络部署特点
结合部署架构图,描述集群工作流程:
单Master模式
这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试。
多Master模式
一个集群无Slave,全是Master,例如2个Master或者3个Master,这种模式的优缺点如下:
多Master多Slave模式(异步)
每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:
多Master多Slave模式(同步)
每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:
DLedger 集群模式
RocketMQ-on-DLedger Group 是指一组相同名称的 Broker,至少需要 3 个节点,通过 Raft 自动选举出一个 Leader,其余节点 作为 Follower,并在 Leader 和 Follower 之间复制数据以保证高可用。
RocketMQ-on-DLedger Group 能自动容灾切换,并保证数据一致。RocketMQ-on-DLedger Group 是可以水平扩展的,也即可以部署任意多个 RocketMQ-on-DLedger Group 同时对外提供服务。
环境准备
安装步骤
1、解压 rocketmq-all-4.5.1-bin-release.zip 到指定目录,如下:
benchmark:基础测试脚本目录 / lib:运行依赖包
bin:命令运维脚本目录 / conf:配置目录
2、进入bin 目录下编辑 runserver.sh 和 runborker.sh 两个文件,调整一下namesrv和broker的启动的jvm内存参数。具体参数大小根据系统环境配置情况而定:
vim runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
vim runroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m"
ps:broker端本地环境不建议开启堆外内存
3、根据情况配置broker的一些参数
brokerClusterName = DefaultCluster
brokerName = broker-a
# 0表示master >0表示slave
brokerId = 0
# 几点开始删除文件
deleteWhen = 04
# 文件保留时间,默认48小时
fileReservedTime = 48
# Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole = ASYNC_MASTER
#刷盘方式
# ASYNC_FLUSH 异步刷盘
# SYNC_FLUSH 同步刷盘
flushDiskType = ASYNC_FLUSH
备注:
# 存储路径
storePathRootDir=xxx/store
# commitLog存储路径
storePathCommitLog=xxx/store/commitlog
# 消费队列存储路径
storePathConsumeQueue=xx/store/pathconsumequeue
# 消息索引存储路径
storePathIndex=xxx/store/pathindex
# checkpoint 文件存储
storeCheckpoint=xxx/store/checkpoint
# abort 文件存储
abortFile=xxx/store/abort
备注:
4、启动rocketmq, 先启动 namesrv 再启动 broker
--后台启动namesrv
nohup sh mqnamesrv &
--后台启动broker
export NAMESRV_ADDR=localhost:9876
--如果你在brocker.conf文件中配置了namesrvAddr = localhost:9876就直接用下面的命令
nohup sh mqbroker -c /xxx/rocketmq-all-4.5.1-bin-release/conf/broker.conf &
--否则用这个命令
nohup sh mqbroker -n localhost:9876 -c /xxx/rocketmq-all-4.5.1-bin-release/conf/
5、查看进程和日志看是否启动成功
jps -l or 查看 namesrv.log 和 broker.log 日志
源码获取
https://github.com/apache/rocketmq/releases
该网址可下载source包,也可以fork仓库。
启动 RocketMQ Namesrv
参考NameServerInstanceTest的startup方法,编写main方法,demo如下:
public class NameServerInstanceTest {
.....
public static void main(String[] args) throws Exception {
final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876); //设置端口
// 创建 NamesrvController,启动
NamesrvController namesrvController = new NamesrvController(namesrvConfig, nettyServerConfig);
namesrvController.initialize();
namesrvController.start();
Thread.sleep(Long.MAX_VALUE); //挂起
}
}
备注:
运行main方法, 显示以下关键字说明启动成功
启动 RocketMQ Broker
参考BrokerControllerTest的testBrokerRestart方法,编写main方法,demo如下:
public class BrokerControllerTest {
.....
public static void main(String[] args) throws Exception {
NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(10911);
BrokerConfig brokerConfig = new BrokerConfig(); // BrokerConfig 配置
brokerConfig.setNamesrvAddr("127.0.0.1:9876"); // 配置nameServer地址
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
// 创建 BrokerController 对象,并启动
BrokerController brokerController = new BrokerController(//
brokerConfig,
nettyServerConfig,
new NettyClientConfig(),
messageStoreConfig);
brokerController.initialize();
brokerController.start();
Thread.sleep(Long.MAX_VALUE); //挂起
}
}
备注:
运行main方法,不报错即可,但是在:NameServerInstanceTest(nameServer)中,发现以下关键字日志,表明broker注册nameServer成功:new topic registered...等
启动 RocketMQ Producer
参考:org.apache.rocketmq.example.quickstart.Producer
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException, IOException {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
/*
* Launch the instance.
*/
producer.setNamesrvAddr("127.0.0.1:9876"); // 配置nameServer地址
producer.start();
for (int i = 0; i < 1; i++) { // i 可以随便配置
try {
/*
* Create a message instance, specifying topic, tag and message body.
*/
Message msg = new Message("abc" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// msg.setDelayTimeLevel(10);
/*
* Call send message to deliver message to one of brokers.
*/
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
/*
* Shut down once the producer instance is not longer in use.
*/
producer.shutdown();
}
}
备注:
运行main方法, 发现日志中包含:
SendResult [sendStatus=SEND_OK...即发送成功
启动 RocketMQ Consumer
参考:
org.apache.rocketmq.example.quickstart.Consumer
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
/*
* Instantiate with specified consumer group name.
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1");
/*
* Specify name server addresses.
*
*
* Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
*
* {@code
* consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
* }
*
*/
/*
* Specify where to start in case the specified consumer group is a brand new one.
*/
consumer.setNamesrvAddr("127.0.0.1:9876"); // 配置nameServer地址
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
/*
* Subscribe one more more topics to consume.
*/
consumer.subscribe("abc", "*");
/*
* Register callback to execute on arrival of messages fetched from brokers.
*/
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
/*
* Launch the consumer instance.
*/
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
备注:
运行main方法, 发现日志中包含:Receive New Messages...即消费成功
rocketMq作为低延迟、高并发、高可用、高可靠的分布式消息中间件,其详细知识点非常多,如需深入,建议源码入坑。
程序员的核心竞争力其实还是技术,因此对技术还是要不断的学习,关注 “IT巅峰技术” 公众号 ,该公众号内容定位:中高级开发、架构师、中层管理人员等中高端岗位服务的,除了技术交流外还有很多架构思想和实战案例。
作者是 《 消息中间件 RocketMQ 技术内幕》 一书作者,同时也是 “RocketMQ 上海社区”联合创始人,曾就职于拼多多、德邦等公司,现任上市快递公司架构负责人,主要负责开发框架的搭建、中间件相关技术的二次开发和运维管理、混合云及基础服务平台的建设。
留言与评论(共有 0 条评论) “” |