本章节,我们主要介绍 RokcetMQ 一个核心组件,它的职责是提供了 Topic 路由管理、Broker 服务注册与存活检测服务。它就是 Name Sever。维护了整个集群的元数据。让我们一起学习一下 Name Server 启动过程吧!
我个人搞中间件的原则:简单粗暴!
功能点 | Zookeeper | Name Server |
角色 | 协调者 | 元数据管理中心 |
配置保存 | 磁盘 | 内存 |
是否支持选举 | 是 | 否 |
数据一致性 | 强一致性 | 最终一致性,通过 broker 定时上报心跳 |
是否高可用 | 是 | 是 |
协议设计 | ZAB | 无 |
网络分区 | 存在 | 不存在 |
更深层次的原因!
RocketMQ 常见的部署架构
RocketMQ Name Server 几大模块
Name Server 核心数据结构,因为 RocketMQ 的 Name Server 是内存型服务。基本不做持久化操作。
// Topic 路由信息
public class RouteInfoManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final HashMap> topicQueueTable;
//说明 master 与 slave 是通过 brokerName 进行配对
private final HashMap brokerAddrTable;
// 将 broker 按照 clusterName 分组
private final HashMap > clusterAddrTable;
// 代表一个活的 broker 链接由最后更新时间,一个链接 channel,数据版本和 Ha 地址组成
// Broker 定时向 namesrv 注册并更新 BrokerLiveInfo 的时间戳
private final HashMap brokerLiveTable;
private final HashMap/* Filter Server */> filterServerTable;
}
// Topic queue 的数据结构
public class QueueData implements Comparable {
// broker 的名称
private String brokerName;
// 可写队列
private int readQueueNums;
// 可写队列
private int writeQueueNums;
// 权限
private int perm;
// flag
private int topicSynFlag;
}
// Broker 数据结构
public class BrokerData implements Comparable {
private String cluster;
// broker 名字
private String brokerName;
}
// Broker 存活数据结构
class BrokerLiveInfo {
private long lastUpdateTimestamp;
private DataVersion dataVersion;
private Channel channel;
private String haServerAddr;
}
Name Server 启动类的入口:org.apache.rocketmq.namesrv.NamesrvStartup
下面是加载&转换配置核心代码片段
// 初始化配置文件
final NamesrvConfig namesrvConfig = new NamesrvConfig();
// NameSrv 中 netty 的配置
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
// 监听的端口,默认是 9876 。这里是设置 Netty 监听的端口,可以设置任何值,只要保证不冲突就可以啦
nettyServerConfig.setListenPort(9876);
// 加载配置
String file = commandLine.getOptionValue('c');
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
// 将配置转换为 namesrv config
MixAll.properties2Object(properties, namesrvConfig);
// 将 配置转换为 netty server config
MixAll.properties2Object(properties, nettyServerConfig);
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
this.namesrvConfig = namesrvConfig;
this.nettyServerConfig = nettyServerConfig;
this.kvConfigManager = new KVConfigManager(this);
// 运行过程中的路由信息,数据只在内存,宕机后数据消失,但是Broker会 定期 推送最新数据
this.routeInfoManager = new RouteInfoManager();
// broker 长链接检测
this.brokerHousekeepingService = new BrokerHousekeepingService(this);
this.configuration = new Configuration(
log,
this.namesrvConfig, this.nettyServerConfig
);
this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}
代码入口:org.apache.rocketmq.namesrv.NamesrvController#initialize
非核心代码我都忽略掉了
// 注册 RequestCOde,RocketMQ 所有的请求,都先经过 Netty 层 decode 后,拿到具体的 RequestCode,再做具体的
// 请求转发
this.registerProcessor();
// 定时扫描不活跃的 Broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
// 其他的都不太重要了,我就不写出来了
// 网络层启动,后续会单独几篇来说 RocketMQ 网络层
this.remotingServer.start();
// 这个是为了开启 SSL,具体的代码片段:org.apache.rocketmq.namesrv.NamesrvController#initialize
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
RocketMQ 通过 RequestCode 来辨别需要调用的方法。
// 保存 KV 配置
RequestCode.PUT_KV_CONFIG
// 获取 KV 配置
RequestCode.GET_KV_CONFIG
// 删除 KV 配置
RequestCode.DELETE_KV_CONFIG
// 查询数据版本号。Broker 注册的是,带有 version
RequestCode.QUERY_DATA_VERSION
// 注册 Borker
RequestCode.REGISTER_BROKER
// 取消注册 broker
RequestCode.UNREGISTER_BROKER
// 通过 Topic 查询路由信息
RequestCode.GET_ROUTEINTO_BY_TOPIC
// 获取集群 Broker 信息
RequestCode.GET_BROKER_CLUSTER_INFO
// 擦除 Broker 权限
RequestCode.WIPE_WRITE_PERM_OF_BROKER
// 获取所有的 Topic
RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER
// 删除 Topic
RequestCode.DELETE_TOPIC_IN_NAMESRV
// 获取 KV 列表
RequestCode.GET_KVLIST_BY_NAMESPACE
// 获取集群Topic
RequestCode.GET_TOPICS_BY_CLUSTER
// 获取系统保留的 Broker
RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS
//
RequestCode.GET_UNIT_TOPIC_LIST
//
RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST
//
RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST
// 更新 name srv的配置
RequestCode.UPDATE_NAMESRV_CONFIG
// 获取 name server 配置
RequestCode.GET_NAMESRV_CONFIG
Broker 端定时注册心跳,并且,携带比如 Topic 等等信息。
// 定时将 broker 的信息注册到 namesrv 上面,broker 启动的时候,先将信息加载到 broker 中
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
Name Servuer 端,处理 Broker 端上报的心跳数据。
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
Broker 注册心跳信息,是 one way 的方式,存在可能心跳数据丢失等等问题。所以 Name Server 端主动检测 Broker 存活问题。前面一节,我们提到过,RocketMQ Name Server 启动的时候,注册一个 定时任务,定时扫描 Broker 端网络。
// 扫描不活跃的 Broker
public void scanNotActiveBroker() {
// Broker 定时上报
Iterator> it = this.brokerLiveTable.entrySet().iterator();
while (it.hasNext()) {
Entry next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
// 120s
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
// channel 关闭
RemotingUtil.closeChannel(next.getValue().getChannel());
it.remove();
log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
// channel 关闭,移除内存重的数据
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
}
}
}
public void onChannelDestroy(String remoteAddr, Channel channel) {
// brokerLiveTable 数据移除
this.brokerLiveTable.remove(brokerAddrFound);
// 移除 Filter ,RocketMQ 的 Broker 也有可能是 Filter
this.filterServerTable.remove(brokerAddrFound);
}
前面的 扫描 Broker 属于 主动发现的,是有一定的时间间隔的。而这个属于 Netty 层,主动发现。
// 接收Broker连接事件,这个接口会监听网络层的请求,如果有close等关闭请求,直接从brokerLiveTable中移除该数据
private BrokerHousekeepingService brokerHousekeepingService;
public class BrokerHousekeepingService implements ChannelEventListener {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final NamesrvController namesrvController;
public BrokerHousekeepingService(NamesrvController namesrvController) {
this.namesrvController = namesrvController;
}
@Override
public void onChannelConnect(String remoteAddr, Channel channel) {
}
@Override
public void onChannelClose(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
@Override
public void onChannelException(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
@Override
public void onChannelIdle(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
}
关于我
前 去哪儿网 技术专家!混迹中间件职场8+年!分享各种Java中间件知识!
留言与评论(共有 0 条评论) “” |