netty实战-自定义解码器处理半包消息

概述


在李林锋的Netty系列之Netty编解码框架分析中介绍了各种解码器,也推荐组合

LengthFieldBasedFrameDecoder
ByteToMessageDecoder

这两个解码器来处理业务消息。但是有时候为了灵活性,会直接选择继承

ByteToMessageDecoder

来处理业务消息,但是直接继承ByteToMessageDecoder,则需要自己处理半包问题。在李林锋的【netty权威指南】中,并没有描述如何自定义解码器来处理半包消息。下文会介绍这方面的知识。

在阅读本文内容之前,你至少需要了解以下两个知识点

1、 netty的ByteBuf类的基本api用法
2、什么是TCP半包

虽然JAVA NIO中也有个ByteBuffer类,但是在Netty程序中,基本都是直接用Netty的ByteBuf类,它包装了更多好用的接口,降低了使用缓冲区类的难度。

之前本人写过几篇关于处理半包消息的文章,读者也可以参看一下


自定义消息协议


目前自定义的消息协议用的最多的是在消息中头四个字节保存消息的长度,格式大概如下

这里写图片描述
len : 表示消息的长度,通常用4个字节保存
head : 消息头部
body : 消息内容

无论每次请求的业务数据多大,都是使用上面的消息格式来表示的。

注意

在实际的项目中,消息格式可能会增加一些标志,例如,开始标记,结束标志,消息序列号,消息的协议类型(json或者二进制等),这里为了描述的方便,就不讲附加的这些消息标志了。


自定义解码器处理半包数据


如上描述,直接继承ByteToMessageDecoder类,同时覆盖其decode方法,完整实现代码如下

服务端代码

package nettyinaction.encode.lengthfield.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class SocketServer {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup parentGroup = new NioEventLoopGroup();
        EventLoopGroup childGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(parentGroup, childGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new SocketServerInitializer());

            ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
            channelFuture.channel().closeFuture().sync();
        }
        finally {
            parentGroup.shutdownGracefully();
            childGroup.shutdownGracefully();
        }
    }
}





package nettyinaction.encode.lengthfield.server;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;


public class SocketServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new SelfDefineEncodeHandler());
        pipeline.addLast(new BusinessServerHandler());
    }
}







package nettyinaction.encode.lengthfield.server;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

public class SelfDefineEncodeHandler extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf bufferIn, List<Object> out) throws Exception {
        if (bufferIn.readableBytes() < 4) {
            return;
        }

        int beginIndex = bufferIn.readerIndex();
        int length = bufferIn.readInt();

        if (bufferIn.readableBytes() < length) {
            bufferIn.readerIndex(beginIndex);
            return;
        }

        bufferIn.readerIndex(beginIndex + 4 + length);

        ByteBuf otherByteBufRef = bufferIn.slice(beginIndex, 4 + length);

        otherByteBufRef.retain();

        out.add(otherByteBufRef);
    }
}





package nettyinaction.encode.lengthfield.server;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class BusinessServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf)msg;
        int length = buf.readInt();
        assert length == (8);

        byte[] head = new byte[4];
        buf.readBytes(head);
        String headString = new String(head);
        assert  "head".equals(headString);

        byte[] body = new byte[4];
        buf.readBytes(body);
        String bodyString = new String(body);
        assert  "body".equals(bodyString);
    }
}

客户端代码

package nettyinaction.encode.lengthfield.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class SocketClient {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .handler(new SocketClientInitializer());

            ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync();
            channelFuture.channel().closeFuture().sync();
        }
        finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}





package nettyinaction.encode.lengthfield.client;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;

public class SocketClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new SocketClientHandler());
    }
}





package nettyinaction.encode.lengthfield.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class SocketClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        UnpooledByteBufAllocator allocator = new UnpooledByteBufAllocator(false);
        ByteBuf buffer = allocator.buffer(20);
        buffer.writeInt(8);
        buffer.writeBytes("head".getBytes());
        buffer.writeBytes("body".getBytes());

        ctx.writeAndFlush(buffer);
    }
}

客户端一旦启动,会发送一条长度为8的消息到服务端,服务端首先使用SelfDefineEncodeHandler类对消息进行解码,处理半包问题。如果消息是有效的完整的消息,当SelfDefineEncodeHandler处理完消息后,会把消息转发给BusinessServerHandler处理,BusinessServerHandler只是简单的做个验证,判断消息内容是否符合预期。

运行上面的代码,代码如预期那样,可以正确的读取到消息并解析消息。

这个例子中,最为核心的类就是SelfDefineEncodeHandler了。里面用了很多的技巧,要理解里面的每行代码,需要分两种情况来分析,分别是拆包粘包

下面分别以拆包和粘包做两个小试验,来验证SelfDefineEncodeHandler是否能正常的处理半包问题。


拆包试验


先调整一下SocketClientHandler类中的channelActive方法中的代码,将body扩大几十倍,逼迫TCP发几次请求到达服务端,看看服务端的SelfDefineEncodeHandler能否正常处理。

UnpooledByteBufAllocator allocator = new UnpooledByteBufAllocator(false);
ByteBuf buffer = allocator.buffer(20);
buffer.writeInt(1604);
buffer.writeBytes("head".getBytes());
String longMsgBody = "";
for (int i = 0; i < 400; i++) {
    longMsgBody = longMsgBody + "body";
}
buffer.writeBytes(longMsgBody.getBytes());

 ctx.writeAndFlush(buffer);

使用一个for循环,将消息body的长度设置为1600,加上长度为4的head,总共消息长度为1604。

然后调整一下服务端类SelfDefineEncodeHandler类的代码,加上三行代码。
第一行代码是加入一个类变量count,统计一下decode方法的调用次数

private static int count = 0;

接着在decode方法中加入三行代码

System.out.println("decode call count="+ ++count);
System.out.println("bufferIn.readableBytes()="+bufferIn.readableBytes());
System.out.println("beginIndex="+beginIndex);

打印出count和bufferIn.readableBytes()的大小以及beginIndex

最后在BusinessServerHandler类加入

private static int count = 0;

成员变量以及在channelRead方法中加入

System.out.println("BusinessServerHandler call count="+ ++count);

运行代码,打印结果如下

decode call count=1
bufferIn.readableBytes()=1024
beginIndex=0

decode call count=2
bufferIn.readableBytes()=1608
beginIndex=0

BusinessServerHandler call count=1

这个结果说明了,虽然客户端只是发送了一条消息,但是其实TCP底层是分两个包发送给服务端,第一次发送了1024个字节,后面的一次请求,才把消息剩下的内容发送给服务端。

虽然decode方法被调用了两次,但是第一次读取到的信息不完整,因此ByteToMessageDecoder会静静的等待另外一个包的到来,第二次读取完整消息后,才把消息转发给BusinessServerHandler类,从打印的结果看,
BusinessServerHandler类的channelRead方法只被调用了一次。

到此我们知道SelfDefineEncodeHandler类的decode方法是可以应付拆包问题的,那到底是如何做到的呢?现在我们回头仔细看看decode方法中的代码。

第一部分代码

if (bufferIn.readableBytes() < 4) {
            return;
}

如果接收到的字节还不到4个字节,也即是连消息长度字段中的内容都不完整的,直接return。


第二部分代码

 int beginIndex = bufferIn.readerIndex();
 int length = bufferIn.readInt();

 if (bufferIn.readableBytes() < length) {
      bufferIn.readerIndex(beginIndex);
      return;
 }

对于拆包这种场景,由于还未读取到完整的消息,bufferIn.readableBytes() 会小于length,并重置bufferIn的readerIndex为0,然后退出,ByteToMessageDecoder会乖乖的等待下个包的到来。

由于第一次调用中readerIndex被重置为0,那么decode方法被调用第二次的时候,beginIndex还是为0的。


第三部分代码

bufferIn.readerIndex(beginIndex + 4 + length);

将readerIndex设置为最大。首先代码能执行到这里,针对拆包这种场景而言,已经是读取到一条有效完整的消息了。这个时候需要通知ByteToMessageDecoder类,bufferIn中的数据已经读取完毕了,不要再调用decode方法了。ByteToMessageDecoder类的底层会根据bufferIn.isReadable()方法来判断是否读取完毕。只有将readerIndex设置为最大,bufferIn.isReadable()方法才会返回false。


第四部分代码

ByteBuf otherByteBufRef = bufferIn.slice(beginIndex, 4 + length);
otherByteBufRef.retain();
out.add(otherByteBufRef);

当decode方法执行完后,会释放bufferIn这个缓冲区,如果将执行完释放操作的bufferIn传递给下个处理器的话,一旦下个处理器调用bufferIn的读或者写的方法时,会立刻报出IllegalReferenceCountException异常的。

因此slice操作后,必须加上一个retain操作,让bufferIn的引用计数器加1,这样ByteToMessageDecoder会刀下留人,先不释放bufferIn。


粘包试验


首先将SocketClientHandler类中的channelActive方法的实现改为

for (int i = 0; i < 20; i++) {
    UnpooledByteBufAllocator allocator = new UnpooledByteBufAllocator(false);
    ByteBuf buffer = allocator.buffer(20);
    buffer.writeInt(8);
    buffer.writeBytes("head".getBytes());
    buffer.writeBytes("body".getBytes());

    ctx.writeAndFlush(buffer);
}

客户端发起20个请求到服务器端。

接着注释掉SocketServerInitializer类中的

pipeline.addLast(new SelfDefineEncodeHandler());

代码,使请求不走SelfDefineEncodeHandler解码器。

运行代码,执行结果如下

BusinessServerHandler call count=1

说明客户端发送了粘包,服务端只接收到一次请求。现在把代码调整回来,走SelfDefineEncodeHandler解码器,运行代码,执行效果如下

decode call count=1
bufferIn.readableBytes()=240
beginIndex=0
BusinessServerHandler call count=1

decode call count=2
bufferIn.readableBytes()=228
beginIndex=12
BusinessServerHandler call count=2

decode call count=3
bufferIn.readableBytes()=216
beginIndex=24
BusinessServerHandler call count=3

decode call count=4
bufferIn.readableBytes()=204
beginIndex=36
BusinessServerHandler call count=4

decode call count=5
bufferIn.readableBytes()=192
beginIndex=48
BusinessServerHandler call count=5

decode call count=6
bufferIn.readableBytes()=180
beginIndex=60
BusinessServerHandler call count=6

decode call count=7
bufferIn.readableBytes()=168
beginIndex=72
BusinessServerHandler call count=7

decode call count=8
bufferIn.readableBytes()=156
beginIndex=84
BusinessServerHandler call count=8

decode call count=9
bufferIn.readableBytes()=144
beginIndex=96
BusinessServerHandler call count=9

decode call count=10
bufferIn.readableBytes()=132
beginIndex=108
BusinessServerHandler call count=10

decode call count=11
bufferIn.readableBytes()=120
beginIndex=120
BusinessServerHandler call count=11

decode call count=12
bufferIn.readableBytes()=108
beginIndex=132
BusinessServerHandler call count=12

decode call count=13
bufferIn.readableBytes()=96
beginIndex=144
BusinessServerHandler call count=13

decode call count=14
bufferIn.readableBytes()=84
beginIndex=156
BusinessServerHandler call count=14

decode call count=15
bufferIn.readableBytes()=72
beginIndex=168
BusinessServerHandler call count=15

decode call count=16
bufferIn.readableBytes()=60
beginIndex=180
BusinessServerHandler call count=16

decode call count=17
bufferIn.readableBytes()=48
beginIndex=192
BusinessServerHandler call count=17

decode call count=18
bufferIn.readableBytes()=36
beginIndex=204
BusinessServerHandler call count=18

decode call count=19
bufferIn.readableBytes()=24
beginIndex=216
BusinessServerHandler call count=19

decode call count=20
bufferIn.readableBytes()=12
beginIndex=228
BusinessServerHandler call count=20

结果符合预期,客户端发送20次,服务端BusinessServerHandler类的channelRead执行了20次。SelfDefineEncodeHandler类是如何做到这一点的呢?还是得回头仔细看看decode方法。

第一部分代码

if (bufferIn.readableBytes() < 4) {
            return;
}

如果接收到的字节还不到4个字节,也即是连消息长度字段中的内容都不完整的,直接return。


第二部分代码

 int beginIndex = bufferIn.readerIndex();
 int length = bufferIn.readInt();

 if (bufferIn.readableBytes() < length) {
      bufferIn.readerIndex(beginIndex);
      return;
 }

由于客户端发送了粘包,decode方法将会接收到一条聚合了多条业务消息的大消息,因此bufferIn.readableBytes()肯定大于length, bufferIn的readerIndex不会被重置。只是decode方法每被执行一次,beginIndex将会递增12,也即是(length+4)。


第三部分代码

bufferIn.readerIndex(beginIndex + 4 + length);

对于粘包这种场景,这行代码就不是表示将readerIndex升到最高,而是将readerIndex后移(length+4)位,让beginIndex递增(length+4)。


第四部分代码

ByteBuf otherByteBufRef = bufferIn.slice(beginIndex, 4 + length);
otherByteBufRef.retain();
out.add(otherByteBufRef);

slice操作,目的是从大消息中截取出一条有效的业务消息。


参考的文章

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,519评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,842评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,544评论 0 330
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,742评论 1 271
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,646评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,027评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,513评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,169评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,324评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,268评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,299评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,996评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,591评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,667评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,911评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,288评论 2 345
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,871评论 2 341

推荐阅读更多精彩内容