  1. Dubbo的服务上下线流程以及动态感知
  2. Dubbo的动态配置监听
  3. Dubbo的路由监听

第1章 服务上线下线监听

1.1 监听注册

1.1.1 说明



1.1.2 源码分析


protected  ClusterInvoker doCreateInvoker(DynamicDirectory directory, Cluster                                                cluster, Registry registry, Class type) {    directory.setRegistry(registry);    directory.setProtocol(protocol);    // all attributes of REFER_KEY    Map parameters = new HashMap        (directory.getConsumerUrl().getParameters());    URL urlToRegistry = new ServiceConfigURL(        parameters.get(PROTOCOL_KEY) == null ? DUBBO : parameters.get(PROTOCOL_KEY),        parameters.remove(REGISTER_IP_KEY), 0, getPath(parameters, type), parameters);    if (directory.isShouldRegister()) {        directory.setRegisteredConsumerUrl(urlToRegistry);        //把协议注册到 /dubbo/cn.enjoy.userService/consumers节点下面        registry.register(directory.getRegisteredConsumerUrl());    }    //创建路由链    directory.buildRouterChain(urlToRegistry);    //订阅事件,对 configurations,providers,routes节点建立监听    directory.subscribe(toSubscribeUrl(urlToRegistry));    //返回默认的 FailoverClusterInvoker对象    return (ClusterInvoker) cluster.join(directory);}


@Overridepublic void subscribe(URL url) {    setSubscribeUrl(url);    CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);    //订阅事件 对 config中的 xxx.xx.xx.xx::.configurations    referenceConfigurationListener = new ReferenceConfigurationListener(this, url);    //订阅其他事件,configurations routes,providers    registry.subscribe(url, this);}

registry.subscribe(url, this);这行代码就是注册监听的代码,其实这里注册的监听目录有三个,configurations routes,providers。代码最终来到了ZookeeperRegistry中的doSubscribe方法。

@Overridepublic void doSubscribe(final URL url, final NotifyListener listener) {    try {        if (ANY_VALUE.equals(url.getServiceInterface())) {            String root = toRootPath();            ConcurrentMap listeners =                zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());            ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> {                for (String child : currentChilds) {                    child = URL.decode(child);                    if (!anyServices.contains(child)) {                        anyServices.add(child);                        subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child,                                                                   Constants.CHECK_KEY, String.valueOf(false)), k);                    }                }            });            zkClient.create(root, false);            List services = zkClient.addChildListener(root, zkListener);            if (CollectionUtils.isNotEmpty(services)) {                for (String service : services) {                    service = URL.decode(service);                    anyServices.add(service);                    subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,                                                                 Constants.CHECK_KEY, String.valueOf(false)), listener);                }            }        } else {            CountDownLatch latch = new CountDownLatch(1);            List urls = new ArrayList<>();            //这里对应 configurators,providers,routes目录            for (String path : toCategoriesPath(url)) {                ConcurrentMap listeners =                    zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());                //RegistryChildListenerImpl 事件回调类,zookeeper事件回调到它                ChildListener zkListener = listeners.computeIfAbsent(listener, k -> new RegistryChildListenerImpl(url, path, k, latch));                if (zkListener instanceof RegistryChildListenerImpl) {                    ((RegistryChildListenerImpl) zkListener).setLatch(latch);                }                zkClient.create(path, false);                //这里会注册zookeeper事件,并且把zookeeper事件和RegistryChildListenerImpl做映射                List children = zkClient.addChildListener(path, zkListener);                if (children != null) {                    //弄一个empty协议,做初始化工作,比如清空集合容器                    urls.addAll(toUrlsWithEmpty(url, path, children));                }            }            //启动后做初始化式的触发监听            notify(url, listener, urls);            // tells the listener to run only after the sync notification of main thread finishes.            latch.countDown();        }    } catch (Throwable e) {        throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() +                               ", cause: " + e.getMessage(), e);    }}



List children = zkClient.addChildListener(path, zkListener);


@Overridepublic List addChildListener(String path, final ChildListener listener) {    ConcurrentMap listeners =        childListeners.computeIfAbsent(path, k -> new ConcurrentHashMap<>());    //ChildListener和zookeeper事件做了映射    TargetChildListener targetListener = listeners.computeIfAbsent(listener, k ->                                                                   createTargetChildListener(path, k));    //添加zookeeper事件监听    return addTargetChildListener(path, targetListener);}


@Overridepublic CuratorZookeeperClient.CuratorWatcherImpl createTargetChildListener(String path,                                                                           ChildListener listener) {    return new CuratorZookeeperClient.CuratorWatcherImpl(client, listener, path);}
//注册zookeeper的监听,@Overridepublic List addTargetChildListener(String path, CuratorWatcherImpl listener) {    try {        return client.getChildren().usingWatcher(listener).forPath(path);    } catch (NoNodeException e) {        return null;    } catch (Exception e) {        throw new IllegalStateException(e.getMessage(), e);    }}


1.2 监听触发

1.2.1 说明


1.2.2 源码分析

前面我们分析了监听的注册,下面我们来分析一下监听的触发源码,比如我们往providers节点写入一个数据,那么监听应该要触发,往providers写数据,我们采用dubbo api的方式去写一个dubbo协议的数据写到providers节点下,如下:

@Testpublic void providerReg() {    String url = "dubbo%3A%2F%2F192.168.67.3%3A20990%2Fcn.enjoy.service.UserService%3Fanyhost%3Dtrue%26application%3Ddubbo_provider%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dcn.enjoy.service.UserService%26metadata-type%3Dremote%26methods%3DdoKill%2CqueryUser%26pid%3D14092%26release%3D3.0.2.1%26retries%3D7%26revision%3D1.0-SNAPSHOT%26service-name-mapping%3Dtrue%26side%3Dprovider%26threadpool%3Dfixed%26threads%3D100%26timeout%3D5000%26timestamp%3D1635058443480";    RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getAdaptiveExtension();    Registry registry = registryFactory.getRegistry(URL.valueOf("zookeeper://"));    registry.register(URL.valueOf(URL.decode(url)));}



static class CuratorWatcherImpl implements CuratorWatcher {    private CuratorFramework client;    private volatile ChildListener childListener;    private String path;    public CuratorWatcherImpl(CuratorFramework client, ChildListener listener, String path)    {        this.client = client;        this.childListener = listener;        this.path = path;    }    protected CuratorWatcherImpl() {    }    public void unwatch() {        this.childListener = null;    }    @Override    public void process(WatchedEvent event) throws Exception {        // if client connect or disconnect to server, zookeeper will queue        // watched event(Watcher.Event.EventType.None, .., path = null).        if (event.getType() == Watcher.Event.EventType.None) {            return;        }        if (childListener != null) {            childListener.childChanged(path,client.getChildren().usingWatcher(this).forPath(path));        }    }}


private class RegistryChildListenerImpl implements ChildListener {    private RegistryNotifier notifier;    private long lastExecuteTime;    private volatile CountDownLatch latch;    public RegistryChildListenerImpl(URL consumerUrl, String path, NotifyListener listener,                                     CountDownLatch latch) {        this.latch = latch;        notifier = new RegistryNotifier(ZookeeperRegistry.this.getDelay()) {            @Override            public void notify(Object rawAddresses) {                long delayTime = getDelayTime();                if (delayTime <= 0) {                    this.doNotify(rawAddresses);                } else {                    long interval = delayTime - (System.currentTimeMillis() -                                                 lastExecuteTime);                    if (interval > 0) {                        try {                            Thread.sleep(interval);                        } catch (InterruptedException e) {                            // ignore                        }                    }                    lastExecuteTime = System.currentTimeMillis();                    this.doNotify(rawAddresses);                }            }            @Override            protected void doNotify(Object rawAddresses) {                ZookeeperRegistry.this.notify(consumerUrl, listener,                                              ZookeeperRegistry.this.toUrlsWithEmpty(consumerUrl, path, (List) rawAddresses));            }        };    }    public void setLatch(CountDownLatch latch) {        this.latch = latch;    }    @Override    public void childChanged(String path, List children) {        try {            latch.await();        } catch (InterruptedException e) {            logger.warn("Zookeeper children listener thread was interrupted unexpectedly,may cause race condition with the main thread.");        }        notifier.notify(children);    }}


protected void doNotify(Object rawAddresses) {    ZookeeperRegistry.this.notify(consumerUrl, listener,                                  ZookeeperRegistry.this.toUrlsWithEmpty(consumerUrl, path, (List) rawAddresses));}


for (Map.Entry> entry : result.entrySet()) {    String category = entry.getKey();    List categoryList = entry.getValue();    categoryNotified.put(category, categoryList);    listener.notify(categoryList);    // We will update our cache file after each notification.    // When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.    if (localCacheEnabled) {        saveProperties(url);    }}


//事件监听回调@Overridepublic synchronized void notify(List urls) {    if (isDestroyed()) {        return;    }    //对回调的协议分组    // routes://    // override://    //dubbo://    Map> categoryUrls = urls.stream()        .filter(Objects::nonNull)        .filter(this::isValidCategory)        .filter(this::isNotCompatibleFor26x)        .collect(Collectors.groupingBy(this::judgeCategory));    List configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY,                                                           Collections.emptyList());    this.configurators =        Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);    List routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY,                                                     Collections.emptyList());    //生成路由规则,加入到规则链中    toRouters(routerURLs).ifPresent(this::addRouters);    // providers    List providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY,                                                       Collections.emptyList());    /**  * 3.x added for extend URL address  */    ExtensionLoader addressListenerExtensionLoader =        ExtensionLoader.getExtensionLoader(AddressListener.class);    List supportedListeners =        addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);    if (supportedListeners != null && !supportedListeners.isEmpty()) {        for (AddressListener addressListener : supportedListeners) {            providerURLs = addressListener.notify(providerURLs, getConsumerUrl(),this);        }    }    //刷新本地服务列表    refreshOverrideAndInvoker(providerURLs);}


private synchronized void refreshOverrideAndInvoker(List urls) {    // mock zookeeper://xxx?mock=return null    overrideDirectoryUrl();    //刷新本地列表    refreshInvoker(urls);}


//刷新本地服务列表private void refreshInvoker(List invokerUrls) {    Assert.notNull(invokerUrls, "invokerUrls should not be null");    if (invokerUrls.size() == 1        && invokerUrls.get(0) != null        && EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {        this.forbidden = true; // Forbid to access        this.invokers = Collections.emptyList();        routerChain.setInvokers(this.invokers);        destroyAllInvokers(); // Close all invokers    } else {        this.forbidden = false; // Allow to access        Map> oldUrlInvokerMap = this.urlInvokerMap; // local reference        if (invokerUrls == Collections.emptyList()) {            invokerUrls = new ArrayList<>();        }        if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {            invokerUrls.addAll(this.cachedInvokerUrls);        } else {            this.cachedInvokerUrls = new HashSet<>();            this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison        }        if (invokerUrls.isEmpty()) {            return;        }        //创建url和invoker对象的映射关系        Map> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map        /**    * If the calculation is wrong, it is not processed.    *    * 1. The protocol configured by the client is inconsistent with the protocol of the server.    *  eg: consumer protocol = dubbo, provider only has other protocol services(rest).    * 2. The registration center is not robust and pushes illegal specification data.    *    */        if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {            logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls                                                   .toString()));            return;        }        //所有的invoker对象        List> newInvokers = Collections.unmodifiableList(new ArrayList<>                                                                    (newUrlInvokerMap.values()));        // pre-route and build cache, notice that route cache should build on original Invoker list.        // toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed.        routerChain.setInvokers(newInvokers);        this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;        this.urlInvokerMap = newUrlInvokerMap;        try {            destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker        } catch (Exception e) {            logger.warn("destroyUnusedInvokers error. ", e);        }        // notify invokers refreshed        this.invokersChanged();    }}

会在//创建url和invoker对象的映射关系 Map newUrlInvokerMap = toInvokers(invokerUrls);


private Map> toInvokers(List urls) {    Map> newUrlInvokerMap = new ConcurrentHashMap<>();    if (urls == null || urls.isEmpty()) {        return newUrlInvokerMap;    }    String queryProtocols = this.queryMap.get(PROTOCOL_KEY);    for (URL providerUrl : urls) {        // If protocol is configured at the reference side, only the matching protocol is selected        if (queryProtocols != null && queryProtocols.length() > 0) {            boolean accept = false;            String[] acceptProtocols = queryProtocols.split(",");            for (String acceptProtocol : acceptProtocols) {                if (providerUrl.getProtocol().equals(acceptProtocol)) {                    accept = true;                    break;                }            }            if (!accept) {                continue;            }        }        if (EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {            continue;        }        if            (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol()                                                                             )) {            logger.error(new IllegalStateException("Unsupported protocol " +                                                   providerUrl.getProtocol() +                                                   " in notified url: " + providerUrl + " from registry " +                                                   getUrl().getAddress() +                                                   " to consumer " + NetUtils.getLocalHost() + ", supported protocol: " +                                                    ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));            continue;        }        URL url = mergeUrl(providerUrl);        // Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again        Map> localUrlInvokerMap = this.urlInvokerMap; // local reference        Invoker invoker = localUrlInvokerMap == null ? null :        localUrlInvokerMap.remove(url);        if (invoker == null) { // Not in the cache, refer again            try {                boolean enabled = true;                if (url.hasParameter(DISABLED_KEY)) {                    enabled = !url.getParameter(DISABLED_KEY, false);                } else {                    enabled = url.getParameter(ENABLED_KEY, true);                }                if (enabled) {                    //生成invoker对象                    invoker = protocol.refer(serviceType, url);                }            } catch (Throwable t) {                logger.error("Failed to refer invoker for interface:" + serviceType +                             ",url:(" + url + ")" + t.getMessage(), t);            }            if (invoker != null) { // Put new invoker in cache                newUrlInvokerMap.put(url, invoker);            }        } else {            newUrlInvokerMap.put(url, invoker);        }    }    return newUrlInvokerMap;}


this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;



