Spring Boot + TKMybatis + Druid 实现多数据源多事务

背景

在一些特定情况的业务开发中,一个应用可能需要连接多个数据源。

  • 读写分离的数据源,一个读库和一个写库,读库负责各种查询操作,写库负责各种添加、修改、删除。
  • 由于业务需要,一个需要操作 A、B 两个数据库的数据,可以是 CRUD(增删改查)操作,根据具体情况配置。

实现

eg:实现两个数据库类型数据源,mysql 和 postgresql。

配置文件内容

spring.datasource.url = jdbc:mysql://127.0.0.1:3306/testdb
spring.datasource.username = test
spring.datasource.password = test
spring.datasource.driverClassName = com.mysql.jdbc.Driver

custom.spring.datasources.postgresql.url = jdbc:postgresql://127.0.0.1:10046/testdb
custom.spring.datasources.postgresql.username = test
custom.spring.datasources.postgresql.password = test
custom.spring.datasources.postgresql.driverClassName = org.postgresql.Driver

主数据源 key

public interface DataSourceKey {
    String DEFAULT = "default";
}

动态切换数据源注解

@Documented
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface TargetDataSource {

    String value() default DataSourceKey.DEFAULT;
}

自定义数据源配置

@Data
@Configuration
@ConfigurationProperties("custom.spring")
public class CustomDataSourceProperties {

    private Map<String, DruidDataSource> datasources;
}

多数据源配置

@Configuration
@EnableConfigurationProperties({MybatisProperties.class, CustomDataSourceProperties.class})
public class DataSourceConfiguration {

    private static final String DEFAULT_DATA_SOURCE = "defaultDataSource";

    @Autowired
    private MybatisProperties mybatisProperties;

    @Bean
    @ConfigurationProperties("spring.datasource")
    public DataSource defaultDataSource() {
        return DruidDataSourceBuilder.create().build();
    }

    @Bean
    @Primary
    public DataSource dynamicDataSource(CustomDataSourceProperties customDataSourceProperties, Map<String, DataSource> dataSourceMap) {
        DynamicDataSource dynamicDataSource = new DynamicDataSource();
        // 默认数据源
        DataSource defaultDataSource = dataSourceMap.get(DataSourceConfiguration.DEFAULT_DATA_SOURCE);
        dynamicDataSource.setDefaultTargetDataSource(defaultDataSource);
        // 配置多数据源
        Map<Object, Object> datasources;
        Map<String, DruidDataSource> customDatasources = customDataSourceProperties.getDatasources();
        if (MapUtils.isNotEmpty(customDatasources)) {
            datasources = customDatasources.entrySet().stream()
                    .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        } else {
            datasources = Maps.newHashMapWithExpectedSize(1);
        }
        datasources.put(DataSourceKey.DEFAULT, defaultDataSource);
        dynamicDataSource.setTargetDataSources(datasources);
        return dynamicDataSource;
    }

    @Bean
    public SqlSessionFactory sqlSessionFactory(DataSource dynamicDataSource) throws Exception {
        SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
        sqlSessionFactoryBean.setDataSource(dynamicDataSource);
        sqlSessionFactoryBean.setMapperLocations(mybatisProperties.resolveMapperLocations());
        sqlSessionFactoryBean.setTypeAliasesPackage(mybatisProperties.getTypeAliasesPackage());
        sqlSessionFactoryBean.setConfiguration(mybatisProperties.getConfiguration());
        sqlSessionFactoryBean.setTransactionFactory(new MultiDataSourceTransactionFactory());
        return sqlSessionFactoryBean.getObject();
    }

    @Bean
    public SqlSessionTemplate sqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
        return new SqlSessionTemplate(sqlSessionFactory);
    }

    @Bean
    public PlatformTransactionManager transactionManager(DataSource dynamicDataSource) {
        return new DataSourceTransactionManager(dynamicDataSource);
    }
}

配置多数据源事务(无法解决异常情况下多数据源事务)

@Slf4j
public class MultiDataSourceTransaction implements Transaction {

    private final DataSource dataSource;
    private Connection mainConnection;
    private final Map<String, Connection> otherConnectionMap = new ConcurrentHashMap<>();
    private boolean isConnectionTransactional;
    private boolean autoCommit;

    public MultiDataSourceTransaction(DataSource dataSource) {
        this.dataSource = dataSource;
    }

    @Override
    public Connection getConnection() throws SQLException {
        String databaseldentification = DataSourceHolder.getDataSourceType();
        if (StringUtils.isEmpty(databaseldentification)) {
            databaseldentification = DataSourceKey.DEFAULT;
        }
        if (DataSourceKey.DEFAULT.equals(databaseldentification)) {
            if (Objects.nonNull(this.mainConnection)) {
                return this.mainConnection;
            } else {
                openMainConnection();
                return this.mainConnection;
            }
        } else {
            // 获取其他的数据源连接
            if (!this.otherConnectionMap.containsKey(databaseldentification)) {
                try {
                    Connection conn = this.dataSource.getConnection();
                    conn.setAutoCommit(this.autoCommit);
                    this.otherConnectionMap.put(databaseldentification, conn);
                } catch (SQLException ex) {
                    throw new CannotGetJdbcConnectionException("Could bot get JDBC otherConnection", ex);
                }
            }
            return this.otherConnectionMap.get(databaseldentification);
        }
    }

    private void openMainConnection() throws SQLException {
        this.mainConnection = DataSourceUtils.getConnection(this.dataSource);
        this.autoCommit = this.mainConnection.getAutoCommit();
        this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.mainConnection, this.dataSource);
        log.debug("JDBC mainConnection [{}] will ({}) be managed by spring", this.mainConnection, this.isConnectionTransactional ? "" : "not");
    }

    @Override
    public void commit() throws SQLException {
        if (this.mainConnection != null && this.isConnectionTransactional && !this.autoCommit) {
            log.debug("Committing JDBC mainConnection [{}]", this.mainConnection);
            this.mainConnection.commit();
        }
        for (Connection connection : this.otherConnectionMap.values()) {
            log.debug("Committing JDBC otherConnection [{}]", connection);
            connection.commit();
        }
    }

    @Override
    public void rollback() throws SQLException {
        if (Objects.nonNull(this.mainConnection) && this.isConnectionTransactional && !this.autoCommit) {
            log.debug("Rolling back JDBC mainConnection [{}]", this.mainConnection);
            this.mainConnection.rollback();
        }
        for (Connection connection : this.otherConnectionMap.values()) {
            log.debug("Rolling back JDBC otherConnection [{}]", connection);
            connection.rollback();
        }
    }

    @Override
    public void close() {
        DataSourceUtils.releaseConnection(this.mainConnection, this.dataSource);
        for (Connection connection : otherConnectionMap.values()) {
            DataSourceUtils.releaseConnection(connection, this.dataSource);
        }
    }

    @Override
    public Integer getTimeout() {
        ConnectionHolder holder = (ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);
        if (Objects.nonNull(holder) && holder.hasTimeout()) {
            return holder.getTimeToLiveInSeconds();
        }
        return null;
    }
}

public class MultiDataSourceTransactionFactory extends SpringManagedTransactionFactory {

    @Override
    public Transaction newTransaction(DataSource dataSource, TransactionIsolationLevel level, boolean autoCommit) {
        return new MultiDataSourceTransaction(dataSource);
    }
}

动态切换数据源 AOP

@Aspect
@Order(1)
@Component
public class DataSourceAspect {

    @Pointcut("execution(* com.demo..mapper.*.*(..))")
    public void pointcut() {
    }

    @Before("pointcut()")
    public void before(JoinPoint point) throws Exception {
        TargetDataSource targetDataSource = AnnotationUtils.findAnnotation(((MethodSignature) point.getSignature()).getMethod(), TargetDataSource.class);
        if (Objects.isNull(targetDataSource)) {
            targetDataSource = AnnotationUtils.findAnnotation(point.getClass(), TargetDataSource.class);
            if (Objects.isNull(targetDataSource)) {
                Object proxy = point.getThis();
                Field h = FieldUtils.getDeclaredField(proxy.getClass().getSuperclass(), "h", true);
                AopProxy aopProxy = (AopProxy) h.get(proxy);
                ProxyFactory advised = (ProxyFactory) FieldUtils.readDeclaredField(aopProxy, "advised", true);
                Class<?> targetClass = advised.getProxiedInterfaces()[0];
                targetDataSource = AnnotationUtils.findAnnotation(targetClass, TargetDataSource.class);
            }
        }
        DataSourceHolder.setDataSourceType(Objects.nonNull(targetDataSource) ? targetDataSource.value() : DataSourceKey.DEFAULT);
    }

    @After("pointcut()")
    public void after() {
        DataSourceHolder.clearDataSourceType();
    }
}
public final class DataSourceHolder {

    private static final ThreadLocal<String> HOLDER = new TransmittableThreadLocal<>();

    public static void setDataSourceType(String dataSourceType) {
        HOLDER.set(dataSourceType);
    }

    public static String getDataSourceType() {
        return HOLDER.get();
    }

    public static void clearDataSourceType() {
        HOLDER.remove();
    }
}
public class DynamicDataSource extends AbstractRoutingDataSource {

    @Override
    protected Object determineCurrentLookupKey() {
        return DataSourceHolder.getDataSourceType();
    }
}

Mapper

/**
  * BaseTKMapper 为自定义类,可忽略
  * postgresql 对应 custom.spring.datasources.postgresql
  * custom.spring.datasources.xxx,xxx 为 key
  */
@TargetDataSource("postgresql")
public interface TestMapper extends BaseTKMapper<Test> {
}
PS:
  • 不限于 TKMybatis,通用 Mybatis 都可用,@TargetDataSource 配置在 Mybaits Mapper 接口类上或者方法上即可,优先级方法 > 类,若是主数据源,可不配置注解。
  • Druid 管理界面可看到多个数据源配置。
  • 无法保证多事务提交或回滚的一致性,异常情况下有可能会导致部分数据源没提交或者回滚,需使用分布式事务解决该问题
对代码不明白地方可联系讨论或对写的不好的地方望不吝指教。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容