上一章中。我们分析了 RocketMQ 创建 Topic 的命令过程。本章,我们开始分析 获取 Topic 路由信息的源码过程。
用法:sh mqadmin topicRoute -n 192.168.1.100:9876 -t shg
指令:topicRoute
代码入口:org.apache.rocketmq.tools.command.topic.TopicRouteSubCommand
参数 | 是否必填 | 说明 |
-h | 否 | 打印帮助 |
-n | 是 | nameserve 服务地址列表,格式ip:port;ip:port;… |
-t | 是 | Topic 名字 |
// RocketMQ 配置了 命令行的执行 shell 脚本入口。就是下面的 mqadmin.sh 这个文件
mqadmin.sh
// 解析命令行入口
org.apache.rocketmq.tools.command.MQAdminStartup#main0
// 设置 namesrvAddr 为全局变量。
if (commandLine.hasOption('n')) {
String namesrvAddr = commandLine.getOptionValue('n');
System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr);
}
RocketMQ 获取 Topic 的路由信息过程
// Namesrv 根据Topic获取Broker Name、队列数(包含读队列与写队列)
public static final int GET_ROUTEINTO_BY_TOPIC = 105;
这个是理解 RocketMQ 的一个关键地方了。我们先重点记住这个路由表。因为在 Producer和 Consumer 端向 Name Server 获取心跳的时候的,会返回 Topic 路由表。
public class RouteInfoManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
// Topic 路由表
private final HashMap> topicQueueTable;
//说明 master 与 slave 是通过 brokerName 进行配对
private final HashMap brokerAddrTable;
// 将 broker 按照 clusterName 分组
private final HashMap > clusterAddrTable;
// 代表一个活的 broker 链接由最后更新时间,一个链接 channel,数据版本和 Ha 地址组成
// Broker 定时向 namesrv 注册并更新 BrokerLiveInfo 的时间戳
private final HashMap brokerLiveTable;
private final HashMap/* Filter Server */> filterServerTable;
}
public class TopicRouteData extends RemotingSerializable {
private String orderTopicConf;
// 队列的数据
private List queueDatas;
// broker 的数据
private List brokerDatas;
// Filter Server
private HashMap/* Filter Server */> filterServerTable;
}
RocketMQ 的 Topic 是一个逻辑概念,而 queue 是实打实的,体现在 QueueData这里。具体的物理的结构就是 Consume queuer 中的 MappedFile 了
public class QueueData implements Comparable {
// broker 的名称
private String brokerName;
// 可写队列
private int readQueueNums;
// 可写队列
private int writeQueueNums;
// 权限
private int perm;
// flag
private int topicSynFlag;
}
Broker 的数据结构
public class BrokerData implements Comparable {
private String cluster;
// broker 名字
private String brokerName;
/**
* brokerId 为 0,表示该 broker 为 master
* broker address 这里究竟是什么?是:mq1101.2dfire-inc.com:10911
*/
private HashMap brokerAddrs;
private final Random random = new Random();
}
关于我
前 去哪儿网 技术专家!混迹中间件职场8+年!分享各种Java中间件知识!
留言与评论(共有 0 条评论) “” |