Dubbo注册中心

1 Dubbo注册中心概述

Dubbo的注册中心承担着Dubbo服务的注册与发现的功能。

Dubbo支持的注册中心主要包括:

  • ZookeeperRegistry

  • MulticastRegistry

  • RedisRegistry

  • DubboRegistry

其中dubbo官方推荐用Zookeeper作为注册中心,下面介绍ZookeeperRegistry

2 Dubbo服务注册

2.1 Dubbo服务注册组件

Dubbo在Registry层实现服务的注册于发现,主要包括如下几个类:

  • ZookeeperRegistry:负责与zookeeper进行交互;

  • RegistryProtocol:从注册中心获取可用服务,或者将服务注册到zookeeper,然后提供服务或者提供调用代理;

  • RegistryDirectory:维护着所有可用的远程Invoker或者本地的Invoker,这个类实现了NotifyListner

  • NotifyListener :负责RegistryDirectoryZookeeperRegistry的通信;

  • FailbackRegistry:继承自Registry,实现了失败重试机制;

2.2 Zookeeper的服务注册模型

流程说明

  • 服务提供者启动时

    • /dubbo/com.foo.BarService/providers目录下写入自己的URL地址。
  • 服务消费者启动时

    • 订阅/dubbo/com.foo.BarService/providers目录下的提供者URL地址。

    • 并向/dubbo/com.foo.BarService/consumers目录下写入自己的URL地址。

  • 监控中心启动时

    • 订阅/dubbo/com.foo.BarService目录下的所有提供者和消费者URL地址。

3 服务发布

RegistryProtocol是对需要暴露服务到注册中心的一层封装,通过RegistryProtocol实现将暴露的服务信息注册到注册中心。

    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        // 暴露服务,然后包装暴露者
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);

        // 获取注册中心地址
        // 通过服务的registry配置,
        //<dubbo:registry id="asRegistry" client="zkclient" protocol="zookeeper" register="true" address="zookeeper:2181" file="D:/dubbo/user/dubbo.registry"/>
        //<dubbo:protocol name="dubbo" port="20880" dispatcher="direct" threadpool="fixed" threads="2"/>
        //<dubbo:service connections="10" interface="com.wuhulala.dubbo.user.service.UserService" ref="demoService" registry="asRegistry"/>

        URL registryUrl = getRegistryUrl(originInvoker);

        //根据注册URL 获取注册中心
        final Registry registry = getRegistry(originInvoker);

        // 处理需要注册到注册中心的地址
        final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);

        //判断是否需要注册,为什么不放到最上面
        boolean register = registedProviderUrl.getParameter("register", true);

        ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);

        if (register) {
            // 注册逻辑
            register(registryUrl, registedProviderUrl);
            ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
       }
       // 省略 ...

    }

    public void register(URL registryUrl, URL registedProviderUrl) {
        // registry在初始化的时候被注入一个自适应的扩展点ExtensionLoader.getExtensionLoader(type).getAdaptiveExtension()
        //所以发布的时候也是自适应寻找对应扩展点进行注册到注册中心
        Registry registry = registryFactory.getRegistry(registryUrl);
        registry.register(registedProviderUrl);
    }

3 ZookeeperRegistry

ZookeeperRegistry是通过ZookeeperRegistryFactory创建。

public Registry getRegistry(URL url) {
        url = url.setPath(RegistryService.class.getName())
                .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
                .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
        String key = url.toServiceString();
        // Lock the registry access process to ensure a single instance of the registry
        LOCK.lock();
        try {
            //从缓存中
            Registry registry = REGISTRIES.get(key);
            if (registry != null) {
                return registry;
            }
            registry = createRegistry(url);
            if (registry == null) {
                throw new IllegalStateException("Can not create registry " + url);
            }
            REGISTRIES.put(key, registry);
            return registry;
        } finally {
            // Release the lock
            LOCK.unlock();
        }
    }

说明:

上面是AbstractRegistryFactory#getRegistry方法根据URL获取对应注册中心,真正的创建在子类的createRegistry方法中实现。

3.1 ZookeeperRegistryFactory
    public Registry createRegistry(URL url) {
        return new ZookeeperRegistry(url, zookeeperTransporter);
    }

注意

ZookeeperTransporter才是真正与Zookeeper通信的对象。默认的Zookeeper客户端是curator。

3.2 ZookeeperRegistry的创建
    /**
     * 默认zookeeper的跟节点 dubbo
     */
    private final static String DEFAULT_ROOT = "dubbo";

    /**
     * 根节点
     */
    private final String root;

    /**
     * Service 接口全名集合。
     * 该属性适可用于监控中心,订阅整个Service层。因为Service层是动态的,可以有不断有新的Service服务发布(注意,不是服务实例)。
     * 在 #doSubscribe(url, notifyListener) 方法中,
     */
    private final Set<String> anyServices = new ConcurrentHashSet<>();

    /**
     * 监听器集合
     */
    private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<>();

    /**
     * zookeeper客户端
     */
    private final ZookeeperClient zkClient;

    public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
        super(url);
        if (url.isAnyHost()) {
            throw new IllegalStateException("registry address == null");
        }
        //获取根节点,不设置则使用/dubbo
        String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
        if (!group.startsWith(PATH_SEPARATOR)) {
            group = PATH_SEPARATOR + group;
        }
        this.root = group;
        //获取zk客户端调用 ZookeeperTransporter#connect(url) 方法,
        // 基于 Dubbo SPIAdaptive 机制,根据url参数,加载对应的 ZookeeperTransporter 实现类,创建对应的 ZookeeperClient 实现类的对应。
        zkClient = zookeeperTransporter.connect(url);
        //设置监听器
        zkClient.addStateListener((state) -> {
            //重连,重新获取最新的服务提供方地址信息
            if (state == StateListener.RECONNECTED) {
                logger.warn("Trying to fetch the latest urls, in case there're provider changes during connection loss.\n" +
                        " Since ephemeral ZNode will not get deleted for a connection lose, " +
                        "there's no need to re-register url of this instance.");
                ZookeeperRegistry.this.fetchLatestAddresses();
            } else if (state == StateListener.NEW_SESSION_CREATED) {
                //重新创建session时,将本服务进行recover恢复,重新发起注册和订阅
                logger.warn("Trying to re-register urls and re-subscribe listeners of this instance to registry...");
                try {
                    ZookeeperRegistry.this.recover();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            } else if (state == StateListener.SESSION_LOST) {
                logger.warn("Url of this instance will be deleted from registry soon. " +
                        "Dubbo client will try to re-register once a new session is created.");
            } else if (state == StateListener.SUSPENDED) {

            } else if (state == StateListener.CONNECTED) {

            }
        });
    }

ZookeeperRegistry在创建时便和Zookeeper创建了连接。

4 服务注册

Zookeeper的服务注册就是创建Zookeeper的节点。

    public void doRegister(URL url) {
        try {
            zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

根据url对象构建一个URL的Path,然后在Zookeeper上创建一个节点,节点是否是持久化的根据dynamic参数决定,true表示创建一个持久化节点,当注册方下线时,持久化节点仍然在Zookeeper上。false表示创建临时节点,当注册方下线时,会删除节点,同时通知监听节点的消费方。

4.1 节点路径

Zookeeper中的服务节点路径如Group/ServiceInterface/category/URL

  • Group:表示节点的根路径,默认是dubbo;

  • ServiceInterface:接口的全限定类名;

  • category:接口的类别,providers、consumers、configurators、routers等这些;

  • URL:URL的编码;

4.2 服务下线

服务下线就是把注册的节点进行删除。

    public void doUnregister(URL url) {
        try {
            zkClient.delete(toUrlPath(url));
        } catch (Throwable e) {
            throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

5 服务订阅

订阅有两种方式pull、push ,pull是客户端定时轮询注册中心拉取配置,push是注册中心主动推送数据给客户端。dubbo是两者结合的方式进行配置的拉取,当第一次启动的时候通过客户端pull拉取所有数据方式,在订阅的节点上进行注册watcher,然后客户端与注册中心保持长连接,而后 每个节点有任何数据变化,注册中心就会更新watcher情况进行回调通知到客户端,客户端就接收到这个通知了。

服务在进行注册的时候,服务端会订阅configurators用来监听动态配置的变更。

在消费者启动的时候,消费者会监听providers、routers、configurators来监听服务提供者、路由规则、配置变更的通知。

5.1 doSubscribe
 public void doSubscribe(final URL url, final NotifyListener listener) {
        try {
            //处理所有service的订阅,
            //客户端第一次连接上注册中心的时候,会把这个service设置成*  也就是 ANY_VALUE,进行获取全量数据
            if (ANY_VALUE.equals(url.getServiceInterface())) {
                String root = toRootPath();
                //获取url的监听器
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                if (listeners == null) {
                    //没有listener则创建一
                    zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
                    listeners = zkListeners.get(url);
                }
                //获取ChildListener
                ChildListener zkListener = listeners.get(listener);
                //第一次连接时为空,新建一个childListener
                if (zkListener == null) {
                    listeners.putIfAbsent(listener, (parentPath, currentChilds) -> {
                        //遍历所有子节点
                        for (String child : currentChilds) {
                            child = URL.decode(child);
                            //有新增服务,则发起这个服务的订阅
                            if (!anyServices.contains(child)) {
                                anyServices.add(child);
                                subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child,
                                        Constants.CHECK_KEY, String.valueOf(false)), listener);
                            }
                        }
                    });
                    zkListener = listeners.get(listener);
                }
                //创建持久化节点,订阅持久化节点的子节点
                zkClient.create(root, false);
                List<String> services = zkClient.addChildListener(root, zkListener);
                //获取到全量service,对每一个service进行订阅
                if (CollectionUtils.isNotEmpty(services)) {
                    for (String service : services) {
                        service = URL.decode(service);
                        anyServices.add(service);
                        subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,
                                Constants.CHECK_KEY, String.valueOf(false)), listener);
                    }
                }
            } else {//客户端非第一次连接上注册中心,对于具体的服务进行订阅
                List<URL> urls = new ArrayList<>();
                //根据URL类别,获取一组需要订阅的路径
                //类别:providers、routers、consumers、configurators
                for (String path : toCategoriesPath(url)) {
                    //获取url对应的监听器,没有则创建
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                    if (listeners == null) {
                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
                        listeners = zkListeners.get(url);
                    }
                    //获取ChildListener,没有则创建
                    ChildListener zkListener = listeners.get(listener);
                    if (zkListener == null) {
                        //子节点数据发生变更时则调用notify方法进行通知这个listener
                        listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));
                        zkListener = listeners.get(listener);
                    }
                    //创建type的持久化节点,且对于这个节点进行订阅
                    zkClient.create(path, false);
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    if (children != null) {
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                //数据获取完成时,调用 `#notify(...)` 方法,回调 NotifyListener,urls为所有自节点的url

                notify(url, listener, urls);
            }
        } catch (Throwable e) {
            throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
5.2 doUnsubscribe
    public void doUnsubscribe(URL url, NotifyListener listener) {
        ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
        if (listeners != null) {
            ChildListener zkListener = listeners.get(listener);
            if (zkListener != null) {
                if (ANY_VALUE.equals(url.getServiceInterface())) {
                    String root = toRootPath();
                    zkClient.removeChildListener(root, zkListener);
                } else {
                    for (String path : toCategoriesPath(url)) {
                        zkClient.removeChildListener(path, zkListener);
                    }
                }
            }
        }
    }

取消订阅就是删除监听器。

5.3 fetchLatestAddresses

当与zookeeper重新连接时,需要刷新本机生产者列表,这个方法就是干这个的,它把的已订阅的列表都加入订阅失败的容器。

//刷新生产者列表
    private void fetchLatestAddresses() {
        // subscribe
        Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
        if (!recoverSubscribed.isEmpty()) {
            if (logger.isInfoEnabled()) {
                logger.info("Fetching the latest urls of " + recoverSubscribed.keySet());
            }
            for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
                URL url = entry.getKey();
                //将当前订阅的列表全部推送至订阅失败的容器
                for (NotifyListener listener : entry.getValue()) {
                    addFailedSubscribed(url, listener);
                }
            }
        }
    }

注意

addFailedSubscribed方法是FailbackRegistry将订阅失败的连接放入失败容器,FailbackRegistry有定时器会定时的处理这些失败的订阅。

6 服务监听

通知事件是调用的AbstractRegistry#notify方法完成。

protected void notify(URL url, NotifyListener listener, List<URL> urls) {
       //......
        //将url分类放置
        Map<String, List<URL>> result = new HashMap<>();
        for (URL u : urls) {
            if (UrlUtils.isMatch(url, u)) {
                String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
                List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
                categoryList.add(u);
            }
        }
        if (result.size() == 0) {
            return;
        }
        //获取url在notified的全部url数据,若notified无则创建一个且放入
        Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
        for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
            String category = entry.getKey();
            List<URL> categoryList = entry.getValue();
            categoryNotified.put(category, categoryList);
            //对于所有url进行通知
            listener.notify(categoryList);
            // We will update our cache file after each notification.
            // When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.
            //当我们的注册表由于网络抖动而出现订阅失败时,我们至少可以返回现有的缓存URL。
            saveProperties(url);
        }
    }

流程说明

  1. 对需要通知的url进行分类;
  2. 分类调用notify方法进行通知,这个也就是org.apache.dubbo.registry.NotifyListener#notify这个方法,也就是在服务订阅doSubscribe传入的参数NotifyListener
  3. 将url进行保存缓存;
6.1 NotifyListener通知
public interface NotifyListener {

    /**
     * 当收到服务变更通知时触发。
     * <p>
     * 通知需处理契约:<br>
     * 1\. 总是以服务接口和数据类型为维度全量通知,即不会通知一个服务的同类型的部分数据,用户不需要对比上一次通知结果。<br>
     * 2\. 订阅时的第一次通知,必须是一个服务的所有类型数据的全量通知。<br>
     * 3\. 中途变更时,允许不同类型的数据分开通知,比如:providers, consumers, routers, overrides,允许只通知其中一种类型,但该类型的数据必须是全量的,不是增量的。<br>
     * 4\. 如果一种类型的数据为空,需通知一个empty协议并带category参数的标识性URL数据。<br>
     * 5\. 通知者(即注册中心实现)需保证通知的顺序,比如:单线程推送,队列串行化,带版本对比。<br>
     *
     * @param urls 已注册信息列表,总不为空,含义同{@link com.alibaba.dubbo.registry.RegistryService#lookup(URL)}的返回值。
     */
    void notify(List<URL> urls);

}

RegistryDirectory#notify就是更新本地缓存。

6.2 saveProperties
private void saveProperties(URL url) {
        if (file == null) {
            return;
        }

        try {
            StringBuilder buf = new StringBuilder();
            /**
             * notifies是 被通知的 URL 集合
             *
             * key1:消费者的 URL ,例如消费者的 URL ,和 {@link #subscribed} 的键一致
             * key2:分类,例如:providers、consumers、routes、configurators。【实际无 consumers ,因为消费者不会去订阅另外的消费者的列表】
             * 在 {@link Constants} 中,以 "_CATEGORY" 结尾
             */
            Map<String, List<URL>> categoryNotified = notified.get(url);
            if (categoryNotified != null) {
                for (List<URL> us : categoryNotified.values()) {
                    for (URL u : us) {
                        if (buf.length() > 0) {
                            buf.append(URL_SEPARATOR);
                        }
                        buf.append(u.toFullString());
                    }
                }
            }
            properties.setProperty(url.getServiceKey(), buf.toString());
            long version = lastCacheChanged.incrementAndGet();
            //同步写入文件还是异步写入文件
            if (syncSaveFile) {
                doSaveProperties(version);
            } else {
                registryCacheExecutor.execute(new SaveProperties(version));
            }
        } catch (Throwable t) {
            logger.warn(t.getMessage(), t);
        }
    }

注意

file在AbstractRegistry对象创建的时候初始化。

6.3 doSaveProperties
public void doSaveProperties(long version) {
        if (version < lastCacheChanged.get()) {
            return;
        }
        if (file == null) {
            return;
        }
        // Save
        try {
            File lockfile = new File(file.getAbsolutePath() + ".lock");
            if (!lockfile.exists()) {
                lockfile.createNewFile();
            }
            try (RandomAccessFile raf = new RandomAccessFile(lockfile, "rw");
                 FileChannel channel = raf.getChannel()) {
                FileLock lock = channel.tryLock();
                if (lock == null) {
                    throw new IOException("Can not lock the registry cache file " + file.getAbsolutePath() + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.registry.file=xxx.properties");
                }
                // Save
                try {
                    if (!file.exists()) {
                        file.createNewFile();
                    }
                    try (FileOutputStream outputFile = new FileOutputStream(file)) {
                        properties.store(outputFile, "Dubbo Registry Cache");
                    }
                } finally {
                    lock.release();
                }
            }
        } catch (Throwable e) {
            savePropertiesRetryTimes.incrementAndGet();
            if (savePropertiesRetryTimes.get() >= MAX_RETRY_TIMES_SAVE_PROPERTIES) {
                logger.warn("Failed to save registry cache file after retrying " + MAX_RETRY_TIMES_SAVE_PROPERTIES + " times, cause: " + e.getMessage(), e);
                savePropertiesRetryTimes.set(0);
                return;
            }
            if (version < lastCacheChanged.get()) {
                savePropertiesRetryTimes.set(0);
                return;
            } else {
                registryCacheExecutor.execute(new SaveProperties(lastCacheChanged.incrementAndGet()));
            }
            logger.warn("Failed to save registry cache file, will retry, cause: " + e.getMessage(), e);
        }
    }

这使用了文件锁的方式保证只有一个在读写文件。

消费者从注册中心获取注册信息后会做本地缓存。内存中也有一份,保存在Properties对象里,磁盘上也持久化一份,通过file对象进行引用。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容