Netty in Action ——— The codec framework

本文是Netty文集中“Netty in action”系列的文章。主要是对Norman Maurer and Marvin Allen Wolfthal 的 《Netty in action》一书简要翻译,同时对重要点加上一些自己补充和扩展。

本章含盖

  • 解码器、编码器、编解码器综述
  • Netty 的编解码类

Netty提供可以简化各种协议的自定义编解码器创建的组件。

什么是编解码器?

每个网络应用都会定义端之间传输的二进制字节该如何被解析和转换,从发送端到目标程序的数据类型。这个转换逻辑通过编解码器来完成,编解码器包含了一个编码器和一个解码器,每个编解码器将一个字节流从一个格式转换为另一个格式。那么怎么区分它们了?
一个编码器转换消息为一个适当的格式用于传输(大部分情况下是一个字节流);对应的解码器转换网络流为一个程序的消息格式。一个编码器操作一个出站数据(outbound data),一个解码器处理一个入站数据(inbound data)。

Decoders

Decoder 实现了ChannelInboundHandler。
解码器类包含了两个不同的使用场景:

  • 解码字节到消息 —— ByteToMessageDecoder 和 ReplayingDecoder
  • 解码一种消息类型到另外一种消息类型 —— MessageToMessageDecoder
    因为解码器的责任是转换入站数据从一种格式到另一种格式,Netty的解码器实现了ChannelInboundHandler。
    因为Netty的ChannelPipeline的设计,你能够链接多个解码器去实现任意复杂的转换逻辑,这是Netty支持代码模块化和重用的一个很好的例子。
ByteToMessageDecoder 抽象类

由于你不知道远端是否会一次性发送一个完整的数据,ByteToMessageDecoder类缓存入站数据直到数据准备好可用于处理。



注意,decodeLast()方法是在当ByteBuf还有可读数据时,默认调用decode()方法。

    /**
     * Is called one last time when the {@link ChannelHandlerContext} goes in-active. Which means the
     * {@link #channelInactive(ChannelHandlerContext)} was triggered.
     *
     * By default this will just call {@link #decode(ChannelHandlerContext, ByteBuf, List)} but sub-classes may
     * override this for some special cleanup operation.
     */
    protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if (in.isReadable()) {
            // Only call decode() if there is something left in the buffer to decode.
            // See https://github.com/netty/netty/issues/4386
            decodeRemovalReentryProtection(ctx, in, out);
        }
    }

示例:


尽管ByteToMessageDecoder使得此模式实现简单,你可能发现有一点比较烦人,就是在调用readInt()前必须校验input ByteBuf是否有足够的数据。下一章我们将讨论ReplayingDecoder,一个特殊的解码器,它能够消除这个一步骤,只需要消耗很小的性能损耗。

编解码器中的引用计数
正如我们在第五章和第六章所提到的,引用计数是需要特别注意的。在编码器和解码器情况下,这个过程是相当简单的:一旦一个消息被编码或解码,它将自动被释放通过调用ReferenceCountUtil.release(message)。如果你需要持有该引用以便后面使用,你能调用ReferenceCountUtil.retain(message)。该调用会增加引用计数,防止消息被释放。

ReplayingDecoder 抽象类

ReplayingDecoder继承了ByteToMessageDecoder,并将我们从调用readableBytes()中解放。它实现这个通过使用一个自定义ByteBuf的实现(ReplayingDecoderBuffer)来封装入站ByteBuf。ReplayingDecoderBuffer在内部执行时调用。
public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder
参数S指定用于状态管理的类型,Void表示不使用状态管理。

示例:基于ReplayingDecoder来实现ToIntegerDecoder

👆int从ByteBuf中提取,然后加到List中。如果有效字节不足,readInt()方法的实现会抛出一个Error异常,该异常会被捕获并在基类中得到处理。decode()方法将再次被调用当更多的数据准备好读取时


请注意ReplayingDecoder的这些方面:

  • 不是所有的ByteBuf操作都支持。如果一个不支持的方法被调用了,那么将抛出一个UnsupportedOperationException异常。
  • ReplayingDecoder略慢与ByteToMessageDecoder
    在现实环境总,在复杂情况下是使用ByteToMessageDecoder还是ReplayingDecoder的区别是很大的。

更多关于解码器
下面的类处理更复杂的使用情况:

  • io.netty.handler.codec.LineBasedFrameDecoder —— 这个类用于Netty内部,使用'结束换行'控制字符( \n or \r\n )来解析消息数据
  • io.netty.handler.codec.http.HttpObjectDecoder —— 用于Http数据的解码器
MessageToMessageDecoder 抽象类

使用MessageToMessageDecoder进行消息格式间的转换。
public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter
参数 “I" 指定了输入消息的类型,作为decode的输入消息类型参数,decode()是你唯一需要实现的方法。

示例:

一个更复杂的例子,请看io.netty.handler.codec.http.HttpObjectAggregator,该类间接继承了MessageToMessageDecoder<HttpObject>.

TooLongFrameException 类

因为Netty是一个异步框架,你需要去缓存字节到内存中直到你能够去解析它。所以,你不应该允许你的解码器去缓存足够的数据来耗尽可用内存。为了解决这个共同关心的问题,Netty提供了一个TooLongFrameException,如果一个帧的大小超过了指定大小则抛出该异常。
为了避免内存被耗尽,你能够设置一个最大字节数的阈值,如果超过了这个阈值,将导致一个TooLongFrameException异常抛出( 并被ChannelHandler.exceptionCaught()捕获)。然后由解码器的用户来决定如果处理该异常。一些协议,例如HTTP,允许你返回一个特殊的响应。在其他情况下,唯一的选择可能就是关闭连接。

👆显示了如何使用TooLongFrameException来通知ChannelPipeline中的其他ChannelHandlers遇到一个超过帧大小的限度。注意,这种情况保护是特别重要的,如果你工作的协议具有任意帧大小。

Encoders

Encoder 实现了ChannelOutboundHandler。
Netty提供了一个集合的类来帮助你写支持如下功能的编码器:

  • 将一消息编码为字节
  • 将一个消息编码为另一个消息
MessageToByteEncoder 抽象类

你可能已经注意到,这个类只有一个方法,但decoder有两个。这是因为解码器经常需要产生一个最后消息在channel已经关闭前( 因此有了 decodeLast() 方法 )(注意,decodeLast会在channelInactive之前被调用)。我们清楚的知道编码器是没有这种情况的 —— 没有必要在连接断开后去产生一个消息。

示例:
MessageToMessageEncoder 抽象类

示例:

如果对特化的MessageToMessageEncoder感兴趣,可以查看io.netty.handler.codec.protobuf.ProtobufEncoder类

codec抽象类

Netty的codec抽象类,将一个编码器和解码器捆绑成一对用于同时管理入站和出站消息的转换。codec同时实现了ChannelInboundHandler 和 ChannelOutboundHandler。
为什么我们不是用这个复合类在所有时候,而是更倾向于将解码和编码分开了?因为将这两个功能分开,无论何时都能最大程度上来保持代码的重用性和可扩展性,这是Netty的一个基本理念。

ByteToMessageCodec 抽象类

ByteToMessageCodec 合并了ByteToMessageDecoder 和 MessageToByteEncoder。


任何 请求/响应 协议都适合使用ByteToMessageCodec。

MessageToMessageCodec 抽象类

public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN>

decode()方法转换一个INBOUND_IN消息为一个OUTBOUND_IN类型,encode()则相反。
INBOUND_IN消息作为写操作发送出去的类型,OUTBOUND_IN消息作为被应用处理的类型。

CombinedChannelDuplexHandler 类

如我们早前说的,合并一个解码器和一个编码器可能会对复用性造成影响。然后,这里提供了一个方式去避免这个损失且不用牺牲配置一个解码器和一个编码器为一个单元的便利性。使用CombinedChannelDuplexHandler来解决这个问题
public class CombinedChannelDuplexHandler<I extends ChannelInboundHandler, O extends ChannelOutboundHandler>
该类扮演一个包含有ChannelInboundHandler和一个ChannelOutboundHandler的容器。通过分别提供一个docoder类和一个encoder类,我们能够实现编解码器而不需要直接继承一个codec抽象类。
也就是说,CombinedChannelDuplexHandler使用组合的方式,复用已经存在的Decoder和Encoder来实现编解码器,这样就保持了代码的重用性和可扩展性。而如果是直接实现一个Codec抽象类的话,则是通过直接实现相关的encode、decode方法来实现编解码器,这使得程序失去了代码的重用性和可扩展性。

示例:



扩展

Q:ReplayingDecoder是如何做到,不用判断字节数是否足够就直接调用readXXX操作并能保证正确的逻辑了?
A:我们来简单看下ReplayingDecoder中的一些实现:



callDecode()是一个写循环实现,每次都会先记录ByteBuf in当前的读索引位置,然后将ByteBuf in封装成一个ReplayingDecoderByteBuf对象(这是一个特殊的ByteBuf实现,它重写了ByteBuf的各种readXXX、getXXX。这些方法在获取真实的数据前会先判断字节是否足够,如果不足够则会抛出一个Signal异常。)。然后将封装好的ReplayingDecoderByteBuf对象传递给decodeRemovalReentryProtection方法(decodeRemovalReentryProtection方法底层会调用decode()方法),这样一来当readXXX操作的时候数据不足的话就会抛出一个Signal异常。在catch{}语块中就会将ByteBuf的readerIndex重置为本次解码前的位置。
但也正是因为如此,在某些情况下ReplayingDecoder可能存在较差的性能。如,在网络很慢且消息格式较复杂的情况下。比如,有个一消息格式为:“消息头”+“消息体”两部分组成一个完整的消息包。我们需要根据消息头获取消息体数据长度以获取我们所需的数据。 但是了,因为网络比较慢的关系,我们读取到的ByteBuf可能不是一个完整的消息格式包(可能包含了消息头以及部分的消息体),本次decode就无法解析出一个消息包(但是我们已经成功解码处理消息头的数据了),那么就会在catch中将ByteBuf的readerIndex重置。那么下次decdoe的时候,又需要重新解析一次消息(即,消息头数据又需要重新进行一次解析)。如果依旧无法获取一个完整的消息包,那么前面的操作将再执行一次。。。
当然,我们也是有办法来解决这个问题的,那就是使用ReplayingDecoderByteBuf的checkpoint(T)方法来管理解码器的状态。嗯,这里举个java doc中的例子来说明checkpoint(T)的使用。

public enum MyDecoderState {
    READ_LENGTH,
    READ_CONTENT;
}

public class IntegerHeaderFrameDecoder extends ReplayingDecoder<MyDecoderState> {

    private int length;

   public IntegerHeaderFrameDecoder() {
     // Set the initial state.
     super(MyDecoderState.READ_LENGTH);
   }

    @Override
   protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
     switch (state()) {
     case READ_LENGTH:
       length = buf.readInt();
       checkpoint(MyDecoderState.READ_CONTENT);
     case READ_CONTENT:
       ByteBuf frame = buf.readBytes(length);
       checkpoint(MyDecoderState.READ_LENGTH);
       out.add(frame);
       break;
     default:
       throw new Error("Shouldn't reach here.");
     }
   }
 }

首先,我们根据自定协议声明好协议的各个状态。这里就是“READ_LENGTH”和“READ_CONTENT”两个状态。然后在IntegerHeaderFrameDecoder解码器的时候,设置初始状态为“MyDecoderState.READ_LENGTH”。在decode方法中,我们根据不同的状态来进行相应的操作:
一开始state为READ_LENGTH,则先进行消息头部分的数据获取,如果此时ByteBuf中的数据不足以获取到消息头的数据那么就会抛出一个Signal异常,由基类根据上面的逻辑进行readerIndex的重置;如果ByteBuf的数据足以获取到消息头,那么在获取到消息头的值后,执行『checkpoint(MyDecoderState.READ_CONTENT);』这步非常的重要,checkpoint方法会完成两个操作:① 将调用decode方法前记录的readerIndex初始值修改为当前ByteBuf的readerIndex值,② 将state状态修改为MyDecoderState.READ_CONTENT。然后继续state为MyDecoderState.READ_CONTENT情况的处理(注意,这里你会发现switch-case中没有break语句,所以流程会走到下一个状态)。这样一来,当ByteBuf中的数据不足以读取到完整的消息体的内容,基类在重置readerIndex的时候,不再是重置到读取消息头之前的位置了,而是重置到读取完消息头之后的位置。这样,当decode再次被调用时,我们就无需再解码一次消息头了,这时state()方法返回的值已经是MyDecoderState.READ_CONTENT(因为我们上面在解码完消息头后通过checkpoint方法设置了状态值为MyDecoderState.READ_CONTENT),流程也会从解码消息体开始继续进行。

后记

若文章有任何错误,望大家不吝指教:)

参考

《Netty in action》
圣思园《精通并发与Netty》

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,519评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,842评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,544评论 0 330
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,742评论 1 271
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,646评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,027评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,513评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,169评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,324评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,268评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,299评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,996评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,591评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,667评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,911评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,288评论 2 345
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,871评论 2 341

推荐阅读更多精彩内容