本章节,我们主要分析 RocketMQ Producer 心跳处理的原理!RocketMQ 心跳处理包括Client主动上报、Broker 扫描不活跃的 channel,达到节约网络端口,提升性能的效果。
RocketMQ 心跳处理
RocketMQ 心跳协议是 Producer、Consumer 公用的!
public class HeartbeatData extends RemotingSerializable {
// 客户端ID
private String clientID;
// 生产者数据集
private Set producerDataSet = new HashSet();
// 消费者数据集
private Set consumerDataSet = new HashSet();
}
// Producer 心跳数据
public class ProducerData {
// 组名
private String groupName;
}
// consumer 心跳数据
public class ConsumerData {
// 组名
private String groupName;
// 消费消息类型:主动(pull)、被动(push)
private ConsumeType consumeType;
// 消息模式:集群(默认)、广播
private MessageModel messageModel;
// consumer 从哪里开始消费
private ConsumeFromWhere consumeFromWhere;
// 订阅的 topic 的基本
private Set subscriptionDataSet = new HashSet();
private boolean unitMode;
}
ProducerTable 数据来源于:org.apache.rocketmq.client.impl.factory.MQClientInstance#registerProducer ConsumerTable
数据来源于:org.apache.rocketmq.client.impl.factory.MQClientInstance#registerConsumer
MQClientInstance 这个类是 Consumer、Producer 统一的抽象底层类!很重要的!
/**
* 准备心跳数据包,这个方法是公用的
*
* @return
*/
private HeartbeatData prepareHeartbeatData() {
// 初始化心跳数据包
HeartbeatData heartbeatData = new HeartbeatData();
// clientID
heartbeatData.setClientID(this.clientId);
// Consumer 心跳数据包
for (Map.Entry entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
ConsumerData consumerData = new ConsumerData();
consumerData.setGroupName(impl.groupName());
consumerData.setConsumeType(impl.consumeType());
consumerData.setMessageModel(impl.messageModel());
consumerData.setConsumeFromWhere(impl.consumeFromWhere());
consumerData.getSubscriptionDataSet().addAll(impl.subscriptions());
consumerData.setUnitMode(impl.isUnitMode());
heartbeatData.getConsumerDataSet().add(consumerData);
}
}
// Producer 心跳数据包
for (Map.Entry entry : this.producerTable.entrySet()) {
MQProducerInner impl = entry.getValue();
if (impl != null) {
ProducerData producerData = new ProducerData();
producerData.setGroupName(entry.getKey());
heartbeatData.getProducerDataSet().add(producerData);
}
}
return heartbeatData;
}
代码入口:org.apache.rocketmq.client.impl.factory.MQClientInstance#startScheduledTask
// 30 秒执行一次,间隔 1 秒钟,发送心跳信息
// 向所有 Broker 发送心跳信息
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
RocketMQ Consumer 端和 Producer 端共用一套逻辑。
具体代码如下: org.apache.rocketmq.client.impl.factory.MQClientInstance#sendHeartbeatToAllBroker 。
下面只摘抄核心代码片段:
int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
// Broker Client向Client发送心跳,并注册自身
public static final int HEART_BEAT = 34;
// broker 处理心跳,包含了 Consumer、Producer
org.apache.rocketmq.broker.processor.ClientManageProcessor#heartBeat
Producer 处理比较简单。
HashMap channelTable = this.groupChannelTable.get(group);
Consumer 端处理有点复杂:
1、看看是否是有新的 consumer instance 上线,如果有,则进行负载均衡处理操作。
2、查看订阅关系是否改变。RocketMQ 消息的过滤是2次过滤方式,一次是 broker 通过 hashcode 过滤,一次是在 Consumer client 通过 topic+tag 过滤。因为hashcode 存在冲突。
// 注册 重试Topic
String newTopic = MixAll.getRetryTopic(data.getGroupName());
// TODO 这里会将 broker 的配置重新推送到 name server 端
this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
newTopic,
subscriptionGroupConfig.getRetryQueueNums(),
PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
// 注册 consumer
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
final Set subList, boolean isNotifyConsumerIdsChangedEnable) {
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null == consumerGroupInfo) {
ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
consumerGroupInfo = prev != null ? prev : tmp;
}
// 更新 channel
boolean r1 =
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
consumeFromWhere);
// 更新订阅关系
boolean r2 = consumerGroupInfo.updateSubscription(subList);
// 若 r1 ==true,则说明有消费者上线了
// 若 r2 ==true,则说明订阅关系发生了变化
if (r1 || r2) {
if (isNotifyConsumerIdsChangedEnable) {
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
return r1 || r2;
}
public class ClientChannelInfo {
// 进行通信的 channel, Netty 中的 channel
private final Channel channel;
// 客户端ID
private final String clientId;
private final LanguageCode language;
// 客户端版本号
private final int version;
// 最后更新时间设定为当前时间,用于 channel 是否处于活跃来判断
private volatile long lastUpdateTimestamp = System.currentTimeMillis();
}
代码入口:org.apache.rocketmq.broker.client.ClientHousekeepingService#start
/**
* 扫描非活跃的 channel
* 1、生产者
* 2、消费者
* 3、过滤服务器
*/
private void scanExceptionChannel() {
// 生产者
this.brokerController.getProducerManager().scanNotActiveChannel();
// 消费者
this.brokerController.getConsumerManager().scanNotActiveChannel();
// 专门的过滤服务器
this.brokerController.getFilterServerManager().scanNotActiveChannel();
}
// channnel 超时过期的时间是 120s,client 是30s 发送一次。
private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
// 最终处理效果是,直接关闭 channel。
RemotingUtil.closeChannel(clientChannelInfo.getChannel());
// consumerTable remove 指定channel。
Consumer 端定时 Rebalance,保证了消费的可靠性。
代码入口: org.apache.rocketmq.client.impl.consumer.RebalanceService。
private static long waitInterval =Long.parseLong(System.getProperty("rocketmq.client.rebalance.waitInterval", "20000"));
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
this.waitForRunning(waitInterval);
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
关于我
前 去哪儿网 技术专家!混迹中间件职场8+年!分享各种Java中间件知识!
留言与评论(共有 0 条评论) “” |