概念篇
1.Netty 是什么?
Netty 是一款基于 NIO(Nonblocking I/O,非阻塞IO)开发的网络通信框架,对比于 BIO(Blocking I/O,阻塞IO),他的并发性能得到了很大提高。难能可贵的是,在保证快速和易用性的同时,并没有丧失可维护性和性能等优势。
2.Netty 的特点是什么?
高并发:Netty 是一款基于 NIO(Nonblocking IO,非阻塞IO)开发的网络通信框架,对比于 BIO(Blocking I/O,阻塞IO),他的并发性能得到了很大提高。
传输快:Netty 的传输依赖于零拷贝特性,尽量减少不必要的内存拷贝,实现了更高效率的传输。
封装好:Netty 封装了 NIO 操作的很多细节,提供了易于使用调用接口。
3.什么是 Netty 的零拷贝?
Netty 的零拷贝主要包含三个方面:
1,Netty 的接收和发送 ByteBuffer 采用 DIRECT BUFFERS,使用堆外直接内存进行 Socket 读写,不需要进行字节缓冲区的二次拷贝。如果使用传统的堆内存(HEAP BUFFERS)进行 Socket 读写,JVM 会将堆内存 Buffer 拷贝一份到直接内存中,然后才写入 Socket 中。相比于堆外直接内存,消息在发送过程中多了一次缓冲区的内存拷贝。
2,Netty 提供了组合 Buffer 对象,可以聚合多个 ByteBuffer 对象,用户可以像操作一个 Buffer 那样方便的对组合 Buffer 进行操作,避免了传统通过内存拷贝的方式将几个小 Buffer 合并成一个大的 Buffer。
3,Netty 的文件传输采用了 transferTo 方法,它可以直接将文件缓冲区的数据发送到目标 Channel,避免了传统通过循环 write 方式导致的内存拷贝问题。
4.Netty 高性能表现在哪些方面?
IO 线程模型:同步非阻塞,用最少的资源做更多的事。
内存零拷贝:尽量减少不必要的内存拷贝,实现了更高效率的传输。
内存池设计:申请的内存可以重用,主要指直接内存。内部实现是用一颗二叉查找树管理内存分配情况。
串形化处理读写:避免使用锁带来的性能开销。
高性能序列化协议:支持 protobuf 等高性能序列化协议。
5.Netty 中有那种重要组件?
Channel:Netty 网络操作抽象类,它除了包括基本的 I/O 操作,如 bind、connect、read、write 等。
EventLoop:主要是配合 Channel 处理 I/O 操作,用来处理连接的生命周期中所发生的事情。
ChannelFuture:Netty 框架中所有的 I/O 操作都为异步的,因此我们需要 ChannelFuture 的 addListener()注册一个 ChannelFutureListener 监听事件,当操作执行成功或者失败时,监听就会自动触发返回结果。
ChannelHandler:充当了所有处理入站和出站数据的逻辑容器。ChannelHandler 主要用来处理各种事件,这里的事件很广泛,比如可以是连接、数据接收、异常、数据转换等。
ChannelPipeline:为 ChannelHandler 链提供了容器,当 channel 创建时,就会被自动分配到它专属的 ChannelPipeline,这个关联是永久性的。
6.Netty 发送消息有几种方式?
Netty 有两种发送消息的方式:
1,直接写入 Channel 中,消息从 ChannelPipeline 当中尾部开始移动;
2,写入和 ChannelHandler 绑定的 ChannelHandlerContext 中,消息从 ChannelPipeline 中的下一个 ChannelHandler 中移动。
7.默认情况 Netty 起多少线程?何时启动?
Netty 默认是 CPU 处理器数的两倍,bind 完之后启动。
8.Netty 支持哪些心跳类型设置?
readerIdleTime:为读超时时间(即测试端一定时间内未接受到被测试端消息)。
writerIdleTime:为写超时时间(即测试端一定时间内向被测试端发送消息)。
allIdleTime:所有类型的超时时间。
实战篇
使用netty实现一个在线聊天的功能
- Server
public class Server {
public static void main(String[] args) throws InterruptedException {
//ServerHandler sHandler = new ServerHandler();
InetSocketAddress localSocket = new InetSocketAddress("127.0.0.1", 9990);
ServerBootstrap serverBootstrap = new ServerBootstrap ();
serverBootstrap.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.localAddress(localSocket)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//添加译码器解码器
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new ServerHanlder());
}
});
final ChannelFuture future = serverBootstrap.bind().sync();
//future.channel().closeFuture().sync();
// future.channel().closeFuture().sync();
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
// TODO Auto-generated method stub
if (f.isSuccess()) {
System.out.println("服务器开启成功");
} else {
System.out.println("服务器开启失败");
f.cause().printStackTrace();
}
}
});
}
}
- ServerHanlder
public class ServerHanlder extends ChannelInboundHandlerAdapter {
private static final int MAX_CONN = 2;//指定最大连接数
private int connectNum = 0;//当前连接数
//channelHandlerContext表
private Vector<ChannelHandlerContext> contexts = new Vector<>(3);
// private Map<String ,String> connects = new
// private ChannelHandlerContext channelHandlerContext ;
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("new connection");
contexts.add(ctx);
//channelHandlerContext =ctx;
}
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// int currentIndex = contexts.indexOf(ctx);
// int anotherIndex = Math.abs(currentIndex - 1);
System.out.println("客户端:" + msg.toString());
InputStreamReader is = new InputStreamReader(System.in);
BufferedReader br = new BufferedReader(is);
String result = "result";
try {
result = br.readLine();
} catch (IOException e) {
e.printStackTrace();
}
//contexts.get(currentIndex).writeAndFlush(result);
ctx.write(result);//给客户端回复
ctx.flush();
}
}
- Client
public class Client1 {
public static void main(String[] args) throws InterruptedException {
//ServerHandler sHandler = new ServerHandler();
InetSocketAddress localSocket = new InetSocketAddress("127.0.0.1", 9990);
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.remoteAddress(localSocket)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//添加译码器解码器
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new Client1Hanlder());
}
});
ChannelFuture future = bootstrap.connect().sync();
// future.channel().writeAndFlush("这里是客户端,请求连接服务端!");
//future.channel().closeFuture().sync();
//final ChannelFuture future = bootstrap.connect("127.0.0.1",9990).sync();
// future.channel().closeFuture().sync();
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
// TODO Auto-generated method stub
if (f.isSuccess()) {
System.out.println("连接服务器成功");
} else {
System.out.println("连接服务器失败");
f.cause().printStackTrace();
}
}
});
}
}
- Client1Hanlder
public class Client1Hanlder extends ChannelInboundHandlerAdapter {
private ChannelHandlerContext chc = null;
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//保存当前ChannelHandlerContext
chc = ctx;
Scanner in = new Scanner(System.in);
ctx.writeAndFlush(Unpooled.copiedBuffer(in.toString(), CharsetUtil.UTF_8));
}
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("服务端: "+msg);
InputStreamReader is = new InputStreamReader(System.in);
BufferedReader br = new BufferedReader(is);
String result ="result";
try{
result = br.readLine();
}
catch(IOException e){
e.printStackTrace();
}
ctx.write(result);//给服务端回复
ctx.flush();
}
}