// 1. 配置 EventLoopGroup
final static EventLoopGroup group = new NioEventLoopGroup();
public static void main(String[] args) throws Exception {
// 2. 创建并配置客户端启动辅助类 Bootstrap
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoClientHandler());
}
});
// 4. 阻塞连接服务端
ChannelFuture f = b.connect("127.0.0.1", 8081).sync();
f.channel().closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
group.shutdownGracefully();
}
});
}
一、代码执行流程梯形图
/*********************************************** 1. 创建 NioEventLoopGroup ***********************************************/
/*********************************************** 2. 创建并设置 Bootstrap ***********************************************/
new Bootstrap()
--> Map<ChannelOption<?>, Object> options = new LinkedHashMap<>()
--> Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<>()
--> BootstrapConfig config = new BootstrapConfig(this)
// 设置 group
AbstractBootstrap.group(group)
--> EventLoopGroup group = group
// 设置 channel
AbstractBootstrap.channel(NioSocketChannel.class)
--> ChannelFactory channelFactory = new ReflectiveChannelFactory(Class<? extends T> clazz) // 设置channel创建工厂,反射建立 channel
// 设置 option
AbstractBootstrap.option(ChannelOption.SO_BACKLOG, 100)
--> Map<ChannelOption<?>, Object> options.put
// 设置 handler
AbstractBootstrap.handler(new ChannelInitializer<SocketChannel>{})
--> ChannelInitializer channelInitializer = new ChannelInitializer<SocketChannel>(){}
--> ChannelHandler handler = channelInitializer
/*********************************************** 3. connect ***********************************************/
Bootstrap.connect(String inetHost, int inetPort)
--> doResolveAndConnect(SocketAddress remoteAddress, SocketAddress localAddress) // localAddress=null
--> ChannelFuture regFuture = initAndRegister()
/********** 3.1 创建 NioSocketChannel *********/
--> Channel channel = channelFactory.newChannel() // channelFactory=ReflectiveChannelFactory
--> new NioSocketChannel()
--> newSocket(SelectorProvider provider)
--> provider.openSocketChannel() // 创建 java.nio.channels.SocketChannel
--> new NioSocketChannel(null, ch) // parent = null ch = SocketChannel
--> AbstractChannel(Channel parent) // parent=NioServerSocketChannel
--> CloseFuture closeFuture = new CloseFuture(this)
--> ChannelId id = DefaultChannelId.newInstance()
--> Unsafe unsafe = new NioSocketChannelUnsafe() // 每一个 Channel 都有一个 Unsafe 对象
--> ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this)
--> DefaultChannelPipeline pipeline = new DefaultChannelPipeline(this) // 每一个 Channel 都有一个 ChannelPipeline 对象
--> this.channel = = this
--> AbstractChannelHandlerContext tail = new TailContext(this)
--> boolean inbound = true
--> boolean outbound = false
--> this.pipeline = pipeline
--> AbstractChannelHandlerContext head = new HeadContext(this)
--> boolean inbound = true
--> boolean outbound = true
--> this.pipeline = pipeline
--> Unsafe unsafe = pipeline.channel().unsafe() // NioSocketChannelUnsafe
--> head<->tail 组建双链表 // 每一个 ChannelPipeline 对象都有一条 ChannelHandlerContext 组成的双向链表,每一个handler都由ChannelHandlerContext包裹
--> SelectableChannel ch = channel // channel = java.nio.channels.SocketChannel
--> int readInterestOp = 1 // SelectionKey.OP_READ
--> ch.configureBlocking(false) // 配置 SocketChannel 为非阻塞
--> NioSocketChannelConfig config = new NioSocketChannelConfig(this, socket.socket()) // tcp 参数配置类
--> RecvByteBufAllocator rcvBufAllocator = new AdaptiveRecvByteBufAllocator()
--> ByteBufAllocator allocator = PooledByteBufAllocator(preferDirect = true)
--> int connectTimeoutMillis = 30000
--> WriteBufferWaterMark writeBufferWaterMark = WriteBufferWaterMark.DEFAULT // low=32 * 1024 hign=64 * 1024
--> Channel channel = this,即 NioServerSocketChannel
--> setTcpNoDelay(true) // 只要不是安卓就设置 SocketChannel.tcpNoDelay 为 true
/********** 3.2 初始化 NioSocketChannel 属性 *********/
--> Bootstrap.init(Channel channel)
--> channel.pipeline().addLast(config.handler()) // 添加 handler(ChannelInitializer)
--> channel.config().setOption((ChannelOption<Object>) option, value) // SocketChannelConfig 设置 option 配置
--> channel.attr(key).set(e.getValue()) // 设置 attr
/********** 3.3 执行注册:此时启动Nio线程 + 注册 channel 到 selector *********/
--> ChannelFuture MultithreadEventLoopGroup.register(Channel channel) // channel=NioSocketChannel
--> EventExecutor eventLoop = PowerOfTwoEventExecutorChooser.next()
--> SingleThreadEventLoop.register(Channel channel)
--> new DefaultChannelPromise(channel, this)
--> EventExecutor executor = eventLoop
--> Channel channel = NioSocketChannel
--> promise.channel().unsafe().register(this, promise) // this=eventLoop
--> AbstractChannel$AbstractUnsafe.register(this, promise)
--> EventLoop eventLoop = this // 同一个 channel 只由一个 EventLoop 来处理
--> 将 register0(promise) 封装为 task,丢入 eventLoop 任务队列
--> SingleThreadEventExecutor.execute(Runnable task) // task register0
--> addTask(Runnable task)
--> offerTask(Runnable task)
--> taskQueue.offer(task) // 如果添加失败(例如队列容量满了),则使用回绝处理器,此处是抛出RejectedExecutionException
--> startThread()
--> 封装线程启动task
// 该task 是在下一步创建线程并启动之后执行的
--> NioEventLoop.Thread thread = 下一步创建出来的线程
--> NioEventLoop.run()
--> processSelectedKeys() // 处理 NIO 感兴趣的事件
--> runAllTasks(long timeoutNanos) // 处理队列中的任务
--> fetchFromScheduledTaskQueue() // 将到期的 scheduledTaskQueue 中的 task 加入 taskQueue
--> Runnable task = pollTask() // taskQueue.poll()
--> safeExecute(task) // task.run() 此时会执行 register0 任务
--> afterRunningAllTasks()
--> runAllTasksFrom(tailTasks) // 执行 tailTasks 中的所有 task
--> ThreadPerTaskExecutor.execute(Runnable command) // threadFactory.newThread(command).start() 创建线程并启动
--> wakeup(boolean inEventLoop) // inEventLoop=false
--> selector.wakeup() // 唤醒阻塞
--> Channel channel = regFuture.channel()
--> ChannelPromise promise = channel.newPromise()
--> pipeline.newPromise()
--> new DefaultChannelPromise(channel)
--> doResolveAndConnect0(channel, remoteAddress, localAddress, promise)
--> doConnect(remoteAddress, localAddress, connectPromise)
--> 将 channel.connect 封装为 task,丢入 eventLoop 任务队列
--> channel.eventLoop().execute(connect 任务) // connect task
register0 task
--> doRegister()
--> selectionKey = javaChannel().register(Selector sel, // eventLoop().unwrappedSelector()
int ops, // 0
Object att) // this
--> pipeline.invokeHandlerAddedIfNeeded() // 会执行一些 handlerAdded() 事件
--> callHandlerAddedForAllHandlers() // 执行 pendingHandlerCallback 链表
--> EchoClient$handler.initChannel(final Channel ch)
--> pipeline.addLast(new EchoClientHandler()) // 添加 EchoClientHandler
--> 从 pipeline 删除 EchoClient$handler(因为此时的 EchoClient$handler 的职责已经结束)
--> pipeline.fireChannelRegistered() // 会执行一些 channelRegister() 事件
connect task
--> connect(SocketAddress remoteAddress, ChannelPromise promise)
--> pipeline.connect(SocketAddress remoteAddress, ChannelPromise promise)
--> AbstractNioUnsafe.connect(remoteAddress, localAddress, promise)
--> doConnect(remoteAddress, localAddress)
--> boolean connected = socketChannel.connect(remoteAddress) // nio, 如果没有立即连接成功,则 connected = false,则需要设置OP_CONNECT
--> selectionKey().interestOps(SelectionKey.OP_CONNECT)
--> ScheduledFuture<?> connectTimeoutFuture = eventLoop().schedule(new Runnable() {...}) // 设置连接超时任务,后续若在超时时间内连接成功,则直接connectTimeoutFuture.cancel()
NioEventLoop.run()
--> processSelectedKeysOptimized()
// 遍历 selectedKeys
--> selectionKey.attachment() // NioSocketChannel
--> processSelectedKey(SelectionKey k, AbstractNioChannel ch) // ch = NioSocketChannel
--> NioUnsafe unsafe = ch.unsafe() // unsafe 实例 = NioSocketchannelUnsafe
// 判断 k 的键值,此处是 CONNECT
--> AbstractNioUnsafe.finishConnect()
--> NioSocketChannel.doFinishConnect() // 检测是否完成连接 javaChannel().finishConnect(),若没有,直接抛出 Error,
--> AbstractNioUnsafe.fulfillConnectPromise(ChannelPromise promise, boolean wasActive)
--> pipeline().fireChannelActive()
--> HeadContext.channelActive(ChannelHandlerContext ctx)
--> ctx.fireChannelActive() // 传播执行 channelActive() 事件
--> readIfIsAutoRead()
--> AbstractChannel.read()
--> pipeline.read()
--> TailContext.read()
--> unsafe.beginRead() // unsafe = AbstractUnsafe
--> AbstractNioChannel.doBeginRead()
--> selectionKey.interestOps(interestOps | readInterestOp) // 绑定 OP_READ 事件
--> connectTimeoutFuture.cancel // 取消连接超时任务
总结:
在 Netty源码分析6 - 服务端启动流程 中,服务端启动成功,客户端就可以进行连接操作了。
总结:
- 创建 NioEventLoopGroup
- 创建并设置 Bootstrap
- options、attrs、handler 都是针对 SocketChannel 起作用的;(客户端只有一种 Channel)
- 设置 channel 时,创建了 ReflectiveChannelFactory,用于反射创建 NioSocketChannel
- 连接操作
第一步与 bind 操作一样,都是执行 initAndRegister()
- 使用 ReflectiveChannelFactory 反射创建 NioSocketChannel
- 创建 java.nio.channels.SocketChannel,NioSocketChannel 是其包装类
- 为 NioSocketChannel 设置唯一ID
- 为 NioSocketChannel 设置 Unsafe 实例(客户端是 NioSocketChannelUnsafe),Unsafe 是真正的进行底层 IO 操作的类 - 每一个 Channel 都有一个 Unsafe 对象
- 为 NioSocketChannel 设置 ChannelPipeline 对象 - 每一个 Channel 都有一个 ChannelPipeline 对象,每一个 ChannelPipeline 都包含一条由 ChannelHandlerContext 组成的双向链表(这条双向链表至少有两个节点 HeadContext 和 TailContext),每个 ChannelHandlerContext 内部都包裹着一个 ChannelHandler。
- 设置 java.nio.channels.SocketChannel 为非阻塞
- 记录感兴趣事件为 OP_READ 事件
只要不是安卓就设置 SocketChannel.tcpNoDelay 为 true
- 初始化 NioSocketChannel 属性
- 将 EchoClient$handler(是一个 ChannelInitialzer 类) 添加到 NioSocketChannel 的 pipeline 中
- 设置 options、attrs 到 NioSocketChannel
- 执行注册:此时启动Nio线程 + 注册 channel 到 selector
- 使用 NioEventLoopGroup 的线程选择器从 group 中选出一个 NioEventLoop
X
- 最终调用 NioSocketChannelUnsafe 执行注册,由于当前执行线程不是当前 eventLoop 的那条 NioEventLoop 线程,所以创建注册任务,并加入
X
的 taskQueue 中,然后创建线程,赋值给X
的 thread 属性,之后启动线程,执行 NIO 死循环- NIO 死循环会按照 ioRatio 计算出来的时间比分别执行 “处理 NIO 感兴趣的事件” 和 “处理队列中的任务”(会将到期的 scheduledTaskQueue 中的 task 加入 taskQueue,之后统一从 taskQueue pollTask),此时会执行注册任务
- 注册任务:
- 将 NioSocketChannel 中的 java.nio.channels.SocketChannel 注册到
X
的 Selector 上,选择键为0(此时不监听任何事件),attachment 为 NioSocketChannel 本身- pipeline.invokeHandlerAddedIfNeeded() 会执行到
EchoClient$handler
,首先执行其 initChannel,此时将业务逻辑处理器 ClientHandler 添加到 ChannelPipeline,如下HeadContext <-> EchoClient$childHandler <-> ClientHandler <-> TailContext
,最后删除 EchoClient$childHandler,最终的 ChannelPipeline 链是HeadContext <-> ClientHandler <-> TailContext
- 注册完毕之后 执行 channelRegister() 事件
- connect 连接
- 与注册操作一样连接操作也是非当前 Channel 所属的 NioEventLoop 线程发起的,所以也要封装为任务,加入到
X
的 taskQueue 中(每加入队列一个任务,都会做一次 selector.wakeup 操作,起到及时执行任务的作用)
- 连接任务的内容:pipeline.connect(SocketAddress remoteAddress, ChannelPromise promise),最终执行到
AbstractNioUnsafe.connect(remoteAddress, localAddress, promise)
- 执行 java 底层的 NIO 连接操作:
socketChannel.connect(remoteAddress)
,如果没有立即 connect 成功,则设置当前的 socketChannel 的兴趣键为 OP_CONNECT,并且创建连接超时任务并添加到 eventLoop 的 scheduledTaskQueue(此时会创建优先队列,赋值给 scheduledTaskQueue,后续如果在连接超时时间之内连接成功,连接超时任务会被取消);如果立即连接成功,则执行 AbstractNioUnsafe.fulfillConnectPromise(ChannelPromise promise, boolean wasActive)- 为 NioServerSocketChannel 设置感兴趣的监听键为创建 NioServerSocketChannel 时所存储的 ACCEPT 事件
注册连接事件之后,客户端被选取的 NioEventLoop 同样死循环 select() 和 runAllTask(),当到来的感兴趣事件是 OP_CONNECT 时,则首先从 selectedKey.attachment() 获取在注册的时候注册上去的 Channel,之后从 Channel 中获取 Unsafe,最终执行 AbstractNioUnsafe.finishConnect():
- 检测是否完成连接 javaChannel().finishConnect(),若没有,直接抛出 Error
- 执行 AbstractNioUnsafe.fulfillConnectPromise(ChannelPromise promise, boolean wasActive)
- pipeline.fireChannelActive()
- 执行 channelActive() 事件
- 绑定 OP_READ 事件到 SocketChannel 上
- 取消连接超时任务
总结:pipeline.fireChannelActive() 的执行时机:
- 服务端 bind 完成之后,会触发 channelActive,此时配置的是 OP_ACCEPT 事件
- 服务端在 SocketChannel 注册完成之后,会触发 channelActive ,此时配置的是 OP_READ 事件
- 客户端在 SocketChannel 连接完成之后,会触发 channelActive ,此时配置的是 OP_READ 事件