Selector
(选择器)是Java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为诸如读写事件做好准备的组件。这样,一个单独的线程可以管理多个channel,从而管理多个网络连接。
首先看一下Selector相关API:
1.selector 的创建:
Selector selector = Selector.open(); //SelectorProvider对象的openSelector方法来创建选择器
2.向selector注册通道:
channel.configureBlocking(false);
SelectionKey key = channel.register(selector,Selectionkey.OP_READ);
与Selector一起使用时,Channel必须处于非阻塞模式下。这意味着不能将FileChannel与Selector一起使用,因为FileChannel不能切换到非阻塞模式。而套接字通道都可以。
channel 可注册的事件集合 interest包含:
SelectionKey.OP_CONNECT (1<<3)
SelectionKey.OP_ACCEPT(1<<4)
SelectionKey.OP_READ(1<<0)
SelectionKey.OP_WRITE(1<<2)
使用二进制位表示的好处就是方便进行二进制运算,从而知道channel感兴趣的事件。
-
SelectionKey
:当向Selector注册Channel时,register()方法会返回一个SelectionKey对象。这个对象包含了一些你感兴趣的属性:
interest集合
:key关联的channel所选择的感兴趣的事件集合。可以通过SelectionKey读写interest集合:
int interestSet = selectionKey.interestOps();
boolean isInterestedInAccept = (interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
ready集合
:通道已经准备就绪的操作的集合。在一次选择(Selection)之后,你会首先访问这个ready set。
int readySet = selectionKey.readyOps();
Channel + Selector
:从SelectionKey访问Channel和Selector很简单
Channel channel = selectionKey.channel();
Selector selector = selectionKey.selector();
附加的对象(可选)
:可以将一个对象或者更多信息附着到SelectionKey上,这样就能方便的识别某个给定的通道。
//方法一
selectionKey.attach(theObject);
Object attachedObj = selectionKey.attachment();
//方法二
SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);
4.通过Selector返回已经准备就绪的那些通道:
select():阻塞到至少有一个通道在你注册的事件上就绪了。
select(long timeout):和select()一样,除了最长会阻塞timeout毫秒(参数)。
selectNow():不会阻塞,不管什么通道就绪都立刻返回
5.selectedKeys:返回准备就绪的keys
Set selectedKeys = selector.selectedKeys();
6.wakeup():使第一个selection但还没有返回的selector马上返回
某个线程调用select()方法后阻塞了,即使没有通道已经就绪,也有办法让其从select()方法返回。只要让其它线程在第一个线程调用select()方法的那个对象上调用Selector.wakeup()方法即可。阻塞在select()方法上的线程会立马返回。
如果有其它线程调用了wakeup()方法,但当前没有线程阻塞在select()方法上,下个调用select()方法的线程会立即“醒来(wake up)”。
7.close()
用完Selector后调用其close()方法会关闭该Selector,且使注册到该Selector上的所有SelectionKey实例无效。通道本身并不会关闭。
完整的selector使用:
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.socket().bind(new InetSocketAddress(port));
Selector selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while(true){
int n = selector.select();
if (n == 0) continue;
Iterator ite = this.selector.selectedKeys().iterator();
while(ite.hasNext()){
SelectionKey key = (SelectionKey)ite.next();
if (key.isAcceptable()){
SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept();
clntChan.configureBlocking(false);
//将选择器注册到连接到的客户端信道,
//并指定该信道key值的属性为OP_READ,
//同时为该信道指定关联的附件
clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufSize));
}
if (key.isReadable()){
handleRead(key);
}
if (key.isWritable() && key.isValid()){
handleWrite(key);
}
if (key.isConnectable()){
System.out.println("isConnectable = true");
}
ite.remove();
}
}
selector中维护的三个重要属性:
keys
:所有注册到Selector的Channel所表示的SelectionKey都会存在于该集合中。keys元素的添加会在Channel注册到Selector时发生。
selectedKeys
:该集合中的每个SelectionKey都是其对应的Channel在上一次操作selection期间被检查到至少有一种SelectionKey中所感兴趣的操作已经准备好被处理。该集合是keys的一个子集。
cancelledKeys
:执行了取消操作的SelectionKey会被放入到该集合中。该集合是keys的一个子集。
1.Selector构建源码分析:
Selector.open()
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}
SelectorProvider.provider()
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
① 如果配置了“java.nio.channels.spi.SelectorProvider”属性,则通过该属性值load对应的SelectorProvider对象,如果构建失败则抛异常。
② 如果provider类已经安装在了对系统类加载程序可见的jar包中,并且该jar包的源码目录META-INF/services包含有一个java.nio.channels.spi.SelectorProvider提供类配置文件,则取文件中第一个类名进行load以构建对应的SelectorProvider对象,如果构建失败则抛异常。
③ 如果上面两种情况都不存在,则返回系统默认的SelectorProvider,即,sun.nio.ch.DefaultSelectorProvider.create();
④ 随后在调用该方法,即SelectorProvider.provider()。则返回第一次调用的结果。
Linux下的sun.nio.ch.DefaultSelectorProvider为:
public class DefaultSelectorProvider {
/**
* Prevent instantiation.
*/
private DefaultSelectorProvider() { }
/**
* Returns the default SelectorProvider.
*/
public static SelectorProvider create() {
return new sun.nio.ch.EPollSelectorProvider();
}
}
其中sun.nio.ch.EPollSelectorProvider类型的SelectorProvider,这里对应于linux系统的epoll模式。
在得到sun.nio.ch.EPollSelectorProvider后调用openSelector()方法构建Selector,这里会构建一个EPollSelectorImpl对象。
class EPollSelectorImpl
extends SelectorImpl
{
// File descriptors used for interrupt
protected int fd0;
protected int fd1;
// The poll object
EPollArrayWrapper pollWrapper;
// Maps from file descriptors to keys
private Map<Integer,SelectionKeyImpl> fdToKey;
EPollSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
long pipeFds = IOUtil.makePipe(false);
fd0 = (int) (pipeFds >>> 32);
fd1 = (int) pipeFds;
try {
pollWrapper = new EPollArrayWrapper();
pollWrapper.initInterrupt(fd0, fd1);
fdToKey = new HashMap<>();
} catch (Throwable t) {
try {
FileDispatcherImpl.closeIntFD(fd0);
} catch (IOException ioe0) {
t.addSuppressed(ioe0);
}
try {
FileDispatcherImpl.closeIntFD(fd1);
} catch (IOException ioe1) {
t.addSuppressed(ioe1);
}
throw t;
}
}
EPollSelectorImpl构造函数完成:
① EPollArrayWrapper的构建,EpollArrayWapper将Linux的epoll相关系统调用封装成了native方法供EpollSelectorImpl使用。
② 通过EPollArrayWrapper向epoll注册中断事件
void initInterrupt(int fd0, int fd1) {
outgoingInterruptFD = fd1;
incomingInterruptFD = fd0;
epollCtl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);
}
③ fdToKey:构建文件描述符-SelectionKeyImpl映射表,所有注册到selector的channel对应的SelectionKey和与之对应的文件描述符都会放入到该映射表中。
EPollArrayWrapper
:完成了对epoll文件描述符的构建,以及对linux系统的epoll指令操纵的封装。维护每次selection操作的结果,即epoll_wait结果的epoll_event数组
。
typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;
struct epoll_event {
__uint32_t events;
epoll_data_t data;
};
epoll_event的数据成员(epoll_data_t data)包含有与通过epoll_ctl将文件描述符注册到epoll时设置的数据相同的数据。这里data.fd为我们注册的文件描述符。这样我们在处理事件的时候持有有效的文件描述符了。
// The fd of the epoll driver
private final int epfd;
// The epoll_event array for results from epoll_wait
private final AllocatedNativeObject pollArray;
// Base address of the epoll_event array
private final long pollArrayAddress;
// 用于存储已经注册的文件描述符和其注册等待改变的事件的关联关系。在epoll_wait操作就是要检测这里文件描述法注册的事件是否有发生。
private final byte[] eventsLow = new byte[MAX_UPDATE_ARRAY_SIZE];
private final Map<Integer,Byte> eventsHigh = new HashMap<>();
EPollArrayWrapper() throws IOException {
// creates the epoll file descriptor
epfd = epollCreate();
// the epoll_event array passed to epoll_wait
int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;
pollArray = new AllocatedNativeObject(allocationSize, true);
pollArrayAddress = pollArray.address();
}
EPoolArrayWrapper构造函数,创建了epoll文件描述符。构建了一个用于存放epoll_wait返回结果的epoll_event数组。
EPollArrayWrapper将Linux的epoll相关系统调用封装成了native方法供EpollSelectorImpl使用。
private native int epollCreate();
private native void epollCtl(int epfd, int opcode, int fd, int events);
private native int epollWait(long pollAddress, int numfds, long timeout, int epfd) throws IOException;
对应着Linux下epoll相关的三个系统调用
int epoll_create(int size)
epoll_create建立一个epoll对象。参数size是内核保证能够正确处理的最大句柄数,多于这个最大数时内核可不保证效果。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event event)
epoll_ctl可以操作epoll_create创建的epoll,如将socket句柄加入到epoll中让其监控,或把epoll正在监控的某个socket句柄移出epoll。
int epoll_wait(int epfd, struct epoll_event events,int maxevents, int timeout)
epoll_wait在调用时,在给定的timeout时间内,所监控的句柄中有事件发生时,就返回用户态的进程。
ServerSocketChannel的构建:
// Our file descriptor
private final FileDescriptor fd;
// fd value needed for dev/poll. This value will remain valid
// even after the value in the file descriptor object has been set to -1
private int fdVal;
ServerSocketChannelImpl(SelectorProvider sp) throws IOException {
super(sp);
this.fd = Net.serverSocket(true);
this.fdVal = IOUtil.fdVal(fd);
this.state = ST_INUSE;
}
将ServerSocketChannel注册到Selector
public final SelectionKey register(Selector sel, int ops,
Object att)
throws ClosedChannelException
{
synchronized (regLock) {
if (!isOpen())
throw new ClosedChannelException();
if ((ops & ~validOps()) != 0)
throw new IllegalArgumentException();
if (blocking)
throw new IllegalBlockingModeException();
SelectionKey k = findKey(sel);
if (k != null) {
k.interestOps(ops);
k.attach(att);
}
if (k == null) {
// New registration
synchronized (keyLock) {
if (!isOpen())
throw new ClosedChannelException();
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
}
return k;
}
}
protected final SelectionKey register(AbstractSelectableChannel ch,
int ops,
Object attachment)
{
if (!(ch instanceof SelChImpl))
throw new IllegalSelectorException();
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
k.attach(attachment);
synchronized (publicKeys) {
implRegister(k);
}
k.interestOps(ops);
return k;
}
EPollSelectorImpl. implRegister
protected void implRegister(SelectionKeyImpl ski) {
if (closed)
throw new ClosedSelectorException();
SelChImpl ch = ski.channel;
int fd = Integer.valueOf(ch.getFDVal());
fdToKey.put(fd, ski);
pollWrapper.add(fd);
keys.add(ski);
}
① 将channel对应的fd(文件描述符)和对应的SelectionKeyImpl放到fdToKey映射表中。
② 将channel对应的fd(文件描述符)添加到EPollArrayWrapper中,并强制初始化fd的事件为0 ( 强制初始更新事件为0,因为该事件可能存在于之前被取消过的注册中。)
③ 将selectionKey放入到keys集合中。
select()
int n = selector.select();
public int select() throws IOException {
return select(0);
}
//最终会调到EPollSelectorImpl.doSelect
protected int doSelect(long timeout) throws IOException {
if (closed)
throw new ClosedSelectorException();
processDeregisterQueue();
try {
begin();
pollWrapper.poll(timeout);
} finally {
end();
}
processDeregisterQueue();
int numKeysUpdated = updateSelectedKeys();
if (pollWrapper.interrupted()) {
// Clear the wakeup pipe
pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
synchronized (interruptLock) {
pollWrapper.clearInterrupted();
IOUtil.drain(fd0);
interruptTriggered = false;
}
}
return numKeysUpdated;
}
① 先处理注销的selectionKey队列
② 进行底层的epoll_wait操作
③ 再次对注销的selectionKey队列进行处理
④ 更新被选择的selectionKey
processDeregisterQueue():
void processDeregisterQueue() throws IOException {
Set var1 = this.cancelledKeys();
synchronized(var1) {
if (!var1.isEmpty()) {
Iterator var3 = var1.iterator();
while(var3.hasNext()) {
SelectionKeyImpl var4 = (SelectionKeyImpl)var3.next();
try {
this.implDereg(var4);
} catch (SocketException var12) {
IOException var6 = new IOException("Error deregistering key");
var6.initCause(var12);
throw var6;
} finally {
var3.remove();
}
}
}
}
}
执行processDeregisterQueue()后cancelledKeys集合会为空
protected void implDereg(SelectionKeyImpl ski) throws IOException {
assert (ski.getIndex() >= 0);
SelChImpl ch = ski.channel;
int fd = ch.getFDVal();
fdToKey.remove(Integer.valueOf(fd));
pollWrapper.remove(fd);
ski.setIndex(-1);
keys.remove(ski);
selectedKeys.remove(ski);
deregister((AbstractSelectionKey)ski);
SelectableChannel selch = ski.channel();
if (!selch.isOpen() && !selch.isRegistered())
((SelChImpl)selch).kill();
}
注销会完成下面的操作:
① 将已经注销的selectionKey从fdToKey( 文件描述与SelectionKeyImpl的映射表 )中移除
② 将selectionKey所代表的channel的文件描述符从EPollArrayWrapper中移除
③ 将selectionKey从keys集合中移除,这样下次selector.select()就不会再将该selectionKey注册到epoll中监听
④ 也会将selectionKey从对应的channel中注销
⑤ 最后如果对应的channel已经关闭并且没有注册其他的selector了,则将该channel关闭
完成上面的的操作后,注销的SelectionKey就不会出现先在keys、selectedKeys以及cancelKeys这3个集合中的任何一个。
int poll(long timeout) throws IOException {
updateRegistrations();
updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
for (int i=0; i<updated; i++) {
if (getDescriptor(i) == incomingInterruptFD) {
interruptedIndex = i;
interrupted = true;
break;
}
}
return updated;
}
updateRegistrations()方法会将已经注册到该selector的事件(eventsLow或eventsHigh)通过调用epollCtl(epfd, opcode, fd, events); 注册到linux系统中。
这里epollWait就会调用linux底层的epoll_wait方法,并返回在epoll_wait期间有事件触发的entry的个数
updateSelectedKeys():
private int updateSelectedKeys() {
int entries = pollWrapper.updated;
int numKeysUpdated = 0;
for (int i=0; i<entries; i++) {
int nextFD = pollWrapper.getDescriptor(i);
SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
// ski is null in the case of an interrupt
if (ski != null) {
int rOps = pollWrapper.getEventOps(i);
if (selectedKeys.contains(ski)) {
if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
numKeysUpdated++;
}
} else {
ski.channel.translateAndSetReadyOps(rOps, ski);
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
selectedKeys.add(ski);
numKeysUpdated++;
}
}
}
}
return numKeysUpdated;
}
该方法会从通过EPollArrayWrapper pollWrapper 以及 fdToKey( 构建文件描述符-SelectorKeyImpl映射表 )来获取有事件触发的SelectionKeyImpl对象,然后将SelectionKeyImpl放到selectedKey集合( 有事件触发的selectionKey集合,可以通过selector.selectedKeys()方法获得 )中,即selectedKeys。并重新设置SelectionKeyImpl中相关的readyOps值。
但是,这里要注意两点:
① 如果SelectionKeyImpl已经存在于selectedKeys集合中,并且发现触发的事件已经存在于readyOps中了,则不会使numKeysUpdated++;这样会使得我们无法得知该事件的变化。
上面这点说明了为什么我们要在每次从selectedKey中获取到Selectionkey后,将其从selectedKey集合移除,就是为了当有事件触发使selectionKey能正确到放入selectedKey集合中,并正确的通知给调用者。
② 如果发现channel所发生I/O事件不是当前SelectionKey所感兴趣,则不会将SelectionKeyImpl放入selectedKeys集合中,也不会使numKeysUpdated++
epoll的实现:
epoll初始化时,会向内核注册一个文件系统,用于存储被监控的句柄文件,调用epoll_create时,会在这个文件系统中创建一个file节点。同时epoll会开辟自己的内核高速缓存区,以红黑树的结构保存句柄,以支持快速的查找、插入、删除。还会再建立一个list链表,用于存储准备就绪的事件。
当执行epoll_ctl时,除了把socket句柄放到epoll文件系统里file对象对应的红黑树上之外,还会给内核中断处理程序注册一个回调函数,告诉内核,如果这个句柄的中断到了,就把它放到准备就绪list链表里。所以,当一个socket上有数据到了,内核在把网卡上的数据copy到内核中后,就把socket插入到就绪链表里。
当epoll_wait调用时,仅仅观察就绪链表里有没有数据,如果有数据就返回,否则就sleep,超时时立刻返回。
epoll的两种工作模式:
LT:level-trigger,水平触发模式,只要某个socket处于readable/writable状态,无论什么时候进行epoll_wait都会返回该socket。
ET:edge-trigger,边缘触发模式,只有某个socket从unreadable变为readable或从unwritable变为writable时,epoll_wait才会返回该socket。
在Linux系统中JDK NIO使用的是 LT ,而Netty epoll使用的是 ET。ET模式可以提供一些参数选择,如 TCP_CORK, SO_REUSEPORT等。
ps:wakeup()如何实现这次调用 ,下次select可以马上返回,是因为Linux中会创建一个pipe,一头读,一头写,wakeup会往通道中写入一个字节,所以下次调用可以马上返回。
参考资料:
http://ifeve.com/selectors/
https://www.jianshu.com/p/0d497fe5484a
http://www.importnew.com/26258.html
https://stackoverflow.com/questions/23465401/why-native-epoll-support-is-introduced-in-netty
http://senlinzhan.github.io/2017/02/10/Linux%E7%9A%84TCP-CORK/
http://jm.taobao.org/2010/10/22/380/
http://goldendoc.iteye.com/blog/1152079