零拷贝,NIO,Reactor和Netty

1. NIO

阅读本节前,请先阅读我的NIO基础文章:https://www.jianshu.com/nb/8788241

NIO是Java中的一种同步非阻塞IO,NIO是面向buffer的非阻塞IO。其中最重要的的三个核心概念是:Channel,Buffer和Selector。

Channel

Channel类似于BIO中的流,可以从中读取或者写入数据。但它和流有以下区别:

  1. Channel是双向的,既可以读又可以写,而流是单向的。
  2. Channel可以进行异步的读写。
  3. 对Channel的读写必须通过buffer对象。

在Java NIO中Channel主要有如下几种类型:

  • FileChannel:从文件读取数据的
  • DatagramChannel:读写UDP网络协议数据
  • SocketChannel:读写TCP网络协议数据
  • ServerSocketChannel:可以监听TCP连接

Buffer

Buffer是NIO中用于存放待读写数据的容器。数据总是从Channel中读取到Buffer中,或者从Buffer中写入到Channel中。

常见Buffer的实现类包括:ByteBuffer,CharBuffer,DoubleBuffer,FloatBuffer,IntBuffer,LongBuffer,ShortBuffer等。

其中ByteBuffer又包含两个实现类:HeapBuffer和MappedByteBuffer(以及其实现类DirectBuffer)。

HeapBuffer这种缓冲区是分配在堆上面的,而DirectBuffer则是直接指向了一块堆外直接内存。

零拷贝技术

下图展示了一个IO读写流程:

image.png
  1. DMAread读取磁盘文件内容到内核缓冲区
  2. 拷贝内核缓冲区数据到应用进程缓冲区(内核态和用户态的切换)
  3. 从应用进程缓冲区copy数据到socket缓冲区(内核态和用户态的切换)
  4. DMA copy给网卡发送

可以清楚得看到,上述IO流程包含两次用户态和内核态上下文切换,在高并发场景下,这些会很致命。因此,Linux提出了零拷贝的概念:即避免用户态和内核态的切换,直接在内核中进行数据传递。Linux提供了两个函数mmapsendfile来实现零拷贝:

  • mmap: 内存映射文件,即将文件的一段直接映射到内存,内核和应用进程共用同一块内存地址
  • sendfile: 从上图的内核缓冲区直接复制到socket缓冲区, 不需要向应用进程缓冲区拷贝

mmap

传统的IO操作都是在内核准备好数据后,将数据从内核中拷贝一份到用户空间中。而直接内存(mmap技术)将文件直接映射到内核空间的内存,返回一个操作地址(address),省去了内核空间拷贝到用户空间这一步操作。如下图所示:

mmap.PNG

在NIO中,MappedByteBuffer则对应着mmap技术。下面是MappedByteBuffer的使用例子:

FileChannel f1 = new FileInputStream(file1).getChannel();
// FileOutputStream打开的FileChannel只能写入
FileChannel f2 = new FileOutputStream(file2).getChannel();) 
// 将file1的数据全部映射成ByteBuffer
MappedByteBuffer mbb = f1.map(MapMode.READ_ONLY, 0, file.length());
// 将buffer里的数据写入到file2中
f2.write(mbb);
mbb.clear();

HeapBuffer的数据结构类似于:

public Class HeapBuffer {
    byte[] data;
    int position, limit, int capacity;
}

而DirectBuffer则直接指向一个内存地址:

public Class DirectBuffer {
    long address;
    int position, limit, int capacity;
}

当我们把一个Direct Buffer写入Channel的时候,就好比是“内核缓冲区”的内容直接写入了Channel,这样显然快了,减少了数据拷贝。而当我们把一个Heap Buffer写入Channel的时候,实际上底层实现会先构建一个临时的Direct Buffer,然后把Heap Buffer的内容复制到这个临时的Direct Buffer上,再把这个Direct Buffer写出去。当然,如果我们多次调用write方法,把一个Heap Buffer写入Channel,底层实现可以重复使用临时的Direct Buffer,这样不至于因为频繁地创建和销毁Direct Buffer影响性能。

Direct Buffer创建和销毁的代价很高,所以要用在尽可能重用的地方。 比如周期长传输文件大采用Direct Buffer,不然一般情况下就直接用heap buffer 就好。

sendfile

sendfile不存在内存映射, 同时保留了mmap的不需要来回拷贝优点,适用于应用进程不需要对读取的数据做任何处理的场景。如图:

sendfile.PNG

Java中Channel.transferTo(Channel destination)对应着sendfile技术。

Selector

Selector用于监听多个Channel的事件。

2. Reactor模型

Reactor模型中主要有三种角色:

  • Reactor:内部封装了一个selector,循环调用select方法获得就绪channel。然后将就绪channel dispatch给对应handler执行真的读写逻辑。
  • Acceptor:监听客户端连接,并为客户端的SocketChannel向Reactor注册对应的handler。
  • Handlers:真正执行非阻塞读/写任务逻辑。

Reactor模型从复杂程度又可以分为三种:单Reactor单线程模型,单Reactor多线程模型和多Reactor多线程模型。

2.1 单Reactor单线程模型

单Reactor单线程模型.PNG

下面是其实现:

/**
    * 等待事件到来,分发事件处理
    */
  class Reactor implements Runnable {
​
      private Reactor() throws Exception {
​
          SelectionKey sk =
                  serverSocket.register(selector,
                          SelectionKey.OP_ACCEPT);
          // attach Acceptor 处理新连接
          sk.attach(new Acceptor());
      }
​
      public void run() {
          try {
              while (!Thread.interrupted()) {
                  selector.select();
                  Set selected = selector.selectedKeys();
                  Iterator it = selected.iterator();
                  while (it.hasNext()) {
                      it.remove();
                      //分发事件处理
                      dispatch((SelectionKey) (it.next()));
                  }
              }
          } catch (IOException ex) {
              //do something
          }
      }
​
      void dispatch(SelectionKey k) {
          // 若是连接事件获取是acceptor
          // 若是IO读写事件获取是handler
          Runnable runnable = (Runnable) (k.attachment());
          if (runnable != null) {
              runnable.run();
          }
      }
​
  }
  /**
    * 连接事件就绪,处理连接事件
    */
  class Acceptor implements Runnable {
      @Override
      public void run() {
          try {
              SocketChannel c = serverSocket.accept();
              if (c != null) {// 注册读写
                  new Handler(c, selector);
              }
          } catch (Exception e) {
​
          }
      }
  }
  /**
    * 处理读写业务逻辑
    */
  class Handler implements Runnable {
      public static final int READING = 0, WRITING = 1;
      int state;
      final SocketChannel socket;
      final SelectionKey sk;
​
      public Handler(SocketChannel socket, Selector sl) throws Exception {
          this.state = READING;
          this.socket = socket;
          sk = socket.register(selector, SelectionKey.OP_READ);
          sk.attach(this);
          socket.configureBlocking(false);
      }
​
      @Override
      public void run() {
          if (state == READING) {
              read();
          } else if (state == WRITING) {
              write();
          }
      }
​
      private void read() {
          process();
          //下一步处理写事件
          sk.interestOps(SelectionKey.OP_WRITE);
          this.state = WRITING;
      }
​
      private void write() {
          process();
          //下一步处理读事件
          sk.interestOps(SelectionKey.OP_READ);
          this.state = READING;
      }
​
      /**
        * task 业务处理
        */
      public void process() {
          //do something
      }
  }

这是最基本的单Reactor单线程模型。其中Reactor线程,负责多路分离Socket,有新的客户端连接触发accept事件之后,Reactor交由Acceptor进行处理。当有IO读写事件就绪后则交给Hanlder 处理。

Acceptor主要任务就是构建Handler ,在获取到和client相关的SocketChannel之后 ,注册读写事件到Reactor(Selector)上,并绑定对应的Hanlder。对应的SocketChannel有读写事件之后,Reactor再交给对应的Hanlder进行处理。

2.2 单Reactor多线程模型

单Reactor多线程模型.PNG

单Reactor多线程模型,在单Reactor的基础上,增加了一个Worker线程池,用于Handler的执行。

/**
    * 多线程处理读写业务逻辑
    */
  class MultiThreadHandler implements Runnable {
      public static final int READING = 0, WRITING = 1;
      int state;
      final SocketChannel socket;
      final SelectionKey sk;
​
      //多线程处理业务逻辑
      ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
​
​
      public MultiThreadHandler(SocketChannel socket, Selector sl) throws Exception {
          this.state = READING;
          this.socket = socket;
          sk = socket.register(selector, SelectionKey.OP_READ);
          sk.attach(this);
          socket.configureBlocking(false);
      }
​
      @Override
      public void run() {
          if (state == READING) {
              read();
          } else if (state == WRITING) {
              write();
          }
      }
​
      private void read() {
          //任务异步处理
          executorService.submit(() -> process());
​
          //下一步处理写事件
          sk.interestOps(SelectionKey.OP_WRITE);
          this.state = WRITING;
      }
​
      private void write() {
          //任务异步处理
          executorService.submit(() -> process());
​
          //下一步处理读事件
          sk.interestOps(SelectionKey.OP_READ);
          this.state = READING;
      }
​
      /**
        * task 业务处理
        */
      public void process() {
          //do IO ,task,queue something
      }
  }

相对于第一种单线程的模式来说,在处理业务逻辑,也就是获取到IO的读写事件之后,交由线程池来处理,这样可以减小主reactor的性能开销,从而更专注的做事件分发工作了,从而提升整个应用的吞吐。

2.3 多Reactor多线程模式

多Reactor多线程模型.PNG

相比较于第二种模型,多Reactor多线程模式将Reactor分为两种:

  • MainReactor:负责监听socket连接,用来处理新连接的建立,将建立的socketChannel指定注册给SubReactor。
  • SubReactor:维护自己的selector, 基于MainReactor注册的socketChannel,监听读写就绪事件,读写就绪后将Handler扔给worker线程池来完成。
/**
    * 多work 连接事件Acceptor,处理连接事件
    */
  class MultiWorkThreadAcceptor implements Runnable {
​
      // cpu线程数相同多work线程
      int workCount =Runtime.getRuntime().availableProcessors();
      SubReactor[] workThreadHandlers = new SubReactor[workCount];
      volatile int nextHandler = 0;
​
      public MultiWorkThreadAcceptor() {
          this.init();
      }
​
      public void init() {
          nextHandler = 0;
          for (int i = 0; i < workThreadHandlers.length; i++) {
              try {
                  workThreadHandlers[i] = new SubReactor();
              } catch (Exception e) {
              }
​
          }
      }
​
      @Override
      public void run() {
          try {
              SocketChannel c = serverSocket.accept();
              if (c != null) {// 注册读写
                  synchronized (c) {
                      // 顺序获取SubReactor,然后注册channel 
                      SubReactor work = workThreadHandlers[nextHandler];
                      work.registerChannel(c);
                      nextHandler++;
                      if (nextHandler >= workThreadHandlers.length) {
                          nextHandler = 0;
                      }
                  }
              }
          } catch (Exception e) {
          }
      }
  }
  /**
    * 多work线程处理读写业务逻辑
    */
  class SubReactor implements Runnable {
      final Selector mySelector;
​
      //多线程处理业务逻辑
      int workCount =Runtime.getRuntime().availableProcessors();
      ExecutorService executorService = Executors.newFixedThreadPool(workCount);
​
​
      public SubReactor() throws Exception {
          // 每个SubReactor 一个selector 
          this.mySelector = SelectorProvider.provider().openSelector();
      }
​
      /**
        * 注册chanel
        *
        * @param sc
        * @throws Exception
        */
      public void registerChannel(SocketChannel sc) throws Exception {
          sc.register(mySelector, SelectionKey.OP_READ | SelectionKey.OP_CONNECT);
      }
​
      @Override
      public void run() {
          while (true) {
              try {
              //每个SubReactor 自己做事件分派处理读写事件
                  selector.select();
                  Set<SelectionKey> keys = selector.selectedKeys();
                  Iterator<SelectionKey> iterator = keys.iterator();
                  while (iterator.hasNext()) {
                      SelectionKey key = iterator.next();
                      iterator.remove();
                      if (key.isReadable()) {
                          read();
                      } else if (key.isWritable()) {
                          write();
                      }
                  }
​
              } catch (Exception e) {
​
              }
          }
      }
​
      private void read() {
          //任务异步处理
          executorService.submit(() -> process());
      }
​
      private void write() {
          //任务异步处理
          executorService.submit(() -> process());
      }
​
      /**
        * task 业务处理
        */
      public void process() {
          //do IO ,task,queue something
      }
  }

在多Reactor多线程模型中,MainReactor 主要是用来处理网络IO建立连接操作,而SubReactor则主要复杂监听IO就绪事件,分派任务执行。此种模型中,每个模块的工作更加专一,耦合度更低,性能和稳定性也大量的提升,支持的可并发客户端数量可达到上百万级别。

参考文章:https://juejin.im/post/5b4570cce51d451984695a9b

3. Netty

首先关于Netty的使用demo,请参考:https://www.jianshu.com/p/e58674eb4c7a

Netty的架构类似于多Reactor多线程模型,但是Netty默认不使用Worker线程池执行Handler,而是直接使用IO线程执行读写任务。下图是Netty的线程模型:

Netty.PNG

如图所示,Netty中包含两个NioEventLoopGroup,一个是boss,另一个是worker。boss负责监听网络连接,而worker负责分发读写事件。每一个NioEventLoopGroup都包含多个NioEventLoop,一个NioEventLoop本质上是一个包含了一个Selector的SingleThreadPool。

事实上,boss类型的NioEventLoopGroup通常只包含一个NioEventLoop。

每个boss NioEventLoop循环执行的任务包含3步:

  • 第1步:轮询accept事件;
  • 第2步:处理io任务,即accept事件,与client建立连接,生成NioSocketChannel,并将NioSocketChannel注册到某个worker NioEventLoop的selector上;
  • 第3步:处理任务队列中的任务,runAllTasks。任务队列中的任务包括用户调用eventloop.execute或schedule执行的任务,或者其它线程提交到该eventloop的任务。

每个worker NioEventLoop循环执行的任务包含3步:

  • 第1步:轮询read、write事件;
  • 第2步:处理io任务,即read、write事件,在NioSocketChannel可读、可写事件发生时进行处理;
  • 第3步:处理任务队列中的任务,runAllTasks。

Client端的Netty架构图如下:

client netty.PNG

client端启动时connect到server,建立NioSocketChannel,并注册到某个NioEventLoop的selector上。client端只包含1个NioEventLoopGroup,每个NioEventLoop循环执行的任务包含3步:

  • 第1步:轮询connect、read、write事件;
  • 第2步:处理io任务,即connect、read、write事件,在NioSocketChannel连接建立、可读、可写事件发生时进行处理;
  • 第3步:处理非io任务,runAllTasks。

3.1 Netty模式

下面是多Reactor的使用模式:

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)

bossGroup中只有一个线程(EventLoop),而workerGroup中的线程是 CPU 核心数乘以2, 因此对应的到 Reactor 线程模型中,我们知道,这样设置的 NioEventLoopGroup 其实就是多Reactor模型。

下面是单Reactor单线程的使用模式:

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup).channel(NioServerSocketChannel.class)

注意, 我们实例化了一个 NioEventLoopGroup,构造器参数是1,表示 NioEventLoopGroup 的线程池大小是1。然后接着我们调用 b.group(bossGroup) 设置了服务器端的 EventLoopGroup。此时bossGroup和workerGroup就是同一个NioEventLoopGroup,并且这个 NioEventLoopGroup只有一个线程(EventLoop),那么对应到Reactor的线程模型中,就相当于单Reactor单线程模型。

3.2 耗时任务

由于Netty中的EventLoop既要处理IO,又要执行Handler。因此需要使用特殊手段执行耗时任务。主要有两种方式:

  • Handler中加入自定义线程池
  • Pipeline中加入线程池

方法一:自定义线程池

public class ServerBusinessThreadPoolHandler extends SimpleChannelInboundHandler { 
    public static final ChannelHandler INSTANCE = new ServerBusinessThreadPoolHandler(); 
    private static ExecutorService threadPool = Executors.newFixedThreadPool(1000); 

    @Override 
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { 
        ByteBuf data = Unpooled.directBuffer(); 
        data.writeBytes(msg); 
        threadPool.submit(() -> { 
            try { 
                //耗时的操作 
               Thread.sleep(1 * 1000); 
            } catch (InterruptedException e) { 
                e.printStackTrace(); 
            } 
           Object result = getResult(data); 
           ctx.channel().writeAndFlush(result); 
         }); 
    } 
}

方法二:Pipeline中加入线程池

ServerBootstrap bootstrap = new ServerBootstrap();
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
bootstrap.group(boss, worker);
EventLoopGroup businessGroup = new NioEventLoopGroup(1000);  //大小为1000的线程池
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline p = socketChannel.pipeline();
        p.addLast(businessGroup, new NettyServerHandler()); // 添加NettyServerHandler,用来处理Server端接收和处理消息的逻辑
     }
});

如图,通过ChannelPipeline.addLast(EventExecutorGroup group, ChannelHandler handler)API为对应的Handler提供了优先选择的executor。如果直接ChannelPipeline.addLast(ChannelHandler handler)方法,那么Handler执行时默认使用对应的NioEventLoop来执行。而通过ChannelPipeline.addLast(EventExecutorGroup group, ChannelHandler handler)API Netty将使用给定的EventExecutorGroup来执行handler。

3.3 Netty避免线程切换

为了尽可能的提升性能,Netty在很多地方进行了无锁化设计,例如在IO线程内部进行串行操作,避免多线程竞争导致的性能下降问题。表面上看,串行化设计似乎CPU利用率不高,并发程度不够,但是,通过调整NIO线程池的线程参数,可以同时启动多个串行化的线程并行运行,这种局部无锁化的串行线程设计相比一个队列---多个工作线程的模型性能更优。

Netty的NioEventLoop读取到消息之后,直接调用ChannelPipeline的fireChannelRead(Object msg)。只要用户不主动切换线程,一直都是由NioEventLoop调用用户的Handler,期间不进行线程切换。这种串行处理方式避免了多线程操作导致的锁的竞争,从性能角度看是最优的。

4. tomcat

Tomcat也采用了Reactor模型的设计理念,如下图所示:

Tomcat.PNG
Tomcat模型.PNG

如图所示,Tomcat线程模型中包含四个关键角色:

  • Acceptor:负责处理Socket连接。获得SocketChannel对象,然后封装在一个tomcat的实现类org.apache.tomcat.util.net.NioChannel对象中。然后将NioChannel对象封装在一个PollerEvent对象中,并将PollerEvent对象压入Poller Event Queue里。
  • Poller:每一个Poller线程都维护了一个Selector对象,主要负责消费Event Queue中的数据,并注册到内部的Selector上,然后不断监听Socket读写就绪事件。读写就绪后,将就绪的SocketChannel传递给Worker线程池执行读写任务。
  • Poller Event Queue:存储PollerEvent对象的消息队列。注意,这个消息队列实际上存储在Poller中,Poller中包含了一个private final SynchronizedQueue<PollerEvent> events = new SynchronizedQueue<>();属性用于存储PollerEvent。并且Poller开放了void addEvent(PollerEvent event)方法,从而Acceptor能够将PollerEvent传递给Poller。
  • Worker:实际的IO读写线程。

多个Worker线程,有时候也叫IO线程,就是专门负责IO读写的。一种实现方式就是像Netty一样,每个Worker线程都有自己的Selector,可以负责多个连接的IO读写事件,每个连接归属于某个线程。另一种方式实现方式就是有专门的线程负责IO事件监听,这些线程有自己的Selector,一旦监听到有IO读写事件,并不是像第一种实现方式那样(自己去执行IO操作),而是将IO操作封装成一个Runnable交给Worker线程池来执行,这种情况每个连接可能会被多个线程同时操作,相比第一种并发性提高了,但是也可能引来多线程问题,在处理上要更加谨慎些。tomcat的NIO模型就是第二种。

参考文章:

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,457评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,837评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,696评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,183评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,057评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,105评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,520评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,211评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,482评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,574评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,353评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,897评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,489评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,683评论 2 335

推荐阅读更多精彩内容