netty极简教程(五):Netty的Reactor模型演进及JDK nio聊天室实现

介绍了jdk实现nio的关键Selector以及SelectableChannel,了解了它的原理,就明白了netty为什么是事件驱动模型:(netty极简教程(四):Selector事件驱动以及SocketChannel
的使用
,接下来将它的使用更深入一步, nio reactor模型演进以及聊天室的实现;


示例源码: https://github.com/jsbintask22/netty-learning

nio server

对于io消耗而言,我们知道提升效率的关键在于服务端对于io的使用;而nio压榨cpu的关键在于使用Selector实现的reactor事件模型以及多线程的加入时机:

单线程reactor模型

image

省略Selector以及ServerSocketChannel的获取注册; 将所有的操作至于reactor主线程

 while (true) {   // 1
    if (selector.select(1000) == 0) {   // 2
        continue;
    }

    Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();    // 3
    while (selectedKeys.hasNext()) {
        SelectionKey selectionKey = selectedKeys.next();
        SelectableChannel channel = selectionKey.channel();

        if (selectionKey.isAcceptable()) {    // 4
            ServerSocketChannel server = (ServerSocketChannel) channel;
            SocketChannel client = server.accept();
            client.configureBlocking(false);
            client.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(CLIENT_BUFFER_SIZE));
            String serverGlobalInfo = "系统消息:用户[" + client.getRemoteAddress() + "]上线了";
            System.err.println(serverGlobalInfo);

            forwardClientMsg(serverGlobalInfo, client);   //  5
        } else if (selectionKey.isReadable()) {

                SocketChannel client = (SocketChannel) channel;
                SocketAddress remoteAddress = null;
                try {
                    remoteAddress = client.getRemoteAddress();
                    String clientMsg = retrieveClientMsg(selectionKey);
                    if (clientMsg.equals("")) {
                        return;
                    }
                    System.err.println("收到用户[" + remoteAddress + "]消息:" + clientMsg);

                    forwardClientMsg("[" + remoteAddress + "]:" + clientMsg, client);   // 6
                } catch (Exception e) {
                    String msg = "系统消息:" + remoteAddress + "下线了";
                    forwardClientMsg(msg, client);            
                    System.err.println(msg);
                    selectionKey.cancel();    // 7
                    try {
                        client.close();
                    } catch (IOException ex) {
                        ex.printStackTrace();
                    }
                }
        }

        selectedKeys.remove();
    }
}
  1. 开启一个while循环,让Selector不断的询问操作系统是否有对应的事件已经准备好
  2. Selector检查事件(等待时间为1s),如果没有直接开启下一次循环
  3. 获取已经准备好的事件(SelectionKey),然后依次循环遍历处理
  4. 如果是Accept事件,说明是ServerSocketChannel注册的,说明新的连接已经建立好了,从中获取新的连接并将新连接再次注册到Selector
  5. 注册后,然后生成消息给其它Socket,表示有新用户上线了
  6. 如果是Read事件,说明客户端Socket有新的数据可读取,读取然后广播该消息到其它所有客户端
  7. 如果发生异常,表示该客户端断开连接了(粗略的处理),同样广播一条消息,并且将该Socket从Selector上注销

读取以及广播消息方法如下:

SocketChannel client = (SocketChannel) selectionKey.channel();
        ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
        int len = client.read(buffer);
        if (len == 0) {
            return "";
        }
        buffer.flip();
        byte[] data = new byte[buffer.remaining()];
        int index = 0;
        while (len != index) {
            data[index++] = buffer.get();
        }
        buffer.clear();
        return new String(data, StandardCharsets.UTF_8);

Set<SelectionKey> allClient = selector.keys();
allClient.forEach(selectionKey -> {
    SelectableChannel channel = selectionKey.channel();
    if (!(channel instanceof ServerSocketChannel) && channel != client) {  // 1
        SocketChannel otherClient = (SocketChannel) channel;
        try {
            otherClient.write(ByteBuffer.wrap(clientMsg.getBytes(StandardCharsets.UTF_8)));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
});

从Selector上获取所有注册的Channel然后遍历,如果不是ServerSocketChannel或者当前消息的Channel,就将消息发送出去.


以上,所有代码放在同一线程中,对于单核cpu而言,相比于bio的Socket编程,我们主要有一个方面的改进

  • 虽然accept方法依然是阻塞的,可是我们已经知道了肯定会有新的连接进来,所以调用改方法不会再阻塞而是直接获取一个新连接
  • 对于read方法而言同样如此,虽然该方法依然是一个阻塞的方法,可是我们已经知道了接下来调用必定会有有效数据,这样cpu不用再进行等待
  • 通过Selector在一个线程中便管理了多个Channel

而对于多核cpu而言,Selector虽然能够有效规避accept和read的无用等待时间,可是它依然存在一些问题;

  1. 上面的操作关键在于Selector的select操作,该方法必须能够快速循环调用,不宜和其它io读取写入放在一起
  2. channel的io(read和write)操作较为耗时,不宜放到同一线程中处理

多线程reactor模型

Reactor多线程模型

基于上面的单线程问题考虑,我们可以将io操作放入线程池中处理:

  1. 将accept事件的广播放入线程池中处理
  2. 将read事件的所有io操作放入线程池中处理
if (selectionKey.isAcceptable()) {
        ServerSocketChannel server = (ServerSocketChannel) channel;
        SocketChannel client = server.accept();
        client.configureBlocking(false);
        client.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(CLIENT_BUFFER_SIZE));
        String serverGlobalInfo = "系统消息:用户[" + client.getRemoteAddress() + "]上线了";
        System.err.println(serverGlobalInfo);

        executorService.submit(() -> {    // 1
            forwardClientMsg(serverGlobalInfo, client);
        });
    } else if (selectionKey.isReadable()) {

        executorService.submit(() -> {    // 2
            SocketChannel client = (SocketChannel) channel;
            SocketAddress remoteAddress = null;
            try {
                remoteAddress = client.getRemoteAddress();
                String clientMsg = retrieveClientMsg(selectionKey);
                if (clientMsg.equals("")) {
                    return;
                }
                System.err.println("收到用户[" + remoteAddress + "]消息:" + clientMsg);

                forwardClientMsg("[" + remoteAddress + "]:" + clientMsg, client);  
            } catch (Exception e) {
                String msg = "系统消息:" + remoteAddress + "下线了";
                forwardClientMsg(msg, client);
                System.err.println(msg);
                selectionKey.cancel();
                try {
                    client.close();
                } catch (IOException ex) {
                    ex.printStackTrace();
                }
            }
        });
    }

    selectedKeys.remove();
}

在 1与2处,我们加入了线程池处理,不再在reactor主线程中做任何io操作。 这便是reactor多线程模型


虽然模型2有效利用了多核cpu优势,可是依然能够找到瓶颈

  • 虽然广播消息是在一个独立线程中,可是我们需要将Selector上注册的所有的channel全部遍历,如果Selector注册了太多的channel,依旧会有效率问题
  • 因为Selector注册了过多的Channel,所以在进行select选取时对于主线程而言依旧会有很多的循环操作,存在瓶颈

基于以上问题,我们可以考虑引入多个Selector,这样主Selector只负责读取accept操作,而其他的io操作均有子Selector负责,这便是多Reactor多线程模型

多Reactor多线程模型

Reactor多线程模型

基于上面的思考,我们要在单Reactor多线程模型上主要需要以下操作

  1. 对于accept到的新连接不再放入主Selector,将其加入多个子Selector
  2. 子Selector操作应该在异步线程中进行.
  3. 所有子Selector只进行read write操作

基于以上,会增加一个子Selector列表,并且将原来的accept以及读取广播分开;
private List<Selector> subSelector = new ArrayList<>(8); 定义一个包含8个子selector的列表并进行初始化

image


如图,分别开启了一个reactor主线程,以及8个子selector子线程,其中,主线程现在只进行accept然后添加至子selector

 while (true) {
    if (mainSelector.select(1000) == 0) {
        continue;
    }

    Iterator<SelectionKey> selectedKeys = mainSelector.selectedKeys().iterator();
    while (selectedKeys.hasNext()) {
        SelectionKey selectionKey = selectedKeys.next();
        SelectableChannel channel = selectionKey.channel();

        if (selectionKey.isAcceptable()) {

            ServerSocketChannel server = (ServerSocketChannel) channel;
            SocketChannel client = server.accept();
            client.configureBlocking(false);
            client.register(subSelector.get(index++), SelectionKey.OP_READ,     // 1
                    ByteBuffer.allocate(CLIENT_BUFFER_SIZE));
            if (index == 8) {   // 2
                index = 0;
            }

            String serverGlobalInfo = "系统消息:用户[" + client.getRemoteAddress() + "]上线了";
            System.err.println(serverGlobalInfo);

            forwardClientMsg(serverGlobalInfo, client);
        }
    }

    selectedKeys.remove();
}
  1. 将新连接注册至从Selector.
  2. 如果当前的selector已经全部添加了一遍则重新从第一个开始

所有的从Selector只进行io操作,并且本身已经在异步线程中运行

while (true) {
    if (subSelector.select(1000) == 0) {
        continue;
    }

    Iterator<SelectionKey> selectedKeys = subSelector.selectedKeys().iterator();
    while (selectedKeys.hasNext()) {
        SelectionKey selectionKey = selectedKeys.next();
        SelectableChannel channel = selectionKey.channel();

        if (selectionKey.isReadable()) {
            SocketChannel client = (SocketChannel) channel;
            SocketAddress remoteAddress = null;
            try {
                remoteAddress = client.getRemoteAddress();
                String clientMsg = retrieveClientMsg(selectionKey);  // 1
                if (clientMsg.equals("")) {
                    return;
                }
                System.err.println("收到用户[" + remoteAddress + "]消息:" + clientMsg);
            
                forwardClientMsg("[" + remoteAddress + "]:" + clientMsg, client);  // 2
            } catch (Exception e) {
                String msg = "系统消息:" + remoteAddress + "下线了";
                forwardClientMsg(msg, client);
                System.err.println(msg);
                selectionKey.cancel();
                try {
                    client.close();
                } catch (IOException ex) {
                    ex.printStackTrace();
                }
            }
        }

        selectedKeys.remove();
    }
  1. 读取消息
  2. 广播消息
    启动server,并且打开三个客户端:


    image

    image

    image

    如图所示,上线通知,消息转发,下线通知成功, 主Selector与从Selector交互成功

netty线程模型思考

事实上,在netty的线程模型中,与上方的多Reactor多线程模型类似,一个改进版的多路复用多Reactor模型; Reactor主从线程模型

  1. 一个主线程不断轮询进行accept操作,将channel注册至子Selector
  2. 一个线程持有一个Selector
  3. 一个子Selector又可以管理多个channel
  4. 在断开连接前,一个channel总是在同一个线程中进行io操作处理

基于以上思考,我们将在后面在netty源码中进行一一验证。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342