肝了三晚,终于吃透了Druid连接池
前言
作为一个java程序员,数据库的JDBC几乎每天都在做,数据库连接池Druid每天也在使用,但可能用起来太简单了(spring中引入依赖即可),往往忽略了连接池的意义和优化
本文从源码的角度分析Druid的常用配置及原理
连接
当我们程序需要访问数据库时,需要创建一个本地到数据库服务的网络连接,此时本地代码就相当于一个数据库的客户端,可以通过这个连接去访问数据、执行sql,如下
Driver driver = new com.mysql.cj.jdbc.Driver();
// 创建连接
Connection con = driver.connect(JDBC_URL, props);
Statement statement = con.createStatement();
ResultSet resultSet = statement.executeQuery("show tables");
while (resultSet.next()) {
System.out.println(resultSet.getString(1));
}
con.close();
池化技术
由于我们的代码需要不断与数据库交互读取数据,如果每次请求数据都创建一个连接的话,网络开销是很大的,也会导致我们的程序比较慢,同时连接如果太多也会给数据库造成压力
为了解决这个问题,就有了池化技术,把创建好的连接放在池里,用时去池里获取,节省了创建连接的时间,也可以通过配置来限定池的最大连接数等
连接池最常用的工具基本就是阿里的Druid了,简单使用如下
// druid 数据源
DruidDataSource druidDataSource = new DruidDataSource();
// 数据源配置
druidDataSource.setUrl(JDBC_URL);
druidDataSource.setUsername(USERNAME);
druidDataSource.setPassword(PASSWORD);
// 初始化
druidDataSource.init();
// 获取表名
Connection con = druidDataSource.getConnection();
Statement statement = con.createStatement();
ResultSet resultSet = statement.executeQuery("show tables");
while (resultSet.next()) {
System.out.println(resultSet.getString(1));
}
con.close();
可以看到使用了Druid,获取连接不再是直接使用驱动创建连接,而是通过DruidDataSource
对象获取连接
DruidDataSource
接下来就分析DruidDataSource的源码,从三个方面入手:配置、存储、线程
配置
首先作为一个连接池工具,首先要支持重要参数的可配置,以下只列举一部分常用的配置和其简单含义,后面的源码分析会实际的分析每个配置的作用
- maxActive 最大连接数
- initialSize 初始化连接数
- minIdle 最小空闲数
- keepAlive 是否保持连接
- asyncInit 是否异步初始化
- timeBetweenEvictionRunsMillis 回收连接任务运行的频率
- minEvictableIdleTimeMillis 最小闲置时间,连接闲置时间小于这个时间不会被回收,大于有可能被回收
- maxEvictableIdleTimeMillis 最大闲置时间,连接闲置时间超过这个数是一定被回收的
- validationQuery 测试是否有效的sql
- phyTimeoutMillis 连接物理超时时间
有很多配置都是和其它配置配合使用的,所以很多配置单独拿出来说它的作用没有意义,还是要结合代码看一下
存储
DruidDataSource
作为一个连接池,内部一定会有一个容器来存储连接,这应该是最重要的属性
private volatile DruidConnectionHolder[] connections; // 当前的所有连接
connections
存储的就是所有的数据库连接对象,并封装了一个连接的持有对象DruidConnectionHolder
,在持有物理连接的同时,也记录了一些连接的其它属性,比如:
- connectTimeMillis 连接建立的时间
- lastActiveTimeMillis 连接上一次被使用的时间
还有非常重要的一点,这个存储连接的容器是有排序的,每次使用连接都从最后拿,这就导致容器尾部的连接是最活跃的,也就导致前面的连接闲置时间肯定是要高于后面的
计数
同时,池内部有很多计数器来存储当前各种维度的数量值
private int poolingCount = 0; // 可用连接数
private int activeCount = 0; // 正在使用连接数
private volatile long discardCount = 0; // 丢弃连接数
private int notEmptyWaitThreadCount = 0; // 等待连接的线程数
线程
DruidDataSource中有几个线程,在初始化方法init被创建并运行,它们分别承担不同的工作
public void init() throws SQLException {
// ...
createAndLogThread(); // 开启负责日志统计的线程
createAndStartCreatorThread(); // 开启负责创建连接的线程
createAndStartDestroyThread(); // 开启负责负责销毁连接的线程
// ...
}
实际上,DruidDataSource
就是依靠这些线程来维护整个线程池中连接的创建和销毁任务,它们可以看做是线程池的维护人员
小结
所以Druid池简单来说就是一个连接的容器(connections),可配的参数,状态/计数的存储组成的一个类,在初始化方法中会创建多个线程,这些线程在连接池的生命周期一直运行并监控这当前线程池的状态,并根据配置和计数数据在需要的时候在容器中创建/销毁线程
连接池中这几个线程是可以被替代的,如果我们设置了调度器,则可以按我们自己的方式去调度创建销毁连接的任务,这属于比较高级的用法了,本文不做探讨
线程源码分析
协调
线程池内部运行的两个主要线程:创建连接的线程和销毁连接的线程,池外部还有我们用户代码中想要获取连接的线程(在此统一称之为用户线程)
各个线程可能都要访问和修改各种计数和连接容器,为了达到线程安全,DruidDataSource
内部提供了一个统一的ReentrantLock锁
protected ReentrantLock lock;
各线程也少不了沟通,比如某用户线程想获取连接,如何通知创建线程去创建连接,创建线程创建完连接有如何告知用户线程,为解决这个问题,DruidDataSource
内部提了两个主要的Condition
protected Condition notEmpty;
protected Condition empty;
其中empty代表空条件,创建线程通过empty.await()
即可等待空信号,而用户线程通过empty.signal()
即可发送空信号给创建线程,此时用户线程notEmpty.await()
开始等待非空条件,而创建线程一般会创建连接,创建完成后通过notEmpty.signal()
通知线程创建完毕
创建连接的线程
CreateConnectionThread是专门负责创建连接的,可以说DruidDataSource中的连接基本都是由它负责实际创建的(也会有特例,比如默认情况下initialSize设置的连接数是在init方法中直接创建的)
大部分情况下CreateConnectionThread是在empty条件上等待空信号,即empty.wait()
,当得到信号时再创建连接
接下来就看一下CreateConnectionThread的源码
public class CreateConnectionThread extends Thread {
public CreateConnectionThread(String name){
super(name);
// 设置守护线程
this.setDaemon(true);
}
public void run() {
initedLatch.countDown();
long lastDiscardCount = 0;
int errorCount = 0;
// 线程一直运行着
for (;;) {
// 一.判断是否需要创建连接
// 获取锁
lock.lockInterruptibly();
// 当前被丢弃的连接数
long discardCount = DruidDataSource.this.discardCount;
// 对比上一次记录被丢弃的连接数,看看是否有变化
boolean discardChanged = discardCount - lastDiscardCount > 0;
lastDiscardCount = discardCount;
try {
// 标志是否需要等待空信号
boolean emptyWait = true;
// 存在异常,当前池连接数为0,且没有新丢弃的连接
if (createError != null
&& poolingCount == 0
&& !discardChanged) {
emptyWait = false;
}
// 如果设置了异步初始化,且当前创建的连接数少于设置初始连接数,则跳过等待直接创建连接
if (emptyWait
&& asyncInit && createCount < initialSize) {
emptyWait = false;
}
// 如果没有跳过等待,并不是实际的去等待,而是还有一层判断
if (emptyWait) {
// 有三种情况可以跳过这一步的等待
// 1.等待使用连接的线程数大于当前可用连接数
// 2.设置了keeplive=true且当前池的总连接数小于设置最小连接数
// 3.连续失败isFailContinuous(这一项先忽略)
// 跳过这一步等待并不代表可以直接创建,还要进行下一步的是否到达最大设置数量的判断
if (poolingCount >= notEmptyWaitThreadCount //
&& (!(keepAlive && activeCount + poolingCount < minIdle))
&& !isFailContinuous()
) {
// 等待空信号
empty.await();
}
// 如果当前连接数量已超过设置最大数量,则等待空信号,否则就可以去创建连接了
if (activeCount + poolingCount >= maxActive) {
empty.await();
// 等待到了空信号,并不是直接创建连接,而是重新判断一次是否需要等待,因为连接数是绝对不能超越maxActive的,所以为了安全,必须重新判断一次
continue;
}
}
} catch (InterruptedException e) {
//...
} finally {
// 释放锁
lock.unlock();
}
// 二.开始创建连接
PhysicalConnectionInfo connection = null;
try {
// 创建物理连接
connection = createPhysicalConnection();
} catch (SQLException e) {
//...
}
// 加入连接池的连接列表,即connections
boolean result = put(connection);
// 如果连接池关闭,创建连接线程也停止
if (closing || closed) {
break;
}
}
}
}
代码看起来还是比较复杂,简单总结一下:
<特殊情况>
创建连接的线程有两种特殊情况,这两种情况主要是异步初始化化和处理异常,这种情况下直接跳过等待,也不需考虑maxActive
,直接创建连接,这种情况相对特殊暂不做考虑
<常规情况>
大部分情况下,创建连接的线程要根据minIdle
,maxActive
等配置以及线程池的状态来判断是否需要等待,如果不需要等待也会创建连接
常规情况下有三种条件,满意任意一种就可以不需等待直接创建连接,但还有个大前提就是池中的连接总数不能超过maxActive
设置的数量
三种条件分别是
- 当等待使用连接的线程数(
notEmptyWaitThreadCount
)大于池中可用连接数(poolingCount
),即供不应求时 - 当线程池设置保持连接(
keepAlive=true
),且当前池中的总连接数(activeCount + poolingCount
)小于设置最小连接数(minIdle
),即池中没有保持足够的最小连接数时 - isFailContinuous 连续失败时
三种条件如果都不满足,则在empty
条件上等待索要连接的信号,得到信号则创建连接(还需要判断最大连接数)
如果三个条件满足任意一个,但连接数已到达maxActive
,依然在empty
条件上等待信号,得到信号重新再判断一次,是为了确保连接数不超过最大配置
画个图梳理一下
用一句话总结一下:
CreateConnectionThread负责给线程池创建连接,当线程池中供不应求、最小保持连接数不足、连续错误时线程会主动创建连接,否则就会休息节省体力,得到需求信号再创建连接,创建完成后重新开始审视创建的工作, ps:整个过程确保连接数不能超出设定范围
销毁连接的线程
与CreateConnectionThread对应,DestroyConnectionThread承担销毁连接的任务,主要根据配置的参数和当前的技术器,销毁掉需要销毁的连接
public class DestroyConnectionThread extends Thread {
public DestroyConnectionThread(String name) {
super(name);
// 设置守护线程
this.setDaemon(true);
}
public void run() {
initedLatch.countDown();
// 不断执行
for (;;) {
try {
//...
// 根据配置timeBetweenEvictionRunsMillis决定销毁任务执行的间隔
if (timeBetweenEvictionRunsMillis > 0) {
Thread.sleep(timeBetweenEvictionRunsMillis);
} else {
Thread.sleep(1000);
}
//...
// 执行销毁任务
destroyTask.run();
} catch (InterruptedException e) {
break;
}
}
}
}
销毁连接的任务实时性要求并不是太高,所以可能会隔一段时间才去计算并销毁一次,这个间隔的时间就是配置timeBetweenEvictionRunsMillis
其中DestroyTask的run方法定义如下
public void run() {
shrink(true, keepAlive);
if (isRemoveAbandoned()) {
removeAbandoned();
}
}
主要调用的方法即shrink,意指收缩线程池,重点看一下这个方法:
public void shrink(boolean checkTime, boolean keepAlive) {
// 获取锁
lock.lockInterruptibly();
// 是否需要补充
boolean needFill = false;
// 驱逐的数量
int evictCount = 0;
// 需要保活的数量
int keepAliveCount = 0;
int fatalErrorIncrement = fatalErrorCount - fatalErrorCountLastShrink;
fatalErrorCountLastShrink = fatalErrorCount;
try {
// 未初始化完成不执行
if (!inited) {
return;
}
// 池中可用连接数超出最小连接数的数量
final int checkCount = poolingCount - minIdle;
final long currentTimeMillis = System.currentTimeMillis();
// 循环池中可用连接
for (int i = 0; i < poolingCount; ++i) {
DruidConnectionHolder connection = connections[i];
// 异常的处理,暂不做考虑
if ((onFatalError || fatalErrorIncrement > 0) && (lastFatalErrorTimeMillis > connection.connectTimeMillis)) {
keepAliveConnections[keepAliveCount++] = connection;
continue;
}
// 如果检查时间,销毁线程传入的是true
if (checkTime) {
// 如果设置了物联连接超时时间
if (phyTimeoutMillis > 0) {
// 当前连接连接时间过过了超时时间,加入要待回收集合中
long phyConnectTimeMillis = currentTimeMillis - connection.connectTimeMillis;
if (phyConnectTimeMillis > phyTimeoutMillis) {
evictConnections[evictCount++] = connection;
continue;
}
}
// 计算当前连接已闲置的时间
long idleMillis = currentTimeMillis - connection.lastActiveTimeMillis;
// 如果连接闲置时间比较短,则可不被回收,可以直接跳出循环,因为连接池是尾部更活跃,后面的肯定更短不需要判断了
if (idleMillis < minEvictableIdleTimeMillis
&& idleMillis < keepAliveBetweenTimeMillis
) {
break;
}
// 如果连接闲置时间超出了设置的 最小闲置时间
if (idleMillis >= minEvictableIdleTimeMillis) {
// 如果当前连接的位置在checkCount以内,则加入待回收集合
if (checkTime && i < checkCount) {
evictConnections[evictCount++] = connection;
continue;
// 否则如果已超出最大闲置时间,也要加入待回收集合
} else if (idleMillis > maxEvictableIdleTimeMillis) {
evictConnections[evictCount++] = connection;
continue;
}
}
// 如果闲置时间超出保活检测时间,且设置了keepAlive,则加入待验证保活的集合中
if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis) {
keepAliveConnections[keepAliveCount++] = connection;
}
} else {
//...
}
}
// 要删除的连接总数,实际上keepAliveCount只是有可能被删除,还没有最终定论,这里做法是先删除掉,如果验证连接可用后续再加回来即可
int removeCount = evictCount + keepAliveCount;
if (removeCount > 0) {
// 删除连接池中的废弃连接,由于废弃的连接一定是前removeCount个连接,所以直接使用复制即可删除
System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount);
Arrays.fill(connections, poolingCount - removeCount, poolingCount, null);
// 当前可用连接数变小
poolingCount -= removeCount;
}
keepAliveCheckCount += keepAliveCount;
// 如果设置了保活,且总连接数小于最小连接数,则需要补充
if (keepAlive && poolingCount + activeCount < minIdle) {
needFill = true;
}
} finally {
lock.unlock();
}
// 如果有要回收的连接
if (evictCount > 0) {
// 循环
for (int i = 0; i < evictCount; ++i) {
DruidConnectionHolder item = evictConnections[i];
Connection connection = item.getConnection();
// 关闭连接
JdbcUtils.close(connection);
destroyCountUpdater.incrementAndGet(this);
}
// 清空需要回收的连接集合
Arrays.fill(evictConnections, null);
}
// 如果有要进行保活的连接
if (keepAliveCount > 0) {
// 循环要保活的连接
for (int i = keepAliveCount - 1; i >= 0; --i) {
DruidConnectionHolder holer = keepAliveConnections[i];
Connection connection = holer.getConnection();
holer.incrementKeepAliveCheckCount();
boolean validate = false;
try {
// 验证链接是否有效,此时要用到配置的validationQuery来验证连接的有效性,如果没设置,就默认有效
this.validateConnection(connection);
validate = true;
} catch (Throwable error) {
if (LOG.isDebugEnabled()) {
LOG.debug("keepAliveErr", error);
}
}
boolean discard = !validate;
// 如果连接有效
if (validate) {
holer.lastKeepTimeMillis = System.currentTimeMillis();
// 重新加入连接池最左侧
boolean putOk = put(holer, 0L, true);
if (!putOk) {
discard = true;
}
}
// 如果连接无效
if (discard) {
try {
// 关闭连接
connection.close();
} catch (Exception e) {
// skip
}
lock.lock();
try {
// 记录被丢弃的连接数+1
discardCount++;
// 如果且总连接数小于最小连接数,发出空信号
if (activeCount + poolingCount <= minIdle) {
emptySignal();
}
} finally {
lock.unlock();
}
}
}
this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount);
// 清空需要保活的连接集合
Arrays.fill(keepAliveConnections, null);
}
// 如果需要补充
if (needFill) {
lock.lock();
try {
// 计算需要补充的数量,createTaskCount是使用自定义调度时的逻辑,暂时忽略
int fillCount = minIdle - (activeCount + poolingCount + createTaskCount);
// 发出空信号
for (int i = 0; i < fillCount; ++i) {
emptySignal();
}
} finally {
lock.unlock();
}
} else if (onFatalError || fatalErrorIncrement > 0) {
// 异常处理 忽略..
}
}
核心代码依然相当复杂,还是尝试总结一下
(一) 销毁任务实时性不高,销毁线程执行是一个定时任务,时间间隔可配
(二) 销毁线程只考虑数目为poolingCount
的池中可用连接,正在使用的连接不可能被销毁(其实也已不在池中)
(三) 销毁线程会从前往后循环查看所有的池中连接,主要判断是否需要销毁或者保活,主要包含如下逻辑:
- 循环前会提前计算当前可用连接超出最小限制连接的数量,为
checkCount
,这个数量其实就是线程池中多余连接的数量,而且按照容器的排序,越前面的连接越不活跃,所以前checkCount
就是多余连接,但多余连接不一定会被移除,有可能因为闲置时间(说明刚用完不久)较短而被暂时保留 - 如果当前连接闲置时间比较短,不需要进行销毁或保活测试,直接跳出循环,因为后面的连接活跃度更高
- 如果连接闲置时间比较长,比如超过了设置的最大闲置时间,或超过最小闲置时间且当前连接本身就是多余连接,就会从池中移出至待销毁的集合中
- 如果连接闲置时间比较长,超过了保活测试的设定时间(且keepAlive),就会从池中移出至待测试有效性的集合中
- 待销毁集合的连接后续会被直接关闭,待测试有效性集合的连接需要测试连接是否可用,如果不可用直接销毁,通过校验加回至连接池中
- 由于销毁了很多连接,可能导致keepAlive情况下最小连接数不够了,所以需要通过empty.signal通知创建线程补充连接
再画个示意图
用户线程
用户线程主要是去池中获取连接,上文也提到过,是从最后拿连接,重点方法takeLast
DruidConnectionHolder takeLast() throws InterruptedException, SQLException {
try {
while (poolingCount == 0) {
// 发送空信号,让创建线程创建连接
emptySignal(); // send signal to CreateThread create connection
// 增加等待线程数
notEmptyWaitThreadCount++;
// 等待非空信号
try {
notEmpty.await(); // signal by recycle or creator
} finally {
notEmptyWaitThreadCount--;
}
//...
}
} catch (InterruptedException ie) {
//...
}
// 有了可用连接
// 可用连接减一,因为要拿出用了
decrementPoolingCount();
// 取出最后一个连接
DruidConnectionHolder last = connections[poolingCount];
connections[poolingCount] = null;
// 返回
return last;
}
逻辑就是取池中最后一个连接,如果没有通知创建线程创建连接
最后
费了好大劲,基本捋明白了Druid连接池的重要代码,感觉真的很复杂
总结一下Druid的优点
- 连接的创建销毁异步执行,保证效率
- 连接池的固定最大连接数避免了连接的过度创建
- 连接池中连接的存活时间可配置,保证高并发下连接不会被回收,可重复利用
- 连接池的保活机制,可以固定维持一定数量的连接长期保留在池中,还可以定时检测连接的有效性,固定维持的连接可以在并发骤增的情况下提前预热,避免一次性建立过多连接
其实还是有很多地方并没有想太明白,而且很多结论也很难测试,如果有误,欢迎指正
共有 0 条评论