Netty源码-客户端启动过程

1 概述

在介绍了Netty服务端启动之后(参考笔者文章Netty源码-服务端启动过程),再看Netty的客户端启动会发现二者十分类似,服务端启动通过调用了ServerBootstrap.bind方法开启,而客户端启动则通过调用Bootstrap.connect方法启动。

本文的介绍比较简单,因为许多操作和服务端启动一致,就没有详细介绍,读者可集合服务端启动过程一起理解。

2 客户端的典型编码

和介绍服务端一样,我们先看一下客户端的典型编码:

public class TimeClient {
    public void connect(int port, String host) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            //客户端只要准备一个group即可
            b.group(group)
            //注册客户端使用的channel
            .channel(NioSocketChannel.class)
            //设置客户端channel选项和属性
            .option(ChannelOption.TCP_NODELAY, true)
            .attr(AttributeKey.valueOf("attrKey"), "attrValue")
            //注册客户端pipelne中的handler
            .handler(new ChannelInitializer<Channel>() {
                @Override
                protected void initChannel(Channel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });
            //调用connect启动客户端
            ChannelFuture f = b.connect(host, port).sync();
        } finally {
            //优雅停机
            group.shutdownGracefully();
        }
    }
    public static void main(String[] args) throws Exception {
        new TimeClient().connect(8080, "127.0.0.1");
    }

}

3 一些配置函数

在第1节我们提到了BootstrapServerBootstrap类,Bootstrap主要负责客户端的启动,而ServerBootstrap则主要负责服务端的启动,我们在Netty源码-服务端启动过程第3节一些配置函数介绍了ServerBootstrap的一些常用配置函数,因为ServerBootstrap即需要配置Accept线程和Server channel,又需要配置客户端连接的线程和客户端channel,所以ServerBootstrap的配置方法都是optionchildOptionhandlerchildHandler这样成对出现的,因为Bootstrap主要负责客户端启动,所以只需要配置客户端线程和channel即可,所以其配置方法则没有child*这一类。相关配置方法在文章Netty源码-服务端启动过程也都介绍过,本文也就不再介绍了。

4 客户端启动

客户端的启动由Bootstrap.connect方法开启,下面看其源码:

//Bootstrap
/**
* Connect a {@link Channel} to the remote peer.
*/
public ChannelFuture connect(String inetHost, int inetPort) {
    return connect(InetSocketAddress.createUnresolved(inetHost, inetPort));
}

/**
* Connect a {@link Channel} to the remote peer.
*/
public ChannelFuture connect(SocketAddress remoteAddress) {
    if (remoteAddress == null) {
        throw new NullPointerException("remoteAddress");
    }
    //验证一些必要配置是否都已经配置过
    validate();
    //进行地址解析和实际的连接
    return doResolveAndConnect(remoteAddress, config.localAddress());
}

@Override
public Bootstrap validate() {
    //父类中的验证主要是保证group和channelFactory不为空
    super.validate();
    //验证设置了handler
    if (config.handler() == null) {
        throw new IllegalStateException("handler not set");
    }
    return this;
}

//客户端的启动也分为三个步骤,第一为初始化通道,第二为向
//EventLoopGroup中的某个NioEventLoop持有的Selector注册
//通道,第三个为解析地址和连接
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
    //这里完成了第一和第二步骤,即初始化和注册
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();

    if (regFuture.isDone()) {
        if (!regFuture.isSuccess()) {
            return regFuture;
        }
        //这里完成第三个步骤:解析地址和连接
        return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                // Directly obtain the cause and do a null check so we only need one volatile read in case of a
                // failure.
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.registered();
                    doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

4.1 Channel初始化和注册

其实客户端Channel初始化和注册实现与服务端基本一样,在初始化时会调用AbstractBootstrap.init方法,这个方法根据在具体的子类中进行了重写,客户端的子类为Bootstrap,其实现如下:

//Bootstrap
//逻辑比较简单,首先向客户端channel的pipeline添加handler
//然后进行选项和attr的设置
@Override
@SuppressWarnings("unchecked")
void init(Channel channel) throws Exception {
    ChannelPipeline p = channel.pipeline();
    p.addLast(config.handler());

    final Map<ChannelOption<?>, Object> options = options0();
    synchronized (options) {
        setChannelOptions(channel, options, logger);
    }

    final Map<AttributeKey<?>, Object> attrs = attrs0();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
        }
    }
}

除此之外,Channel初始化和注册与服务端一样,可参见笔者文章Netty源码-服务端启动过程4.1节相关内容,这里不再介绍。

doResolveAndConnect0方法主要完成第三个步骤,解析服务器地址并连接:

//Bootstrap
private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    try {
        final EventLoop eventLoop = channel.eventLoop();
        //首先进行域名解析(如果指定的服务端地址时域名而不是
        //IP时需要进行解析)
        final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);

        if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {
            // Resolver has no idea about what to do with the specified remote address or it's resolved already.
            //解析成功之后进行连接操作
            doConnect(remoteAddress, localAddress, promise);
            return promise;
        }

        final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);

        if (resolveFuture.isDone()) {
            final Throwable resolveFailureCause = resolveFuture.cause();

            if (resolveFailureCause != null) {
                // Failed to resolve immediately
                channel.close();
                promise.setFailure(resolveFailureCause);
            } else {
                // Succeeded to resolve immediately; cached? (or did a blocking lookup)
                doConnect(resolveFuture.getNow(), localAddress, promise);
            }
            return promise;
        }

        // Wait until the name resolution is finished.
        resolveFuture.addListener(new FutureListener<SocketAddress>() {
            @Override
            public void operationComplete(Future<SocketAddress> future) throws Exception {
                if (future.cause() != null) {
                    channel.close();
                    promise.setFailure(future.cause());
                } else {
                    doConnect(future.getNow(), localAddress, promise);
                }
            }
        });
    } catch (Throwable cause) {
        promise.tryFailure(cause);
    }
    return promise;
}

4.2 地址解析和连接

在完成第一和第二个步骤之后,通道初始化和注册都已经完成,Bootstrap就会调用doResolveAndConnect0方法解析服务器地址并连接。

4.2.1 地址解析

因为我们在指定服务端地址不仅可以使用IP,还可以使用域名,所以我们在指定域名时,就需要Netty将其解析为IP地址,实现逻辑也比较简单,默认的解析器为DefaultNameResolver,根据域名解析出IP调用的方法为java.net.InetAddress.getAllByName(hostname),这里也不再展开介绍。

4.2.2 连接

在将域名(如果配置的服务端地址为域名而不是IP时会进行解析操作)解析为IP之后,会调用doConnect进行连接操作:

//Bootstrap
private static void doConnect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {

    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    final Channel channel = connectPromise.channel();
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (localAddress == null) {
                //直接调用通道的connect方法
                channel.connect(remoteAddress, connectPromise);
            } else {
                channel.connect(remoteAddress, localAddress, connectPromise);
            }
            connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
        }
    });
}

从上面的源码可以看出,连接操作直接通过调用Channel.connect方法完成,Channel.connect方法我们直接看起子类AbstractChannel中的实现:

//AbstractChannel
//connect都是直接通过调用Pipeline的connect进行连接操作
@Override
public ChannelFuture connect(SocketAddress remoteAddress) {
    return pipeline.connect(remoteAddress);
}

@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
    return pipeline.connect(remoteAddress, localAddress);
}

根据Netty源码-ChannelPipeline和ChannelHandler中的介绍,connect方法属于Outbound事件,所以最终会调用HeadContext.connect方法:

//HeadContext
 @Override
public void connect(
        ChannelHandlerContext ctx,
        SocketAddress remoteAddress, SocketAddress localAddress,
        ChannelPromise promise) throws Exception {
    unsafe.connect(remoteAddress, localAddress, promise);
}

HeadContext.connect方法调用了Unsafe.connect方法,我们看其在子类AbstractNioUnsafe中的实现:

//AbstractNioUnsafe
@Override
public final void connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    if (!promise.setUncancellable() || !ensureOpen(promise)) {
        return;
    }

    try {
        if (connectPromise != null) {
            // Already a connect in process.
            throw new ConnectionPendingException();
        }

        boolean wasActive = isActive();
        //这里调用了外部类AbstractNioChannel.doConnect
        //方法执行实际的连接动作
        if (doConnect(remoteAddress, localAddress)) {
            fulfillConnectPromise(promise, wasActive);
        } else {
            connectPromise = promise;
            requestedRemoteAddress = remoteAddress;

            // Schedule connect timeout.
            int connectTimeoutMillis = config().getConnectTimeoutMillis();
            if (connectTimeoutMillis > 0) {
                connectTimeoutFuture = eventLoop().schedule(new Runnable() {
                    @Override
                    public void run() {
                        ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                        ConnectTimeoutException cause =
                                new ConnectTimeoutException("connection timed out: " + remoteAddress);
                        if (connectPromise != null && connectPromise.tryFailure(cause)) {
                            close(voidPromise());
                        }
                    }
                }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
            }

            promise.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isCancelled()) {
                        if (connectTimeoutFuture != null) {
                            connectTimeoutFuture.cancel(false);
                        }
                        connectPromise = null;
                        close(voidPromise());
                    }
                }
            });
        }
    } catch (Throwable t) {
        promise.tryFailure(annotateConnectException(t, remoteAddress));
        closeIfClosed();
    }
}

因为这里介绍的是客户端的启动,所以我们看AbstractNioChannel.doConnect在其子类NioSocketChannel中的实现:

//NioSocketChannel
 @Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
    //如何本地地址不为空,表示要进行本地地址绑定
    if (localAddress != null) {
        doBind0(localAddress);
    }

    boolean success = false;
    try {
        //最终调用了java channel.connect方法
        boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
        if (!connected) {
            selectionKey().interestOps(SelectionKey.OP_CONNECT);
        }
        success = true;
        return connected;
    } finally {
        if (!success) {
            doClose();
        }
    }
}

//本地地址绑定,调用java channel的bind方法进行绑定
 private void doBind0(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        SocketUtils.bind(javaChannel(), localAddress);
    } else {
        SocketUtils.bind(javaChannel().socket(), localAddress);
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,457评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,837评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,696评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,183评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,057评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,105评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,520评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,211评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,482评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,574评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,353评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,897评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,489评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,683评论 2 335

推荐阅读更多精彩内容