简书 杭州_mina
《tomcat 8.x NioEndpoint核心组件浅析1》
《tomcat 8.x NioEndpoint之Acceptor组件浅析2》
1.poller 浅析
Poller实现了Runnable它在NIoEndpoint的startInternal中被启动来回顾一下startInternal中的实现
// Start poller threads
//默认创建2两个poller对象
pollers = new Poller[getPollerThreadCount()];
for (int i=0; i<pollers.length; i++) {
pollers[i] = new Poller();
Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
pollerThread.setPriority(threadPriority);
//设置守护线程
pollerThread.setDaemon(true);
//启动poller对象
pollerThread.start();
}
我们主要来看poller中run方法。
/**
* The background thread that adds sockets to the Poller, checks the
* poller for triggered events and hands the associated socket off to an
* appropriate processor as events occur.
*/
@Override
public void run() {
// Loop until destroy() is called
while (true) {
boolean hasEvents = false;
try {
//链接没有关闭
if (!close) {
//处理队列中所有的PollerEvent对象,如果成功返回true
//这个run方法之后我会讲一下events方法
hasEvents = events();
//wakeupCounter主要用来唤醒selector,使得阻塞的线程立刻返回
if (wakeupCounter.getAndSet(-1) > 0) {
//if we are here, means we have other stuff to do
//do a non blocking select
keyCount = selector.selectNow();
} else {
keyCount = selector.select(selectorTimeout);
}
wakeupCounter.set(0);
}
if (close) {
events();
timeout(0, false);
try {
selector.close();
} catch (IOException ioe) {
log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
}
break;
}
} catch (Throwable x) {
ExceptionUtils.handleThrowable(x);
log.error("",x);
continue;
}
//either we timed out or we woke up, process events first
//当没有select出来等于0 处理队列中的事件
if ( keyCount == 0 ) hasEvents = (hasEvents | events());
//select出来所有key然后进行迭代,如果key不等于空处理NioSocketWrapper
Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
// Walk through the collection of ready keys and dispatch
// any active event.
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
// Attachment may be null if another thread has called
// cancelledKey()
if (attachment == null) {
iterator.remove();
} else {
iterator.remove();
//处理NioSocketWrapper
processKey(sk, attachment);
}
}//while
//process timeouts
timeout(keyCount,hasEvents);
}//while
getStopLatch().countDown();
}
我们先来看一下event这个方法
/**
* Processes events in the event queue of the Poller.
*
* @return <code>true</code> if some events were processed,
* <code>false</code> if queue was empty
*/
public boolean events() {
boolean result = false;
PollerEvent pe = null;
//循环队列中的所有事件
while ( (pe = events.poll()) != null ) {
result = true;
try {
//执行事件
pe.run();
//重置PollerEvent对象以达到对象复用的效果
//好处就是减少对象的重复创建和GC
pe.reset();
if (running && !paused) {
eventCache.push(pe);
}
} catch ( Throwable x ) {
log.error("",x);
}
}
return result;
}
@Override
public void run() {
//如果有连接就绪
if (interestOps == OP_REGISTER) {
try {
//注册OP_READ事件
socket.getIOChannel().register(
socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper);
} catch (Exception x) {
log.error(sm.getString("endpoint.nio.registerFail"), x);
}
} else {
final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
try {
if (key == null) {
// The key was cancelled (e.g. due to socket closure)
// and removed from the selector while it was being
// processed. Count down the connections at this point
// since it won't have been counted down when the socket
// closed.
// 如果key等于连接数直接减一.
socket.socketWrapper.getEndpoint().countDownConnection();
} else {
final NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment();
if (socketWrapper != null) {
//we are registering the key to start with, reset the fairness counter.
int ops = key.interestOps() | interestOps;
socketWrapper.interestOps(ops);
key.interestOps(ops);
} else {
socket.getPoller().cancelledKey(key);
}
}
} catch (CancelledKeyException ckx) {
try {
socket.getPoller().cancelledKey(key);
} catch (Exception ignore) {}
}
}
}
继续看processKey方法
protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
try {
if ( close ) {
//取消key
cancelledKey(sk);
} else if ( sk.isValid() && attachment != null ) {
//如果有读写事件
if (sk.isReadable() || sk.isWritable() ) {
if ( attachment.getSendfileData() != null ) {
processSendfile(sk,attachment, false);
} else {
//根据已准备的事件从感兴趣的事件移除
unreg(sk, attachment, sk.readyOps());
boolean closeSocket = false;
// Read goes before write
//可读
if (sk.isReadable()) {
if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
closeSocket = true;
}
}
//可写
if (!closeSocket && sk.isWritable()) {
//处理socket下面浅析processSocket方法
if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
closeSocket = true;
}
}
//关闭socket
if (closeSocket) {
//取消注册
cancelledKey(sk);
}
}
}
} else {
//invalid key
cancelledKey(sk);
}
} catch ( CancelledKeyException ckx ) {
cancelledKey(sk);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error("",t);
}
}
来啊!!! 继续啊死磕processSocket方法
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
SocketEvent event, boolean dispatch) {
try {
if (socketWrapper == null) {
return false;
}
//从缓存中获取一个processor
//processorCache其实也是和niochannels 一样使用对象复用技术
SocketProcessorBase<S> sc = processorCache.pop();
if (sc == null) {
//没有就创建一个
sc = createSocketProcessor(socketWrapper, event);
} else {
//有了就是重置
sc.reset(socketWrapper, event);
}
//来了 这里就是使用Executor了 对应tomcat io模型中的 Executor
Executor executor = getExecutor();
if (dispatch && executor != null) {
executor.execute(sc);
} else {
//表示不知道还有这种操作难道在poller处理io读写。。。。
sc.run();
}
} catch (RejectedExecutionException ree) {
getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
getLog().error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}
2.总结poller流程
poller代码 虽然很多但是总归就是取出Poller.events队列PollerEvent对象,并运行PollerEvent.run()方法。在PollerEvent.run()方法中发是OP_REGISTER事件,则会在Poller.selector上注册SocketChannel对象的OP_READ就绪事件。不停的执行selector.select()如果发现有感兴趣的事件,扔给Executor处理。