RocketMQ源码分析七之创建ConsumerGroup

在本章中,我们将分析 RocketMQ 是如何 创建订阅组的。看看 RocketMQ 是如何实现的。


如何开启&关闭

在部署 RocketMQ 的 Broker 的时候,我们通常都会把 autoCreateSubscriptionGroup 设置为 false。一方面防止胡乱订阅,一方面也是为了后面的运维、统计。

命令参数

用法:sh mqadmin updateSubGroup -n 192.168.1.100:9876 -t shg
指令:updateSubGroup
代码入口:org.apache.rocketmq.tools.command.consumer.UpdateSubGroupSubCommand

参数

是否必填

说明

-h

打印帮助

-n

nameserve 服务地址列表,格式ip:port;ip:port;…

-b

如果 -c 为 空,则必填 broker 地址,表示订阅组建立在该 broker 上

-c

如果 -b 为空,则必填 cluster 名称,表示 topic 建在该集群上。(集群可通过 clusterList 命令来查询)

-d

是否容许广播方式消费

-g

订阅组名称

-i

从哪个broker 开始消费

-m

是否容许从队列的最小位置开始消费,默认会设置为 false。

-q

消费失败的消息放到一个重试队列,每个订阅组配置几个重试队列

-r

重试消费最大次数,超过则投递到死信队列,不再投递,并报警

-s

消费功能是否开启

-w

发现消息堆积后,将Consumer的消费请求重定向到另外一台Slave机器

-q

重试队列的数量。默认是 1个队列

-r

最大重试次数

-a

是否通知有消费者实例变化

解析命令行参数入口

// RocketMQ 配置了 命令行的执行 shell 脚本入口。就是下面的 mqadmin.sh 这个文件
mqadmin.sh

// 解析命令行入口
org.apache.rocketmq.tools.command.MQAdminStartup#main0

// 设置 namesrvAddr 为全局变量。
if (commandLine.hasOption('n')) {
    String namesrvAddr = commandLine.getOptionValue('n');
    System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr);
}

RequstCode

// 更新或者创建订阅住
public static final int UPDATE_AND_CREATE_SUBSCRIPTIONGROUP = 200;

// Namesrv 获取注册到Name Server的所有Broker集群信息
public static final int GET_BROKER_CLUSTER_INFO = 106;

核心代码流程

只讨论集群下的。指定 broker 跟这个差不多

// 从 Name Server 获取 此集群下的 master 节点
Set masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);

// broker 端缓存配置
this.brokerController.getSubscriptionGroupManager().updateSubscriptionGroupConfig(config);

// 需要立马持久化,防止断电等意外情况发生
this.persist();

核心数据结构

// 订阅组
private final ConcurrentMap subscriptionGroupTable =
        new ConcurrentHashMap(1024);

// 集群信息
public class ClusterInfo extends RemotingSerializable {
    private HashMap brokerAddrTable;
    private HashMap> clusterAddrTable;
}

// RocketMQ 的broker中的主从关系是通过 brokerName 来绑定的。
// broker 信息
public class BrokerData implements Comparable {
    private String cluster;

    // broker 名字
    private String brokerName;

    /**
     * brokerId 为 0,表示该 broker 为 master
     * broker address 这里究竟是什么?是:mq1101.jiandan.com:10911
     */
    private HashMap brokerAddrs;

    private final Random random = new Random();
}

哪里使用到订阅关系了呢

org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.Channel, org.apache.rocketmq.remoting.protocol.RemotingCommand, boolean)


// 确保订阅组存在
SubscriptionGroupConfig subscriptionGroupConfig =
            this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
if (null == subscriptionGroupConfig) {
    response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
    response.setRemark(String.format("subscription group [%s] does not exist, %s", requestHeader.getConsumerGroup(), FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)));
    return response;
}


总结

  1. RocketMQ 的订阅关系就是存在 broker 并作持久化,等 Consumer 端消费消息的时候,校验一下。
  2. 好几篇 RocketMQ 的源码,我都在一直强调 数据结构,这个在 RocketMQ 很重要,把这些数据结构记住了,能方便我们理解 RocketMQ 更快。
  3. RocketMQ 通过从 运维端来阅读代码。会更加简单。也让大家了解到 RocketMQ 是多么的简单的。

关于我
前 去哪儿网 技术专家!混迹中间件职场8+年!分享各种Java中间件知识!

发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章