在上一篇文章中对于客户端的启动做了阐述,在本文则将对服务端的启动做说明。其实服务端和客户端启动的过程是比较相似的,如果对客户端启动比较了解,那么接下来的旅程将会比较轻松。
同样的,我们先看下服务端的代码:
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
ChannelFuture f = b.bind(PORT).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
- EventLoopGroup: 指定了两个,bossGroup和workGroup,bossGroup是为了处理请求链接,而workGroup是为了处理与各个客户端的操作。
- ChannelType: 指定 Channel 的类型. 因为是客户端, 因此使用了 NioServerSocketChannel.
- Handler: 设置数据的处理器。
接下来让我们看看服务端启动的基本流程:
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
创建了两个NioEventLoopGroup,这两个对象可以看做是传统IO编程模型的两大线程组,bossGroup表示监听端口,accept 新连接的线程组,workerGroup表示处理每一条连接的数据读写的线程组
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
给引导类配置两大线程组,这个引导类的线程模型也就定型了。
.channel(NioServerSocketChannel.class)
和客户端一样,在这里设置IO模型,为NIO。
.option(ChannelOption.SO_BACKLOG, 100)
这个是给服务端的channel配置一些属性,SO_BACKLOG表示系统用于临时存放已完成三次握手的请求的队列的最大长度,如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数。
.handler(new LoggingHandler(LogLevel.INFO))
handler()用于指定在服务端启动过程中的一些逻辑。
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
我们调用childHandler()方法,给这个引导类创建一个ChannelInitializer,这里主要就是定义后续每条连接的数据读写,业务处理逻辑。
EventLoopGroup初始化
服务端的EventLoopGroup初始化和客户端的是有区别的,bossGroup是负责监听客户端连接的,如果它发现客户端有连接来到时,会初始化各项资源,接着在workerGroup中取出一个EeventLoop绑定到这个客户端中,处理这个客户端的各种请求。具体的初始化过程在上一篇文章中已经有比较详细的说明,这里就不再详述。比较需要注意的是二者是在哪里和channel关联起来的。首先来看下一下路径:
ServerBootstrap.bind()-->AbstractBootstrap.doBind()-->AbstractBootstrap.initAndRegister()
仔细看下initAndRegister()方法中的关键代码:
final ChannelFuture initAndRegister() {
channel = channelFactory.newChannel();
init(channel);
ChannelFuture regFuture = config().group().register(channel);
}
从上述代码可以看出,这部分和客户端的实现如出一辙,要注意的是config().group()返回的是我们初始化EventLoopGroup的哪个,这个从b.group(bossGroup, workerGroup)方法中可以看出:
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}
得出结论: config().group().register(channel);中的group是parentGroup,即bossGroup,用于负责监听客户端连接。跟踪register的结果自然和客户端的一样,最终代码如下:
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
即是NioServerSocketChannsl与bossGroup关联上,注册到一个selector。接下来我们需要找到workerGroup是在哪里与NioSocketChannel关联是在哪里。继续往下看。
init(channel)
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
从上面代码可以看出,ChannelPipeline会添加一个handler ServerBootstrapAcceptor,在ServerBootstrapAcceptor会重写一个方法:channelRead,请看channelRead方法关键代码:
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
这里的childGroup即是currentChildGroup为workGroup,在这里workGroup会和某个channel关联,完成channel注册到selector。所以接下来的疑问在于channelRead方法是在哪里调用的。这里可以提下,会调用NioEventLoop.run方法,这个方法中调用processSelectedKeys方法,最后会调用到unsafe.read()中的doReadMessages,在这个方法中buf.add(new NioSocketChannel(this, ch));再结合一下代码可知,最后通过pipeline机制会调用到channelRead。
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
handler的添加
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
自定义handler的添加过程和客户端的如出一辙,差别在于此处的handler和childHandler分别设置了handler和childHandler,handler字段与accept过程有关, handler负责处理客户端的连接请求; 而 childHandler 就是负责和客户端的连接的 IO 交互。看代码理解:
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
在上述的initChannel方法中通过config.handler()获取到LoggingHandler,然后加入pipeline中,然后再加入ServerBootstrapAcceptor
最后形成结果如下:
然后ServerBootstrapAcceptor.channelRead方法中会为新建的Channel 设置 handler 并注册到一个eventLoop。
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
上述的childHandler就是服务端启动的childHandler。当这个客户端连接 Channel 注册后, 就会触发 ChannelInitializer.initChannel 方法的调用, 此后的客户端连接的 ChannelPipeline 状态如下:
最后我们来总结一下服务器端的 handler 与 childHandler 的区别与联系:
- 在服务器 NioServerSocketChannel 的 pipeline 中添加的是 handler 与 ServerBootstrapAcceptor.
- 当有新的客户端连接请求时, ServerBootstrapAcceptor.channelRead 中负责新建此连接的 NioSocketChannel 并添加 childHandler 到 NioSocketChannel 对应的 pipeline 中, 并将此 channel 绑定到 workerGroup 中的某个 eventLoop 中.
- handler 是在 accept 阶段起作用, 它处理客户端的连接请求.
-
childHandler 是在客户端连接建立以后起作用, 它负责客户端连接的 IO 交互.