在实际网络应用中,我们接收和发送的数据都是以实际应用数据类型为单位的(比如一个Http数据体,或者一个ThriftObject)。而对于Socket而言,它处理的是TCP传输层的数据,在它接收或发送的一个TCP包中,可能正好对应一个ThriftObject,或者多个ThriftObject、ThriftObject的一部分,甚至可能由多个ThriftObject的多个部分组成。这就是TCP的粘包半包问题。
Netty提供了一种机制可以帮助我们方便地处理TCP半包和粘包问题,它是通过嵌入ChannelHandler来实现的。
下面以一段简单的代码实例来看一下,如何在Netty中处理TCP粘包和拆包问题。可以看到代码中添加了类型为FixedLengthFrameDecoder
的ChannelHandler,添加这段代码的效果就是每次会截取定量长度为1024的字节数据作为下层ChannelHandler的input处理对象。
public void bind(int port) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//处理TCP半包粘包问题
ch.pipeline().addLast(new FixedLengthFrameDecoder(1024));
}
})
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new SimpleNettyServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
1. 源码分析
我们就以FixedLengthFrameDecoder
为例,它的实现非常简单,继承自类ByteToMessageDecoder
,并对父类的抽象方法decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
进行了实现。所以,要分析Netty对TCP粘包拆包的处理,核心逻辑在于ByteToMessageDecoder
。
public class FixedLengthFrameDecoder extends ByteToMessageDecoder{
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
{.....}
}
1.1 流程概述
4个核心方法:
ByteToMessageDecoder channelRead(..)
ByteToMessageDecoder callDecode(..)
ByteToMessageDecoder fireChannelRead(..)
FixedLengthFrameDecoder decode(..)
method | Class | 说明 |
---|---|---|
void channelRead( ChannelHandlerContext ctx, Object msg) | ByteToMessageDecoder | 入口方法,入参msg会是一个ByteBuf。主要过程就是整合一下组装成一个新的ByteBuf cumulation,然后对这个cumulation 进行callDecode |
void callDecode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out) | ByteToMessageDecoder | 对ByteBuf进行解码,解析成一组JavaObject到out中,会在该方法中调用实际的decode方法,并会调用fireChannelRead方法进行ChannelHandler的传递 |
void fireChannelRead( ChannelHandlerContext ctx, List<Object> msgs, int numElements) | ByteToMessageDecoder | 该方法会对解析生成的JavaObject进行下层ChannelHandler的传递 |
decode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out) | FixedLengthFrameDecoder | 实际进行decode的方法,会将ByteBuf按照固定长度进行拆分成一组Object |
1.2 channelRead方法分析
通过文章 Netty源码分析-ChannelPipeline 的分析,channelRead
方法的调用是在ChannelPipeline中,入参msg会是一个ByteBuf。
第一步:首先会实例化一个空的CodecOutputList
,用于存放一会将要解码生成的对象。
第二步:核心在于赋值cumulation,cumulation的类型也是ByteBuf,它与入参的msg会有什么不同呢?如果cumulation为null,会直接将msg的地址赋值给cumulation,否则会将cumulator.cumulate(ctx.alloc(), cumulation, data)
方法返回值赋值给cumulation,看一下方法描述“Cumulate the given {@link ByteBuf}s and return the {@link ByteBuf} that holds the cumulated bytes.
”,原来是将msg与cumulation进行一个merge。这就相当于将两个TCP包的数据进行了一个数据上的衔接。
第三步:调用callDecode
对cumulation进行解码。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
cumulation = data;
} else {
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Throwable t) {
throw new DecoderException(t);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
// We did enough reads already try to discard some bytes so we not risk to see a OOME.
// See https://github.com/netty/netty/issues/4275
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
}
1.3 callDecode方法分析
callDecode的核心流程就是对ByteBuf进行遍历,遍历的过程中不断调用decode方法解析出Object对象,并对解析出的对象执行fireChannelRead方法,保证Pipeline的往下传递。
代码注释中会对过程进行分析。
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
while (in.isReadable()) {
int outSize = out.size();
if (outSize > 0) {
//说明有解码好的对象,对这些对象进行Pipeline的往下传递
fireChannelRead(ctx, out, outSize);
out.clear();
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
int oldInputLength = in.readableBytes();
//真正调用实际decode方法进行解码的地方
decodeRemovalReentryProtection(ctx, in, out);
if (ctx.isRemoved()) {
break;
}
// 说明没有解码到新对象,这时候如果ByteBuf没有移动,说明此次ByteBuf内容不足以解码,会直接break。
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
//解码到了对象,但是却没有移动ByteBuf,说明有问题
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Throwable cause) {
throw new DecoderException(cause);
}
}
1.4 fireChannelRead方法分析
fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, int numElements)
方法的入参是已经解码后产生的List<Object> ,会遍历这些Object,分别调用ctx.fireChannelRead(final Object msg)
进行ChanelPipeline的往下传递。
下边也列出了ctx.fireChannelRead(final Object msg)
的代码实现,findContextInbound()会找到ctx的下一个AbstractChannelHandlerContext,将ChannelPipeline进行往后传递。
static void fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, int numElements) {
if (msgs instanceof CodecOutputList) {
fireChannelRead(ctx, (CodecOutputList) msgs, numElements);
} else {
for (int i = 0; i < numElements; i++) {
ctx.fireChannelRead(msgs.get(i));
}
}
}
abstract class AbstractChannelHandlerContext{
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(), msg);
return this;
}
}
1.5 decode方法分析
下面是类FixedLengthFrameDecoder的代码实现。可以看到非常简单:判断当前的ByteBuf长度够不够一个Frame的长度,如果不够不处理,否则会解码出一个Frame并添加至 List<Object> out 中,然后将ByteBuf指针前移frameLength长度。
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}
protected Object decode(
@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
if (in.readableBytes() < frameLength) {
return null;
} else {
return in.readRetainedSlice(frameLength);
}
}
2. 其他实现
上边的例子中,我们讲述了解决TCP粘包拆包的一个例子-分割成固定长度的Frame。在实际应用中,会根据应用层业务实体类型进行不同的decode解码,比如Http应用中需要解码出HttpRequest,thrift RPC调用中需要解码出ThriftObject等等。
实际Netty已经帮助使用者做了非常多的工作,像常用的HttpRequestDecoder可以帮助我们解码出Http对象,XmlDecoder可以帮助我们解码出XML对象,类似的还有json对象解码、WebSocket对象解码等等。
如果Netty已经提供的Decoder无法满足你的要求,你也可以实现自己的Decoder。过程非常简单,只需要继承ByteToMessageDecoder类并实现抽象方法decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
.