一、代码执行流程梯形图
NioEventLoop.run()
--> processSelectedKeysOptimized()
// 遍历 selectedKeys
--> selectionKey.attachment() // NioServerSocketChannel
--> processSelectedKey(SelectionKey k, AbstractNioChannel ch) // ch = NioServerSocketChannel
--> NioUnsafe unsafe = ch.unsafe() // unsafe 实例 = NioMessageUnsafe
// 判断 k 的键值,此处是 ACCEPT
--> NioMessageUnsafe.read()
<!-- 1. 创建 NioSocketChannel -->
--> NioServerSocketChannel.doReadMessages(List<Object> buf)
--> SocketChannel ch = serverSocketChannel.accept() // ch = java.nio.channels.SocketChannel
--> new NioSocketChannel(this, ch) // this = NioServerSocketChannel ch = SocketChannel
--> AbstractChannel(Channel parent) // parent=NioServerSocketChannel
--> CloseFuture closeFuture = new CloseFuture(this)
--> ChannelId id = DefaultChannelId.newInstance()
--> Unsafe unsafe = new NioSocketChannelUnsafe() // 每一个 Channel 都有一个 Unsafe 对象
--> ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this)
--> DefaultChannelPipeline pipeline = new DefaultChannelPipeline(this) // 每一个 Channel 都有一个 ChannelPipeline 对象
--> this.channel = = this
--> AbstractChannelHandlerContext tail = new TailContext(this)
--> boolean inbound = true
--> boolean outbound = false
--> this.pipeline = pipeline
--> AbstractChannelHandlerContext head = new HeadContext(this)
--> boolean inbound = true
--> boolean outbound = true
--> this.pipeline = pipeline
--> Unsafe unsafe = pipeline.channel().unsafe() // NioSocketChannelUnsafe
--> head<->tail 组建双链表 // 每一个 ChannelPipeline 对象都有一条 ChannelHandlerContext 组成的双向链表,每一个handler都由ChannelHandlerContext包裹
--> SelectableChannel ch = channel // channel = java.nio.channels.SocketChannel
--> int readInterestOp = 1 // SelectionKey.OP_READ
--> ch.configureBlocking(false) // 配置 ServerSocketChannel 为非阻塞
--> NioSocketChannelConfig config = new NioSocketChannelConfig(this, socket.socket()) // tcp 参数配置类
--> RecvByteBufAllocator rcvBufAllocator = new AdaptiveRecvByteBufAllocator()
--> ByteBufAllocator allocator = PooledByteBufAllocator(preferDirect = true)
--> int connectTimeoutMillis = 30000
--> WriteBufferWaterMark writeBufferWaterMark = WriteBufferWaterMark.DEFAULT // low=32 * 1024 hign=64 * 1024
--> Channel channel = this,即 NioServerSocketChannel
--> setTcpNoDelay(true) // 只要不是安卓就设置 SocketChannel.tcpNoDelay 为 true
<!-- 2. 使用 ServerBootstrapAcceptor 为 NioSocketChannel 设置属性 + 注册 NioSocketChannel 到 workerGroup 中的一个 NioEventLoop 上 -->
--> DefaultChannelPipeline.fireChannelRead(Object msg) // msg = NioSocketChannel
--> ServerBootstrapAcceptor.channelRead(ChannelHandlerContext ctx, Object msg)
--> child.pipeline().addLast(childHandler) // child = NioSocketChannel,将 childHandler 添加到 NioSocketChannel 的 pipeline 中
--> 设置 childOptions 和 childAttrs
--> childGroup.register(child) // childGroup = workerGroup
--> EventExecutor eventLoop = PowerOfTwoEventExecutorChooser.next() // 从 workerGroup 中选择一条 NioEventLoopGroup
--> SingleThreadEventLoop.register(Channel channel)
--> new DefaultChannelPromise(channel, this)
--> EventExecutor executor = eventLoop
--> Channel channel = NioSocketChannel
--> promise.channel().unsafe().register(this, promise) // this=eventLoop
--> AbstractChannel$AbstractUnsafe.register(this, promise)
--> EventLoop eventLoop = this // 同一个 channel 只由一个 EventLoop 来处理
--> 因为当前线程是 worker 的一个NioEventLoop线程,不是当前 eventLoop 的那条NioEventLoop线程,所以将 register0(promise) 封装为 task,丢入 eventLoop 任务队列
--> SingleThreadEventExecutor.execute(Runnable task) // task register0
--> addTask(Runnable task)
--> offerTask(Runnable task)
--> taskQueue.offer(task) // 如果添加失败(例如队列容量满了),则使用回绝处理器,此处是抛出RejectedExecutionException
--> startThread()
--> 封装线程启动task
// 该 task 是在下一步创建线程并启动之后执行的
--> NioEventLoop.Thread thread = 下一步创建出来的线程
--> NioEventLoop.run()
--> processSelectedKeys() // 处理 NIO 感兴趣的事件
--> runAllTasks(long timeoutNanos) // 处理队列中的任务
--> fetchFromScheduledTaskQueue() // 将到期的 scheduledTaskQueue 中的 task 加入 taskQueue
--> Runnable task = pollTask() // taskQueue.poll()
--> safeExecute(task) // task.run() 此时会执行 register0 任务
--> afterRunningAllTasks()
--> runAllTasksFrom(tailTasks) // 执行 tailTasks 中的所有 task
--> ThreadPerTaskExecutor.execute(Runnable command) // threadFactory.newThread(command).start() 创建线程并启动
--> wakeup(boolean inEventLoop) // inEventLoop=false
--> selector.wakeup() // 唤醒阻塞
register0 task
--> doRegister()
--> selectionKey = javaChannel().register(Selector sel, // eventLoop().unwrappedSelector()
int ops, // 0
Object att) // this
--> pipeline.invokeHandlerAddedIfNeeded() // 会执行一些 handlerAdded() 事件
--> callHandlerAddedForAllHandlers() // 执行 pendingHandlerCallback 链表
--> childHandler.initChannel(final Channel ch)
--> pipeline.addLast(new ServerHandler()) // 添加 ServerHandler
--> 从 pipeline 删除 childHandler(因为此时的 childHandler 的职责已经结束)
--> pipeline.fireChannelRegistered() // 会执行一些 channelRegister() 事件
--> pipeline.fireChannelActive()
--> HeadContext.channelActive(ChannelHandlerContext ctx)
--> ctx.fireChannelActive() // 传播执行 channelActive() 事件
--> readIfIsAutoRead()
--> AbstractChannel.read()
--> pipeline.read()
--> TailContext.read()
--> unsafe.beginRead() // unsafe = AbstractUnsafe
--> AbstractNioChannel.doBeginRead()
--> selectionKey.interestOps(interestOps | readInterestOp) // 绑定 OP_READ 事件
总结:
在 Netty源码分析6 - 服务端启动流程 中,服务端启动成功,启动了一个 bossGroup 中的 NioEventLoop 的 thread,该 thread 一直死循环等待 selectedKey 和 taskQueue 中的任务到来,这里就是 accept 的入口。当一个 connect 请求到来时,判断 selectedKey 是 OP_ACCEPT 事件,则首先从 selectedKey.attachment() 获取在注册的时候注册上去的 Channel,之后从 Channel 中获取 Unsafe,最终执行 Unsafe 的 read() 方法:
- 创建 NioSocketChannel
- 使用 serverSocketChannel.accept() 创建 java.nio.channels.SocketChannel,NioSocketChannel 是其包装类,创建 Channel 的后续操作与创建 NioServerSocketChannel 类似
- 为 NioSocketChannel 设置唯一ID
- 为 NioSocketChannel 设置 Unsafe 实例(服务端是
NioSocketChannelUnsafe
),Unsafe 是真正的进行底层 IO 操作的类 - 每一个 Channel 都有一个 Unsafe 对象- 为 NioSocketChannel 设置 ChannelPipeline 对象 - 每一个 Channel 都有一个 ChannelPipeline 对象,每一个 ChannelPipeline 都包含一条由 ChannelHandlerContext 组成的双向链表(这条双向链表至少有两个节点 HeadContext 和 TailContext),每个 ChannelHandlerContext 内部都包裹着一个 ChannelHandler。
- 设置 java.nio.channels.SocketChannel 为非阻塞
- 记录感兴趣事件为 READ 事件
只要不是安卓就设置 SocketChannel.tcpNoDelay 为 true
- 使用 ServerBootstrapAcceptor 为 NioSocketChannel 设置属性 + 注册 NioSocketChannel 到 workerGroup 中的一个 NioEventLoop 上
- 将 EchoServer$childHandler(是一个 ChannelInitialzer 类) 添加到 NioSocketChannel 的 pipeline 中
- 设置 childOptions 和 childAttrs
- 执行注册逻辑
- 使用 PowerOfTwoEventExecutorChooser 从 workerGroup 中选择一条 NioEventLoopGroup
Z
- 后续的注册逻辑与 Netty源码分析6 - 服务端启动流程 相同
- 注册任务:
- 将 NioSocketChannel 中的 java.nio.channels.ServerChannel 注册到
Z
的 Selector 上,选择键为0(此时不监听任何事件),attachment 为 NioSocketChannel 本身- pipeline.invokeHandlerAddedIfNeeded() 会执行到
EchoServer$childHandler
,首先执行其 initChannel,此时将业务逻辑处理器 ServerHandler 添加到 ChannelPipeline,如下HeadContext <-> EchoServer$childHandler <-> ServerHandler <-> TailContext
,最后删除 EchoServer$childHandler,最终的 ChannelPipeline 链是HeadContext <-> ServerHandler <-> TailContext
- pipeline.fireChannelRegistered() 注册完毕之后 执行 channelRegister() 事件
- pipeline.fireChannelActive() 服务端 bind 完成之后,会触发 channelActive 此时配置的是 OP_ACCEPT 事件;在 SocketChannel 注册完成之后,也会调用 pipeline.fireChannelActive(),此时配置的是 OP_READ 事件
- 执行 channelActive() 事件
- 绑定 OP_READ 事件到 SocketChannel 上