背景
在一些特定情况的业务开发中,一个应用可能需要连接多个数据源。
- 读写分离的数据源,一个读库和一个写库,读库负责各种查询操作,写库负责各种添加、修改、删除。
- 由于业务需要,一个需要操作 A、B 两个数据库的数据,可以是 CRUD(增删改查)操作,根据具体情况配置。
实现
eg:实现两个数据库类型数据源,mysql 和 postgresql。
配置文件内容
spring.datasource.url = jdbc:mysql://127.0.0.1:3306/testdb
spring.datasource.username = test
spring.datasource.password = test
spring.datasource.driverClassName = com.mysql.jdbc.Driver
custom.spring.datasources.postgresql.url = jdbc:postgresql://127.0.0.1:10046/testdb
custom.spring.datasources.postgresql.username = test
custom.spring.datasources.postgresql.password = test
custom.spring.datasources.postgresql.driverClassName = org.postgresql.Driver
主数据源 key
public interface DataSourceKey {
String DEFAULT = "default";
}
动态切换数据源注解
@Documented
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface TargetDataSource {
String value() default DataSourceKey.DEFAULT;
}
自定义数据源配置
@Data
@Configuration
@ConfigurationProperties("custom.spring")
public class CustomDataSourceProperties {
private Map<String, DruidDataSource> datasources;
}
多数据源配置
@Configuration
@EnableConfigurationProperties({MybatisProperties.class, CustomDataSourceProperties.class})
public class DataSourceConfiguration {
private static final String DEFAULT_DATA_SOURCE = "defaultDataSource";
@Autowired
private MybatisProperties mybatisProperties;
@Bean
@ConfigurationProperties("spring.datasource")
public DataSource defaultDataSource() {
return DruidDataSourceBuilder.create().build();
}
@Bean
@Primary
public DataSource dynamicDataSource(CustomDataSourceProperties customDataSourceProperties, Map<String, DataSource> dataSourceMap) {
DynamicDataSource dynamicDataSource = new DynamicDataSource();
// 默认数据源
DataSource defaultDataSource = dataSourceMap.get(DataSourceConfiguration.DEFAULT_DATA_SOURCE);
dynamicDataSource.setDefaultTargetDataSource(defaultDataSource);
// 配置多数据源
Map<Object, Object> datasources;
Map<String, DruidDataSource> customDatasources = customDataSourceProperties.getDatasources();
if (MapUtils.isNotEmpty(customDatasources)) {
datasources = customDatasources.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
} else {
datasources = Maps.newHashMapWithExpectedSize(1);
}
datasources.put(DataSourceKey.DEFAULT, defaultDataSource);
dynamicDataSource.setTargetDataSources(datasources);
return dynamicDataSource;
}
@Bean
public SqlSessionFactory sqlSessionFactory(DataSource dynamicDataSource) throws Exception {
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(dynamicDataSource);
sqlSessionFactoryBean.setMapperLocations(mybatisProperties.resolveMapperLocations());
sqlSessionFactoryBean.setTypeAliasesPackage(mybatisProperties.getTypeAliasesPackage());
sqlSessionFactoryBean.setConfiguration(mybatisProperties.getConfiguration());
sqlSessionFactoryBean.setTransactionFactory(new MultiDataSourceTransactionFactory());
return sqlSessionFactoryBean.getObject();
}
@Bean
public SqlSessionTemplate sqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
return new SqlSessionTemplate(sqlSessionFactory);
}
@Bean
public PlatformTransactionManager transactionManager(DataSource dynamicDataSource) {
return new DataSourceTransactionManager(dynamicDataSource);
}
}
配置多数据源事务(无法解决异常情况下多数据源事务)
@Slf4j
public class MultiDataSourceTransaction implements Transaction {
private final DataSource dataSource;
private Connection mainConnection;
private final Map<String, Connection> otherConnectionMap = new ConcurrentHashMap<>();
private boolean isConnectionTransactional;
private boolean autoCommit;
public MultiDataSourceTransaction(DataSource dataSource) {
this.dataSource = dataSource;
}
@Override
public Connection getConnection() throws SQLException {
String databaseldentification = DataSourceHolder.getDataSourceType();
if (StringUtils.isEmpty(databaseldentification)) {
databaseldentification = DataSourceKey.DEFAULT;
}
if (DataSourceKey.DEFAULT.equals(databaseldentification)) {
if (Objects.nonNull(this.mainConnection)) {
return this.mainConnection;
} else {
openMainConnection();
return this.mainConnection;
}
} else {
// 获取其他的数据源连接
if (!this.otherConnectionMap.containsKey(databaseldentification)) {
try {
Connection conn = this.dataSource.getConnection();
conn.setAutoCommit(this.autoCommit);
this.otherConnectionMap.put(databaseldentification, conn);
} catch (SQLException ex) {
throw new CannotGetJdbcConnectionException("Could bot get JDBC otherConnection", ex);
}
}
return this.otherConnectionMap.get(databaseldentification);
}
}
private void openMainConnection() throws SQLException {
this.mainConnection = DataSourceUtils.getConnection(this.dataSource);
this.autoCommit = this.mainConnection.getAutoCommit();
this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.mainConnection, this.dataSource);
log.debug("JDBC mainConnection [{}] will ({}) be managed by spring", this.mainConnection, this.isConnectionTransactional ? "" : "not");
}
@Override
public void commit() throws SQLException {
if (this.mainConnection != null && this.isConnectionTransactional && !this.autoCommit) {
log.debug("Committing JDBC mainConnection [{}]", this.mainConnection);
this.mainConnection.commit();
}
for (Connection connection : this.otherConnectionMap.values()) {
log.debug("Committing JDBC otherConnection [{}]", connection);
connection.commit();
}
}
@Override
public void rollback() throws SQLException {
if (Objects.nonNull(this.mainConnection) && this.isConnectionTransactional && !this.autoCommit) {
log.debug("Rolling back JDBC mainConnection [{}]", this.mainConnection);
this.mainConnection.rollback();
}
for (Connection connection : this.otherConnectionMap.values()) {
log.debug("Rolling back JDBC otherConnection [{}]", connection);
connection.rollback();
}
}
@Override
public void close() {
DataSourceUtils.releaseConnection(this.mainConnection, this.dataSource);
for (Connection connection : otherConnectionMap.values()) {
DataSourceUtils.releaseConnection(connection, this.dataSource);
}
}
@Override
public Integer getTimeout() {
ConnectionHolder holder = (ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);
if (Objects.nonNull(holder) && holder.hasTimeout()) {
return holder.getTimeToLiveInSeconds();
}
return null;
}
}
public class MultiDataSourceTransactionFactory extends SpringManagedTransactionFactory {
@Override
public Transaction newTransaction(DataSource dataSource, TransactionIsolationLevel level, boolean autoCommit) {
return new MultiDataSourceTransaction(dataSource);
}
}
动态切换数据源 AOP
@Aspect
@Order(1)
@Component
public class DataSourceAspect {
@Pointcut("execution(* com.demo..mapper.*.*(..))")
public void pointcut() {
}
@Before("pointcut()")
public void before(JoinPoint point) throws Exception {
TargetDataSource targetDataSource = AnnotationUtils.findAnnotation(((MethodSignature) point.getSignature()).getMethod(), TargetDataSource.class);
if (Objects.isNull(targetDataSource)) {
targetDataSource = AnnotationUtils.findAnnotation(point.getClass(), TargetDataSource.class);
if (Objects.isNull(targetDataSource)) {
Object proxy = point.getThis();
Field h = FieldUtils.getDeclaredField(proxy.getClass().getSuperclass(), "h", true);
AopProxy aopProxy = (AopProxy) h.get(proxy);
ProxyFactory advised = (ProxyFactory) FieldUtils.readDeclaredField(aopProxy, "advised", true);
Class<?> targetClass = advised.getProxiedInterfaces()[0];
targetDataSource = AnnotationUtils.findAnnotation(targetClass, TargetDataSource.class);
}
}
DataSourceHolder.setDataSourceType(Objects.nonNull(targetDataSource) ? targetDataSource.value() : DataSourceKey.DEFAULT);
}
@After("pointcut()")
public void after() {
DataSourceHolder.clearDataSourceType();
}
}
public final class DataSourceHolder {
private static final ThreadLocal<String> HOLDER = new TransmittableThreadLocal<>();
public static void setDataSourceType(String dataSourceType) {
HOLDER.set(dataSourceType);
}
public static String getDataSourceType() {
return HOLDER.get();
}
public static void clearDataSourceType() {
HOLDER.remove();
}
}
public class DynamicDataSource extends AbstractRoutingDataSource {
@Override
protected Object determineCurrentLookupKey() {
return DataSourceHolder.getDataSourceType();
}
}
Mapper
/**
* BaseTKMapper 为自定义类,可忽略
* postgresql 对应 custom.spring.datasources.postgresql
* custom.spring.datasources.xxx,xxx 为 key
*/
@TargetDataSource("postgresql")
public interface TestMapper extends BaseTKMapper<Test> {
}
PS:
- 不限于 TKMybatis,通用 Mybatis 都可用,
@TargetDataSource
配置在 Mybaits Mapper 接口类上或者方法上即可,优先级方法 > 类
,若是主数据源,可不配置注解。 - Druid 管理界面可看到多个数据源配置。
- 无法保证多事务提交或回滚的一致性,异常情况下有可能会导致部分数据源没提交或者回滚,需使用分布式事务解决该问题