可以先看下NettyClient启动流程分析
以EchoClient为例,其main方法代码如下:
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new EchoClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(HOST, PORT).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
}
connect调用链为Bootstrap.connect->Bootstrap.doConnect->Bootstrap.doConnect0->
Bootstrap.doConnect方法如下:
private ChannelFuture doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();(1)
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
final ChannelPromise promise = channel.newPromise();
if (regFuture.isDone()) {
doConnect0(regFuture, channel, remoteAddress, localAddress, promise);(2)
} else {
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
}
});
}
return promise;
}
(1)initAndRegister方法定义如下:
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
/**
* 创建channel对象,server端为NioServerSocketChannel,client端为NioSocketChannel
*/
channel = channelFactory().newChannel();(3)
/**
* init方法主要是初始化用户设置的初始化handler,这个handler为ChannelIntializer类型,
* server端通过ServerBootstrap.childHandler传入这个handler,除了这个handler,init方法也会把通过ServerBootstrap.handler传入的handler添加到pipeline
* client端通过Bootstrap.handler传入这个handler
*/
init(channel);(4)
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
/**
* 对于server:
* 在boss EventLoopGroup中注册该channel(NioServerSocketChannel)
* 从boss EventLoopGroup中选出一个EventLoop注册该channel ->SingleThreadEventLoop.register -> channel.unsafe().register
* 对于client:
* 没有boss EventLoopGroup和worker EventLoopGroup之分,只有1个EventLoopGroup,会在这个EventLoopGroup注册该channel(NioSocketChannel)
* 会从EventLoopGroup中选择一个EventLoop来注册这个channel ->SingleThreadEventLoop.register -> channel.unsafe().register
*/
ChannelFuture regFuture = group().register(channel);(5)
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}
(3)创建该NioSocketChannel对象,仅仅是创建了该对象,没有与server建立连接和注册
(4)初始化channel,主要是设置用户通过Bootstrap设置的初始化handler
@Override
@SuppressWarnings("unchecked")
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
/**
* handler()方法返回用户通过Bootstrap.handle方法设置的hanler,这个handler是ChannelIntializer类型,并不是真正的业务handler,
* 而是用来初始化业务handler pipeline的handler,其initChannel方法里为用户设置的初始化业务pipeline的逻辑
*/
p.addLast(handler());
final Map<ChannelOption<?>, Object> options = options();
synchronized (options) {
for (Entry<ChannelOption<?>, Object> e: options.entrySet()) {
try {
if (!channel.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + channel, t);
}
}
}
final Map<AttributeKey<?>, Object> attrs = attrs();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
}
}
(5) 注册该channel到selector,调用链->SingleThreadEventLoop.register -> channel.unsafe().register,对于NioSocketChannel,unsafe为NioByteUnsafe,register会调用其父类AbstractUnsafe的register方法,然后调到register0方法,如下:
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
/**
* 将channel注册到selector
*/
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
/**
* 1.对于NioServerSocketChannel
* 在NioServerSocketChannel首次注册到selector后,会执行该channel对应的pipeline的invokeHandlerAddedIfNeeded方法
* 这个方法会把通过ServerBootstrap.handler设置的LoggingHandler和ServerBootstrapAcceptort封装成DefaultChannelHandlerContext加入到pipeline中,
* 并移除初始化用的ChannelIntializer。
* 最终执行的是ChannelIntializer的实现类的initChannel方法。
* 2.对于NioSocketChannel
* 在NioSocketChannel首次注册到selector后,会执行该channel对应的pipeline的invokeHandlerAddedIfNeeded方法
* 这个方法会把通过ServerBootstrap.childHandler设置的业务handler封装成DefaultChannelHandlerContext加入到pipeline中,
* 并移除初始化用的ChannelIntializer。
* 最终执行的是ChannelIntializer的实现类的initChannel方法。
*/
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
/**
* 针对server端
* 对于NioServerSocketChannel,首次注册时因为还没绑定,所以isActive为false
* 对于NioSocketChannel,首次注册时说明已经与该client建立好连接了,所以isActive为true
* 针对client端
* 对于NioSocketChannel,首次注册时还没有与server建立好连接,所以isActive为false
*/
if (isActive()) {
/**
* firstRegistration首次注册标识,只有第一次注册才会传播channel active事件
*/
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
/**
* netty默认是auto read,因此channel active后会触发一次读操作
*/
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
register0方法完成了channel注册到selector,并初始化好了真正的业务handler。因为这时还没有与server建立连接,所以isActive为false,不能向server端一样执行pipeline.fireChannelActive()。
但Bootstrap.doConnect方法中执行完channel注册后,又调用了doConnect0方法,如下:
private static void doConnect0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
if (localAddress == null) {
channel.connect(remoteAddress, promise);
} else {
channel.connect(remoteAddress, localAddress, promise);
}
promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
channel.connect的调用链为NioSocketChannel.connect->AbstractChannel.connect->pipeline.connect->tail.connect->AbstractChannelHandlerContext.connect,AbstractChannelHandlerContext.connect方法如下:
@Override
public ChannelFuture connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
if (!validatePromise(promise, false)) {
// cancelled
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound();(6)
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeConnect(remoteAddress, localAddress, promise);(7)
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeConnect(remoteAddress, localAddress, promise);
}
}, promise, null);
}
return promise;
}
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
connect(remoteAddress, localAddress, promise);
}
}
(6)处从tail向head找第一个outbound context,在本例中,只有HeadContext这一个outbound,HeadContext的connect方法如下:
@Override
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
unsafe.connect(remoteAddress, localAddress, promise);
}
最终调用了unsafe.connect方法,AbstractNioUnsafe的connect方法如下:
@Override
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
try {
if (connectPromise != null) {
// Already a connect in process.
throw new ConnectionPendingException();
}
boolean wasActive = isActive();
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isCancelled()) {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
close(voidPromise());
}
}
});
}
} catch (Throwable t) {
promise.tryFailure(annotateConnectException(t, remoteAddress));
closeIfClosed();
}
}
其主要是执行了doConnect和fulfillConnectPromise方法,doConnect方法由NioSocketChannel类提供,调用了jdk nio的channel连接方法,connect为异步操作,可能没能返回连接建立成功,如果没有立刻建立成功,那么注册connect事件,在NioEventLoop事件循环中获取到连接成功事件后会执行fulfillConnectPromise方法,如果立刻建立连接成功,直接调用fulfillConnectPromise方法。如下:
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) {
doBind0(localAddress);
}
boolean success = false;
try {
boolean connected = javaChannel().connect(remoteAddress);
if (!connected) {
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}
fulfillConnectPromise方法如下:
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
if (promise == null) {
// Closed via cancellation and the promise has been notified already.
return;
}
// Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
// We still need to ensure we call fireChannelActive() in this case.
boolean active = isActive();
// trySuccess() will return false if a user cancelled the connection attempt.
boolean promiseSet = promise.trySuccess();
// Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
// because what happened is what happened.
if (!wasActive && active) {
pipeline().fireChannelActive();
}
// If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
if (!promiseSet) {
close(voidPromise());
}
}
wasActive为建立连接之前的状态,为false,如果现在建立成功的话会执行pipeline().fireChannelActive()方法,最终会调用HeadContext的channelActive方法,如下:
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
readIfIsAutoRead();
}
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}
然后会调用到channel.read方法,channel为NioSocketChannel,调用链为NioSocketChannel.read->pipeline.read->tail.read,tail.read如下:
@Override
public ChannelHandlerContext read() {
/**
* 从tail(inbound)->channelintializer(inbound)->head(outbound)找到的第1个outbound为HeadContext
*/
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeRead();
} else {
Runnable task = next.invokeReadTask;
if (task == null) {
next.invokeReadTask = task = new Runnable() {
@Override
public void run() {
next.invokeRead();
}
};
}
executor.execute(task);
}
return this;
}
HeadContext的read方法如下:
@Override
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
其调用了AbstractUnsafe的beginRead,如:
@Override
public final void beginRead() {
assertEventLoop();
if (!isActive()) {
return;
}
try {
doBeginRead();
} catch (final Exception e) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);
}
});
close(voidPromise());
}
}
最终调用了AbstractNioChannel的doBeginRead方法,具体执行了为该NioSocketChannel的selectionKey注册读事件,这样就能在server发送来数据后能感知到了:
/**
* 当channel活跃时和读数据完成时会调用该方法
* 注册感兴趣事件,对于NioServerSocketChannel,readInterestOp是accept事件,对于NioSocketChannel,readInterestOp是read事件
*/
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
if (inputShutdown) {
return;
}
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}