dubbo服务引用初始化浅析

dubbo服务引用初始化过程就是创建服务动态代理的过程,与服务发布一样,同样借助bean初始化完成动态代理的创建。具体调用过程:
com.alibaba.dubbo.config.ReferenceConfig:
get()-->init()-->createProxy()
在createProxy()方法中,服务引用分为三种情况:
1.JVM内部引用的代理
2.用户指定URL 直连
3.通过注册中心,引用远程的代理
直接看第三种

private T createProxy(Map<String, String> map) {
         .....................................
                List<URL> us = loadRegistries(false);//导入注册中心url
                if (us != null && us.size() > 0) {
                    for (URL u : us) {
                        URL monitorUrl = loadMonitor(u);//导入monitor相关信息,加入URL中
                        if (monitorUrl != null) {
                            map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                        }
                        urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                    }
                }
                if (urls == null || urls.size() == 0) {
                    throw new IllegalStateException("No such any registry to reference " + interfaceName  + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
                }
            }

            if (urls.size() == 1) {//只有一个注册中心
                invoker = refprotocol.refer(interfaceClass, urls.get(0));
            } else {//多注册中心
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                for (URL url : urls) {//每个注册中心初始化引用一遍
                    invokers.add(refprotocol.refer(interfaceClass, url));
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        registryURL = url; // 用了最后一个registry url
                    }
                }
                if (registryURL != null) { // 有 注册中心协议的URL
                    // 对有注册中心的Cluster 只用 AvailableCluster
                    // 多个注册中心,封装成一个Invoker
                    URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); 
                    invoker = cluster.join(new StaticDirectory(u, invokers));
                }  else { // 不是 注册中心的URL
                    invoker = cluster.join(new StaticDirectory(invokers));
                }
            }
        }
    .....................................
        // 创建服务代理
        return (T) proxyFactory.getProxy(invoker);
    }

主要服务引用过程在这个方法里refprotocol.refer(interfaceClass, url),调用到RegistryProtocol.refer(),根据url获取到注册中心,然后调用doRefer(),new 一个RegistryDirectory,主要缓存远程服务相关信息,如ip:port,然后向注册中心consumer端注册自己,然后在相应节点providers端订阅提供者的信息,具体过程如下图


clipboard.png
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
        if (! Constants.ANY_VALUE.equals(url.getServiceInterface())
                && url.getParameter(Constants.REGISTER_KEY, true)) {//注册自己
              registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                    Constants.CHECK_KEY, String.valueOf(false)));
        }
      //订阅provider注册中心节点信息,此处url需要携带providers参数
       directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, 
                Constants.PROVIDERS_CATEGORY 
                + "," + Constants.CONFIGURATORS_CATEGORY 
                + "," + Constants.ROUTERS_CATEGORY));
        return cluster.join(directory);
    }

下面进入directory.subscribe()方法,此方法非常重要,此处完成提供者信息的获取,通信客户端的初始化,此方法调用链为:FailbackRegistry.subscribe()-->ZookeeperRegistry.doSubscribe()-->RegistryDirectory.notify(),下面来看ZookeeperRegistry.doSubscribe()方法,此处主要完成提供端信息的获取,转换成url,订阅的节点有三类:
/dubbo/xx.xx/providers;
/dubbo/xx.xx/routes;
/dubbo/xx.xx/configurations;

protected void doSubscribe(final URL url, final NotifyListener listener) {
     List<URL> urls = new ArrayList<URL>();//缓存订阅到信息
      for (String path : toCategoriesPath(url)) {
              ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                  ...............................................
                    zkClient.create(path, false);
                   //从zk订阅到的信息,并添加子节点监听
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    if (children != null) {
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
            notify(url, listener, urls);
  }

然后调用到AbstractRegistry.notify()方法

protected void notify(URL url, NotifyListener listener, List<URL> urls) {
        ..........................................
        for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
            String category = entry.getKey();
            List<URL> categoryList = entry.getValue();
            categoryNotified.put(category, categoryList);
            saveProperties(url);//url信息保存到本地
            listener.notify(categoryList);
        }
}

listener就是一路传过来的RegistryDirectory,进入RegistryDirectory.notify()-->refreshInvoker()

private void refreshInvoker(List<URL> invokerUrls){
           ..........................................
            Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls) ;// 将URL列表转成Invoker列表
            Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // 换方法名映射Invoker列表
            // state change
            //如果计算错误,则不进行处理.
            if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0 ){
                logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :"+invokerUrls.size() + ", invoker.size :0. urls :"+invokerUrls.toString()));
                return ;
            }
            this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
            this.urlInvokerMap = newUrlInvokerMap;
            try{
                destroyUnusedInvokers(oldUrlInvokerMap,newUrlInvokerMap); // 关闭未使用的Invoker
            }catch (Exception e) {
                logger.warn("destroyUnusedInvokers error. ", e);
            }
}

上面的方法提供者url信息转换成invoker,并与method对应起来。下面进入toInvokers(invokerUrls):

 private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
          //检查提供者相关信息.......
         ....................
          // 缓存key为没有合并消费端参数的URL,不管消费端如何合并参数,如果服务端URL发生变化,则重新refer
            Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
            Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
            if (invoker == null) { // 缓存中没有,重新refer
                try {
                    ........................................
                    if (enabled) {
                        invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);
                    }
                } catch (Throwable t) {
                    logger.error("Failed to refer invoker for interface:"+serviceType+",url:("+url+")" + t.getMessage(), t);
                }
                if (invoker != null) { // 将新的引用放入缓存
                    newUrlInvokerMap.put(key, invoker);
                }
            }else {
                newUrlInvokerMap.put(key, invoker);
            }
        }
        keys.clear();
        return newUrlInvokerMap;
}

下面调用protocol.refer(serviceType, url),此protocol为适配类,调用过程为ProtocolFilterWrapper.refer()-->ProtocolListenerWrapper.refer()-->DubboProtocol.refer():

    public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
        // create rpc invoker.
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);
        return invoker;
    }

然后调用getClients(url),创建ExchangeClient用来处理和提供端的链接,一般一个服务接口只会创建一个client

private ExchangeClient[] getClients(URL url){
        //是否共享连接
        boolean service_share_connect = false;
        int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
        //如果connections不配置,则共享连接,否则每服务每连接
        if (connections == 0){
            service_share_connect = true;
            connections = 1;
        }
        ExchangeClient[] clients = new ExchangeClient[connections];
        for (int i = 0; i < clients.length; i++) {
            if (service_share_connect){
                clients[i] = getSharedClient(url);
            } else {
                clients[i] = initClient(url);
            }
        }
        return clients;
    }

然后一路调用
Exchangers.connect()-->HeaderExchanger.connect()->Transporters.connect()-->NettyTransporter.connect(),最终初始化一个NettyClient,调用doopen()方法,完成网络客户端的初始化

    protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();
        bootstrap = new ClientBootstrap(channelFactory);
        // config
        // @see org.jboss.netty.channel.socket.SocketChannelConfig
        bootstrap.setOption("keepAlive", true);
        bootstrap.setOption("tcpNoDelay", true);
        bootstrap.setOption("connectTimeoutMillis", getTimeout());
        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
                ChannelPipeline pipeline = Channels.pipeline();
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler);
                return pipeline;
            }
        });
    }

回到RegistryProtocol.doRefer()中,cluster.join(directory),此处调用过程为:MockClusterWrapper.join()-->FailfastCluster.join(),最终初始化一个MockClusterInvoker对象,此对象持有RegistryDirectory和FailfastClusterInvoker的引用,接下来调用JavassistProxyFactory.getProxy()创建代理:

 public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

此对象spring注入服务接口中,形成一个透明化的本地服务,当调用本地接口方法时,就会启动动态代理,执行InvokerInvocationHandler.invoke()方法:

 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }
        //调用MockClusterInvoker.invoke()方法
        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 200,783评论 5 472
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,396评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 147,834评论 0 333
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,036评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,035评论 5 362
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,242评论 1 278
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,727评论 3 393
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,376评论 0 255
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,508评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,415评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,463评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,140评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,734评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,809评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,028评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,521评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,119评论 2 341

推荐阅读更多精彩内容

  • 首先还是Spring碰到dubbo的标签之后,会使用parseCustomElement解析dubbo标签,使用的...
    加大装益达阅读 6,791评论 0 11
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,566评论 18 139
  • 因我疏忽 令你受苦 个中滋味 灼心刻骨 破碎清空 填补如初 余下时光 倾力守护 安娜星 2017.11.29
    安娜的花园阅读 366评论 2 2
  • 小时候,第一次学车,摔了,从此我只剩下一只眼,来看人,倒也足矣。十七岁,粑粑给我买了个单车,我便谈了场恋爱,像是一...
    简鱼阅读 270评论 0 3
  • 任何的学习,都只有一个方向--------就是走向完整而独立-------完整的生活,独立的思考。 模仿是人的天性...
    笑看云起时a阅读 380评论 8 1