DruidDataSource源码解析
获取连接,获取连接时会先根据配置参数初始化连接池。如果配置有maxWaitMillis则等待maxWaitMillis时间后仍不能获取连接则抛出GetConnectionTimeoutException异常。具体代码如下:
public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {
init();//初始化连接池
if (filters.size() > 0) {
FilterChainImpl filterChain = new FilterChainImpl(this);
return filterChain.dataSource_connect(this, maxWaitMillis);
} else {
return getConnectionDirect(maxWaitMillis);
}
}
public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException {
int notFullTimeoutRetryCnt = 0;
for (;;) {
// handle notFullTimeoutRetry
DruidPooledConnection poolableConnection;
try {
//等待一定时间后,仍旧获取不到连接则抛出GetConnectionTimeoutException异常
poolableConnection = getConnectionInternal(maxWaitMillis);/
} catch (GetConnectionTimeoutException ex) {
....
}
//testOnborrow是否为true,如果是则检验连接的有效性,如果为无效连接则抛弃该连接并重新获取
if (isTestOnBorrow()) {
boolean validate = testConnectionInternal(poolableConnection.getConnection());
if (!validate) {
if (LOG.isDebugEnabled()) {
LOG.debug("skip not validate connection.");
}
Connection realConnection = poolableConnection.getConnection();
discardConnection(realConnection);
continue;
}
} else {
Connection realConnection = poolableConnection.getConnection();
if (realConnection.isClosed()) {
discardConnection(null); // 传入null,避免重复关闭
continue;
}
//如果testWhileIdle是true并且空闲时间大于timeBetweenEvictionRunsMillis则验证连接是否有效,如果无效关闭连接
if (isTestWhileIdle()) {
final long currentTimeMillis = System.currentTimeMillis();
final long lastActiveTimeMillis = poolableConnection.getConnectionHolder().getLastActiveTimeMillis();
final long idleMillis = currentTimeMillis - lastActiveTimeMillis;
long timeBetweenEvictionRunsMillis = this.getTimeBetweenEvictionRunsMillis();
if (timeBetweenEvictionRunsMillis <= 0) {
timeBetweenEvictionRunsMillis = DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS;
}
if (idleMillis >= timeBetweenEvictionRunsMillis) {
boolean validate = testConnectionInternal(poolableConnection.getConnection());
if (!validate) {
if (LOG.isDebugEnabled()) {
LOG.debug("skip not validate connection.");
}
discardConnection(realConnection);
continue;
}
}
}
}
if (isRemoveAbandoned()) {
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
poolableConnection.setConnectStackTrace(stackTrace);
poolableConnection.setConnectedTimeNano();
poolableConnection.setTraceEnable(true);
synchronized (activeConnections) {
activeConnections.put(poolableConnection, PRESENT);
}
}
if (!this.isDefaultAutoCommit()) {
poolableConnection.setAutoCommit(false);
}
return poolableConnection;
}
}
初始化线程池
public void init() throws SQLException {
if (inited) {
return;
}
final ReentrantLock lock = this.lock;//默认非公平锁,通过DruidDataSource构造函数可以修改锁策略
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
throw new SQLException("interrupt", e);
}
boolean init = false;
try {
if (inited) {
return;
}
init = true;
initStackTrace = Utils.toString(Thread.currentThread().getStackTrace());
this.id = DruidDriver.createDataSourceId();
if (this.id > 1) {
long delta = (this.id - 1) * 100000;
this.connectionIdSeed.addAndGet(delta);
this.statementIdSeed.addAndGet(delta);
this.resultSetIdSeed.addAndGet(delta);
this.transactionIdSeed.addAndGet(delta);
}
if (this.jdbcUrl != null) {
this.jdbcUrl = this.jdbcUrl.trim();
initFromWrapDriverUrl();
}
if (this.dbType == null || this.dbType.length() == 0) {
this.dbType = JdbcUtils.getDbType(jdbcUrl, null);
}
for (Filter filter : filters) {
filter.init(this);
}
if (JdbcConstants.MYSQL.equals(this.dbType) || //
JdbcConstants.MARIADB.equals(this.dbType)) {
boolean cacheServerConfigurationSet = false;
if (this.connectProperties.containsKey("cacheServerConfiguration")) {
cacheServerConfigurationSet = true;
} else if (this.jdbcUrl.indexOf("cacheServerConfiguration") != -1) {
cacheServerConfigurationSet = true;
}
if (cacheServerConfigurationSet) {
this.connectProperties.put("cacheServerConfiguration", "true");
}
}
.....
if (this.driverClass != null) {
this.driverClass = driverClass.trim();
}
initFromSPIServiceLoader();
if (this.driver == null) {
if (this.driverClass == null || this.driverClass.isEmpty()) {
this.driverClass = JdbcUtils.getDriverClassName(this.jdbcUrl);
}
if (MockDriver.class.getName().equals(driverClass)) {
driver = MockDriver.instance;
} else {
driver = JdbcUtils.createDriver(driverClassLoader, driverClass);
}
} else {
if (this.driverClass == null) {
this.driverClass = driver.getClass().getName();
}
}
initCheck();//如果为oracle数据库或者DB2数据库则初始化check
initExceptionSorter();//初始化ExceptionSorter
initValidConnectionChecker();//初始化ConnectionChecker
validationQueryCheck();
if (isUseGlobalDataSourceStat()) {
dataSourceStat = JdbcDataSourceStat.getGlobal();
if (dataSourceStat == null) {
dataSourceStat = new JdbcDataSourceStat("Global", "Global", this.dbType);
JdbcDataSourceStat.setGlobal(dataSourceStat);
}
if (dataSourceStat.getDbType() == null) {
dataSourceStat.setDbType(this.getDbType());
}
} else {
dataSourceStat = new JdbcDataSourceStat(this.name, this.jdbcUrl, this.dbType, this.connectProperties);
}
dataSourceStat.setResetStatEnable(this.resetStatEnable);
connections = new DruidConnectionHolder[maxActive];
SQLException connectError = null;
try {
// 按照配置参数initialSize初始化连接池
for (int i = 0, size = getInitialSize(); i < size; ++i) {
// createPhysicalConnection() 创建物理连接,后边我们会看到这个物理连接是怎么创建的,会让创建连接线程等待,当
Connection conn = createPhysicalConnection();
DruidConnectionHolder holder = new DruidConnectionHolder(this, conn);
connections[poolingCount] = holder;
incrementPoolingCount();//增加连接池可用连接数量即poolingCount++
}
if (poolingCount > 0) {
poolingPeak = poolingCount;
poolingPeakTime = System.currentTimeMillis();
}
} catch (SQLException ex) {
LOG.error("init datasource error, url: " + this.getUrl(), ex);
connectError = ex;
}
createAndLogThread();//如果配置timeBetweenLogStatsMillis则创建log线程每隔指定时间记录连接池状态
createAndStartCreatorThread();//创建并且启动线程在存在等待获取连接线程的的时候,创建连接。
createAndStartDestroyThread();//创建并启动销毁线程
initedLatch.await();
initedTime = new Date();
registerMbean();
if (connectError != null && poolingCount == 0) {
throw connectError;
}
} catch (SQLException e) {
LOG.error("dataSource init error", e);
throw e;
} catch (InterruptedException e) {
throw new SQLException(e.getMessage(), e);
} finally {
inited = true;
lock.unlock();
if (init && LOG.isInfoEnabled()) {
LOG.info("{dataSource-" + this.getID() + "} inited");
}
}
}
创建物理连接,可以看到创建物理连接很简单,就是根据JDBC去创建connection
public Connection createPhysicalConnection(String url, Properties info) throws SQLException {
Connection conn;
if (getProxyFilters().size() == 0) {
conn = getDriver().connect(url, info);
} else {
conn = new FilterChainImpl(this).connection_connect(info);
}
createCount.incrementAndGet();
return conn;
}
下面再看看createAndStartCreatorThread 是什么东西,可以看到DruidDataSource会启动一个线程当活动连接数+池中的连接数小于maxActive时并且池中连接数小于等待获取连接的线程数时会创建新的物理连接并放到连接池中。
protected void createAndStartCreatorThread() {
if (createScheduler == null) {
String threadName = "Druid-ConnectionPool-Create-" + System.identityHashCode(this);
createConnectionThread = new CreateConnectionThread(threadName);
createConnectionThread.start();
return;
}
initedLatch.countDown();
}
public class CreateConnectionThread extends Thread {
public CreateConnectionThread(String name){
super(name);
this.setDaemon(true);
}
public void run() {
initedLatch.countDown();
int errorCount = 0;
for (;;) {
// addLast
try {
lock.lockInterruptibly();
} catch (InterruptedException e2) {
break;
}
try {
// 必须存在线程等待,才创建连接
if (poolingCount >= notEmptyWaitThreadCount) {
empty.await();
}
// 防止创建超过maxActive数量的连接
if (activeCount + poolingCount >= maxActive) {
empty.await();
continue;
}
} catch (InterruptedException e) {
lastCreateError = e;
lastErrorTimeMillis = System.currentTimeMillis();
break;
} finally {
lock.unlock();
}
Connection connection = null;
try {
connection = createPhysicalConnection();
} catch (SQLException e) {
LOG.error("create connection error, url: " + jdbcUrl, e);
errorCount++;
if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) {
if (breakAfterAcquireFailure) {
break;
}
try {
Thread.sleep(timeBetweenConnectErrorMillis);
} catch (InterruptedException interruptEx) {
break;
}
}
} catch (RuntimeException e) {
LOG.error("create connection error", e);
continue;
} catch (Error e) {
LOG.error("create connection error", e);
break;
}
if (connection == null) {
continue;
}
put(connection);
errorCount = 0; // reset errorCount
}
}
}
最后我们再看下createAndStartDestroyThread 做了什么事情, createAndStartDestroyThread该方法有两种回收连接池的策略,第一种是通过作业调度第二种是通过Thread.sleep方法。两种策略按timeBetweenEvictionRunsMillis间隔调度执行destoryTask;destoryTask的 shrink(true) 主要根据配置参数判断线程池中连接的空闲时间idleMillis 是否大于等于 minEvictableIdleTimeMillis,如果是则关闭该连接。removeAbandoned()方法主要是在配置连接泄露处理策略时,关闭泄露的连接
protected void createAndStartDestroyThread() {
destoryTask = new DestroyTask();
if (destroyScheduler != null) {
long period = timeBetweenEvictionRunsMillis;
if (period <= 0) {
period = 1000;
}
destroySchedulerFuture = destroyScheduler.scheduleAtFixedRate(destoryTask, period, period,
TimeUnit.MILLISECONDS);
initedLatch.countDown();
return;
}
String threadName = "Druid-ConnectionPool-Destroy-" + System.identityHashCode(this);
destroyConnectionThread = new DestroyConnectionThread(threadName);
destroyConnectionThread.start();
}
public class DestroyTask implements Runnable {
@Override
public void run() {
shrink(true);
if (isRemoveAbandoned()) {
removeAbandoned();
}
}
}
public class DestroyConnectionThread extends Thread {
public DestroyConnectionThread(String name){
super(name);
this.setDaemon(true);
}
public void run() {
initedLatch.countDown();
for (;;) {
// 从前面开始删除
try {
if (closed) {
break;
}
if (timeBetweenEvictionRunsMillis > 0) {
Thread.sleep(timeBetweenEvictionRunsMillis);
} else {
Thread.sleep(1000); //
}
if (Thread.interrupted()) {
break;
}
destoryTask.run();
} catch (InterruptedException e) {
break;
}
}
}
removeAbandoned处理逻辑
public int removeAbandoned() {
int removeCount = 0;
long currrentNanos = System.nanoTime();
List<DruidPooledConnection> abandonedList = new ArrayList<DruidPooledConnection>();
synchronized (activeConnections) {
Iterator<DruidPooledConnection> iter = activeConnections.keySet().iterator();
for (; iter.hasNext();) {
DruidPooledConnection pooledConnection = iter.next();
if (pooledConnection.isRunning()) {
continue;
}
long timeMillis = (currrentNanos - pooledConnection.getConnectedTimeNano()) / (1000 * 1000);
if (timeMillis >= removeAbandonedTimeoutMillis) {
iter.remove();
pooledConnection.setTraceEnable(false);
abandonedList.add(pooledConnection);
}
}
}
if (abandonedList.size() > 0) {
for (DruidPooledConnection pooledConnection : abandonedList) {
synchronized (pooledConnection) {
if (pooledConnection.isDisable()) {
continue;
}
}
JdbcUtils.close(pooledConnection);
pooledConnection.abandond();
removeAbandonedCount++;
removeCount++;
if (isLogAbandoned()) {
StringBuilder buf = new StringBuilder();
buf.append("abandon connection, owner thread: ");
buf.append(pooledConnection.getOwnerThread().getName());
buf.append(", connected time nano: ");
buf.append(pooledConnection.getConnectedTimeNano());
buf.append(", open stackTrace\n");
StackTraceElement[] trace = pooledConnection.getConnectStackTrace();
for (int i = 0; i < trace.length; i++) {
buf.append("\tat ");
buf.append(trace[i].toString());
buf.append("\n");
}
LOG.error(buf.toString());
}
}
}
return removeCount;
}