首先来看一个RcoketMQ发送消息的例子:
@Service
public class MQService {
@Autowired
DefaultMQProducer defaultMQProducer;
public void sendMsg() {
String msg = "我是一条消息";
// 创建消息,指定TOPIC、TAG和消息内容
Message sendMsg = new Message("TestTopic", "TestTag", msg.getBytes());
SendResult sendResult = null;
try {
// 同步发送消息
sendResult = defaultMQProducer.send(sendMsg);
System.out.println("消息发送响应:" + sendResult.toString());
} catch (MQClientException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
RocketMQ是通过DefaultMQProducer进行消息发送的,它实现了MQProducer接口,MQProducer接口中定义了消息发送的方法,方法主要分为三大类:
public interface MQProducer extends MQAdmin {
// 同步发送消息
SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException;
// 异步发送消息,SendCallback为回调函数
void send(final Message msg, final SendCallback sendCallback) throws MQClientException,
RemotingException, InterruptedException;
// 异步发送消息,没有回调函数
void sendOneway(final Message msg) throws MQClientException, RemotingException,
InterruptedException;
// 省略其他方法
}
接下来以将以同步消息发送为例来分析消息发送的流程。
DefaultMQProducer里面有一个DefaultMQProducerImpl类型的成员变量defaultMQProducerImpl,从默认的无参构造函数中可以看出在构造函数中对defaultMQProducerImpl进行了实例化,在send方法中就是调用defaultMQProducerImpl的方法进行消息发送的:
public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* 默认消息生产者实现类
*/
protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
/**
* 默认的构造函数
*/
public DefaultMQProducer() {
this(null, MixAll.DEFAULT_PRODUCER_GROUP, null);
}
/**
* 构造函数
*/
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
this.namespace = namespace;
this.producerGroup = producerGroup;
// 实例化
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}
/**
* 同步发送消息
*/
@Override
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// 设置主题
msg.setTopic(withNamespace(msg.getTopic()));
// 发送消息
return this.defaultMQProducerImpl.send(msg);
}
}
DefaultMQProducerImpl中消息的发送在sendDefaultImpl方法中实现,处理逻辑如下:
public class DefaultMQProducerImpl implements MQProducerInner {
/**
* DEFAULT SYNC -------------------------------------------------------
*/
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// 发送消息
return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}
public SendResult send(Message msg,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// 发送消息
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}
/**
* 发送消息
* @param msg 发送的消息
* @param communicationMode
* @param sendCallback 回调函数
* @param timeout 超时时间
*/
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
final long invokeID = random.nextLong();
// 开始时间
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
// 查找主题路由信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
boolean callTimeout = false;
// 消息队列
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
// 获取失败重试次数
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
// 获取BrokerName
String lastBrokerName = null == mq ? null : mq.getBrokerName();
// 根据BrokerName选择一个消息队列
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
// 记录本次选择的消息队列
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
// 记录时间
beginTimestampPrev = System.currentTimeMillis();
if (times > 0) {
//Reset topic with namespace during resend.
msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
}
// 计算选择消息队列的耗时时间
long costTime = beginTimestampPrev - beginTimestampFirst;
// 如果已经超时,终止发送
if (timeout < costTime) {
callTimeout = true;
break;
}
// 发送消息
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
// 结束时间
endTimestamp = System.currentTimeMillis();
// 记录向Broker发送消息的请求耗时,消息发送结束时间 - 开始时间
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
// 如果发送失败
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
// 是否重试
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
// 返回结果
return sendResult;
default:
break;
}
} catch (RemotingException e) {
endTimestamp = System.currentTimeMillis();
// 如果抛出异常,记录请求耗时
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
}
// ... 省略其他异常处理
} else {
break;
}
}
if (sendResult != null) {
return sendResult;
}
// ...
}
validateNameServerSetting();
throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}
}
折叠
DefaultMQProducerImpl中有一个路由信息表topicPublishInfoTable,记录了主题对应的路由信息,其中KEY为topic, value为对应的路由信息对象TopicPublishInfo:
public class DefaultMQProducerImpl implements MQProducerInner {
// 路由信息表,KEY为topic, value为对应的路由信息对象TopicPublishInfo
private final ConcurrentMap topicPublishInfoTable =
new ConcurrentHashMap();
}
TopicPublishInfo中记录了主题所在的消息队列信息、所在Broker等信息:
messageQueueList:一个MessageQueue类型的消息队列列表,MessageQueue中记录了主题名称、主题所属的Broker名称和队列ID
sendWhichQueue:计数器,选择消息队列的时候增1,以此达到轮询的目的
topicRouteData:从NameServer查询到的主题对应的路由数据,包含了队列和Broker的相关数据
public class TopicPublishInfo {
// 消息队列列表
private List messageQueueList = new ArrayList();
// 一个计数器,每次选择消息队列的时候增1,以此达到轮询的目的
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
// 主题路由数据
private TopicRouteData topicRouteData;
// ...
}
// 消息队列
public class MessageQueue implements Comparable, Serializable {
private static final long serialVersionUID = 6191200464116433425L;
private String topic; // 主题
private String brokerName; // 所属Broker名称
private int queueId; // 队列ID
// ...
}
// 主题路由数据
public class TopicRouteData extends RemotingSerializable {
private List queueDatas; // 队列数据列表
private List brokerDatas; // Broker信息列表
// ...
}
// 队列数据
public class QueueData implements Comparable {
private String brokerName; // Broker名称
private int readQueueNums; // 可读队列数量
private int writeQueueNums; // 可写队列数量
private int perm;
private int topicSysFlag;
}
// Broker数据
public class BrokerData implements Comparable {
private String cluster; // 集群名称
private String brokerName; // Broker名称
private HashMap brokerAddrs; // Broker地址集合,KEY为Broker ID, value为Broker 地址
// ...
}
折叠
在查找主题路由信息的时候首先从DefaultMQProducerImpl缓存的路由表topicPublishInfoTable中根据主题查找路由信息,如果查询成功返回即可,如果未查询到,需要从NameServer中获取路由信息,如果获取失败,则使用默认的主题路由信息:
public class DefaultMQProducerImpl implements MQProducerInner {
// 路由信息表,KEY为topic, value为对应的路由信息对象TopicPublishInfo
private final ConcurrentMap topicPublishInfoTable =
new ConcurrentHashMap();
/**
* 根据主题查找路由信息
* @param topic 主题
* @return
*/
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
// 根据主题获取对应的主题路由信息
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
// 如果未获取到
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
// 从NameServer中查询路由信息
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
// 如果路由信息获取成功
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
// 返回路由信息
return topicPublishInfo;
} else {
// 如果路由信息未获取成功,使用默认主题查询路由信息
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
// 返回路由信息
return topicPublishInfo;
}
}
}
从NameServer获取主题路由信息数据是在MQClientInstance中的updateTopicRouteInfoFromNameServer方法中实现的:
public class MQClientInstance {
public boolean updateTopicRouteInfoFromNameServer(final String topic) {
// 从NameServer更新路由信息
return updateTopicRouteInfoFromNameServer(topic, false, null);
}
/**
* 从NameServer更新路由信息
* @param topic 主题
* @param isDefault 是否使用默认的主题
* @param defaultMQProducer 默认消息生产者
* @return
*/
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
try {
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
TopicRouteData topicRouteData;
// 是否使用默认的路由信息
if (isDefault && defaultMQProducer != null) {
// 使用默认的主题路由信息
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
clientConfig.getMqClientApiTimeout());
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums); // 设置可读队列数量
data.setWriteQueueNums(queueNums); // 设置可写队列数量
}
}
} else {
// 从NameServer获取路由信息
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());
}
// 如果路由信息不为空
if (topicRouteData != null) {
// 从路由表中获取旧的路由信息
TopicRouteData old = this.topicRouteTable.get(topic);
// 判断路由信息是否发生变化
boolean changed = topicRouteDataIsChange(old, topicRouteData);
if (!changed) {
// 是否需要更新路由信息
changed = this.isNeedUpdateTopicRouteInfo(topic);
} else {
log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
}
// 如果数据发生变化
if (changed) {
// 克隆一份新的路由信息
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
// 处理brokerAddrTable中的数据
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
// 更新brokerAddrTable中的数据
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}
// ...
log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
// 将新的路由信息加入到路由表
this.topicRouteTable.put(topic, cloneTopicRouteData);
return true;
}
} else {
log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}. [{}]", topic, this.clientId);
}
} catch (MQClientException e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
}
} catch (RemotingException e) {
log.error("updateTopicRouteInfoFromNameServer Exception", e);
throw new IllegalStateException(e);
} finally {
this.lockNamesrv.unlock();
}
} else {
log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms. [{}]", LOCK_TIMEOUT_MILLIS, this.clientId);
}
} catch (InterruptedException e) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
}
return false;
}
}
折叠
向NameServer发送请求的代码实现在MQClientAPIImpl的getTopicRouteInfoFromNameServer方法中,可以看到构建了请求命令RemotingCommand并设置请求类型为RequestCode.GET_ROUTEINFO_BY_TOPIC,表示从NameServer获取路由信息,之后通过Netty向NameServer发送请求,并解析返回结果:
public class MQClientAPIImpl {
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis)
throws RemotingException, MQClientException, InterruptedException {
// 从NameServer获取路由信息
return getTopicRouteInfoFromNameServer(topic, timeoutMillis, true);
}
/**
* 从NameServer获取路由信息
* @param topic
* @param timeoutMillis
* @param allowTopicNotExist
* @return
* @throws MQClientException
* @throws InterruptedException
* @throws RemotingTimeoutException
* @throws RemotingSendRequestException
* @throws RemotingConnectException
*/
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
requestHeader.setTopic(topic);
// 创建请求命令,请求类型为获取主题路由信息GET_ROUTEINFO_BY_TOPIC
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader);
// 发送请求
RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
// 如果主题不存在
case ResponseCode.TOPIC_NOT_EXIST: {
if (allowTopicNotExist) {
log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
}
break;
}
// 如果请求发送成功
case ResponseCode.SUCCESS: {
byte[] body = response.getBody();
// 返回获取的路由信息
if (body != null) {
return TopicRouteData.decode(body, TopicRouteData.class);
}
}
default:
break;
}
throw new MQClientException(response.getCode(), response.getRemark());
}
}
折叠
主题路由信息数据TopicPublishInfo获取到之后,需要从中选取一个消息队列,是通过调用MQFaultStrategy的selectOneMessageQueue方法触发的,之后会进入MQFaultStrategy的selectOneMessageQueue方法从主题路由信息中选择消息队列:
public class DefaultMQProducerImpl implements MQProducerInner {
private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
// 选择消息队列
return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}
}
MQFaultStrategy的selectOneMessageQueue方法主要是通过调用TopicPublishInfo中的相关方法进行消息队列选择的。
启用故障延迟机制
如果启用了故障延迟机制,会遍历TopicPublishInfo中存储的消息队列列表,对计数器增1,轮询选择一个消息队列,接着会判断消息队列所属的Broker是否可用,如果Broker可用返回消息队列即可。
如果选出的队列所属Broker不可用,会调用latencyFaultTolerance的pickOneAtLeast方法(下面会讲到)选择一个Broker,从tpInfo中获取此Broker可写的队列数量,如果数量大于0,调用selectOneMessageQueue()方法选择一个队列。
如果故障延迟机制未选出消息队列,依旧会调用selectOneMessageQueue()选择出一个消息队列。
未启用故障延迟机制
直接调用的selectOneMessageQueue(String lastBrokerName)方法并传入上一次使用的Broker名称进行选择。
public class MQFaultStrategy {
/**
* 选择消息队列
* @param tpInfo 主题路由信息
* @param lastBrokerName 上一次使用的Broker名称
* @return
*/
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
// 如果启用故障延迟机制
if (this.sendLatencyFaultEnable) {
try {
// 计数器增1
int index = tpInfo.getSendWhichQueue().incrementAndGet();
// 遍历TopicPublishInfo中存储的消息队列列表
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
// 轮询选择一个消息队列
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
// 如果下标小于0,则使用0
if (pos < 0)
pos = 0;
// 根据下标获取消息队列
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
// 判断消息队列所属的Broker是否可用,如果可用返回当前选择的消息队列
if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
return mq;
}
// 如果未获取到可用的Broker
// 调用pickOneAtLeast选择一个
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
// 从tpInfo中获取Broker可写的队列数量
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
// 如果可写的队列数量大于0
if (writeQueueNums > 0) {
// 选择一个消息队列
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
// 设置消息队列所属的Broker
mq.setBrokerName(notBestBroker);
// 设置队列ID
mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
}
// 返回消息队列
return mq;
} else {
// 移除Broker
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
// 如果故障延迟机制未选出消息队列,调用selectOneMessageQueue选择消息队列
return tpInfo.selectOneMessageQueue();
}
// 根据上一次使用的BrokerName获取消息队列
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
}
折叠
selectOneMessageQueue方法的实现
selectOneMessageQueue方法中,如果上一次选择的BrokerName为空,则调用无参的selectOneMessageQueue方法选择消息队列,也是默认的选择方式,首先对计数器增一,然后用计数器的值对messageQueueList列表的长度取余得到下标值pos,再从messageQueueList中获取pos位置的元素,以此达到轮询从messageQueueList列表中选择消息队列的目的。
如果传入的BrokerName不为空,遍历messageQueueList列表,同样对计数器增一,并对messageQueueList列表的长度取余,选取一个消息队列,不同的地方是选择消息队列之后,会判断消息队列所属的Broker是否与上一次选择的Broker名称一致,如果一致则继续循环,轮询选择下一个消息队列,也就是说,如果上一次选择了某个Broker发送消息,本次将不会再选择这个Broker,当然如果最后仍未找到满足要求的消息队列,则仍旧使用默认的选择方式,也就是调用无参的selectOneMessageQueue方法进行选择。
public class TopicPublishInfo {
private boolean orderTopic = false;
private boolean haveTopicRouterInfo = false;
private List messageQueueList = new ArrayList(); // 消息队列列表
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); // 一个计数器,每次选择消息队列的时候增1,以此达到轮询的目的
private TopicRouteData topicRouteData;
// ...
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
// 如果上一次选择的BrokerName为空
if (lastBrokerName == null) {
// 选择消息队列
return selectOneMessageQueue();
} else {
// 遍历消息队列列表
for (int i = 0; i < this.messageQueueList.size(); i++) {
// 计数器增1
int index = this.sendWhichQueue.incrementAndGet();
// 对长度取余
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
// 获取消息队列,也就是使用使用轮询的方式选择消息队列
MessageQueue mq = this.messageQueueList.get(pos);
// 如果队列所属的Broker与上一次选择的不同,返回消息队列
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
// 使用默认方式选择
return selectOneMessageQueue();
}
}
// 选择消息队列
public MessageQueue selectOneMessageQueue() {
// 自增
int index = this.sendWhichQueue.incrementAndGet();
// 对长度取余
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
// 选择消息队列
return this.messageQueueList.get(pos);
}
}
折叠
回到发送消息的代码中,可以看到消息发送无论成功与否都会调用updateFaultItem方法更新失败条目:
public class DefaultMQProducerImpl implements MQProducerInner {
private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();
// 发送消息
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// ...
for (; times < timesTotal; times++) {
try {
// 开始时间
beginTimestampPrev = System.currentTimeMillis();
// ...
// 发送消息
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
// 结束时间
endTimestamp = System.currentTimeMillis();
// 更新失败条目
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
// ...
} catch (RemotingException e) {
endTimestamp = System.currentTimeMillis();
// 更新失败条目
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
}
// 省略其他catch
// ...
catch (InterruptedException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
log.warn("sendKernelImpl exception", e);
log.warn(msg.toString());
throw e;
}
} else {
break;
}
}
// ...
}
// 更新FaultItem
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
// 调用MQFaultStrategy的updateFaultItem方法
this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);
}
}
折叠
MQFaultStrategy中有一个类型的成员变量,最终是通过调用latencyFaultTolerance的updateFaultItem方法进行更新的,并传入了三个参数:
brokerName:Broker名称
currentLatency:当前延迟时间,由上面的调用可知传入的值为发送消息的耗时时间,即消息发送结束时间 - 开始时间
duration:持续时间,根据isolation的值决定,如果为true,duration的值为30000ms也就是30s,否则与currentLatency的值一致
public class MQFaultStrategy {
// 故障延迟机制
private final LatencyFaultTolerance latencyFaultTolerance = new LatencyFaultToleranceImpl();
/**
* 更新失败条目
* @param brokerName Broker名称
* @param currentLatency 发送消息耗时:请求结束时间 - 开始时间
* @param isolation
*/
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
if (this.sendLatencyFaultEnable) {
// 计算duration,isolation为true时使用30000,否则使用发送消息的耗时时间currentLatency
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
// 更新到latencyFaultTolerance中
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
}
}
LatencyFaultToleranceImpl
LatencyFaultToleranceImpl中有一个faultItemTable,记录了每个Broker对应的FaultItem,在updateFaultItem方法中首先根据Broker名称从faultItemTable获取FaultItem:
public class LatencyFaultToleranceImpl implements LatencyFaultTolerance {
// FaultItem集合,Key为BrokerName,value为对应的FaultItem对象
private final ConcurrentHashMap faultItemTable = new ConcurrentHashMap(16);
/**
* 更新FaultItem
* @param name Broker名称
* @param currentLatency 延迟时间,也就是发送消息耗时:请求结束时间 - 开始时间
* @param notAvailableDuration 不可用的持续时间,也就是上一步中的duration
*/
@Override
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
// 获取FaultItem
FaultItem old = this.faultItemTable.get(name);
// 如果不存在
if (null == old) {
// 新建FaultItem
final FaultItem faultItem = new FaultItem(name);
// 设置currentLatency延迟时间
faultItem.setCurrentLatency(currentLatency);
// 设置规避故障开始时间,当前时间 + 不可用的持续时间,不可用的持续时间有两种情况:值为30000或者与currentLatency一致
faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
// 添加到faultItemTable
old = this.faultItemTable.putIfAbsent(name, faultItem);
if (old != null) {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
} else {
// 更新时间
old.setCurrentLatency(currentLatency);
// 更新开始时间
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
}
}
FaultItem是LatencyFaultToleranceImpl的一个内部类,里面有三个变量:
isAvailable方法
isAvailable方法用于开启故障延迟机制时判断Broker是否可用,可用判断方式为:当前时间 - startTimestamp的值大于等于 0,如果小于0则认为不可用。
上面分析可知startTimestamp的值为新建/更新FaultItem的时间 + 不可用的时间,如果当前时间减去规避故障开始时间的值大于等于0,说明此Broker已经超过了设置的规避时间,可以重新被选择用于发送消息。
compareTo方法
FaultItem还实现了Comparable,重写了compareTo方法,在排序的时候使用,对比大小的规则如下:
总结
isAvailable方法返回true的时候表示FaultItem对象的值越小,因为true代表Broker已经过了规避故障的时间,可以重新被选择。
currentLatency的值越小表示FaultItem的值越小。currentLatency的值与Broker发送消息的耗时有关,耗时越低,值就越小。
startTimestamp值越小同样表示整个FaultItem的值也越小。startTimestamp的值与currentLatency有关(值不为默认的30000毫秒情况下),currentLatency值越小,startTimestamp的值也越小。
public class LatencyFaultToleranceImpl implements LatencyFaultTolerance {
class FaultItem implements Comparable {
private final String name; // Broker名称
private volatile long currentLatency; // 发送消息耗时时间:请求结束时间 - 开始时间
private volatile long startTimestamp; // 规避开始时间:新建/更新FaultItem的时间 + 不可用的时间notAvailableDuration
@Override
public int compareTo(final FaultItem other) {
// 如果isAvailable不相等,说明一个为true一个为false
if (this.isAvailable() != other.isAvailable()) {
if (this.isAvailable()) // 如果当前对象为true
return -1; // 当前对象小
if (other.isAvailable())// 如果other对象为true
return 1; // other对象大
}
// 对比发送消息耗时时间
if (this.currentLatency < other.currentLatency)
return -1;// 当前对象小
else if (this.currentLatency > other.currentLatency) {
return 1; // other对象大
}
// 对比故障规避开始时间
if (this.startTimestamp < other.startTimestamp)
return -1;
else if (this.startTimestamp > other.startTimestamp) {
return 1;
}
return 0;
}
// 用于判断Broker是否可用
public boolean isAvailable() {
// 当前时间减去startTimestamp的值是否大于等于0,大于等于0表示可用
return (System.currentTimeMillis() - startTimestamp) >= 0;
}
}
}
折叠
在选择消息队列时,如果开启故障延迟机制并且未找到合适的消息队列,会调用pickOneAtLeast方法选择一个Broker,那么是如何选择Broker的呢?
由FaultItem的compareTo方法可知,currentLatency和startTimestamp的值越小,整个FaultItem的值也就越小,正序排序时越靠前,靠前表示向Broker发送消息的延迟越低,在选择Broker时优先级越高,所以如果half值小于等于0的时候,取链表中的第一个元素,half值大于0的时候,处于链表前half个的Brokerddd,延迟都是相对较低的,此时轮询从前haft个Broker中选择一个Broker。
public class LatencyFaultToleranceImpl implements LatencyFaultTolerance {
// FaultItem集合,Key为BrokerName,value为对应的FaultItem对象
private final ConcurrentHashMap faultItemTable = new ConcurrentHashMap(16);
@Override
public String pickOneAtLeast() {
final Enumeration elements = this.faultItemTable.elements();
List tmpList = new LinkedList();
// 遍历faultItemTable
while (elements.hasMoreElements()) {
final FaultItem faultItem = elements.nextElement();
// 将FaultItem添加到列表中
tmpList.add(faultItem);
}
if (!tmpList.isEmpty()) {
Collections.shuffle(tmpList);
// 排序
Collections.sort(tmpList);
// 计算中间数
final int half = tmpList.size() / 2;
// 如果中位数小于等于0
if (half <= 0) {
// 获取第一个元素
return tmpList.get(0).getName();
} else {
// 对中间数取余
final int i = this.whichItemWorst.incrementAndGet() % half;
return tmpList.get(i).getName();
}
}
return null;
}
}
再回到MQFaultStrategy中选择消息队列的地方,在开启故障延迟机制的时候,选择队列后会调用LatencyFaultToleranceImpl的isAvailable方法来判断Broker是否可用,而LatencyFaultToleranceImpl的isAvailable方法又是调用Broker对应 FaultItem的isAvailable方法来判断的。
由上面的分析可知,isAvailable返回true表示Broker已经过了规避时间可以用于发送消息,返回false表示还在规避时间内,需要避免选择此Broker,所以故障延迟机制指的是在发送消息时记录每个Broker的耗时时间,如果某个Broker发生故障,但是生产者还未感知(NameServer 30s检测一次心跳,有可能Broker已经发生故障但未到检测时间,所以会有一定的延迟),用耗时时间做为一个故障规避时间(也可以是30000ms),此时消息会发送失败,在重试或者下次选择消息队列的时候,如果在规避时间内,可以在短时间内避免再次选择到此Broker,以此达到故障规避的目的。
如果某个主题所在的所有Broker都处于不可用状态,此时调用pickOneAtLeast方法尽量选择延迟时间最短、规避时间最短(排序后的失败条目中靠前的元素)的Broker作为此次发生消息的Broker。
public class MQFaultStrategy {
private final LatencyFaultTolerance latencyFaultTolerance = new LatencyFaultToleranceImpl();
/**
* 选择消息队列
* @param tpInfo 主题路由信息
* @param lastBrokerName 上一次使用的Broker名称
* @return
*/
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
// 如果启用故障延迟机制
if (this.sendLatencyFaultEnable) {
try {
// 计数器增1
int index = tpInfo.getSendWhichQueue().incrementAndGet();
// 遍历TopicPublishInfo中存储的消息队列列表
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
// 轮询选择一个消息队列
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
// 如果下标小于0,则使用0
if (pos < 0)
pos = 0;
// 根据下标获取消息队列
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
// 判断消息队列所属的Broker是否可用,如果可用返回当前选择的消息队列
if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
return mq;
}
// 如果未获取到可用的Broker
// 调用pickOneAtLeast选择一个
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
// 从tpInfo中获取Broker可写的队列数量
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
// 如果可写的队列数量大于0
if (writeQueueNums > 0) {
// 选择一个消息队列
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
// 设置消息队列所属的Broker
mq.setBrokerName(notBestBroker);
// 设置队列ID
mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
}
// 返回消息队列
return mq;
} else {
// 移除Broker
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
// 如果故障延迟机制未选出消息队列,调用selectOneMessageQueue选择消息队列
return tpInfo.selectOneMessageQueue();
}
// 根据上一次使用的BrokerName获取消息队列
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
}
public class LatencyFaultToleranceImpl implements LatencyFaultTolerance {
private final ConcurrentHashMap faultItemTable = new ConcurrentHashMap(16);
@Override
public boolean isAvailable(final String name) {
final FaultItem faultItem = this.faultItemTable.get(name);
if (faultItem != null) {
// 调用FaultItem的isAvailable方法判断是否可用
return faultItem.isAvailable();
}
return true;
}
}
留言与评论(共有 0 条评论) “” |