dubbo剖析:三 网络通信之 -- Server实现

引子:

  • dubbo剖析:一 服务发布 中,我们讲到了 RegistryProtocol.export过程中有一个关键步骤,即调用doLocalExport(final Invoker<T> originInvoker)生成Exporter,其最终调用了DubboProtocolexport()方法。
  • DubboProtocolexport()方法:完成了 "启动并监听网络服务" 的工作,具体是通过HeaderExchangerbind()方法创建了一个HeaderExchangerServer实现的。
  • 本章我们就来介绍HeaderExchangerServer设计架构功能实现

一、入口流程

服务发布流程中,RegistryProtocol会调用到DubboProtocolexport()方法,用于完成网络服务的启动和监听。

1.1 入口代码

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();

        //step1 export service.
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);

        //...省略部分代码...

        //step2 创建ExchangeServer
        openServer(url);

        return exporter;
    }
    private void openServer(URL url) {
        String key = url.getAddress();
        if (isServer) {
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                //调用createServer
                serverMap.put(key, createServer(url));
            } else {
                server.reset(url);
            }
        }
    }
    private ExchangeServer createServer(URL url) {
        //...省略部分代码,参数解析之类的...
        ExchangeServer server;
        try {
            //关键代码,使用HeaderExchanger.bind创建HeaderExchangeServer
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        return server;
    }

1.2 流程图解

DubboProtocol.export()流程图

第一步:生成Exporter:

  • AbstractProxyInvoker作为构造参数,new出一个DubboExporter;
  • DubboExporter维护了AbstractProxyInvoker的生命周期;
  • 服务url、DubboExporter放入DubboProtocol的缓存map供消息接收处理使用;

第二步:创建ExchangeServer:

  • AbstractProxyInvoker中获取服务url;
  • 服务url作为入参、同时使用DubboProtocol包含的网络事件处理器requestHandler,调用HeaderExchangerbind()方法创建ExchangeServer
  • HeaderExchanger中又依赖了NettyTransporter,使用其bind()方法创建NettyServer
  • NettyServer是启动网络服务的核心类。HeaderExchangerServer使用NettyServer作为构造参数,扩展了它的功能;

二、server端网络层结构

server端网络层类图关系说明

2.1 网络传输层

  • EndPoint为网络端点的抽象接口,定义了获取网络端点地址、连接、及最原始的发送消息的方法。
  • ChannelHandler为网络事件处理器接口,定义了Server端监听到各种类型的网络事件时的处理方法(connected、disconnected、sent、received、caught),Netty中也有类似定义。
  • Server为网络服务端的抽象接口,继承了EndPoint的功能,并扩展了获取与服务端建连的通道Channel的方法。
  • Transporter为网络传输层的抽象接口,核心作用就是提供了创建ServerClient两个核心接口实现类的方法。

2.2 信息交换层

  • ExchangeHandler,在ChannelHandler接口基础上,添加了 响应请求 的方法。
  • ExchangeServer,在Server接口基础上,将获取Channel的方法扩展为获取ExchangeChannel的方法。
  • Exchanger为信息交换层的抽象接口,核心作用就是提供了创建ExchangeServerExchangeClient两个核心接口实现类的方法。

2.3 网络通道Channel

网络通道接口定义
  • Channel为网络通道的抽象接口,继承了EndPoint的功能,并扩展了绑定获取属性和获取通道对端地址的方法。
  • ExchangeChannel,在Channel接口的基础上,扩展了请求响应模式的功能,并能获取绑定在通道上的网络事件监听器。

三、HeaderExchangeServer & NettyServer实现详解

Server实现层次结构图

3.1 网络层

AbstractPeer类(网络事件处理器和网络节点的通用实现):

  • 定义了属性ChannelHandlerURL,作为构造方法入参注入;
  • 实现了ChannelHandlerEndPoint接口,ChannelHandler接口的相关方法依赖其channelHandler属性完成实现;

AbstractEndPoint类(加入编解码功能):

  • 定义了构造方法,入参包含属性ChannelHandlerURL
  • 定义了属性Codec2,用于编解码,通过SPI动态注入;
  • 定义了timeout/connectTimeout相关超时属性,由URL解析赋值;
  • 对外暴露了获取Codec2和超时相关属性的方法,供上层依赖调用;

AbstractServer类(网络服务端通用抽象,抽象出openclosesend的公共流程,并提供了doOpendoClose的实现扩展):

  • 定义了构造方法,入参包含属性ChannelHandlerURL,并触发doOpen()扩展;
  • 重写EndPoint接口的close()方法,触发doClose()扩展;
  • 实现EndPoint接口的send()方法,遍历并调用Channel.send()

NettyServer类(网络服务端Netty实现类,实现了doOpendoClosegetChannels三个具体扩展):

  • 实现了doOpen()扩展方法,使用Netty的ServerBootstrap完成服务启动监听,其网络世界处理器为this包装成的NettyHandler
  • 实现了doClose()扩展方法,调用Netty的boostrapchannel完成网络资源释放;
  • 实现了getChannels()方法,channels的值由网络事件处理器在connect、disconnect事件触发时变动维护;
    @Override
    protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();
        ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
        ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
        ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
        bootstrap = new ServerBootstrap(channelFactory);

        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
        channels = nettyHandler.getChannels();
        // https://issues.jboss.org/browse/NETTY-365
        // https://issues.jboss.org/browse/NETTY-379
        // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                ChannelPipeline pipeline = Channels.pipeline();
                /*int idleTimeout = getIdleTimeout();
                if (idleTimeout > 10000) {
                    pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
                }*/
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler);
                return pipeline;
            }
        });
        // bind
        channel = bootstrap.bind(getBindAddress());
    }
    @Override
    protected void doClose() throws Throwable {
        try {
            if (channel != null) {
                // unbind.
                channel.close();
            }
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
        try {
            Collection<com.alibaba.dubbo.remoting.Channel> channels = getChannels();
            if (channels != null && channels.size() > 0) {
                for (com.alibaba.dubbo.remoting.Channel channel : channels) {
                    try {
                        channel.close();
                    } catch (Throwable e) {
                        logger.warn(e.getMessage(), e);
                    }
                }
            }
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
        try {
            if (bootstrap != null) {
                // release external resource.
                bootstrap.releaseExternalResources();
            }
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
        try {
            if (channels != null) {
                channels.clear();
            }
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
    }

3.2 交换层

HeaderExchangeServer类(交换层服务端,将网络层的Channel扩展为交换层的ExchangeChannel,并加入心跳检测功能):

  • 定义了构造方法,入参包含属性Server,用于实现服务端网络层功能;
  • 定义了属性定时任务线程池scheduled,用于执行“定时心跳收发及心跳超时监测”任务;
  • 定义了hearbeat/heartbeatTieout相关心跳属性,由URL解析赋值;
  • 构造方法中启动“定时心跳收发及心跳超时监测”任务,超时时“Server断连、Client断连重连”;
  • 将网络层Channel扩展为交换层ExchangeChannel,具体实现后续另辟章节;
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,088评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,715评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,361评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,099评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 60,987评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,063评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,486评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,175评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,440评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,518评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,305评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,190评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,550评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,880评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,152评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,451评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,637评论 2 335

推荐阅读更多精彩内容