五.Selector

一.Selector是什么

  • Selector选择器类管理着一个被注册的通道集合的信息和它们的就绪状态。通道是和选择器一起被注册的,并且使用选择器来更新通道的就绪状态。
  • 一个通道可以被注册到多个Selector选择器上,但对每个选择器而言只能被注册一次。
  • SelectionKey:选择键封装了特定的通道与特定的Selector选择器的注册关系
connect:客户端连接服务端事件,对应值为SelectionKey.OP_CONNECT
accept:服务端接收客户端连接事件,对应值为SelectionKey.OP_ACCEPT
read:读事件,对应值为SelectionKey.OP_READ
write:写事件,对应值为SelectionKey.OP_WRITE

二.层次图

image.png
  • 1.Selector实现open方法,我们发现SelectorProvider类。SocketChannel、ServerSocketChannel和Selector的实例初始化都通过SelectorProvider类实现
  public static Selector open() throws IOException {  
        return SelectorProvider.provider().openSelector();  
    } 
  • 2.AbstractSelector取消的key放在一个set集合中,对集合进行添加操作时,必须同步取消key set集合。反注册选择key完成的实际工作是,将key从key对应的通道的选择key数组中移除。
private final Set<SelectionKey> cancelledKeys = new HashSet<SelectionKey>();
 void cancel(SelectionKey k) {                       // package-private
        synchronized (cancelledKeys) {
            cancelledKeys.add(k);
        }
    }
  • 3.SelectorImpl:
    • 其他线程获取选择器的就绪key和key集合,实际上返回的是key集合的代理publicKeys和就绪key集合的代理publicSelectedKeys。
protected Set selectedKeys;//已经操作事件准备就绪的选择key(为解决1.4bug而存在) 
protected HashSet keys;//与选择器关联的key集合(为解决1.4bug而存在)  
private Set publicKeys;//外部访问key集合的代理,真正使用的  
private Set publicSelectedKeys;//外部访问就绪key集合代理,真正使用的    ;
public Set<SelectionKey> selectedKeys() {...return this.publicSelectedKeys; }}
public Set<SelectionKey> keys() { ....return this.publicKeys;}
  • select方法(准备就绪事件数):委托给为lockAndDoSelect同步方法,获取key集合代理publicKeys和就绪key代理集合publicSelectedKeys,然后交给doSelect(long l)方法,这个方法为抽象方法,待子类扩展。
 public int select() throws IOException {
        return this.select(0L);
    }
  private int lockAndDoSelect(long var1//**超时时间**/) throws IOException {
        synchronized(this) {
            if(!this.isOpen()) {
                throw new ClosedSelectorException();
            } else {
                Set var4 = this.publicKeys;
                int var10000;
                synchronized(this.publicKeys) {
                    Set var5 = this.publicSelectedKeys;
                    synchronized(this.publicSelectedKeys) {
                        var10000 = this.doSelect(var1);
                    }
                }

                return var10000;
            }
        }
    }
 protected abstract int doSelect(long var1) throws IOException;
  • register方法:
protected final SelectionKey register(AbstractSelectableChannel var1, int var2, Object var3) {
        if(!(var1 instanceof SelChImpl)) {
            throw new IllegalSelectorException();
        } else {
            SelectionKeyImpl var4 = new SelectionKeyImpl((SelChImpl)var1, this);
           //设置key的附加物  
            var4.attach(var3);
            Set var5 = this.publicKeys;
            synchronized(this.publicKeys) {
            //完成实际的注册工作 
                this.implRegister(var4);
            }
            //设置key的兴趣事件集 ,var2是 SelectionKey关系值
            var4.interestOps(var2);
            return var4;
        }
    }
    //待子类实现  
   protected abstract void implRegister(SelectionKeyImpl selectionkeyimpl);

背景知识

1.FileDescriptor:文件描述符用于描述系统底层的特殊的结构句柄,可以被用来表示开放文件、开放
套接字等,包含 private int fd;//文件描述值 和private long handle;//初始化文件描述句柄
2.句柄:标识应用程序中的不同对象应用程序能够通过句柄访问相应的对象的信息,但是句柄不是指针,程序不能利用句柄来直接阅读文件中的信息。如果句柄不在IO文件中,它是毫无用处的。 句柄是Windows用来标志应用程序中建立的或是使用的唯一整数,大量使用了句柄来标识对象。

  • 4.WindowsSelectorImpl
    全局变量
    private final int INIT_CAP = 8;//选择key集合,key包装集合初始化容量  
    private static final int MAX_SELECTABLE_FDS = 1024;//最大选择key数量  
    private SelectionKeyImpl channelArray[];//选择器关联通道集合  
    private PollArrayWrapper pollWrapper;//存放所有文件描述对象(选择key,唤醒管道的源与sink通道)的集合  
   private final List threads = new ArrayList();//选择操作线程集合  
    private final FdMap fdMap = new FdMap();//存放选择key文件句柄与选择key映射关系的Map  
   private final SubSelector subSelector = new SubSelector();//子选择器  
    private int totalChannels;//注册到选择器的通道数量  
    private int threadsCount;//选择线程数  
    private final Pipe wakeupPipe = Pipe.open();//唤醒等待选择操的管道  
    private final int wakeupSourceFd;//唤醒管道源通道文件句柄
    private final int wakeupSinkFd;//唤醒管道sink通道文件句柄
  //四个同步锁
    private Object closeLock;//选择器关闭同步锁  
    private final Object interruptLock = new Object();//中断同步锁,在唤醒选择操作线程时,用于同步  
    private final StartLock startLock = new StartLock();//选择操作开始锁  
    private final FinishLock finishLock = new FinishLock();//选择操作结束锁  
  //初始化
 WindowsSelectorImpl(SelectorProvider var1) throws IOException {
        super(var1);
        this.wakeupSourceFd = ((SelChImpl)this.wakeupPipe.source()).getFDVal();//获取句柄
        SinkChannelImpl var2 = (SinkChannelImpl)this.wakeupPipe.sink();
        var2.sc.socket().setTcpNoDelay(true);
        this.wakeupSinkFd = var2.getFDVal();//获取句柄
        this.pollWrapper.addWakeupSocket(this.wakeupSourceFd, 0);
    }

fdMap:存放选择key文件句柄与选择key的HashMap

   private WindowsSelectorImpl.MapEntry put(SelectionKeyImpl var1) {
            return (WindowsSelectorImpl.MapEntry)this.put(new Integer(var1.channel.getFDVal()), new WindowsSelectorImpl.MapEntry(var1));
        }

pollWrapper: 存放选择key和通道及其相关兴趣事件到本地内存

 private static final short EVENT_OFFSET = 4;//兴趣事件开始位置  
 static short SIZE_POLLFD = 8;//句柄长度int(4)+兴趣事件(4) 
 void addWakeupSocket(int var1//索引, int var2//句柄) {
        this.putDescriptor(var2, var1);
        this.putEventOps(var2, 1);
    }
//将文件描述放在索引var2上
void putDescriptor(int var1, int var2) {
        this.pollArray.putInt(SIZE_POLLFD * var1 + 0, var2);
    }
//存放索引文件描述信息的兴趣操作事件 
void putEventOps(int var1, int var2) {
        this.pollArray.putShort(SIZE_POLLFD * var1 + 4, (short)var2);
    }

image.png

背景知识
在JDK1.5 update10和linux core2.6以上版本,sun优化了Selctor的实现,底层使用epoll替换了select/poll。在linux2.6(准确来说是2.5.44)由内核直接支持的方法。epoll解决了select和poll的缺点。epoll每次注册新的事件到epoll中,会把所有的fd(文件标识符)拷贝进内核,而不是在等待的时候重复拷贝,保证了每个fd在整个过程中只会拷贝1次。epoll它所支持的fd上限是最大可以打开文件的数目,具体数目可以cat /proc/sys/fs/file-max查看,一般来说这个数目和系统内存关系比较大。epoll在注册新的事件时,为每个fd指定一个回调函数,当设备就绪的时候,调用这个回调函数,这个回调函数就会把就绪的fd加入一个就绪表中。(所以epoll实际只需要遍历就绪表)。
参考:Linux下I/O多路复用select, poll, epoll 三种模型的Python使用

doSelect方法:其中 subSelector.poll() 是select的核心,由native函数poll0实现,SubSelector主要有两个方法以poll从pollWrapper拉取关注读写事件的选择key。每个SelectThread使用,SubSelector从当前注册到选择器的通道中选取SubSelector索引所对应的批次的通道已经就绪的通道并更新操作事件。整个选择过程有startLock和finishLock来控制。再有在一个选择操作的所有子选择线程执行完,才释放finishLock

 protected int doSelect(long var1) throws IOException {
       ......
       this.subSelector.poll();        
       .....
    }
 private int poll() throws IOException {
        return this.poll0(WindowsSelectorImpl.this.pollWrapper.pollArrayAddress, Math.min(WindowsSelectorImpl.this.totalChannels, 1024), this.readFds, this.writeFds, this.exceptFds, WindowsSelectorImpl.this.timeout);
 }
 private native int poll0(long var1, int var3, int[] var4, int[] var5, int[] var6, long var7);

SelectionKeyImpl保存注册时的channel、selector、event以及保存在pollWrapper的偏移位置index。

implRegister方法:首先同步关闭锁,以防在注册的过程中,选择器被关闭;检查选择器是否关闭,没有关闭,则检查是否扩容,需要则扩容为pollWrapper为原来的两倍;检查过后,添加选择key到选择器通道集合,设置key在选择器通道集合的索引,添加选择key到文件描述fdMap,添加key到key集合,将选择key添加到文件描述信息及关注操作事件包装集合pollWrapper,通道计数器自增。

protected void implRegister(SelectionKeyImpl selectionkeyimpl)  
{  
    //同步关闭锁,以防在注册的过程中,选择器被关闭  
    synchronized(closeLock)  
    {  
        if(pollWrapper == null)  
            //文件描述包装集合为null,即选器已关闭  
            throw new ClosedSelectorException();  
        growIfNeeded();//  
        channelArray[totalChannels] = selectionkeyimpl;//添加到选择器通道集合  
        selectionkeyimpl.setIndex(totalChannels);//设置key在选择器通道集合的索引  
        fdMap.put(selectionkeyimpl);//添加选择key到文件描述fdMap  
        keys.add(selectionkeyimpl);//添加key到key集合  
     //将选择key添加到文件描述信息及关注操作事件包装集合pollWrapper  
        pollWrapper.addEntry(totalChannels, selectionkeyimpl);  
        totalChannels++;//通道计数器自增  
    }  
}  
void addEntry(int var1, SelectionKeyImpl var2) {
      //epoll每次注册新的事件到epoll中,会把所有的fd(文件标识符)拷贝进内核,
        this.putDescriptor(var1, var2.channel.getFDVal());
    }

参考:
NIO源码分析
深入浅出NIO Socket实现机制

三.总结

  • Selector是通过implRegister方法把每次注册新的SelectionKeyImpl事件拷贝到pollWrapper内存数组中,通过doSelect()的native函数poll0()拉取读写就绪的SelectionKeyImpl事件,如果之前没有发生事件,程序就阻塞在select处,当然不会一直阻塞,因为epoll在timeout时间内如果没有事件,也会返回。
  • ServerSocketChannelImpl的初始化主要是初始化ServerSocket通道线程thread,地址绑定,接受连接同步锁,默认创建ServerSocketChannelImpl的状态为未初始化,文件描述和文件描述id,如果使用本地地址,则获取本地地址。bind首先检查ServerSocket是否关闭,是否绑定地址,如果既没有绑定也没关闭,则检查绑定的socketaddress是否正确或合法;然后通过Net工具类的bind(native)和listen(native),完成实际的ServerSocket地址绑定和开启监听,如果绑定是开启的参数小于1,则默认接受50个连接。accept方法主要是调用accept0(native)方法接受连接,并根据接受来接
    一旦有对应的事件发生,poll0方法就会返回。
  • SocketChannelImpl构造主要是初始化读写及状态锁和通道socket文件描述。
    connect连接方法首先同步读锁和写锁,确保socket通道打开,并没有连接;然后检查socket地址的正确性与合法性,然后检查当前线程是否有Connect方法的访问控制权限,最后尝试连接socket地址。从缓冲区读取字节序列写到通道write(ByteBuffer),首先确保通道打开,且输出流没有关闭,然后委托给IOUtil写字节序列;IOUtil写字节流过程为首先通过Util从当前线程的缓冲区获取可以容下字节序列的临时缓冲区(DirectByteBuffer),如果没有则创建一个DirectByteBuffer,将字节序列写到临时的DirectByteBuffer中,然后将写操作委托给nativedispatcher(SocketDispatcher),将DirectByteBuffer添加到当前线程的缓冲区, 以便重用,因为DirectByteBuffer实际上是存在物理内存中,频繁的分配将会消耗更多的资源。
    channel的源码笔者还未细看,有兴趣的可以看参考部分。
    参考:SocketChannelImpl 解析一(通道连接,发送数据)

四.实例(聊天室的实现)

public class TestNonBlockingNIO1 {

    //客户端
    @Test
    public void client() throws IOException{
        //1. 获取通道
        SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898));

        //2. 切换非阻塞模式
        sChannel.configureBlocking(false);

        //3. 分配缓冲区
        ByteBuffer buf = ByteBuffer.allocate(1024);

        //4. 发送数据给服务端
        Scanner scan = new Scanner(System.in);

        while(scan.hasNext()){
            String str = scan.next();

            buf.put((new Date().toString() + "\n" + str).getBytes());
            buf.flip();
            sChannel.write(buf);
            buf.clear();
        }

        //5. 关闭通道
        sChannel.close();
    }

    //服务端
    @Test
    public void server() throws IOException{
        //1. 获取通道
        ServerSocketChannel ssChannel = ServerSocketChannel.open();

        //2. 切换非阻塞模式
        ssChannel.configureBlocking(false);

        //3. 绑定端口号
        ssChannel.bind(new InetSocketAddress(9898));

        //4. 获取选择器
        Selector selector = Selector.open();

        //5. 将通道注册到选择器上, 并且指定“监听事件”
        ssChannel.register(selector, SelectionKey.OP_ACCEPT);

        //6. 轮询监听选择器上的“准备就绪”的事件
        while(selector.select() > 0){

            //7. 获取当前选择器上所有“准备就绪”的选择键(监听事件)
            Iterator<SelectionKey> it = selector.selectedKeys().iterator();

            while(it.hasNext()){
                //8. 获取当前准备就绪的选择键
                SelectionKey sk = it.next();

                //9. 判断具体是哪个事件“准备就绪”
                if(sk.isAcceptable()){
                    //10.若接收状态就绪,获取当前客户端的连接
                    SocketChannel sChannel = ssChannel.accept();

                    //11.切换非阻塞式
                    sChannel.configureBlocking(false);

                    //12.将该通道注册到选择器上
                    sChannel.register(selector, SelectionKey.OP_READ);
                }else if(sk.isReadable()){
                    //13.若“读就绪”,获取当前选择器上就绪状态的通道
                    SocketChannel sChannel = (SocketChannel) sk.channel();

                    //14.读取数据
                    ByteBuffer buf = ByteBuffer.allocate(1024);

                    int len = 0;
                    while((len = sChannel.read(buf)) > 0){
                        buf.flip();
                        System.out.println(new String(buf.array(), 0, len));
                        buf.clear();
                    }
                }

                //15.取消选择键
                it.remove();
            }
        }
    }
}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,830评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,992评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,875评论 0 331
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,837评论 1 271
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,734评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,091评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,550评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,217评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,368评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,298评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,350评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,027评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,623评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,706评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,940评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,349评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,936评论 2 341

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,566评论 18 139
  • 作者: 一字马胡 转载标志 【2017-11-24】 更新日志 一、Java OIO Java OIO (Jav...
    一字马胡阅读 1,333评论 0 12
  • Java NIO(New IO)是从Java 1.4版本开始引入的一个新的IO API,可以替代标准的Java I...
    JackChen1024阅读 7,521评论 1 143
  • NIO 操作系统背景知识 unix提供了5中io模型,其中java的底层实现依赖的是操作系统的io复用模型。lin...
    江江的大猪阅读 546评论 0 2
  • 生活中总有许多我们搞不清楚的事情,那么在别人忙着自己的事情顾不上我们的时候,你会大胆向他们询问吗? 如果是我来回答...
    一只正在成长的狮子阅读 231评论 1 2