6. ChannelHandler and ChannelPipeline
6.1 The ChannelHandler family
6.1.1 Channel的生命周期
ChannelUnregistered
已创建,但是还没有被注册到EventLoop上。
ChannelRegistered
已创建,并且已经注册到EventLoop。
ChannelActive
连接上远程主机。
ChannelActive
没有连接到远程主机。
Channel状态的变化会触发相应的事件。
6.1.2 ChannelHandler的生命周期
handlerAdd
添加handler
handlerRemove
删除handler
exceptionCaught
发生异常
ChannelHandler有两个重要的子接口:ChannelInboundHandler和ChannelOutboundHandler。
6.1.3 ChannelInboundHandler接口
接受到数据或者Channel的状态发生改变会调用ChannelInboundHandler中的方法。注意,当ChannelInboundHandler中的channelRead()方法被overwrite,需要对ByteBuf实例持有的资源进行显示释放。
public class DiscardHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ReferenceCountUtil.release(msg);
}
}
可以使用SimpleChannelInboundHandler,它会自动释放资源,无需人工干预:
@Sharable
public class SimpleDiscardHandler
extends SimpleChannelInboundHandler<Object> {
@Override
public void channelRead0(ChannelHandlerContext ctx,
Object msg) {
// No need to do anything special
}
}
6.1.4 ChannelOutboundHandler接口
它一个比较强大的功能是延迟执行。
CHANNELPROMISE VS. CHANNELFUTURE
CHANNELPROMISE是CHANNELFUTURE的子接口,CHANNELFUTURE是不可写的,CHANNELPROMISE是可写的(例如setSuccess(),setFailure()方法)
6.1.5 ChannelHandler adapters
6.1.6 资源管理
要注意ChannelInboundHandler.channelRead()或者ChannelOutboundHandler.write()要释放相应的资源,否则会产生内存泄漏。netty使用引用计数法来管理内存资源。可以使用netty提供的ResourceLeakDetector来发现潜在的内存泄漏问题。
java -Dio.netty.leakDetectionLevel=ADVANCED
leakDetectionLevel可以为DISABLED、SIMPLE(默认)、ADVANCED和PARANOID。
6.2 ChannelPipeline接口
ChannelPipeline可以看成由ChannelHandler组成的链表,I/O事件会在ChannelPipeline上传播。每个新Channel会绑定一个新ChannelPipeline,两者是一对一关系。
事件传播的时候,会判断ChannelHandler的类型(implements Inbound还是OutBound的接口)和事件传播的方向是否一致,不一致跳过。
6.2.1 ChannelPipeline修改
ChannelPipeline中ChannelHandler可以动态地被添加、删除或者替换。
6.2.2 Firing events
会调用ChannelPipeline中下一个ChannelHandler里的方法。
代码示例:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
public class HttpServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new MyHandler());
ch.pipeline().addLast(new MyHandler2());
}
});
ChannelFuture future = bootstrap.bind(8080).sync();
future.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
class MyHandler extends SimpleChannelInboundHandler<String>{
@Override
protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("in MyHandler1 , messageReceived invoked");
for(int i = 0;i < 10 ; i++) {
ctx.fireChannelInactive();//调用fireChannelInactive 10次
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("in MyHandler1 , channelInactive invoked");
}
}
class MyHandler2 extends SimpleChannelInboundHandler<String>{
@Override
protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("in MyHandler2 ,messageReceived invoked");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("in MyHandler2 , channelInactive invoked");
}
}
输出:
6.3 ChannelHandlerContext接口
ChannelHandlerContext代表了ChannelHandler和ChannelPipeline之间的联系,无论何时,添加一个ChannelHandler到ChannelPipeline就会创建一个ChannelHandlerContext。ChannelHandlerContext的主要功能是和所在ChannelPipeline的其他ChannelHandler交互。
ChannelHandlerContext有很多方法,大部分方法在Channel和ChannelPipeline里都出现过,但是这里有一个非常大的区别,调用Channel和ChannelPipeline里的方法,会在整个pipeline里传播(从头到尾),而ChannelHandlerContext里同名的方法,是从当前ChannelHandler开始传播。
6.3.1 Using ChannelHandlerContext
6.3.2 ChannelHandler和ChannelHandlerContext的高级用法。
- ChannelHandlerContext的pipeline()方法可以获取ChannelPipeline的引用,这样我们可以通过这个引用操作ChannelHandler,实现动态协议。
- 可以把ChannelHandlerContext的引用缓存起来,在ChannelHandler方法外面用,甚至在一个不同的线程里使用。下面提供了一个示例。
- 可以将一个ChannelHandler实例可能会被添加到不同的ChannelPipeline里,但是需要使用@Sharable注解,此外还需注意的是,这个Sharable的ChannelHandler需要是线程安全的。
为什么需要@Sharable的ChannelHandler,一个需求就是通过这个@Sharable来统计多个Channel的数据。
6.4 异常处理
6.4.1 Inbound异常处理
由于exception默认会从触发异常的ChannelHandler继续向后流动,所以图中的这种处理逻辑,我们一般放在最后ChannelPipeline的末尾。这样就可以确保,无论是哪个ChannelHandler触发异常,都能够被捕获并处理。如果不对异常做捕获处理操作,netty会打印异常未被捕获的日志。
6.4.2 outbound异常处理
进行outbound操作,要想知道结果(正常完成还是发生异常),需要这样做:
每个outbound操作都会返回一个ChannelFuture。添加到ChannelFuture上的监听器会收到成功或者错误通知。
ChannelOutboundHandler中的方法绝大多数都会ChannelPromise类型的参数。ChannelPromise也可以添加监听来接受异步通知。ChannelPromise是可写的,可以通过它的setSucess()方法或者setFailure(Throwable cause)立即发布通知。
如果ChannelOutboundHandler自己抛出异常,netty会通知添加到ChannelPromise上的监听器。
7. EventLoop and threading model
7.1 Threading model overview
JDK早期版本多线程编程的方式是create
新线程再start
。JDK5推出了Executor API
,它的线程池技术通过缓存和重用大大提高了性能。
- 有任务(
Runnable实现
)的时候,从线程池里挑选出一个空闲线程,把任务submit
给它。 - 任务执行完毕了,线程变成空闲,回到线程池,等待下一次挑选使用。
线程池不能解决上下文切换开销的问题,上下文的开销在heavy load下会很大。
7.2 EventLoop接口
EventLoop
是一个用来处理事件的任务,基本思想如下图所示:
EventLoop
接口的API分为两类:concurrent和networking。
- concurrent
基于java.util.concurrent
包,提供thread executors - networking
io.netty.channel
继承了EventLoop接口,提供了和Channel事件交互的能力。
7.2.1 Netty 4中I/O事件的处理
7.3.1 JDK 任务调度API
JDK5之前,任务调度只能用java.util.Timer
,Timer就是一个后台线程,有很多限制:
- 如果执行多个定时任务,一个任务发生异常没有捕获,整个Timer线程会挂掉(其他所有任务都会down掉)
- 假如某个任务的执行时间过长,超过一些任务的间隔时间,会导致这些任务执行推迟。
JDK后续推出了java.util.concurrent
,其中定义的ScheduleExecutorService
克服了这些缺陷。
ScheduledExecutorService executor =Executors.newScheduledThreadPool(10);
ScheduledFuture<?> future = executor.schedule(
new Runnable() {
@Override
public void run() {
System.out.println("60 seconds later");
}
}, 60, TimeUnit.SECONDS);
//to do
executor.shutdown();
尽管ScheduledExecutorSevice
挺好用的,但是在负载大的时候有较大的性能耗费,netty进行了优化。
7.3.2 使用EventLoop进行任务调度
ScheduledExecutorService
也有一些限制,例如会创建额外创建一些线程来管理线程池,这在任务调度非常激烈的情况下,会成为性能的瓶颈。netty没有直接使用ScheduledExecutorService
,使用了继承于ScheduledExecutorService
,自己实现的EventLoop
。
Channel ch = ...
ScheduledFuture<?> future = ch.eventLoop().schedule(
new Runnable() {
@Override
public void run() {
System.out.println("60 seconds later");
}
}, 60, TimeUnit.SECONDS);
重复定时执行:
Channel ch = ...
ScheduledFuture<?> future = ch.eventLoop().scheduleAtFixedRate(
new Runnable() {
@Override
public void run() {
System.out.println("Run every 60 seconds");
}
}, 60, 60, TimeUnit.Seconds);
7.4 实现细节
7.4.1 线程管理
netty线程模型的优越之处是在于它会确定当前执行线程的身份,再进行相应操作。如果当前执行线程被绑定到当前的Channel
和EventLoop
,会被直接执行,否则会被放到EventLoop
的队列里,每个EventLoop
有自己单独的队列。
Never put a long-running task in the execution queue, because it will block any other task from executing on the same thread.” If you must make blocking calls or execute long-running tasks, we advise the use of a dedicated EventExecutor.
7.4.2 EventLoop/Thread分配
EventLoopGroup
包含了EventLoops
和Channels
,EventLoops
创建方式取决于使用哪种I/O.
异步I/O
异步I/O仅仅使用少量的EventLoops
,这些EventLoops
被很多的Channels
共享,这样就可以用最少的线程接受很多的Channels
,而不是一个线程一个Channel
。
阻塞I/O
共同点:每个Channel
的I/O事件只会被一个线程处理。
8. Bootstrapping
bootstrapping an application is the process of configuring it to run
8.1 Bootstrap classes
Namely, a server devotes a parent channel to accepting connections from clients and
creating child channels for conversing with them, whereas a client will most likely
require only a single, non-parent channel for all network interactions. (As we’ll see, this
applies also to connectionless transports such as UDP , because they don’t require a
channel for each connection.)
server需要一个parent channel来接受客户端连接,需要创建多个child channels来应答客户端。
client只需要一个单独的channel,不需要parent channel。
服务端处理使用ServerBootstrap
,客户端使用Bootstrap
。
Why are the bootstrap classes Cloneable?
You’ll sometimes need to create multiple channels that have similar or identical settings. To support this pattern without requiring a new bootstrap instance to be created and configured for each channel, AbstractBootstrap has been marked Cloneable . Calling clone() on an already configured bootstrap will return another bootstrap instance that’s immediately usable. Note that this creates only a shallow copy of the bootstrap’s EventLoopGroup , so the latter will be shared among all of the cloned channels. This is acceptable, as the cloned channels are often short-lived, a typical case being a channel created to make an HTTP request.
8.2 Bootstrapping clients and connectionless protocols
Bootstrap
主要用来给客户端和使用面向无连接的应用创建Channels
。
Bootstraping a client:
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new SimpleChannelInboundHandler<ByteBuf>() {
@Override
protected void channeRead0(
ChannelHandlerContext channelHandlerContext,
ByteBuf byteBuf) throws Exception {
System.out.println("Received data");
}
} );
ChannelFuture future = bootstrap.connect(new InetSocketAddress("www.manning.com", 80));
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture)throws Exception {
if (channelFuture.isSuccess()) {
System.out.println("Connection established");
} else {
System.err.println("Connection attempt failed");
channelFuture.cause().printStackTrace();
}
}
} );
8.2.2 Channel和EventLoopGroup的兼容性
you can’t mix components having different
prefixes, such as NioEventLoopGroup and OioSocketChannel . The following listing
shows an attempt to do just that.
Channel
和EventLoopGroup
的前缀要一样。否则会抛出IllegalStateException
8.3 Bootstraping servers
ServerBootstrap类
A ServerBootstrap creating a ServerChannel on bind() , and the ServerChannel managing a number of child Channels.
相比 Bootstrap
类,增加了childHandler()
,childAttr()
,childOption()
方法。ServerChannel
来创建许许多多的子Channel
,代表接受的连接。ServerBootstrap
提供了这些方法来简化对子Channel
的配置。
NioEventLoopGroup group = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(group)
.channel(NioServerSocketChannel.class)
.childHandler(new SimpleChannelInboundHandler<ByteBuf>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx,ByteBuf byteBuf) throw Exception {
System.out.println("Received data");
}
} );
ChannelFuture future = bootstrap.bind(new InetSocketAddress(8080));
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
System.out.println("Server bound");
} else {
System.err.println("Bound attempt failed");
channelFuture.cause().printStackTrace();
}
}
} );
8.4 Bootstrapping clients from a Channel
Suppose your server is processing a client request that requires it to act as a client to
a third system. This can happen when an application, such as a proxy server, has to
integrate with an organization’s existing systems, such as web services or databases. In
such cases you’ll need to bootstrap a client Channel from a ServerChannel
作为服务端接受连接,同时又作为客户端,请求远程服务器(类似于proxy),最容易想到的办法是再创建一个客户端的Bootstrap
,但是这样需要另外一个EventLoop
来处理客户端角色的Channel
,发生在服务端Channel
和客户端Channel
之间数据交换引起的上文切换也会带来额外的性能损耗。
最好的办法是创建的客户端Channel
和服务端Channel
,共享同一个EventLoop
:
ServerBootstrap bootstrap = new ServerBootstrap();
//Sets the EventLoopGroups that provide EventLoops for processing Channel events
bootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup()).channel(NioServerSocketChannel.class)
.childHandler(new SimpleChannelInboundHandler<ByteBuf>() {
ChannelFuture connectFuture;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//Creates a Bootstrap to connect to remote host
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class).handler(new SimpleChannelInboundHandler<ByteBuf>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
System.out.println("Received data");
}
});
//Uses the same EventLoop as the one assigned to the accepted channel
bootstrap.group(ctx.channel().eventLoop());
connectFuture = bootstrap.connect(new InetSocketAddress("www.manning.com", 80));
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf)
throws Exception {
if (connectFuture.isDone()) {
// do something with the data
//When the connection is complete performs some data operation (such as proxying)
}
}
});
ChannelFuture future = bootstrap.bind(new InetSocketAddress(8080));
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
System.out.println("Server bound");
} else {
System.err.println("Bind attempt failed");
channelFuture.cause().printStackTrace();
}
}
});
8.5 Adding multiple ChannelHandlers during a bootstrap
在bootstrap
的时候,如何添加多个ChannelHandler
?
netty提供了ChannelInboundHandlerAdapter
的特殊子类ChannelInitializer
:
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter
ChannelInitializer
提供了initChannel()
可以轻松添加ChannelHandlers
到ChannelPipeline
。
protected abstract void initChannel(C ch) throws Exception;
一旦Channel
注册到EventLoop
,我们实现的initChannel()
就会被调用。当initChannel()
返回的时候,ChannelInitializer
实例会把自己从ChannelPipeline
中删除。
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializerImpl());
ChannelFuture future = bootstrap.bind(new InetSocketAddress(8080));
future.sync();
对应ChannelInitializerImpl
的实现:
final class ChannelInitializerImpl extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpClientCodec());
pipeline.addLast(new HttpObjectAggregator(Integer.MAX_VALUE));
}
}
8.6 Using Netty ChannelOptions and attributes
不需要我们手工配置每个Channel
,netty提供了option()
方法来把ChannelOptions
应用到bootstrap
,ChannelOptions
中的配置会自动地应用到所有Channel
Netty的Channel
和bootstrap
类,提供了AttributeMap
抽象集合和AttributeKey<T>
泛型类,用来insert和retrieve属性值。使用这些工具,我们可以安全地把任意类型的数据和Channel
关联起来。
Attribute
的一个使用场景是,服务端应用需要追踪用户和Channels
的关系。可以把用户的ID作为一个属性存到Channel
里。这样就可以实现根据ID来路由消息和Channel
不活跃自动关闭等功能。
final AttributeKey<Integer> id = new AttributeKey<Integer>("ID");
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(new NioEventLoopGroup()).channel(NioSocketChannel.class)
.handler(new SimpleChannelInboundHandler<ByteBuf>() {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
Integer idValue = ctx.channel().attr(id).get();
// do something with the idValue
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf)
throws Exception {
System.out.println("Received data");
}
});
bootstrap.option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
bootstrap.attr(id, 123456);
ChannelFuture future = bootstrap.connect(new InetSocketAddress("www.manning.com", 80));
future.syncUninterruptibly();
8.7 Bootstrapping DatagramChannels
之前的bootstrap
示例代码都是基于TCP-based的SocketChannel
,bootstrap
也可以配置为无连接协议。
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(new OioEventLoopGroup()).channel(OioDatagramChannel.class)
.handler(new SimpleChannelInboundHandler<DatagramPacket>() {
@Override
public void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
// Do something with the packet
}
});
ChannelFuture future = bootstrap.bind(new InetSocketAddress(0));
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
System.out.println("Channel bound");
} else {
System.err.println("Bind attempt failed");
channelFuture.cause().printStackTrace();
}
}
});
8.8 Shutdown
Alternatively, you can call Channel.close() explicitly on all active channels before calling EventLoopGroup.shutdownGracefully() . But in all cases, remember to shut down the EventLoopGroup itself.
EventLoopGroup.shutdownGracefully()
,它的返回值是一个future
,这也是一个异步操作。
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class);
...
Future<?> future = group.shutdownGracefully();
// block until the group has shutdown
future.syncUninterruptibly();
9 Unit testing
Netty提供了embedded transport
来测试ChannelHandlers
,embedded transport
是EmbeddedChannel
(一种特殊的Channel
实现) 的特色功能,可以简单地实现在pipeline
中传播事件。
我们可以写入inbound
或者outbound
数据到EmbeddedChannel
,然后检查是否有东西传输到ChannelPipeline
的末尾。我们还可以确定消息是否被编解码,是否有ChannelHandler
被触发。
Inbound data
会被ChannelInboundHandlers
处理,代表着从远程主机读取的数据。
outbound data
会被ChannelOutboundHandlers
处理,代表将要发送到远程主机的数据。
相关API:
图9.1展示了数据在EmbededChannel
的流动情况。我们可以:
使用
writeOutbound()
,写入消息到Channel
,让消息以outbound
方向在pipeline
中传递。后续,我们可以使用readOutbound()
读取处理过后的数据,判断结果是否与预期一致。使用
writeInbound()
,写入消息到Channel
,让消息以inbound
方向在pipeline
中传递。后续,我们可以使用readInbound()
读取处理过后的数据,判断结果是否与预期一致。
9.2 Testing ChannelHandlers with EmbeddedChannel
9.2.1 Testing inbound messages
图9.2 展示了一个简单的ByteToMessageDecoder
实现。如果有足够的数据,这个Decoder会产生固定大小的frame。如果没有足够的数据,没有达到这个固定的size值,它会等待接下来的数据,继续判断能否接着产生frame。
具体代码实现如下:
public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
private final int frameLength;
public FixedLengthFrameDecoder(int frameLength) {
if (frameLength <= 0) {
throw new IllegalArgumentException(
"frameLength must be a positive integer: " + frameLength);
}
this.frameLength = frameLength;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in,
List<Object> out) throws Exception {
while (in.readableBytes() >= frameLength) {
ByteBuf buf = in.readBytes(frameLength);
out.add(buf);
}
}
}
那么如何进行单元测试呢,测试代码如下:
public class FixedLengthFrameDecoderTest {
@Test
public void testFramesDecoded() {
ByteBuf buf = Unpooled.buffer();
for (int i = 0; i < 9; i++) {
buf.writeByte(i);
}
ByteBuf input = buf.duplicate();
EmbeddedChannel channel = new EmbeddedChannel(
new FixedLengthFrameDecoder(3));
// write bytes
assertTrue(channel.writeInbound(input.retain()));
assertTrue(channel.finish());
// read messages
ByteBuf read = (ByteBuf) channel.readInbound();
assertEquals(buf.readSlice(3), read);
read.release();
read = (ByteBuf) channel.readInbound();
assertEquals(buf.readSlice(3), read);
read.release();
read = (ByteBuf) channel.readInbound();
assertEquals(buf.readSlice(3), read);
read.release();
assertNull(channel.readInbound());
buf.release();
}
@Test
public void testFramesDecoded2() {
ByteBuf buf = Unpooled.buffer();
for (int i = 0; i < 9; i++) {
buf.writeByte(i);
}
ByteBuf input = buf.duplicate();
EmbeddedChannel channel = new EmbeddedChannel(
new FixedLengthFrameDecoder(3));
assertFalse(channel.writeInbound(input.readBytes(2)));
assertTrue(channel.writeInbound(input.readBytes(7)));
assertTrue(channel.finish());
ByteBuf read = (ByteBuf) channel.readInbound();
assertEquals(buf.readSlice(3), read);
read.release();
read = (ByteBuf) channel.readInbound();
assertEquals(buf.readSlice(3), read);
read.release();
read = (ByteBuf) channel.readInbound();
assertEquals(buf.readSlice(3), read);
read.release();
assertNull(channel.readInbound());
buf.release();
}
}
9.2.2 Testing outbound messages
我们需要测试一个编码器:AbsIntegerEncoder
,它是Netty的MessageToMessageEncode
的一个实现,功能是将整数取绝对值。
我们的流程如下:
EmbeddedChannel
会将一个四字节负数按照outbound方向写入Channel
。编码器会从到来的
ByteBuf
读取每个负数,调用Math.abs()
获得绝对值。-
编码器将绝对值写入到
ChannelHandlerPipe
。
编码器代码实现:
public class AbsIntegerEncoder extends MessageToMessageEncoder<ByteBuf> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext,
ByteBuf in, List<Object> out) throws Exception {
while (in.readableBytes() >= 4) {
int value = Math.abs(in.readInt());
out.add(value);
}
}
}
怎么测试?请看下文:
public class AbsIntegerEncoderTest {
@Test
public void testEncoded() {
ByteBuf buf = Unpooled.buffer();
for (int i = 1; i < 10; i++) {
buf.writeInt(i * -1);
}
EmbeddedChannel channel = new EmbeddedChannel(new AbsIntegerEncoder());
assertTrue(channel.writeOutbound(buf));
assertTrue(channel.finish());
// read bytes
for (int i = 1; i < 10; i++) {
assertEquals(i, channel.readOutbound());
}
assertNull(channel.readOutbound());
}
}
9.3 Testing exception handling
为了测试异常处理,我们有如下的示例。
为防止资源耗尽,当我们读取到的数据多于某个数值,我们会抛出一个TooLongFrameException
在图9.4中,最大frame的大小为3字节,当一个frame的字节数大于3,它会被忽略,并且会抛出
TooLongFrameException
,其他的pipeline
里的其他ChannelHandlers
要么覆写exceptionCaught()
进行捕获处理,要么会忽略这个异常。
解码器代码:
public class FrameChunkDecoder extends ByteToMessageDecoder {
private final int maxFrameSize;
public FrameChunkDecoder(int maxFrameSize) {
this.maxFrameSize = maxFrameSize;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in,
List<Object> out) throws Exception {
int readableBytes = in.readableBytes();
if (readableBytes > maxFrameSize) {
// discard the bytes
in.clear();
throw new TooLongFrameException();
}
ByteBuf buf = in.readBytes(readableBytes);
out.add(buf);
}
}
如何测试,请看:
public class FrameChunkDecoderTest {
@Test
public void testFramesDecoded() {
ByteBuf buf = Unpooled.buffer();
for (int i = 0; i < 9; i++) {
buf.writeByte(i);
}
ByteBuf input = buf.duplicate();
EmbeddedChannel channel = new EmbeddedChannel(new FrameChunkDecoder(3));
assertTrue(channel.writeInbound(input.readBytes(2)));
try {
channel.writeInbound(input.readBytes(4));
Assert.fail();
} catch (TooLongFrameException e) {
// expected exception
}
assertTrue(channel.writeInbound(input.readBytes(3)));
assertTrue(channel.finish());
// Read frames
ByteBuf read = (ByteBuf) channel.readInbound();
assertEquals(buf.readSlice(2), read);
read.release();
read = (ByteBuf) channel.readInbound();
assertEquals(buf.skipBytes(4).readSlice(3), read);
read.release();
buf.release();
}
}
10.The codec framework
encoder
,将outbound
消息转换成易于传输的方式(大部分是字节流)。
decoder
,将inbound
网络字节流转回成应用程序消息格式。
10.2 Decoders
两种场景需要使用到Decoders
:
- 将字节流解码成消息--
ByteToMessageDecoder
和ReplayingDecoder
- 将一种消息类型解码成另一种类型--
MessageToMessageDecoder
10.2.1 ByteToMessageDecoder抽象类
功能: 将字节流解码成消息或者另一种字节流。
使用示例ToIntegerDecoder
:
每次从ByteBuf
读取四个字节,解码成int
,添加到List
里。当没有更多的数据添加到List
,List
里的内容会传递到下一个ChannelInboundHandler
。
public class ToIntegerDecoder extends ByteToMessageDecoder {
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() >= 4) {
out.add(in.readInt());
}
}
}
编解码框架里,消息处理完了,会自动调用ReferenceCountUtil.release(message)
,资源会自动释放。
Reference counting in codecs
As we mentioned in chapters 5 and 6, reference counting requires special attention. In the case of encoders and decoders, the procedure is quite simple: once a mes- sage has been encoded or decoded, it will automatically be released by a call toReferenceCountUtil.release(message)
. If you need to keep a reference for later use you can callReferenceCountUtil.retain(message)
. This increments the reference count, preventing the message from being released.
10.2.2 ReplayingDecoder抽象类
public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder
ReplayingDecoder
继承于ByteToMessageDecoder
,特点是我们不再需要调用readableBytes()
,省了判断数据是否足够的逻辑。
注意:
不是所有的
ByteBuf
的操作都被支持。如果不支持会抛出UnsupportedOperationException
异常。ReplayingDecoder
会比ByteToMessageDecoder
稍慢。
ToIntegerDecoder2
:
public class ToIntegerDecoder2 extends ReplayingDecoder<Void> {
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
out.add(in.readInt());
}
}
更多的解码工具可以在io.netty.handler.codec
下找到。
io.netty.handler.codec.LineBasedFrameDecoder
,通过换行符(\n
或者\r\n
)来解析消息。io.netty.handler.codec.http.HttpObjectDecoder
,解析HTTP数据。
10.2.3 MessageToMessageDecoder抽象类
消息格式互相转换,如把一种类型的POJO转换成另外一种。
public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter
API差不多
示例:IntegerToStringDecoder
public class IntegerToStringDecoder extends MessageToMessageDecoder<Integer> {
@Override
public void decode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception {
out.add(String.valueOf(msg));
}
}
一个更贴切详细的例子是io.netty.handler.codec.http.HttpObjectAggregator
用 TooLongFrameException
防止资源耗尽:
public class SafeByteToMessageDecoder extends ByteToMessageDecoder {
private static final int MAX_FRAME_SIZE = 1024;
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int readable = in.readableBytes();
if (readable > MAX_FRAME_SIZE) {
in.skipBytes(readable);
throw new TooLongFrameException("Frame too big!");
}
// do something
}
}
10.3 Encoders
与解码器类似,Encoders
分为两种:
- 将消息编码成字节流。
- 将一种消息编码成另一种格式的消息。
10.3.1 MessageToByteEncoder抽象类
示例ShortToByteEncoder
:
public class ShortToByteEncoder extends MessageToByteEncoder<Short> {
@Override
public void encode(ChannelHandlerContext ctx, Short msg, ByteBuf out) throws Exception {
out.writeShort(msg);
}
}
更具体的应用实践可以参见io.netty.handler.codec.http.websocketx.WebSocket08FrameEncoder
10.4 编解码抽象类
既能encode
,又能decode
,二合一。
10.4.1 ByteToMessageCodec抽象类
Any request/response protocol could be a good candidate for using the
ByteToMessageCodec
. For example, in an SMTP implementation, the codec would read incoming bytes and decode them to a custom message type, saySmtpRequest
. On the receiving side, when a response is created, anSmtpResponse
will be produced, which will be encoded back to bytes for transmission.
10.4.2 MessageToMessageCodec抽象类
public abstract class MessageToMessageCodec<INBOUND_IN,OUTBOUND_IN>
public class WebSocketConvertHandler extends MessageToMessageCodec<WebSocketFrame, WebSocketConvertHandler.MyWebSocketFrame> {
@Override
protected void encode(ChannelHandlerContext ctx,
WebSocketConvertHandler.MyWebSocketFrame msg, List<Object> out)
throws Exception {
ByteBuf payload = msg.getData().duplicate().retain();
switch (msg.getType()) {
case BINARY:
out.add(new BinaryWebSocketFrame(payload));
break;
case TEXT:
out.add(new TextWebSocketFrame(payload));
break;
case CLOSE:
out.add(new CloseWebSocketFrame(true, 0, payload));
break;
case CONTINUATION:
out.add(new ContinuationWebSocketFrame(payload));
break;
case PONG:
out.add(new PongWebSocketFrame(payload));
break;
case PING:
out.add(new PingWebSocketFrame(payload));
break;
default:
throw new IllegalStateException("Unsupported websocket msg " + msg);
}
}
@Override
protected void decode(ChannelHandlerContext ctx, WebSocketFrame msg,
List<Object> out) throws Exception {
ByteBuf payload = msg.getData().duplicate().retain();
if (msg instanceof BinaryWebSocketFrame) {
out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.BINARY,
payload));
} else if (msg instanceof CloseWebSocketFrame) {
out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.CLOSE,
payload));
} else if (msg instanceof PingWebSocketFrame) {
out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.PING,
payload));
} else if (msg instanceof PongWebSocketFrame) {
out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.PONG,
payload));
} else if (msg instanceof TextWebSocketFrame) {
out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.TEXT,
payload));
} else if (msg instanceof ContinuationWebSocketFrame) {
out.add(new MyWebSocketFrame(
MyWebSocketFrame.FrameType.CONTINUATION, payload));
} else {
throw new IllegalStateException("Unsupported websocket msg " + msg);
}
}
public static final class MyWebSocketFrame {
private final FrameType type;
private final ByteBuf data;
public WebSocketFrame(FrameType type, ByteBuf data) {
this.type = type;
this.data = data;
}
public FrameType getType() {
return type;
}
public ByteBuf getData() {
return data;
}
public enum FrameType {BINARY,
CLOSE,
PING,
PONG,
TEXT,
CONTINUATION;
}
}
}
10.4.3 CombinedChannelDuplexHandler类
将编码器解码器放在一块影响代码的重用性。CombinedChannelDuplexHandler
可以解决这个问题。我们可以使用它而不直接使用codec抽象类。
方法签名:
public class CombinedChannelDuplexHandler <I extends ChannelInboundHandler, O extends ChannelOutboundHandler>
下面是一个使用范例:
解码器例子ByteToCharDecoder
功能是一次读取2个字节,解码成char
写到List
里
public class ByteToCharDecoder extends ByteToMessageDecoder {
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
while (in.readableBytes() >= 2) {
out.add(in.readChar());
}
}
}
编码器例子CharToByteEncoder
public class CharToByteEncoder extends MessageToByteEncoder<Character> {
@Override
public void encode(ChannelHandlerContext ctx, Character msg, ByteBuf out)
throws Exception {
out.writeChar(msg);
}
}
是时候combine
了:
public class CombinedByteCharCodec extends CombinedChannelDuplexHandler<ByteToCharDecoder, CharToByteEncoder> {
public CombinedByteCharCodec() {
super(new ByteToCharDecoder(), new CharToByteEncoder());
}
}