多数据源跳库组件及分析
- 连接池介绍
- 多数据源使用
- 多数据源应用场景
- 多数据源配置
- spring + druid 多数据源配置
- springboot + druid 多数据源配置
- 多数据源跳库组件化
- 多数据源问题及解决办法
连接池介绍
首先说下连接池:(应该都不陌生)
数据源即DataSource, 在java中有很多对DataSource的实现, 如C3P0, Druid. 从连接池中获取某个数据库的连接, 减少创建和关闭连接带来的性能消耗。 同时基本上所有的连接池实现都有对于连接池的一些配置如 用户名密码,url, maxWait,maxActive 等等。
一个数据源管理一组相同属性的数据库的连接
由于数据源的配置在网上都有详细的说明,在这里就不重复了, 根据自己业务场景使用合适的参数。
多数据源的应用场景
在业务中往往采用1个关系型数据库如mysql.
到目前为止, 遇到过下面几种多数据源的应用场景。
- 分库分表
当业务变得越来越复杂, 数据变得越来月庞大, 即使简单的sql查询性能也不见有多好, 这时候我们可以采用分库分表的方式。 当然切分的方式有很多,比如按照业务垂直切分, 在这里不做讨论。
- 读写分离
对数据库的更新以及删除等操作和查询操作是在不同数据库进行的。
在一个数据库进行操作, 对于开发是最理想的, 拆分数据库在一定程度会增大模块的复杂性,根据自己的需求, 恰当使用多数据源。 当然最理想的情况,对于开发来讲, 即是多数据源的切换是透明的。
多数据源使用
环境 mybatis, spring
多数据源即在一个应用中配置多个不同的连接池。
对于数据源的使用是相对容易的。
由于某些架构或业务中, 有时候是要进行多个数据库的操作,这就涉及到多个数据源的创建以及数据源的切换。
在这里我先讲对于多数据源的创建, 也就是数据源是在项目启动时读取xml的配置创建数据源, 后面我会介绍如何动态创建数据源。
配置
多个 druid 数据源配置
<bean id="druiddataSource" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close">
...
这里是数据源的配置 如 user_name
</bean>
针对多数据源, 要创建多个数据源, 可以通过parent来配置,
<bean parent="druiddataSource" id="xxx">
<!-- 数据库基本信息配置 -->
<property name="url" value="${jdbc.url}" />
</bean>
<bean parent="druiddataSource" id="xxx2">
<!-- 数据库基本信息配置 -->
<property name="url" value="${jdbc.url2}" />
</bean>
<bean parent="druiddataSource" id="xxx3">
<!-- 数据库基本信息配置 -->
<property name="url" value="${jdbc.url3}" />
</bean>
... 其他数据源
以上就是多数据源的配置, 以上的配置仅是url配置不同, 其他配置相同, 如果其他连接池配置不同, 设置相应的property即可
在jdbc编程中我们知道, 获取到对应的连接池, 我们就可以调用dataSource.getConnection() 方法获取连接, 但是对于任何与数据库打交道, 我们并不需要关心连接的获取与关闭,以及对于连接的其他细节, 这时候诸如Mybatis, hibernate, jdbcTemplate 的框架就出现了, 这些框架屏蔽了底层的细节。 我们就能很容易使用controller-service-dao 3层模型来进行开发, 下面就介绍一下 使用Mybatis来动态切换数据源。
- mybatis 配置 + 切换配置
<bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
<property name="dataSource" ref="dataSource" /> <!--注意这里的dataSource-->
<!--其他mybatis配置 如mybatisLocation, 这里就不再提了-->
</bean>
<!--这里的dataSource 不能直接是druid这类的dataSource, 而是需要一个能够动态获取的dataSource, 在后面我会进行源码分析。-->
<bean class="com.xxx.datasource.DynamicDataSource" id="dataSource">
<property name="targetDataSources">
<map key-type="java.lang.String">
<entry value-ref="xxx1" key="value1"/> <!--这里的xxx1就是之前配置的数据源, 要使用几个数据源就配置几个-->
...
</map>
</property>
</bean>
那么再看一下com.xxx.datasource.DynamicDataSource这个是怎么样的呢
public class DynamicDataSource extends AbstractRoutingDataSource {
/**
* 获取线程下当前的数据源
* @return
*/
@Override
protected Object determineCurrentLookupKey() {
return DBContextHolder.getDBType();
}
}
//在这里说明 返回的Object 是跟之前设置map里的key 进行匹配, 这样才能通过key来取出,
//在切换的时候只用设置DBContextHolder.setDBType(“要的”)就可以了 ,DBContextHolder 是一个本地线程。
// 其实该DynamicDataSource也是java.sql.DataSource。 只不过它将多个数据源通过key-value的形式放在绑定。
key就是我们切换数据源的依据。
以上是spring xml+ druid的配置, 再来看看springboo的配置
druid + springboot 配置
使用springboot, 也就不用xml, 如何配置数据源呢? 如果在了解了上面的原理后,并且有一定的spring 基础, 那么配置动态数据源将变得轻松。
@Configuration
public class DevDruidConfiguration implements ApplicationContextAware{
// 默认数据源
@ConfigurationProperties("spring.datasource.druid.key1")
@Bean
public DataSource key1(){
return DruidDataSourceBuilder.create().build();
}
@Primary
@ConfigurationProperties("spring.datasource.druid.key2")
@Bean
public DataSource key2(){
return DruidDataSourceBuilder.create().build();
}
public ApplicationContext getContext() {
return context;
}
// dynamic DataSource bean 配置
@Bean
public DynamicDataSource dynamicDataSource() {
DynamicDataSource dataSource = new DynamicDataSource();
ApplicationContext c = getContext();
Map<Object,Object> target = new HashMap<>();
target.put("key1" , "key1" );
target.put("key2" , "key2" );
dataSource.setTargetDataSources(target);
dataSource.setDataSourceLookup(new BeanFactoryDataSourceLookup(c.getAutowireCapableBeanFactory()));
dataSource.setDefaultTargetDataSource("hpdes");
return dataSource;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
context = applicationContext;
}
}
mybatis 配置
@Configuration
public class MybatisConfiguration extends MybatisAutoConfiguration {
// public SqlSessionFactory sqlSessionFactory
public MybatisConfiguration(MybatisProperties properties, ObjectProvider<Interceptor[]> interceptorsProvider, ResourceLoader resourceLoader, ObjectProvider<DatabaseIdProvider> databaseIdProvider, ObjectProvider<List<ConfigurationCustomizer>> configurationCustomizersProvider) {
super(properties, interceptorsProvider, resourceLoader, databaseIdProvider, configurationCustomizersProvider);
}
@Bean("sqlSessionFactory")
public SqlSessionFactory sqlSessionFactory(@Qualifier("dynamicDataSource")DataSource dynamicDataSource) throws Exception {
return super.sqlSessionFactory(dynamicDataSource);
}
}
springbooot application.yml 配置
spring:
datasource:
druid:
key1:
url: jdbc:mysql://@mysqlUrl@:3306/db1?characterEncoding=utf-8
name: key1
key2:
url: jdbc:mysql://@mysqlUrl@:3306/db2?characterEncoding=utf-8
name: key2
initial-size: 1 # 初始化
min-idle: 1 #
...
动态数据源切换源码分析
说了这么多, 不从源码级别说,估计很多人都是蒙圈的, 但是在此之前我先概括一下, 既然是动态切换, 你首先要设置一个标识符说明要哪个数据源, 那么再获取的时候, 通过这个标识符去找对应的数据源, 而最容易找的集合就是通过Map进行存储 key是标志符号, value是DataSource. 下面就进行源码分析了
- Mybatis 管理数据源
public void setDataSource(DataSource dataSource) {
if (dataSource instanceof TransactionAwareDataSourceProxy) {
// If we got a TransactionAwareDataSourceProxy, we need to perform
// transactions for its underlying target DataSource, else data
// access code won't see properly exposed transactions (i.e.
// transactions for the target DataSource).
this.dataSource = ((TransactionAwareDataSourceProxy) dataSource).getTargetDataSource();
} else {
this.dataSource = dataSource;
}
}
这里的DataSource 就是DynamicDataSource。
- 获取相应的数据源: 从事务中获取连接
先大致说一下mybatis的调用顺序
SqlSession -> Executor -> Handler
在Executor 中调用getConnection()
protected Connection getConnection(Log statementLog) throws SQLException {
Connection connection = transaction.getConnection();
if (statementLog.isDebugEnabled()) {
return ConnectionLogger.newInstance(connection, statementLog, queryStack);
} else {
return connection;
}
}
// transaction.getConnection() 如果是SpringMannagedTransaction, 则会从DataSourceUtils中获取
private void openConnection() throws SQLException {
this.connection = DataSourceUtils.getConnection(this.dataSource);
this.autoCommit = this.connection.getAutoCommit();
this.isConnectionTransactional = isConnectionTransactional(this.connection, this.dataSource);
if (logger.isDebugEnabled()) {
logger.debug(
"JDBC Connection ["
+ this.connection
+ "] will"
+ (this.isConnectionTransactional ? " " : " not ")
+ "be managed by Spring");
}
}
// 在DataSourceUtils.getConnection(this.dataSource) 会先从本地线程中根据数据源获取连接, 见后面多数据源源码分析
若本地线程中没有, 则会调用dataSource.getConnection(); 此时这里是DynamicDataSource, 其getConnection()方法如下
@Override
public Connection getConnection() throws SQLException {
return determineTargetDataSource().getConnection();
}
// 关键来了, 在determineTargetDataSource中正是调用determineCurrentLookupKey来获取数据源的key
protected DataSource determineTargetDataSource() {
Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");
Object lookupKey = determineCurrentLookupKey(); // 当前要切换的数据源的key
DataSource dataSource = this.resolvedDataSources.get(lookupKey);
if (dataSource == null && (this.lenientFallback || lookupKey == null)) {
dataSource = this.resolvedDefaultDataSource;
}
if (dataSource == null) {
throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");
}
return dataSource;
}
对于数据源是如何切换的, 我们通过分析已经很清楚, 要注意, key的获取是根据我们的实现获取到的, 最简单的方法就是本地线程, 但是在异步或多线程调用可能就会出现问题。
以上我们创建的数据源是在项目启动时就会创建好, 并且如果再添加新的数据源, 必须修改配置, 并且重新部署应用, 这对分库的应用来说是个大麻烦。
动态切换多数据源组件化
前面我们知道切换数据源之前总是要调用DataSource.setDB("....") 来进行切换, 并且如果该调用只是某个业务逻辑的一部分, 在切换数据源后的调用还需要在切换回来, 并且还要考虑调用发生异常的情况, 为此增大了开发的易错性。 那么有什么好的方法能够避免这样的切换吗?
我初步的设计了以下的组件,不需要考虑切换的步骤, 只需要关心当前我需要切换到什么数据库和执行逻辑:
代码实现如下:
public class DataSourceService {
/**
* 跳库执行
* @param source 跳到某个租户下
* @param exec 执行的逻辑
*/
public static <E> void skip(String source, Execution<E> exec) throws Throwable {
DBContextHolder.setDBType(source, true);
try{
exec.execute();
}finally {
DBContextHolder.recover();
}
}
/**
* 跳库执行返回结果
*/
public static<E> E skipCall(String source, Execution<E> exec) throws Throwable {
DBContextHolder.setDBType(source, true);
try {
E result = exec.execute();
return result;
}finally {
DBContextHolder.recover();
}
}
}
public class DBContextHolder {
private static final ThreadLocal<String> contextHolder = new ThreadLocal<String>();
public static final ThreadLocal<Boolean> isPropagation = new ThreadLocal<Boolean>(){
@Override
protected Boolean initialValue() {
return false;
}
};
public static final ThreadLocal<SourceInfo> preSourceHolder = new ThreadLocal<>();
/**
* 跳库不传播, 在传播后调用该接口无效
* @param dbType
*/
public static void setDBType(String dbType) {
if (isPropagation.get()!=Boolean.TRUE) {
updateSourceInfo();
contextHolder.set(dbType);
}
}
/**
* 跳库, 接下来都为切换成该库
* @param dbType
* @param isProp
*/
public static void setDBType(String dbType, boolean isProp) {
updateSourceInfo();
contextHolder.set(dbType);
isPropagation.set(isProp);
}
public static String getDBType() {
return contextHolder.get();
}
public static void clearDBType() {
contextHolder.remove();
isPropagation.remove();
updateSourceInfo();
}
public static void recover(){
if (preSourceHolder.get()!=null){
contextHolder.set(preSourceHolder.get().contextHolder);
isPropagation.set(preSourceHolder.get().isPropagation);
preSourceHolder.remove();
}
}
public static boolean isPropagation(){
return isPropagation.get();
}
private static void updateSourceInfo(){
SourceInfo sourceInfo = new SourceInfo();
sourceInfo.contextHolder = contextHolder.get();
sourceInfo.isPropagation = isPropagation.get();
preSourceHolder.set(sourceInfo);
}
public static String getCurrentDB(){
return DBContextHolder.getDBType();
}
}
调用示例
Entity obj = DataSourceService.skipCall("sourceKey", new Execution<Entity>() {
@Override
public Entity execute() throws Throwable {
return serviceImpl.doSql("....."); // 业务逻辑执行
}
});
如果使用Java8, 使用lamdba 表达式
Entity obj = DataSourceService.skipCall("sourceKey", ()->{
return serviceImpl.doSql("....."); // 业务逻辑执行
});
如何使用多数据源(多情况分析)
controller 拦截跳库
往往(大部分情况下)页面的一次操作都在一个数据库中,所以使用拦截器进行跳库是不错的选择, 如果本次操作涉及多个数据库, 可以使用上述组件进行数据源切换。
拦截器大致如下
public class DataSourceIntercetor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o) throws Exception {
HttpServletRequest request = httpServletRequest;
String tenant = getDataSouceKeyFromRequest(request);
return DBContextHolder.setDB(tenant);
// 上述getDataSouceKeyFromRequest 根据自己的需求实现,如果信息保存在token中, 则解析token取出, 如果存在cookie中(并不是很安全),
则查询cookie,
}
@Override
public void postHandle(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o, ModelAndView modelAndView) throws Exception {
}
@Override
public void afterCompletion(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o, Exception e) throws Exception {
DBContextHolder.clearDBType();// 这里最好清空一下本地线程
}
}
多数据源问题考虑
在解决跳库这些问题, 对于跳库, 有些问题是我们需要注意的
- 在多数据源的情况下, 事务如何保证?
分布式事务解决方式
用于对事务性要求不高,允许数据“最终一致”
首先来看一下如果用原来的事务处理会出现什么情况?
在这里主要为==spring transaction== 和 ==mybatis== 一起使用的情况。
假如有以下2个service方法
service1.doMethod1(); // 加入向数据库1插入一条记录
service2.doMethod2(); // 向数据库2插入一条记录
在另一个service3 调用了service1.method1() 和service2.method2()。
大致代码如下:
method3(){
service1.doMethod1();
service2.doMethod2();
}
场景1: 不采用事务切面
在调用method3前不开启事务
假如method1调用失败, 由于service1是对A数据库的操作, 开启事务,则service1事务回滚, 没有什么问题。
但是method1调用成功, service1 事务提交, 然而service2调用失败,service2事务回滚, 然而service1无法事务回滚, 导致不一致性。
场景2: 无法切换数据源
即对method3采用@Transactional 注解或进行 tx:advice 切面配置
在调用method3 之前开启事务, 由于事务已经开启, 导致doMethod1和doMethod2的连接不再重新获取而是从事务中的连接池获取
使得2个方法的执行的连接都是在事务开始时获取的数据源连接。导致切换数据源失败
原因: 源码分析
我们在平时通常使用spring 的 TransactionManager, 通过aop 和 tx:advice 来进行事务管理, 很方便。
首先来看一下spring aop 事务切面核心代码:
final PlatformTransactionManager tm = determineTransactionManager(txAttr); //txAttr是PROPAGATION_REQUIRED 等事务隔离性的实体封装
final String joinpointIdentification = methodIdentification(method, targetClass); // 便于日志记录的识别号
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// target invocation exception
completeTransactionAfterThrowing(txInfo, ex); // 会回滚事务
throw ex;
}
finally {
cleanupTransactionInfo(txInfo); // 清空事务相关的状态
}
commitTransactionAfterReturning(txInfo); // 提交事务
return retVal;
}
## 在createTransactionIfNecessary中会进行事务的开始。
下面进行核心代码的
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
Object transaction = doGetTransaction(); //从resources 中获取事务
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition); // 开始事务
prepareSynchronization(status, definition);
return status;
}
}
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
implements ResourceTransactionManager, InitializingBean {
if (txObject.getConnectionHolder() == null ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = this.dataSource.getConnection(); // 从数据源中获取连接,动态数据源会根据此时的contextHolder 来获取到某个连接
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
//这一步很关键 也就是说连接池和连接会以map的形式绑定到本地线程中, //将来所有的连接都是通过连接池对应的连接,所以这里动态数据源与此时的一个连接绑定了,
//在本次事务内都是对该连接进行操作
}
}
从中看出, spring在开启事务时, 数据源(DynamicDatasource)会和当前的连接绑定, 导致以后的连接不是通过动态数据源获取,而是通过key找value, 而value永远是之前的数据源。
如果在多数据源上加上了事务, 以上方法通过DynamicDataSource似乎就不可行了。
事务思路
调用service1 开启事务, 但是方法1调用成功不能提交事务,
调用service2 开启事务, 但是方法2调用成功不能提交事务
service3 调用service1和service2 都成功, 同时提交service1和service2。
如何结局
其实多个数据源已经不能用数据库的事务来解决了, 我们其实可以把多数据源理解成分布式的架构, 多余分布式事务控制有很多解决办法, 有心的可以去网上查询相应的办法, 今天就先讲到这里。