tomcat 8.x NioEndpoint之Acceptor组件浅析2

简书 杭州_mina

tomcat 8.x NioEndpoint核心组件浅析1

1. Acceptor 浅析

   /**
     * The background thread that listens for incoming TCP/IP connections and
     * hands them off to an appropriate processor.
     */
    protected class Acceptor extends AbstractEndpoint.Acceptor {

        @Override
        public void run() {

            int errorDelay = 0;

            // Loop until we receive a shutdown command
            // 一直循环直到接收到关闭命令
            while (running) {

                // paused 只有在unbind的时候会设置
                while (paused && running) {
                    //变更Acceptor的状态
                    state = AcceptorState.PAUSED;
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        // Ignore
                    }
                }
                
                if (!running) {
                    break;
                }
                // Acceptor 设置成running 
                state = AcceptorState.RUNNING;

                try {
                    //if we have reached max connections, wait
                    //连接数达到最大,暂停线程。
                    //这里用到的是connectionLimitLatch锁,可以理解为一个闭锁
                    //我理解connectionLimitLatch和coundownlatch类似
                    //补充一点这个是先增加计数器、如果超过最大连接数则减少计时器、然后线程暂停,你这个计时器就是当前连接数
                    //程序的下面我会简单的讲一下这个方法的实现
                    countUpOrAwaitConnection();

                    SocketChannel socket = null;
                    try {
                        // Accept the next incoming connection from the server
                        // socket
                        //调用serversocket的accept方法
                        socket = serverSock.accept();
                    } catch (IOException ioe) {
                        // We didn't get a socket
                        // 出错了要减去连接数
                        countDownConnection();
                        if (running) {
                            // Introduce delay if necessary
                            errorDelay = handleExceptionWithDelay(errorDelay);
                            // re-throw
                            throw ioe;
                        } else {
                            break;
                        }
                    }
                    // Successful accept, reset the error delay
                    errorDelay = 0;

                    // Configure the socket
                    if (running && !paused) {
                        // setSocketOptions() will hand the socket off to
                        // an appropriate processor if successful
                        // 把socket扔到poller中
                        if (!setSocketOptions(socket)) {
                            // 如果加入poller失败关闭连接
                            closeSocket(socket);
                        }
                    } else {
                        closeSocket(socket);
                    }
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    log.error(sm.getString("endpoint.accept.fail"), t);
                }
            }
            state = AcceptorState.ENDED;
        }
    }

下面来看setSocketOptions方法中如何把SocketChannel加入到poller中

protected boolean setSocketOptions(SocketChannel socket) {
        // Process the connection
        try {
            //disable blocking, APR style, we are gonna be polling it
            socket.configureBlocking(false); //设置成非阻塞
            //获取socket
            Socket sock = socket.socket();
           //配置socket信息
            socketProperties.setProperties(sock);
            //创建一个NioChannel 他封装了SocketChannel
            NioChannel channel = nioChannels.pop();
            if (channel == null) {
                //如果为null 创建一个NioChannel 这里使用系统内存
               //使用系统内存可以省去一步从系统内存拷贝到堆内存的动作、性能上会有很大的提升,nioChannels初始化默认为128个 
               //当socket 关闭的重新清理NioChannel而不是销毁这个对象可以达到对象复用的效果、因为申请系统内存的开销比申请堆内存的开销要大很多
                SocketBufferHandler bufhandler = new SocketBufferHandler(
                        socketProperties.getAppReadBufSize(),
                        socketProperties.getAppWriteBufSize(),
                        socketProperties.getDirectBuffer());
                if (isSSLEnabled()) {
                    channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
                } else {
                    channel = new NioChannel(socket, bufhandler);
                }
            } else {
                //如果不为null设置SocketChannel
                channel.setIOChannel(socket);
                //将channle复位 以上就是重置系统内存把指针重新定位到buff的开始位置
                channel.reset();
            }
            //丢入poller队列中
            getPoller0().register(channel);
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            try {
                log.error("",t);
            } catch (Throwable tt) {
                ExceptionUtils.handleThrowable(tt);
            }
            // Tell to close the socket
            return false;
        }
        return true;
    }

最后一步register方法

 public void register(final NioChannel socket) {
             //设置poller 对象
            socket.setPoller(this); 
            //设置一个附件待会把这个NioSocketWrapper注册到poller对象中的selector上去           
            NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);//获取
            socket.setSocketWrapper(ka);
            ka.setPoller(this);
            ka.setReadTimeout(getSocketProperties().getSoTimeout());
            ka.setWriteTimeout(getSocketProperties().getSoTimeout());
            ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
            ka.setSecure(isSSLEnabled());
            ka.setReadTimeout(getSoTimeout());
            ka.setWriteTimeout(getSoTimeout());
            PollerEvent r = eventCache.pop();
            ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
            //生成一个PollerEvent对象 这个对象继承了Runnable
            //这个对象的主要作用就是把NioSocketWrapper注册到poller对象中的selector上去           
            //还有就是获取SelectionKey 该对象是用于跟踪这些被注册事件的句柄
            if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
            else r.reset(socket,ka,OP_REGISTER);
            //把pollerEvent放入到poller的队列中
           addEvent(r);
        }

2. LimitLatch 浅析

  • 2.1 AbstractEndpoint 封装了LimitLatch一些方法
    //默认最大连接数为10000
    private int maxConnections = 10000;
    //获取最大连接数
    public int  getMaxConnections() {
        return this.maxConnections;
    }
    //初始化闭锁
    protected LimitLatch initializeConnectionLatch() {
        if (maxConnections==-1) return null;
        if (connectionLimitLatch==null) {
            //
            connectionLimitLatch = new LimitLatch(getMaxConnections());
        }
        return connectionLimitLatch;
    }
    //将会释放所有的线程
    protected void releaseConnectionLatch() {
        LimitLatch latch = connectionLimitLatch;
        if (latch!=null) latch.releaseAll();
        connectionLimitLatch = null;
    }
    //增加计数,如果太大,那么等待
    protected void countUpOrAwaitConnection() throws InterruptedException {
        if (maxConnections==-1) return;
        LimitLatch latch = connectionLimitLatch;
        if (latch!=null) latch.countUpOrAwait();
    }
    //减少计数
    protected long countDownConnection() {
        if (maxConnections==-1) return -1;
        LimitLatch latch = connectionLimitLatch;
        if (latch!=null) {
            long result = latch.countDown();
            if (result<0) {
                getLog().warn(sm.getString("endpoint.warn.incorrectConnectionCount"));
            }
            return result;
        } else return -1
    }
  • 2.2 LimitLatch 源码浅析
/*
 *  Licensed to the Apache Software Foundation (ASF) under one or more
 *  contributor license agreements.  See the NOTICE file distributed with
 *  this work for additional information regarding copyright ownership.
 *  The ASF licenses this file to You under the Apache License, Version 2.0
 *  (the "License"); you may not use this file except in compliance with
 *  the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */
package org.apache.tomcat.util.threads;

import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;

import java.util.Collection;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

/**
 * Shared latch that allows the latch to be acquired a limited number of times
 * after which all subsequent requests to acquire the latch will be placed in a
 * FIFO queue until one of the shares is returned.
 */
public class LimitLatch {

    private static final Log log = LogFactory.getLog(LimitLatch.class);
     //构建sync对象、主要用来同步、阻塞两个功能
    private class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1L;

        public Sync() {
        }
        @Override
        protected int tryAcquireShared(int ignored) {
             //增加计数器
            long newCount = count.incrementAndGet();
            //如果计数器大于最大limit 则计数器减一 返回-1 否则返回 1 
            //这里的limit 其实就是maxConnections
            if (!released && newCount > limit) {
                // Limit exceeded
                count.decrementAndGet();
                return -1;
            } else {
                return 1;
            }
        }

        @Override
        protected boolean tryReleaseShared(int arg) {
            //计数器减一
            count.decrementAndGet();
            return true;
        }
    }

    private final Sync sync;
    //计数器
    private final AtomicLong count;
    //最大连接数
    private volatile long limit;
    //是否全部释放
    private volatile boolean released = false;

    /**
     * Instantiates a LimitLatch object with an initial limit.
     *
     * @param limit - maximum number of concurrent acquisitions of this latch
     */
    public LimitLatch(long limit) {
        this.limit = limit;
        this.count = new AtomicLong(0);
        this.sync = new Sync();
    }

    /**
     * Returns the current count for the latch
     *
     * @return the current count for latch
     */
    //获取当前计数器
    public long getCount() {
        return count.get();
    }

    /**
     * Obtain the current limit.
     *
     * @return the limit
     */
   //获取最大连接数
    public long getLimit() {
        return limit;
    }


    /**
     * Sets a new limit. If the limit is decreased there may be a period where
     * more shares of the latch are acquired than the limit. In this case no
     * more shares of the latch will be issued until sufficient shares have been
     * returned to reduce the number of acquired shares of the latch to below
     * the new limit. If the limit is increased, threads currently in the queue
     * may not be issued one of the newly available shares until the next
     * request is made for a latch.
     *
     * @param limit The new limit
     */
   //设置最大连接数
    public void setLimit(long limit) {
        this.limit = limit;
    }


    /**
     * Acquires a shared latch if one is available or waits for one if no shared
     * latch is current available.
     *
     * @throws InterruptedException If the current thread is interrupted
     */
    //增加计数器如果超过最大连接数、则等待并且计数器减一
    public void countUpOrAwait() throws InterruptedException {
        if (log.isDebugEnabled()) {
            log.debug("Counting up[" + Thread.currentThread().getName() + "] latch=" + getCount());
        }
        sync.acquireSharedInterruptibly(1);
    }

    /**
     * Releases a shared latch, making it available for another thread to use.
     *
     * @return the previous counter value
     */
    //减少计数器其实就是减少连接数
    public long countDown() {
        sync.releaseShared(0);
        long result = getCount();
        if (log.isDebugEnabled()) {
            log.debug("Counting down[" + Thread.currentThread().getName() + "] latch=" + result);
        }
        return result;
    }

    /**
     * Releases all waiting threads and causes the {@link #limit} to be ignored
     * until {@link #reset()} is called.
     *
     * @return <code>true</code> if release was done
     */
    //释放所有线程
    public boolean releaseAll() {
        released = true;
        return sync.releaseShared(0);
    }

    /**
     * Resets the latch and initializes the shared acquisition counter to zero.
     *
     * @see #releaseAll()
     */
    //重置计数器
    public void reset() {
        this.count.set(0);
        released = false;
    }

    /**
     * Returns <code>true</code> if there is at least one thread waiting to
     * acquire the shared lock, otherwise returns <code>false</code>.
     *
     * @return <code>true</code> if threads are waiting
     */
    //是否有等待线程
    public boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }

    /**
     * Provide access to the list of threads waiting to acquire this limited
     * shared latch.
     *
     * @return a collection of threads
     */
    //获取所有等待线程
    public Collection<Thread> getQueuedThreads() {
        return sync.getQueuedThreads();
    }
}

3.总结Acceptor流程

虽然看上去代码有点复杂,但是实际上就是一句话概括。获取socket、并且对socket进行封装,扔到Poller的队列中。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,723评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,080评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,604评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,440评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,431评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,499评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,893评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,541评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,751评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,547评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,619评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,320评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,890评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,896评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,137评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,796评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,335评论 2 342

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,579评论 18 139
  • 说明本次redis集群安装在rhel6.8 64位机器上,redis版本为3.2.8,redis的gem文件版本为...
    读或写阅读 14,580评论 3 9
  • ¥开启¥ 【iAPP实现进入界面执行逐一显】 〖2017-08-25 15:22:14〗 《//首先开一个线程,因...
    小菜c阅读 6,344评论 0 17
  • 先来说说我昨天做的梦吧,昨天晚上一直做梦坐电梯,腾云驾雾,飞来飞去,毫无约束。突然嗖的一下,我的眼睛就睁开了。打开...
    怀瑾姑娘阅读 413评论 0 1
  • 今天离宝宝出生还有27天,三伏天持续高温,腹中又揣着小火炉,简直就是太痛苦了,不开空调睡不着,长时间开空调又由于空...
    小七碎碎念阅读 129评论 0 0