作者:石臻臻, CSDN博客之星Top5、Kafka Contributor 、nacos Contributor、华为云 MVP ,腾讯云TVP, 滴滴Kafka技术专家 、 KnowStreaming。
KnowStreaming 是滴滴开源的Kafka运维管控平台, 有兴趣一起参与参与开发的同学,但是怕自己能力不够的同学,可以联系我,当你导师带你参与开源! 。
还记得我们在 服务端增删改配置数据之后如何通知集群中的其他机器 中分析了服务端之间的相互通知修改数据, 当时结尾的时候,留下了以下问题,我们将在这篇文章中来给解析一下;
在【Nacos源码之配置管理 九】客户端获取配置数据的流程 中我们知道了客户端获取配置数据的整个流程;也知道了客户端的Nacos配置服务类NacosConfigService ; 这个配置服务类在初始化的时候初始化了一些基本的属性,还有一些重要的实例 比如
在【Nacos源码之配置管理 九】客户端获取配置数据的流程 中只是说明了主动发起获取数据的流程; 但是客户端最重要的功能还是轮询和订阅功能; 所以下面主要讲一下 客户端如何订阅服务端数据;
代码中addListener表示对指定的配置监听,如果消息有变更了,则执行方法receiveConfigInfo ; 当然可以选择是否用异步执行的; getExecutor 方法是可以自定义线程池来执行方法 receiveConfigInfo ; 如果是返回null的话,那么就是用主线程同步执行的;
addTenantListeners
ConfigService.addListener调用之后最终是执行了这里的方法,但是看这里的方法似乎只是将listeners放入到了配置数据的缓存CacheData 中; 那什么时候会通知到我们的监听器呢?那我们得看看哪里调用了我们这个listeners了;
ClientWork 客户端
看看clientwork初始化的代码
主要看看里面的checkConfigInfo() 最终执行的是
LongPollingRunnable是一个长轮询的任务类,他负责不断的去对比服务端的配置数据的MD5与自身是否一致,如果不一致,则会发起请求去服务端获取最新数据来更新客户端的缓存数据;
我们看下主要代码:
public void run() {
//公众号: 进击的老码农 个人微信: jjdlmn_
List cacheDatas = new ArrayList();
List inInitializingCacheList = new ArrayList();
try {
// check failover config
for (CacheData cacheData : cacheMap.get().values()) {
if (cacheData.getTaskId() == taskId) {
cacheDatas.add(cacheData);
try {
checkLocalConfig(cacheData);
if (cacheData.isUseLocalConfigInfo()) {
cacheData.checkListenerMd5();
}
} catch (Exception e) {
LOGGER.error("get local config info error", e);
}
}
}
// check server config
List changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
for (String groupKey : changedGroupKeys) {
String[] key = GroupKey.parseKey(groupKey);
String dataId = key[0];
String group = key[1];
String tenant = null;
if (key.length == 3) {
tenant = key[2];
}
try {
String content = getServerConfig(dataId, group, tenant, 3000L);
CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
cache.setContent(content);
LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}",
agent.getName(), dataId, group, tenant, cache.getMd5(),
ContentUtils.truncateContent(content));
} catch (NacosException ioe) {
String message = String.format(
"[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
agent.getName(), dataId, group, tenant);
LOGGER.error(message, ioe);
}
}
for (CacheData cacheData : cacheDatas) {
if (!cacheData.isInitializing() || inInitializingCacheList
.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
cacheData.checkListenerMd5();
cacheData.setInitializing(false);
}
}
inInitializingCacheList.clear();
executorService.execute(this);
} catch (Throwable e) {
// If the rotation training task is abnormal, the next execution time of the task will be punished
LOGGER.error("longPolling error : ", e);
executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
}
}
我们先看下面的做了什么
checkUpdateDataIds获取有变化的配置数据列表
这个方法只是将所有 不是使用本地配置的的 配置数据group、dataid 拼接起来可能更新的字符串 probeUpdateString;真正做处理的还是checkUpdateConfigStr()方法;
//公众号: 进击的老码农
/**
* 从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。
*/
List checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws IOException {
List params = Arrays.asList(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);
List headers = new ArrayList(2);
headers.add("Long-Pulling-Timeout");
headers.add("" + timeout);
// told server do not hang me up if new initializing cacheData added in
if (isInitializingCacheList) {
headers.add("Long-Pulling-Timeout-No-Hangup");
headers.add("true");
}
if (StringUtils.isBlank(probeUpdateString)) {
return Collections.emptyList();
}
try {
HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params,
agent.getEncode(), timeout);
if (HttpURLConnection.HTTP_OK == result.code) {
setHealthServer(true);
return parseUpdateDataIdResponse(result.content);
} else {
setHealthServer(false);
LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(), result.code);
}
} catch (IOException e) {
setHealthServer(false);
LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e);
throw e;
}
return Collections.emptyList();
}
这个方法主要就是发起http请求,从而拿到哪些的group+dataid的配置项是发生了变化的; 注意这里不是拿具体内容content; 上面拼接的 probeUpdateString是所有客户端监听的所有配置项(除了使用了本地配置); 主要是拿到变更的group+dataid才好去发起获取content的请求
第4步骤看起来很简单,就是发起一个http请求获取有变更的配置项;但是这里其实很有文章,下面会介绍到
上面的步骤让我们拿到了改变过的配置项 changedGroupKeys ;这个时候是没有拿到具体的数据内容content的; 代码不贴了,概述一下流程
可以看到重新把这个任务放到线程池中取执行了; 那么我们就郁闷了,这不就是不停的去发情请求吗?这样不会有性能问题吗? 话是这么说,这个就是长轮询的方式发起请求; 不停的去请求;但是也不是想象中 发起之后里面发起; 因为发起一个请求,如果配置没有变更的话,服务端会将请求挂起,直到快到超时时间或者直到配置数据有变更才会返回数据并结束连接;
.
留言与评论(共有 0 条评论) “” |