Netty源码分析7 - 服务端 accept 原理

一、代码执行流程梯形图

NioEventLoop.run()
--> processSelectedKeysOptimized()
  // 遍历 selectedKeys
  --> selectionKey.attachment() // NioServerSocketChannel
  --> processSelectedKey(SelectionKey k, AbstractNioChannel ch) // ch = NioServerSocketChannel
    --> NioUnsafe unsafe = ch.unsafe() // unsafe 实例 = NioMessageUnsafe
    // 判断 k 的键值,此处是 ACCEPT
    --> NioMessageUnsafe.read()
      <!-- 1. 创建 NioSocketChannel -->
      --> NioServerSocketChannel.doReadMessages(List<Object> buf)
        --> SocketChannel ch = serverSocketChannel.accept() // ch = java.nio.channels.SocketChannel
        --> new NioSocketChannel(this, ch) // this = NioServerSocketChannel ch = SocketChannel
          --> AbstractChannel(Channel parent) // parent=NioServerSocketChannel
            --> CloseFuture closeFuture = new CloseFuture(this)
            --> ChannelId id = DefaultChannelId.newInstance()
            --> Unsafe unsafe = new NioSocketChannelUnsafe()    // 每一个 Channel 都有一个 Unsafe 对象
              --> ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this)
            --> DefaultChannelPipeline pipeline = new DefaultChannelPipeline(this) // 每一个 Channel 都有一个 ChannelPipeline 对象
              --> this.channel = = this
              --> AbstractChannelHandlerContext tail = new TailContext(this)
                --> boolean inbound = true
                --> boolean outbound = false
                --> this.pipeline = pipeline
              --> AbstractChannelHandlerContext head = new HeadContext(this)
                --> boolean inbound = true
                --> boolean outbound = true
                --> this.pipeline = pipeline
                --> Unsafe unsafe = pipeline.channel().unsafe() // NioSocketChannelUnsafe
              --> head<->tail 组建双链表  // 每一个 ChannelPipeline 对象都有一条 ChannelHandlerContext 组成的双向链表,每一个handler都由ChannelHandlerContext包裹
          --> SelectableChannel ch = channel // channel = java.nio.channels.SocketChannel
          --> int readInterestOp = 1 // SelectionKey.OP_READ
          --> ch.configureBlocking(false) // 配置 ServerSocketChannel 为非阻塞
          --> NioSocketChannelConfig config = new NioSocketChannelConfig(this, socket.socket()) // tcp 参数配置类
            --> RecvByteBufAllocator rcvBufAllocator = new AdaptiveRecvByteBufAllocator()
            --> ByteBufAllocator allocator = PooledByteBufAllocator(preferDirect = true)
            --> int connectTimeoutMillis = 30000
            --> WriteBufferWaterMark writeBufferWaterMark = WriteBufferWaterMark.DEFAULT // low=32 * 1024 hign=64 * 1024
            --> Channel channel = this,即 NioServerSocketChannel
            --> setTcpNoDelay(true) // 只要不是安卓就设置 SocketChannel.tcpNoDelay 为 true
      <!-- 2. 使用 ServerBootstrapAcceptor 为 NioSocketChannel 设置属性 + 注册 NioSocketChannel 到 workerGroup 中的一个 NioEventLoop 上 -->
      --> DefaultChannelPipeline.fireChannelRead(Object msg) // msg = NioSocketChannel
        --> ServerBootstrapAcceptor.channelRead(ChannelHandlerContext ctx, Object msg)
          --> child.pipeline().addLast(childHandler) // child = NioSocketChannel,将 childHandler 添加到 NioSocketChannel 的 pipeline 中
          --> 设置 childOptions 和 childAttrs
          --> childGroup.register(child) // childGroup = workerGroup
            --> EventExecutor eventLoop = PowerOfTwoEventExecutorChooser.next() // 从 workerGroup 中选择一条 NioEventLoopGroup
            --> SingleThreadEventLoop.register(Channel channel)
              --> new DefaultChannelPromise(channel, this)
                --> EventExecutor executor = eventLoop
                --> Channel channel = NioSocketChannel
              --> promise.channel().unsafe().register(this, promise) // this=eventLoop
                --> AbstractChannel$AbstractUnsafe.register(this, promise)
                  --> EventLoop eventLoop = this  // 同一个 channel 只由一个 EventLoop 来处理
                  --> 因为当前线程是 worker 的一个NioEventLoop线程,不是当前 eventLoop 的那条NioEventLoop线程,所以将 register0(promise) 封装为 task,丢入 eventLoop 任务队列
                  --> SingleThreadEventExecutor.execute(Runnable task) // task register0
                    --> addTask(Runnable task)
                      --> offerTask(Runnable task)
                        --> taskQueue.offer(task) // 如果添加失败(例如队列容量满了),则使用回绝处理器,此处是抛出RejectedExecutionException
                    --> startThread()
                      --> 封装线程启动task
                        // 该 task 是在下一步创建线程并启动之后执行的
                        --> NioEventLoop.Thread thread = 下一步创建出来的线程
                        --> NioEventLoop.run()
                          --> processSelectedKeys() // 处理 NIO 感兴趣的事件
                          --> runAllTasks(long timeoutNanos) // 处理队列中的任务
                            --> fetchFromScheduledTaskQueue() // 将到期的 scheduledTaskQueue 中的 task 加入 taskQueue
                            --> Runnable task = pollTask() // taskQueue.poll()
                            --> safeExecute(task) // task.run() 此时会执行 register0 任务
                            --> afterRunningAllTasks()
                              --> runAllTasksFrom(tailTasks) // 执行 tailTasks 中的所有 task
                      --> ThreadPerTaskExecutor.execute(Runnable command) // threadFactory.newThread(command).start() 创建线程并启动
                    --> wakeup(boolean inEventLoop) // inEventLoop=false
                      --> selector.wakeup() // 唤醒阻塞

register0 task
--> doRegister()
  --> selectionKey = javaChannel().register(Selector sel, // eventLoop().unwrappedSelector()
                                            int ops,      // 0
                                            Object att)   // this
--> pipeline.invokeHandlerAddedIfNeeded() // 会执行一些 handlerAdded() 事件
  --> callHandlerAddedForAllHandlers() // 执行 pendingHandlerCallback 链表
    --> childHandler.initChannel(final Channel ch)  
      --> pipeline.addLast(new ServerHandler())  // 添加 ServerHandler
    --> 从 pipeline 删除 childHandler(因为此时的 childHandler 的职责已经结束)
--> pipeline.fireChannelRegistered() // 会执行一些 channelRegister() 事件
--> pipeline.fireChannelActive()
  --> HeadContext.channelActive(ChannelHandlerContext ctx)
    --> ctx.fireChannelActive() // 传播执行 channelActive() 事件
    --> readIfIsAutoRead()
      --> AbstractChannel.read()
        --> pipeline.read()
          --> TailContext.read()
            --> unsafe.beginRead() // unsafe = AbstractUnsafe
              --> AbstractNioChannel.doBeginRead()
                --> selectionKey.interestOps(interestOps | readInterestOp) // 绑定 OP_READ 事件

总结:

Netty源码分析6 - 服务端启动流程 中,服务端启动成功,启动了一个 bossGroup 中的 NioEventLoop 的 thread,该 thread 一直死循环等待 selectedKey 和 taskQueue 中的任务到来,这里就是 accept 的入口。当一个 connect 请求到来时,判断 selectedKey 是 OP_ACCEPT 事件,则首先从 selectedKey.attachment() 获取在注册的时候注册上去的 Channel,之后从 Channel 中获取 Unsafe,最终执行 Unsafe 的 read() 方法:

  1. 创建 NioSocketChannel
  • 使用 serverSocketChannel.accept() 创建 java.nio.channels.SocketChannel,NioSocketChannel 是其包装类,创建 Channel 的后续操作与创建 NioServerSocketChannel 类似
  • 为 NioSocketChannel 设置唯一ID
  • 为 NioSocketChannel 设置 Unsafe 实例(服务端是 NioSocketChannelUnsafe),Unsafe 是真正的进行底层 IO 操作的类 - 每一个 Channel 都有一个 Unsafe 对象
  • 为 NioSocketChannel 设置 ChannelPipeline 对象 - 每一个 Channel 都有一个 ChannelPipeline 对象,每一个 ChannelPipeline 都包含一条由 ChannelHandlerContext 组成的双向链表(这条双向链表至少有两个节点 HeadContext 和 TailContext),每个 ChannelHandlerContext 内部都包裹着一个 ChannelHandler。
  • 设置 java.nio.channels.SocketChannel 为非阻塞
  • 记录感兴趣事件为 READ 事件
  • 只要不是安卓就设置 SocketChannel.tcpNoDelay 为 true
  1. 使用 ServerBootstrapAcceptor 为 NioSocketChannel 设置属性 + 注册 NioSocketChannel 到 workerGroup 中的一个 NioEventLoop 上
  • 将 EchoServer$childHandler(是一个 ChannelInitialzer 类) 添加到 NioSocketChannel 的 pipeline 中
  • 设置 childOptions 和 childAttrs
  • 执行注册逻辑
  • 将 NioSocketChannel 中的 java.nio.channels.ServerChannel 注册到 Z 的 Selector 上,选择键为0(此时不监听任何事件),attachment 为 NioSocketChannel 本身
  • pipeline.invokeHandlerAddedIfNeeded() 会执行到 EchoServer$childHandler,首先执行其 initChannel,此时将业务逻辑处理器 ServerHandler 添加到 ChannelPipeline,如下 HeadContext <-> EchoServer$childHandler <-> ServerHandler <-> TailContext,最后删除 EchoServer$childHandler,最终的 ChannelPipeline 链是 HeadContext <-> ServerHandler <-> TailContext
  • pipeline.fireChannelRegistered() 注册完毕之后 执行 channelRegister() 事件
  • pipeline.fireChannelActive() 服务端 bind 完成之后,会触发 channelActive 此时配置的是 OP_ACCEPT 事件;在 SocketChannel 注册完成之后,也会调用 pipeline.fireChannelActive(),此时配置的是 OP_READ 事件
  • 执行 channelActive() 事件
  • 绑定 OP_READ 事件到 SocketChannel 上
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 201,924评论 5 474
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,781评论 2 378
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 148,813评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,264评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,273评论 5 363
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,383评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,800评论 3 393
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,482评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,673评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,497评论 2 318
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,545评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,240评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,802评论 3 304
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,866评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,101评论 1 258
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,673评论 2 348
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,245评论 2 341

推荐阅读更多精彩内容