8.1 总述
由第七节的讲述可知ChannelHandler
并不处理事件,而由其子类代为处理:ChannelInboundHandler
拦截和处理入站事件,ChannelOutboundHandler
拦截和处理出站事件。ChannelHandler
和ChannelHandlerContext
通过组合或继承的方式关联到一起成对使用。事件通过ChannelHandlerContext
主动调用如fireXXX()
和write(msg)
等方法,将事件传播到下一个处理器。注意:入站事件在ChannelPipeline
双向链表中由头到尾正向传播,出站事件则方向相反。
当客户端连接到服务器时,Netty新建一个ChannelPipeline
处理其中的事件,而一个ChannelPipeline
中含有若干ChannelHandler
。如果每个客户端连接都新建一个ChannelHandler
实例,当有大量客户端时,服务器将保存大量的ChannelHandler
实例。为此,Netty提供了Sharable
注解,如果一个ChannelHandler
状态无关,那么可将其标注为Sharable
,如此,服务器只需保存一个实例就能处理所有客户端的事件。
8.2 源码分析
8.2.1 核心类
上图是
ChannelHandler
的核心类类图,其继承层次清晰,我们逐一分析。
1.ChannelHandler
ChannaleHandler
作为最顶层的接口,并不处理入站和出站事件,所以接口中只包含最基本的方法:
// Handler本身被添加到ChannelPipeline时调用
void handlerAdded(ChannelHandlerContext ctx) throws Exception;
// Handler本身被从ChannelPipeline中删除时调用
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
// 发生异常时调用
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
其中也定义了Sharable
标记注解:
@Inherited
@Documented
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@interface Sharable {
// no value
}
作为ChannelHandler
的默认实现,ChannelHandlerAdapter
有个重要的方法isSharable()
,代码如下:
public boolean isSharable() {
Class<?> clazz = getClass();
// 每个线程一个缓存
Map<Class<?>, Boolean> cache =
InternalThreadLocalMap.get().handlerSharableCache();
Boolean sharable = cache.get(clazz);
if (sharable == null) {
// Handler是否存在Sharable注解
sharable = clazz.isAnnotationPresent(Sharable.class);
cache.put(clazz, sharable);
}
return sharable;
}
这里引入了优化的线程局部变量InternalThreadLocalMap
,将在以后分析,此处可简单理解为线程变量ThreadLocal
,即每个线程都有一份ChannelHandler
是否Sharable
的缓存。这样可以减少线程间的竞争,提升性能。
2.ChannelInboundHandler
ChannelInboundHandler处理入站事件,以及用户自定义事件:
// 类似的入站事件
void channeXXX(ChannelHandlerContext ctx) throws Exception;
// 用户自定义事件
void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
ChannelInboundHandlerAdapter作为ChannelInboundHandler的实现,默认将入站事件自动传播到下一个入站处理器。其中的代码高度一致,如下:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
3.ChannelOutboundHandler
ChannelOutboundHandler处理出站事件:
// 类似的出站事件
void read(ChannelHandlerContext ctx) throws Exception;
同理,ChannelOutboundHandlerAdapter
作为ChannelOutboundHandler
的事件,默认将出站事件传播到下一个出站处理器:
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
ctx.read();
}
4.ChannelDuplexHandler
ChannelDuplexHandler则同时实现了ChannelInboundHandler
和ChannelOutboundHandler
接口。如果一个所需的ChannelHandler
既要处理入站事件又要处理出站事件,推荐继承此类。
至此,ChannelHandler的核心类已分析完毕,接下来将分析一些Netty自带的Handler。
8.3 ChannelHandler实例
8.3.1 LoggingHandler
日志处理器LoggingHandler
是使用Netty进行开发时的好帮手,它可以对入站\出站事件进行日志记录,从而方便我们进行问题排查。首先看类签名:
@Sharable
public class LoggingHandler extends ChannelDuplexHandler
注解Sharable
说明LoggingHandler
没有状态相关变量,所有Channel可以使用一个实例。继承自ChannelDuplexHandler
表示对入站出站事件都进行日志记录。最佳实践:使用static
修饰LoggingHandler
实例,并在生产环境删除LoggingHandler
。
该类的成员变量如下:
// 实际使用的日志处理,slf4j、log4j等
protected final InternalLogger logger;
// 日志框架使用的日志级别
protected final InternalLogLevel internalLevel;
// Netty使用的日志级别
private final LogLevel level;
// 默认级别为Debug
private static final LogLevel DEFAULT_LEVEL = LogLevel.DEBUG;
看完成员变量,在移目构造方法,LoggingHandler
的构造方法较多,一个典型的如下:
public LoggingHandler(LogLevel level) {
if (level == null) {
throw new NullPointerException("level");
}
// 获得实际的日志框架
logger = InternalLoggerFactory.getInstance(getClass());
// 设置日志级别
this.level = level;
internalLevel = level.toInternalLevel();
}
在构造方法中获取用户实际使用的日志框架,如slf4j、log4j等,并日志设置记录级别。其他的构造方法也类似,不在赘述。
记录出站、入站事件的过程类似,我们以ChannelRead()
为例分析,代码如下:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
logMessage(ctx, "RECEIVED", msg); // 记录日志
ctx.fireChannelRead(msg); // 传播事件
}
private void logMessage(ChannelHandlerContext ctx, String eventName, Object msg) {
if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, formatMessage(eventName, msg)));
}
}
protected String formatMessage(String eventName, Object msg) {
if (msg instanceof ByteBuf) {
return formatByteBuf(eventName, (ByteBuf) msg);
} else if (msg instanceof ByteBufHolder) {
return formatByteBufHolder(eventName, (ByteBufHolder) msg);
} else {
return formatNonByteBuf(eventName, msg);
}
}
其中的代码都简单明了,主要分析formatByteBuf()
方法:
protected String formatByteBuf(String eventName, ByteBuf msg) {
int length = msg.readableBytes();
if (length == 0) {
StringBuilder buf = new StringBuilder(eventName.length() + 4);
buf.append(eventName).append(": 0B");
return buf.toString();
} else {
int rows = length / 16 + (length % 15 == 0? 0 : 1) + 4;
StringBuilder buf = new StringBuilder(eventName.length() +
2 + 10 + 1 + 2 + rows * 80);
buf.append(eventName)
.append(": ").append(length).append('B').append(NEWLINE);
appendPrettyHexDump(buf, msg);
return buf.toString();
}
其中的数字计算,容易让人失去耐心,使用逆向思维,放上结果反推:
有了这样的结果,请反推实现。需要注意的是其中的
appendPrettyHexDump()
方法,这是在ByteBufUtil
里的static
方法,当我们也需要查看多字节数据时,这是一个特别有用的展现方法,记得可在以后的Debug中可加以使用。
8.3.2 TimeoutHandler
在开发TCP服务时,一个常见的需求便是使用心跳保活客户端。而Netty自带的三个超时处理器IdleStateHandler
,ReadTimeoutHandler
和WriteTimeoutHandler
可完美满足此需求。其中IdleStateHandler
可处理读超时(客户端长时间没有发送数据给服务端)、写超时(服务端长时间没有发送数据到客户端)和读写超时(客户端与服务端长时间无数据交互)三种情况。这三种情况的枚举为:
public enum IdleState {
READER_IDLE, // 读超时
WRITER_IDLE, // 写超时
ALL_IDLE // 数据交互超时
}
以IdleStateHandler
的读超时事件为例进行分析,首先看类签名:
public class IdleStateHandler extends ChannelDuplexHandler
注意到此Handler没有Sharable
注解,这是因为每个连接的超时时间是特有的即每个连接有独立的状态,所以不能标注Sharable
注解。继承自ChannelDuplexHandler
是因为既要处理读超时又要处理写超时。
该类的一个典型构造方法如下:
public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds,
int allIdleTimeSeconds) {
this(readerIdleTimeSeconds, writerIdleTimeSeconds,
allIdleTimeSeconds, TimeUnit.SECONDS);
}
分别设定各个超时事件的时间阈值。以读超时事件为例,有以下相关的字段:
// 用户配置的读超时时间
private final long readerIdleTimeNanos;
// 判定超时的调度任务Future
private ScheduledFuture<?> readerIdleTimeout;
// 最近一次读取数据的时间
private long lastReadTime;
// 是否第一次读超时事件
private boolean firstReaderIdleEvent = true;
// 状态,0 - 无关, 1 - 初始化完成 2 - 已被销毁
private byte state;
// 是否正在读取
private boolean reading;
首先看初始化方法initialize()
:
private void initialize(ChannelHandlerContext ctx) {
switch (state) {
case 1: // 初始化进行中或者已完成
case 2: // 销毁进行中或者已完成
return;
}
state = 1;
lastReadTime = ticksInNanos();
if (readerIdleTimeNanos > 0) {
readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
readerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
初始化的工作较为简单,设定最近一次读取时间lastReadTime
为当前系统时间,然后在用户设置的读超时时间readerIdleTimeNanos
截止时,执行一个ReaderIdleTimeoutTask
进行检测。其中使用的方法很简洁,如下:
long ticksInNanos() {
return System.nanoTime();
}
ScheduledFuture<?> schedule(ChannelHandlerContext ctx, Runnable task,
long delay, TimeUnit unit) {
return ctx.executor().schedule(task, delay, unit);
}
然后,分析销毁方法destroy()
:
private void destroy() {
state = 2; // 这里结合initialize对比理解
if (readerIdleTimeout != null) {
// 取消调度任务,并置null
readerIdleTimeout.cancel(false);
readerIdleTimeout = null;
}
}
可知销毁的处理也很简单,分析完初始化和销毁,再看这两个方法被调用的地方,initialize()
在三个方法中被调用:
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isActive() &&
ctx.channel().isRegistered()) {
initialize(ctx);
}
}
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isActive()) {
initialize(ctx);
}
super.channelRegistered(ctx);
}
public void channelActive(ChannelHandlerContext ctx) throws Exception {
initialize(ctx);
super.channelActive(ctx);
}
当客户端与服务端成功建立连接后,Channel被激活,此时channelActive
的初始化被调用;如果Channel被激活后,动态添加此Handler,则handlerAdded
的初始化被调用;如果Channel被激活,用户主动切换Channel的执行线程Executor,则channelRegistered
的初始化被调用。这一部分较难理解,请仔细体会。destroy()
则有两处调用:
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
destroy();
super.channelInactive(ctx);
}
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
destroy();
}
即该Handler被动态删除时,handlerRemoved
的销毁被执行;Channel失效时,channelInactive
的销毁被执行。
分析完这些,在分析核心的调度任务ReaderIdleTimeoutTask
:
private final class ReaderIdleTimeoutTask implements Runnable {
private final ChannelHandlerContext ctx;
ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
@Override
protected void run() {
if (!ctx.channel().isOpen()) {
// Channel不再有效
return;
}
long nextDelay = readerIdleTimeNanos;
if (!reading) {
// nextDelay<=0 说明在设置的超时时间内没有读取数据
nextDelay -= ticksInNanos() - lastReadTime;
}
// 隐含正在读取时,nextDelay = readerIdleTimeNanos > 0
if (nextDelay <= 0) {
// 超时时间已到,则再次调度该任务本身
readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos,
TimeUnit.NANOSECONDS);
boolean first = firstReaderIdleEvent;
firstReaderIdleEvent = false;
try {
IdleStateEvent event =
newIdleStateEvent(IdleState.READER_IDLE, first);
channelIdle(ctx, event); // 模板方法处理
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// 注意此处的nextDelay值,会跟随lastReadTime刷新
readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
}
这个读超时检测任务执行的过程中又递归调用了它本身进行下一次调度,请仔细品味该种使用方法。再列出channelIdle()
的代码:
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt)
throws Exception {
ctx.fireUserEventTriggered(evt);
}
本例中,该方法将写超时事件作为用户事件传播到下一个Handler,用户需要在某个Handler中拦截该事件进行处理。该方法标记为protect
说明子类通常可覆盖,ReadTimeoutHandler
子类即定义了自己的处理:
@Override
protected final void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt)
throws Exception {
assert evt.state() == IdleState.READER_IDLE;
readTimedOut(ctx);
}
protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
if (!closed) {
ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE);
ctx.close();
closed = true;
}
}
可知在ReadTimeoutHandler
中,如果发生读超时事件,将会关闭该Channel。当进行心跳处理时,使用IdleStateHandler
较为麻烦,一个简便的方法是:直接继承ReadTimeoutHandler
然后覆盖readTimedOut()
进行用户所需的超时处理。