最近需要做一个长连接的设备管理,使用netty可以方便的做到,还可以配置心跳及解码器
自定义长度解码器
LengthFieldBasedFrameDecoder解码器自定义长度解决TCP粘包黏包问题。所以又称为: 自定义长度解码器
TCP粘包和黏包现象
TCP粘包是指发送方发送的若干个数据包到接收方时粘成一个包。从接收缓冲区来看,后一个包数据的头紧接着前一个数据的尾。
当TCP连接建立后,Client发送多个报文给Server,TCP协议保证数据可靠性,但无法保证Client发了n个包,服务端也按照n个包接收。Client端发送n个数据包,Server端可能收到n-1或n+1个包。
为什么出现粘包现象?
发送方原因: TCP默认会使用Nagle算法。而Nagle算法主要做两件事:1)只有上一个分组得到确认,才会发送下一个分组;2)收集多个小分组,在一个确认到来时一起发送。所以,正是Nagle算法造成了发送方有可能造成粘包现象。
接收方原因: TCP接收方采用缓存方式读取数据包,一次性读取多个缓存中的数据包。自然出现前一个数据包的尾和后一个收据包的头粘到一起。
如何解决粘包现象
就是要选择相应的解码器
- 添加特殊符号,接收方通过这个特殊符号将接收到的数据包拆分开 - DelimiterBasedFrameDecoder特殊分隔符解码器
- 每次发送固定长度的数据包 - FixedLengthFrameDecoder定长编码器
- 在消息头中定义长度字段,来标识消息的总长度 - LengthFieldBasedFrameDecoder自定义长度解码器
LengthFieldBasedFrameDecoder参数
自定义长度解码器,所以构造函数中6个参数,基本都围绕那个定义长度域,进行的描述。
- maxFrameLength - 发送的数据帧最大长度
- lengthFieldOffset - 定义长度域位于发送的字节数组中的下标。换句话说:发送的字节数组中下标为${lengthFieldOffset}的地方是长度域的开始地方
- lengthFieldLength - 用于描述定义的长度域的长度。换句话说:发送字节数组bytes时, 字节数组bytes[lengthFieldOffset, lengthFieldOffset+lengthFieldLength]域对应于的定义长度域部分
- lengthAdjustment - 满足公式: 发送的字节数组bytes.length - lengthFieldLength = bytes[lengthFieldOffset, lengthFieldOffset+lengthFieldLength] + lengthFieldOffset + lengthAdjustment
- initialBytesToStrip - 接收到的发送数据包,去除前initialBytesToStrip位
- failFast - true: 读取到长度域超过maxFrameLength,就抛出一个 TooLongFrameException。false: 只有真正读取完长度域的值表示的字节之后,才会抛出 TooLongFrameException,默认情况下设置为true,建议不要修改,否则可能会造成内存溢出
- ByteOrder - 数据存储采用大端模式或小端模式
举例解释参数如何写
客户端多次发送"HELLO, WORLD"字符串给服务端。"HELLO, WORLD"共12字节(12B)。长度域中的内容是16进制的值,如下:
0x000c -----> 12
0x000e -----> 14
场景1
数据包大小: 14B = 长度域2B + "HELLO, WORLD"
解释:
如上图,长度域的值为12B(0x000c)。希望解码后保持一样,根据上面的公式,参数应该为:
lengthFieldOffset = 0
lengthFieldLength = 2
lengthAdjustment = 0 = 数据包长度(14) - lengthFieldOffset - lengthFieldLength - 长度域的值(12)
initialBytesToStrip = 0 - 解码过程中,没有丢弃任何数据
场景2
数据包大小: 14B = 长度域2B + "HELLO, WORLD"
解释:
上图中,解码后,希望丢弃长度域2B字段,所以,只要initialBytesToStrip = 2即可。其他与场景1相同
lengthFieldOffset = 0
lengthFieldLength = 2
lengthAdjustment = 0 = 数据包长度(14) - lengthFieldOffset - lengthFieldLength - 长度域的值(12)
initialBytesToStrip = 2 解码过程中,丢弃2个字节的数据
场景3
数据包大小: 14B = 长度域2B + "HELLO, WORLD"。与场景1不同的是:场景3中长度域的值为14(0x000E)
解释:
如上图,长度域的值为14(0x000E)。希望解码后保持一样,根据上面的公式,参数应该为:
lengthFieldOffset = 0
lengthFieldLength = 2
lengthAdjustment = -2 = 数据包长度(14) - lengthFieldOffset - lengthFieldLength - 长度域的值(14)
initialBytesToStrip = 0 - 解码过程中,没有丢弃任何数据
场景4
场景4在长度域前添加2个字节的Header。长度域的值(0x00000C) = 12。总数据包长度: 17=Header(2B) + 长度域(3B) + "HELLO, WORLD"
解释
如上图。编码解码后,长度保持一致,所以initialBytesToStrip = 0。参数应该为:
lengthFieldOffset = 2
lengthFieldLength = 3
lengthAdjustment = 0 = 数据包长度(17) - lengthFieldOffset(2) - lengthFieldLength(3) - 长度域的值(12)
initialBytesToStrip = 0 - 解码过程中,没有丢弃任何数据
场景5
与场景4不同的地方是: Header与长度域的位置换了。总数据包长度: 17=长度域(3B) + Header(2B) + "HELLO, WORLD"
解释
如上图。编码解码后,长度保持一致,所以initialBytesToStrip = 0。参数应该为:
lengthFieldOffset = 0
lengthFieldLength = 3
lengthAdjustment = 2 = 数据包长度(17) - lengthFieldOffset(0) - lengthFieldLength(3) - 长度域的值(12)
initialBytesToStrip = 0 - 解码过程中,没有丢弃任何数据
场景6
如下图,"HELLO, WORLD"域前有多个字段。总数据长度: 16 = HEADER1(1) + 长度域(2) + HEADER2(1) + "HELLO, WORLD"
lengthFieldOffset = 1
lengthFieldLength = 2
lengthAdjustment = 1 = 数据包长度(16) - lengthFieldOffset(1) - lengthFieldLength(2) - 长度域的值(12)
initialBytesToStrip = 0 - 解码过程中,没有丢弃任何数据
自定义协议
很多时候并不能按照以上参数的方式去解析数据,所以需要自定义协议,LengthFieldBasedFrameDecoder解码器自定义协议.通常,协议的格式如下:
通常来说,使用
ByteToMessageDocoder
这个编码器,我们要分别解析出Header,length,body这几个字段.而使用LengthFieldBasedFrameDecoder
,我们就可以直接接收想要的一部分,相当于在原来的基础上包上了一层,有了这层之后,我们可以控制我们每次只要读想读的字段,这对于自定义协议来说十分方便.
- MyProtocolDecoder的定义
public class MyProtocolDecoder extends LengthFieldBasedFrameDecoder {
private static final int HEADER_SIZE = 6;
/**
*
* @param maxFrameLength 帧的最大长度
* @param lengthFieldOffset length字段偏移的地址
* @param lengthFieldLength length字段所占的字节长
* @param lengthAdjustment 修改帧数据长度字段中定义的值,可以为负数 因为有时候我们习惯把头部记入长度,若为负数,则说明要推后多少个字段
* @param initialBytesToStrip 解析时候跳过多少个长度
* @param failFast 为true,当frame长度超过maxFrameLength时立即报TooLongFrameException异常,为false,读取完整个帧再报异
*/
public MyProtocolDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
//在这里调用父类的方法,实现指得到想要的部分,我在这里全部都要,也可以只要body部分
in = (ByteBuf) super.decode(ctx,in);
if(in == null){
return null;
}
if(in.readableBytes()<HEADER_SIZE){
throw new Exception("字节数不足");
}
//读取type字段
byte type = in.readByte();
//读取flag字段
byte flag = in.readByte();
//读取length字段
int length = in.readInt();
if(in.readableBytes()!=length){
throw new Exception("标记的长度不符合实际长度");
}
//读取body
byte []bytes = new byte[in.readableBytes()];
in.readBytes(bytes);
return new MyProtocolBean(type,flag,length,new String(bytes,"UTF-8"));
}
}
在上述的代码中,调用父类的方法,实现截取到自己想要的字段,如可以判断数据必须以xx开头。
- 协议实体的定义
public class MyProtocolBean {
//类型 系统编号 0xA 表示A系统,0xB 表示B系统
private byte type;
//信息标志 0xA 表示心跳包 0xC 表示超时包 0xC 业务信息包
private byte flag;
//内容长度
private int length;
//内容
private String content;
public MyProtocolBean(byte flag, byte type, int length, String content) {
this.flag = flag;
this.type = type;
this.length = length;
this.content = content;
}
}
3.服务端的实现
public class Server {
private static final int MAX_FRAME_LENGTH = 1024 * 1024; //最大长度
private static final int LENGTH_FIELD_LENGTH = 4; //长度字段所占的字节数
private static final int LENGTH_FIELD_OFFSET = 2; //长度偏移
private static final int LENGTH_ADJUSTMENT = 0;
private static final int INITIAL_BYTES_TO_STRIP = 0;
private int port;
public Server(int port) {
this.port = port;
}
public void start(){
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap sbs = new ServerBootstrap().group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new MyProtocolDecoder(MAX_FRAME_LENGTH,LENGTH_FIELD_OFFSET,LENGTH_FIELD_LENGTH,LENGTH_ADJUSTMENT,INITIAL_BYTES_TO_STRIP,false));
ch.pipeline().addLast(new ServerHandler());
};
}).option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 绑定端口,开始接收进来的连接
ChannelFuture future = sbs.bind(port).sync();
System.out.println("Server start listen at " + port );
future.channel().closeFuture().sync();
} catch (Exception e) {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new Server(port).start();
}
}
- 服务端Hanlder
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
MyProtocolBean myProtocolBean = (MyProtocolBean)msg; //直接转化成协议消息实体
System.out.println(myProtocolBean.getContent());
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
}
}
服务端Handler没什么特别的地方,只是输出接收到的消息
- 客户端
public class Client {
static final String HOST = System.getProperty("host", "127.0.0.1");
static final int PORT = Integer.parseInt(System.getProperty("port", "8080"));
static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
public static void main(String[] args) throws Exception {
// Configure the client.
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new MyProtocolEncoder());
ch.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture future = b.connect(HOST, PORT).sync();
future.channel().writeAndFlush("Hello Netty Server ,I am a common client");
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
- 客户端Handler
public class ClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
MyProtocolBean myProtocolBean = new MyProtocolBean((byte)0xA, (byte)0xC, "Hello,Netty".length(), "Hello,Netty");
ctx.writeAndFlush(myProtocolBean);
}
}
客户端Handler实现发送消息.
- 客户端编码器
public class MyProtocolEncoder extends MessageToByteEncoder<MyProtocolBean> {
@Override
protected void encode(ChannelHandlerContext ctx, MyProtocolBean msg, ByteBuf out) throws Exception {
if(msg == null){
throw new Exception("msg is null");
}
out.writeByte(msg.getType());
out.writeByte(msg.getFlag());
out.writeInt(msg.getLength());
out.writeBytes(msg.getContent().getBytes(Charset.forName("UTF-8")));
}
}
编码的时候,只需要按照定义的顺序依次写入到ByteBuf中.
小结
若是上面的参数直接可以满足要求,可以直接使用参数,若不可以则通过自定义的方式去实现,更加灵活。