生产者发送消息的主要流程图如上图所示。具体的代码由于比较多,我就不在这边贴出来的。 主要讲一下我认为比较重要的点
Producer会每隔30s从Namesrv获取最新的Topic路由信息,并缓存到本地
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { // 每个30s从NameServer更新路由表信息 MQClientInstance.this.updateTopicRouteInfoFromNameServer(); } catch (Exception e) { log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e); } }}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
路由信息就是用来发送时选择具体的Broker和队列的
private SendResult sendSelectImpl( Message msg, MessageQueueSelector selector, Object arg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { long beginStartTime = System.currentTimeMillis(); this.makeSureStateOK(); Validators.checkMessage(msg, this.defaultMQProducer); // 获取Topic队列信息 // 此处的流程: // 先从本地缓存中获取,获取到返回 // 没有获取到.从NameSrv中获取,获取到返回 // 没有获取到.如果能够自动创建Topic,会把消息放到TBW102.后续会有自动创建Topic的逻辑 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { MessageQueue mq = null; try { List messageQueueList = mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList()); Message userMessage = MessageAccessor.cloneMessage(msg); String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace()); userMessage.setTopic(userTopic); mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg)); } catch (Throwable e) { throw new MQClientException("select message queue threw exception.", e); } // 省略代码.... } // 省略代码....}
此处流程可以总结为:
异步消息需要我们单独加一个回调方法,添加在发送消息成功/失败的一些处理。 因为异步消息没有对Broker回来的结果进行额外的处理,那么自然我们就不能像同步消息一样,对Broker返回回来的结果单独针对SendResult进行单独的重试操作。所以需要我们在失败的回调方法上进行额外的处理(例如重试消息发送) 具体的原因,我们可以看下消息发送的主干逻辑
另外我在这里贴一下消息发送的主干代码(具体的代码和它们的注释可以看下我的github 我是一个超链接)
// ... 省略部分代码// 查找主题路由信息TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) { // ... 省略部分代码 for (; times < timesTotal; times++) { // 上次发送的broker名称 String lastBrokerName = null == mq ? null : mq.getBrokerName(); // 选择一条messagequeue(轮询+失败规避) MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { // ... 省略部分代码 try { // ... 省略部分代码 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); endTimestamp = System.currentTimeMillis(); // 失败规避 this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); switch (communicationMode) { case ASYNC: return null; case ONEWAY: return null; case SYNC: if (sendResult.getSendStatus() != SendStatus.SEND_OK) { // 发送失败重试 if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } return sendResult; default: break; } } catch (/** 一系列异常处理 **/) { } } }}
流程:
上面提到了失败规避,到底什么是失败规避呢? 在我们一次消息发送过程中,消息有可能发送失败。在消息发送失败,重试时选择发送消息的队列时,就会规避上次MessageQueue所在的Broker。这样能够减少很多不必要的请求(因为Broker宕机后,很大情况下这个Broker短时间内依旧是无法使用的) 那么,为什么会有宕机的Broker在我们的内存中存在? 因为NameSrv是根据心跳检测来确定Broker是否可用的(有间隔 10s),且消息生产者更新路由信息也是有间隔的(30s)。且为了Namesrv设计的简单,Namesrv不会主动将Broker宕机的信息推给消息生产者,而是需要消息生产者定时更新的时候,才会感知到Broker宕机。 在这期间存在误差,所以我们是要一个机制(失败规避策略)来减少一些不必要的性能消耗 另外,失败规避默认是关闭的
private boolean sendLatencyFaultEnable = false;public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { if (this.sendLatencyFaultEnable) { // 默认以30s作为computeNotAvailableDuration的参数 long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); }}
发送端消息的可靠性主要是靠消息发送重试 RocketMQ的架构中,可能会存在多个Broker为某个topic提供服务,这个topic的消息被存放在多个Broker下。(有点类似于Redis的Cluster) 当生产者往某个Broker发送消息失败时,会进行失败规避,选择其他提供服务的Broker,进行发送消息。确保消息能够被稳定的送到Broker中
msgId,对于客户端来说msgId是由客户端producer实例端生成的,具体来说,调用方法MessageClientIDSetter.createUniqIDBuffer()生成唯一的Id;
//for MessageBatch,ID has been set in the generating processif (!(msg instanceof MessageBatch)) { // 为消息分配唯一的全局id MessageClientIDSetter.setUniqID(msg);}
public static void setUniqID(final Message msg) { // 因为有了这个判读,所以我们重试的时候,消息id是不会发生变化的 if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) { msg.putProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, createUniqID()); }}public static final String PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX = "UNIQ_KEY";
但是,offsetMsgId是会发生变化的
发送消息的时候可以自定义一个MessageQueueSelector,就可以自己选推到那个队列,实现顺序消息的需求了
void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException;
private SendResult sendSelectImpl( Message msg, MessageQueueSelector selector, Object arg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { long beginStartTime = System.currentTimeMillis(); this.makeSureStateOK(); Validators.checkMessage(msg, this.defaultMQProducer); TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { MessageQueue mq = null; try { List messageQueueList = mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList()); Message userMessage = MessageAccessor.cloneMessage(msg); String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace()); userMessage.setTopic(userTopic); mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg)); } catch (Throwable e) { throw new MQClientException("select message queue threw exception.", e); } long costTime = System.currentTimeMillis() - beginStartTime; if (timeout < costTime) { throw new RemotingTooMuchRequestException("sendSelectImpl call timeout"); } if (mq != null) { return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime); } else { throw new MQClientException("select message queue return null.", null); } } validateNameServerSetting(); throw new MQClientException("No route info for this topic, " + msg.getTopic(), null); }
有一个注意点:这种基于selector方式的,发送消息失败是不会重试的 原因:我的理解是我们自己自定义的selector,在没有规则策略的前提下,大概率还是选择到这个失败的队列里面。但是如果有规避策略的话,又和我们定义selector的本意违背了(例如我要实现顺序消息,结果失败重试把它丢到了其他队列,违背了顺序消息的本意)
留言与评论(共有 0 条评论) “” |