ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

多数据源切换和分布式事务控制

2021-03-10 23:59:24  阅读:464  来源: 互联网

标签:return String 数据源 切换 new 分布式 public dataSource


SpringBoot2.x+Druid+Mybatis-plus+Atomikos实现多数据源切换和分布式事务

前言

  • 简介
    本次项目的数据库结构为三台服务器,且每台服务器的mysql上分别都有十几个数据库。
    复杂的业务场景下,可能不仅要在各个服务器内切换不同的数据源,还要能切换到其他服务器的数据库连接上。
    所以我们需要考虑如何实现动态的切换数据源,并保证业务失败时能同时回滚所有的事务,防止数据出错。

  • 环境:
    SpringBoot + Mysql + Mybatis-plus + Druid + Atomikos

引入依赖

这里需要注意的地方就是数据库连接:mysql-connector的版本,跟后面你选择的数据库连接池(Druid或Hikari)以及构建的数据源DataSource类型有关。某些情况下用SpringBoot2.x自带的8.0.x版本的会报错,需要降低版本

<properties>
    <mysql-connector>6.0.6</mysql-connector>
    <druid.version>1.1.18</druid.version>
    <mybatis-plus.version>3.2.0</mybatis-plus.version>
</properties>

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>${mysql-connector}</version>
</dependency>
<!--数据库连接池-->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>druid-spring-boot-starter</artifactId>
    <version>${druid.version}</version>
</dependency>
<!-- mybatis-plus -->
<dependency>
    <groupId>com.baomidou</groupId>
    <artifactId>mybatis-plus-boot-starter</artifactId>
    <version>${mybatis-plus.version}</version>
</dependency>
<dependency>
    <groupId>com.baomidou</groupId>
    <artifactId>mybatis-plus-generator</artifactId>
    <version>${mybatis-plus.version}</version>
</dependency>
<!--atomikos transaction management-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
<!-- Spring AOP -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-aop</artifactId>
</dependency>

注意:对于使用mysql jdbc 6.0.x的必须更新druid到最新的1.1.6,否则druid无法支持分布式事务。感兴趣的可查看官方的release说明

yml配置

数据库结构
在这里插入图片描述在这里插入图片描述

spring:
  # 默认数据库连接
  datasource:
    username: root
    password: root
    url: jdbc:mysql://192.168.153.129:3306/local_itoyoung?useSSL=false&allowMultiQueries=true&userUnicode=true&characterEncoding=utf8&pinGlobalTxToPhysicalConnection=true
    # SpringBoot2.x默认数据库连接池,后续数据源类型只能选择MysqlXADataSource
    # type: com.zaxxer.hikari.HikariDataSource

    # Druid连接池,可根据自己选择mysql-connector-java版本选择数据源类型
    # type: com.alibaba.druid.pool.DruidDataSource
    type: com.alibaba.druid.pool.xa.DruidXADataSource
    driver-class-name: com.mysql.cj.jdbc.Driver
  # atomikos事务控制
  jta:
    atomikos:
      properties:
        enable-logging: true
        default-jta-timeout: 600000
        max-timeout: 600000
    transaction-manager-id: txManager

# 其他数据源配置
# 数据源切换的key值为:type_dbName
slave:
  datasource:
    types: local,local127
    dbNames: itoyoung,becychen

# 两台服务器中都有local_itoyoung的数据库
# 数据库名:prefix_dbName
local_datasource:
  prefix: local
  username: root
  password: root
  url: jdbc:mysql://192.168.153.129:3306/dbname?serverTimezone=GMT%2B8&useSSL=false&allowMultiQueries=true&userUnicode=true&characterEncoding=utf8&pinGlobalTxToPhysicalConnection=true

local127_datasource:
  prefix: local
  username: root
  password: root
  url: jdbc:mysql://127.0.0.1:3306/dbname?serverTimezone=GMT%2B8&useSSL=false&allowMultiQueries=true&userUnicode=true&characterEncoding=utf8&pinGlobalTxToPhysicalConnection=true

数据源配置和注册

/**
 * 采用的策略是在项目启动时就加载数据源
 * 实现 EnvironmentAware 用于读取application.yml配置
 *
 * @author itoyoung
 * @date 2020-06-12 14:34
 */
@Configuration
@Slf4j
public class DynamicDataSourceRegister implements EnvironmentAware {

    private final static String MAPPER_LOCATION = "classpath:/mapper/**/*.xml";

    private DataSource defaultDataSource;

    private static Map<Object, Object> dataSourceMap = new ConcurrentHashMap<>();

    private static SqlSessionFactory defaultSqlSessionFactory;

    private static Map<Object, SqlSessionFactory> sqlSessionFactoryMap = new ConcurrentHashMap<>();

    @Override
    public void setEnvironment(Environment env) {
        log.info("开始注册数据源");
        initDefaultDataSource(env);
        initAllDataSource(env);
    }

    private void initDefaultDataSource(Environment env) {
        String prefix = "spring.datasource.";
        String url = env.getProperty(prefix + "url");
        String username = env.getProperty(prefix + "username");
        String password = env.getProperty(prefix + "password");

        defaultDataSource = getDataSource(url, username, password, "default_datasource");
        defaultSqlSessionFactory = getSqlSessionFactory(defaultDataSource, "default_datasource");
    }

    private void initAllDataSource(Environment env) {
        String types = env.getProperty("slave.datasource.types");
        String dbNames = env.getProperty("slave.datasource.dbNames");

        String[] typeArr = types.split(",");
        String[] dbNameArr = dbNames.split(",");

        CountDownLatch countDownLatch = new CountDownLatch((dbNameArr.length + 1) * typeArr.length);
        ExecutorService executor = Executors.newFixedThreadPool(8);

        for (String type : typeArr) {
            String prefix = env.getProperty(type + "_datasource.prefix");
            String username = env.getProperty(type + "_datasource.username");
            String password = env.getProperty(type + "_datasource.password");
            String url = env.getProperty(type + "_datasource.url");

            executor.submit(() -> {
                DataSource defaultDs = getDataSource(url.replace("dbname", prefix), username, password, type);
                dataSourceMap.put(type, defaultDs);
                SqlSessionFactory defaultSqlSession = getSqlSessionFactory(defaultDs, type);
                sqlSessionFactoryMap.put(type, defaultSqlSession);
                countDownLatch.countDown();
            });

            for (String dbName : dbNameArr) {
                executor.submit(() -> {
                    String datasourceKey = type + "_" + dbName;
                    DataSource dataSource = getDataSource(url.replace("dbname", prefix + "_" + dbName), username, password, datasourceKey);
                    dataSourceMap.put(datasourceKey, dataSource);
                    SqlSessionFactory sqlSessionFactory = getSqlSessionFactory(dataSource, datasourceKey);
                    sqlSessionFactoryMap.put(datasourceKey, sqlSessionFactory);
                    countDownLatch.countDown();
                });
            }

            try {
                countDownLatch.await();
                executor.shutdown();
            } catch (InterruptedException e) {
                log.error("初始化数据源失败:", e);
            }
        }
    }

    /**
     * 构造数据源对象
     *
     * @param url           数据库连接
     * @param username      用户名
     * @param password      密码
     * @param dataSourceKey 数据源切换的key值
     * @return DataSource
     */
    private DataSource getDataSource(String url, String username, String password, String dataSourceKey) {
        /*
         *  注意:
         *      如果项目使用的alibaba的Druid的连接池且这里使用的是DruidXADataSource
         *      则需要将mysql-connector-java降低到6.0.X版本
         *      否则SpringBoot2.x自带的8.0.x版本的会报错:
         *      XAConnectionFactory: failed to create pooled connection - DBMS down or unreachable
         *
         *      如果使用MysqlXADataSource
         *      那不管是SpringBoot自带的Hikari,还是Druid
         *      都不会有问题,也不用降低mysql-connector-java版本
         *
         */
        DruidXADataSource dataSource = new DruidXADataSource();
        dataSource.setUrl(url);
        dataSource.setUsername(username);
        dataSource.setPassword(password);
        dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
        // 以下的配置信息和AtomikosDataSourceBean中的配置信息二选一即可
        // 初始化时建立物理连接的个数
        dataSource.setInitialSize(5);
        // 最小空闲连接
        dataSource.setMinIdle(5);
        // 最大连接数
        dataSource.setMaxActive(20);
        // 获取连接时最大等待时间,单位毫秒
        dataSource.setMaxWait(60000);
        // 有两个含义:
        // 1. Destroy线程会检测连接的间隔时间,如果连接空闲时间大于等于minEvictableIdleTimeMillis则关闭物理连接。
        // 2.testWhileIdle的判断依据,详细看testWhileIdle属性的说明
        dataSource.setTimeBetweenEvictionRunsMillis(60000);
        // 连接保持空闲而不被驱逐的最小时间,单位是毫秒
        dataSource.setMinEvictableIdleTimeMillis(300000);
        // 配置一旦重试多次失败后等待多久再继续重试连接,单位是毫秒
        dataSource.setTimeBetweenConnectErrorMillis(6000);
        // 用该SQL语句检查链接是否可用。如果validationQuery=null,testOnBorrow、testOnReturn、testWhileIdle都不会起作用。
        dataSource.setValidationQuery("SELECT 1 FROM DUAL");
        dataSource.setTestWhileIdle(true);
        // 申请连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。
        dataSource.setTestOnBorrow(false);
        // 归还连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。
        dataSource.setTestOnReturn(false);
        dataSource.setPoolPreparedStatements(false);
        dataSource.setMaxPoolPreparedStatementPerConnectionSize(-1);
        dataSource.setUseGlobalDataSourceStat(true);
        // 通过connectProperties属性来打开mergeSql功能;慢SQL记录
        dataSource.setConnectionProperties("druid.stat.mergeSql=true;druid.stat.slowSqlMillis=500");
        
        
        // MysqlXADataSource dataSource = new MysqlXADataSource();
        // dataSource.setUrl(url);
        // dataSource.setPassword(password);
        // dataSource.setUser(username);

        // mysql-connector-java 8.0.x版本才可选
        // try {
        //     dataSource.setPinGlobalTxToPhysicalConnection(true);
        // } catch (SQLException e) {
        //     e.printStackTrace();
        // }

        AtomikosDataSourceBean dataSourceBean = new AtomikosDataSourceBean();
        try {
            dataSourceBean.setXaDataSource(dataSource);
            dataSourceBean.setXaDataSourceClassName("com.mysql.cj.jdbc.Driver");
            dataSourceBean.setUniqueResourceName(dataSourceKey);

            //最小空闲连接数
            dataSourceBean.setMinPoolSize(10);
            //最大连接数
            dataSourceBean.setMaxPoolSize(100);
            //连接最大存活时间
            dataSourceBean.setMaxLifetime(30000);
            dataSourceBean.setBorrowConnectionTimeout(30);
            dataSourceBean.setLoginTimeout(30);
            dataSourceBean.setMaintenanceInterval(60);
            //最大空闲时间
            dataSourceBean.setMaxIdleTime(60);
            dataSourceBean.setReapTimeout(2000);
            dataSourceBean.setMaintenanceInterval(60);
            dataSourceBean.setTestQuery("SELECT 1 FROM DUAL");
            log.info("DataSource: build dataSource : " ,dataSourceKey);
            return dataSourceBean;
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 构造SqlSessionFactory
     *
     * @param dataSource 数据源
     * @return SqlSessionFactory
     */
    private SqlSessionFactory getSqlSessionFactory(DataSource dataSource, String dataSourceKey) {
        try {
            // Mybatis-plus必须用MybatisSqlSessionFactoryBean构造SqlSessionFactory
            // 否则Mybatis-plus自带的方法不可用
            MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
            sqlSessionFactoryBean.setDataSource(dataSource);
            sqlSessionFactoryBean.setVfs(SpringBootVFS.class);

            MybatisConfiguration configuration = new MybatisConfiguration();

            // 如果选择的是Mybatis,则可选择下面的配置
            // SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
            // sqlSessionFactoryBean.setDataSource(dataSource);
            // sqlSessionFactoryBean.setVfs(SpringBootVFS.class);

            // org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration();
            configuration.setMapUnderscoreToCamelCase(true);

            sqlSessionFactoryBean.setConfiguration(configuration);

            //多数据源需要手动配置分页插件
            Interceptor interceptor = new PaginationInterceptor();
            Properties properties = new Properties();
            properties.setProperty("helperDialect", "mysql");
            interceptor.setProperties(properties);
            Interceptor[] plugins = {interceptor};
            sqlSessionFactoryBean.setPlugins(plugins);

            sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(MAPPER_LOCATION));
            log.info("SqlSessionFactory: build SqlSessionFactory: ", dataSourceKey);
            return sqlSessionFactoryBean.getObject();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    /*
     * 构造数据源动态切换路由
     * ImportBeanDefinitionRegistrar接口的实现方法,通过该方法可以按照自己的方式注册bean
     * 然后再在启动类上加上注解:@Import(value = DynamicDataSourceRegister.class)
     * 但是注意,这个方式只会运行setEnvironment(env)和registerBeanDefinitions(..)方法
     * 其他的bean是不会帮你注入的
     * ps:
     *  如果在实现接口的基础上,再加上@Configuration注解
     *  setEnvironment(env)中的内容会被执行两次
     *
     * @param annotationMetadata
     * @param beanDefinitionRegistry
     */
//    @Override
//    public void registerBeanDefinitions(AnnotationMetadata annotationMetadata, BeanDefinitionRegistry beanDefinitionRegistry) {
//        // 获取所有数据源配置
//        Map<Object, Object> targetDataSources = new HashMap<Object, Object>();
//        //添加默认数据源
//        targetDataSources.put("dataSource", this.defaultDataSource);
//        DynamicDataSourceContextHolder.dataSourceIds.add("dataSource");
//        //添加其他数据源
//        targetDataSources.putAll(dataSourceMap);
//
//        DynamicDataSourceContextHolder.dataSourceIds.addAll(dataSourceMap.keySet());
//
//        //创建DynamicDataSource
//        GenericBeanDefinition dataSourceBeanDefinition = new GenericBeanDefinition();
//        dataSourceBeanDefinition.setBeanClass(DynamicRoutingDataSource.class);
//        dataSourceBeanDefinition.setSynthetic(true);
//        MutablePropertyValues mpv = dataSourceBeanDefinition.getPropertyValues();
//        mpv.addPropertyValue("defaultTargetDataSource", this.defaultDataSource);
//        mpv.addPropertyValue("targetDataSources", targetDataSources);
//        //注册 - BeanDefinitionRegistry
//        beanDefinitionRegistry.registerBeanDefinition("dataSource", dataSourceBeanDefinition);
//
//        log.info("Dynamic DataSource Registry");
//        log.info("注册数据源成功,一共注册{}个数据源", dataSourceMap.keySet().size() + 1);
//    }

    /**
     * 构造数据源动态切换路由
     *
     * @return DynamicRoutingDataSource
     */
    @Bean("dynamicRoutingDataSource")
    public DynamicRoutingDataSource buildRouting() {
        Set<Object> keySet = dataSourceMap.keySet();
        for (Object o : keySet) {
            DynamicDataSourceContextHolder.dataSourceIds.add(String.valueOf(o));
        }
        log.info("Dynamic DataSource Registry");
        log.info("注册数据源成功,一共注册{}个数据源", keySet.size() + 1);
        return new DynamicRoutingDataSource(defaultDataSource, dataSourceMap);
    }

    /**
     * 自定义sqlSessionTemplate支持多数据源的分布式事务
     *
     * @return CustomSqlSessionTemplate
     */
    @Bean("sqlSessionTemplate")
    public CustomSqlSessionTemplate sqlSessionTemplate() {
        //构造函数中SqlSessionFactory即默认对象
        CustomSqlSessionTemplate customSqlSessionTemplate = new CustomSqlSessionTemplate(defaultSqlSessionFactory);
        customSqlSessionTemplate.setTargetSqlSessionFactorys(new HashMap<>(sqlSessionFactoryMap));
        log.info("Dynamic sqlSessionTemplate Registry");
        log.info("注册sqlSessionTemplate成功,一共注册{}个sqlSessionTemplate", sqlSessionFactoryMap.size() + 1);
        return customSqlSessionTemplate;
    }

    /*
     * 创建事务管理器,如果yml文件已配置,这里就不用配置了
     *
     */
    // @Bean("transactionManager")
    // public JtaTransactionManager regTransactionManager () {
    //     UserTransactionManager userTransactionManager = new UserTransactionManager();
    //     UserTransaction userTransaction = new UserTransactionImp();
    //     return new JtaTransactionManager(userTransaction, userTransactionManager);
    // }
}

自定义SqlSessionTemplate

解决分布式事务控制下数据源无法动态切换的问题,这里在我整合shiro时出现了一个大坑,
且不仅于shiro,可能任何一个引入了某个XXXService的Configuration,
都有可能会导致服务的分布式事务回滚失败,这里涉及到Bean注入到容器中的顺序问题。
自定义的SqlSessionTemplate一定要在任何一个需要实现事务控制的Service之前注入。
可是shiro这里我尝试用@ConditionalOnBean(name = "sqlSessionTemplate")
使shirConfig加载在其注入之后,但是后面又发现登录直接失败了,因为其中的SecurityManager注入失败了…

 /**
 * from https://github.com/igool/spring-jta-mybatis
 * 自定义sqlSessionTemplate,为了支持多数据源的分布式事务
 *
 * @author itoyoung
 * @date 2020-06-12 14:46
 */
public class CustomSqlSessionTemplate extends SqlSessionTemplate {

    private final SqlSessionFactory sqlSessionFactory;
    private final ExecutorType executorType;
    private final SqlSession sqlSessionProxy;
    private final PersistenceExceptionTranslator exceptionTranslator;

    private Map<Object, SqlSessionFactory> targetSqlSessionFactorys;
    private SqlSessionFactory defaultTargetSqlSessionFactory;

    public void setTargetSqlSessionFactorys(Map<Object, SqlSessionFactory> targetSqlSessionFactorys) {
        this.targetSqlSessionFactorys = targetSqlSessionFactorys;
    }

    public void setDefaultTargetSqlSessionFactory(SqlSessionFactory defaultTargetSqlSessionFactory) {
        this.defaultTargetSqlSessionFactory = defaultTargetSqlSessionFactory;
    }

    public CustomSqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
        this(sqlSessionFactory, sqlSessionFactory.getConfiguration().getDefaultExecutorType());
    }

    public CustomSqlSessionTemplate(SqlSessionFactory sqlSessionFactory, ExecutorType executorType) {
        this(sqlSessionFactory, executorType, new MyBatisExceptionTranslator(sqlSessionFactory.getConfiguration().getEnvironment().getDataSource(), true));
    }

    public CustomSqlSessionTemplate(SqlSessionFactory sqlSessionFactory, ExecutorType executorType, PersistenceExceptionTranslator exceptionTranslator) {

        super(sqlSessionFactory, executorType, exceptionTranslator);

        this.sqlSessionFactory = sqlSessionFactory;
        this.executorType = executorType;
        this.exceptionTranslator = exceptionTranslator;

        this.sqlSessionProxy = (SqlSession) newProxyInstance(SqlSessionFactory.class.getClassLoader(), new Class[] { SqlSession.class }, new SqlSessionInterceptor());

        this.defaultTargetSqlSessionFactory = sqlSessionFactory;
    }

    @Override
    public SqlSessionFactory getSqlSessionFactory() {
        SqlSessionFactory targetSqlSessionFactory = targetSqlSessionFactorys.get(DynamicDataSourceContextHolder.getDataSourceRouterKey());
        if (targetSqlSessionFactory != null) {
            return targetSqlSessionFactory;
        } else if (defaultTargetSqlSessionFactory != null) {
            return defaultTargetSqlSessionFactory;
        } else {
            Assert.notNull(targetSqlSessionFactorys, "Property 'targetSqlSessionFactorys' or 'defaultTargetSqlSessionFactory' are required");
            Assert.notNull(defaultTargetSqlSessionFactory, "Property 'defaultTargetSqlSessionFactory' or 'targetSqlSessionFactorys' are required");
        }
        return this.sqlSessionFactory;
    }

    // 省略。。。

    /**
     * Proxy needed to route MyBatis method calls to the proper SqlSession got from Spring's Transaction Manager It also unwraps exceptions thrown by {@code Method#invoke(Object, Object...)} to pass a {@code PersistenceException} to the
     * {@code PersistenceExceptionTranslator}.
     */
    private class SqlSessionInterceptor implements InvocationHandler {
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            final SqlSession sqlSession = getSqlSession(CustomSqlSessionTemplate.this.getSqlSessionFactory(), CustomSqlSessionTemplate.this.executorType, CustomSqlSessionTemplate.this.exceptionTranslator);
            try {
                Object result = method.invoke(sqlSession, args);
                if (!isSqlSessionTransactional(sqlSession, CustomSqlSessionTemplate.this.getSqlSessionFactory())) {
                    // force commit even on non-dirty sessions because some databases require
                    // a commit/rollback before calling close()
                    sqlSession.commit(true);
                }
                return result;
            } catch (Throwable t) {
                Throwable unwrapped = unwrapThrowable(t);
                if (CustomSqlSessionTemplate.this.exceptionTranslator != null && unwrapped instanceof PersistenceException) {
                    Throwable translated = CustomSqlSessionTemplate.this.exceptionTranslator.translateExceptionIfPossible((PersistenceException) unwrapped);
                    if (translated != null) {
                        unwrapped = translated;
                    }
                }
                throw unwrapped;
            } finally {
                closeSqlSession(sqlSession, CustomSqlSessionTemplate.this.getSqlSessionFactory());
            }
        }
    }

}

数据源动态切换路由

/**
 * 数据源动态切换路由配置,只需要继承AbstractRoutingDataSource
 * 重写determineCurrentLookupKey()即可
 *
 * @author itoyoung
 * @date 2020-06-12 14:51
 */
@Slf4j
public class DynamicRoutingDataSource extends AbstractRoutingDataSource {

    @Override
    protected Object determineCurrentLookupKey() {
        String dataSourceKey = DynamicDataSourceContextHolder.getDataSourceRouterKey();
        log.info("当前数据源是: {}",dataSourceKey);
        return dataSourceKey;
    }

    public DynamicRoutingDataSource() {
    }

    /**
     * 配置好多个数据源的信息以及默认数据源信息
     *
     * @param defaultTargetDataSource 默认数据源
     * @param targetDataSources       目标数据源
     */
    public DynamicRoutingDataSource(DataSource defaultTargetDataSource, Map<Object,Object> targetDataSources) {
        super.setDefaultTargetDataSource(defaultTargetDataSource);
        super.setTargetDataSources(targetDataSources);
        super.afterPropertiesSet();
    }
}

协助切换数据源的操作类

/**
 * 数据源动态切换操作类
 *
 * @author itoyoung
 * @date 2020-06-12 15:02
 */
@Slf4j
public class DynamicDataSourceContextHolder {


    /**
     * 存储所有的数据源key
     */
    public static List<String> dataSourceIds = new ArrayList<>();

    /**
     * 线程级别的私有变量
     * <p>
     * ThreadLocal 用于提供线程局部变量,在多线程环境可以保证各个线程里的变量独立于其它线程里的变量。
     * 也就是说 ThreadLocal 可以为每个线程创建一个【单独的变量副本】,相当于线程的 private static 类型变量。
     */
    private static final ThreadLocal<String> HOLDER = new ThreadLocal<>();

    private static final ThreadLocal<String> DBNAME_HOLDER = new ThreadLocal<>();

    private static final ThreadLocal<String> SYSTEM_HOLDER = new ThreadLocal<>();

    public static String getDataSourceRouterKey() {
        return HOLDER.get();
    }

    /**
     * 获取当前db源 数据库
     *
     * @return
     */
    public static String getCurrentDataSourceDbName() {
        return DBNAME_HOLDER.get();
    }

    public static void setDataSourceRouterKey(String dataSourceRouterKey) {
        if (!containsDataSource(dataSourceRouterKey)) {
            log.info("数据源[{}]不存在,使用当前数据源>{}", dataSourceRouterKey, HOLDER.get());
            return;
        }
        if (!dataSourceRouterKey.equals(HOLDER.get())) {
            log.info("切换至{}数据源", dataSourceRouterKey);
            HOLDER.set(dataSourceRouterKey);
        }
    }

    public static void setDataSourceRouterKey(String system, String dbName) {
        String dataSourceRouterKey = system + "_" + dbName;
        setDataSourceRouterKey(dataSourceRouterKey);
    }

    /**
     * 移除数据源
     */
    public static void remove() {
        HOLDER.remove();
    }

    /**
     * 判断指定DataSrouce当前是否存在
     *
     * @param dataSourceId
     * @return
     */
    public static boolean containsDataSource(String dataSourceId) {
        return dataSourceIds.contains(dataSourceId);
    }

    static public String getRootDbNameParam() {
        return DBNAME_HOLDER.get();
    }

    /**
     * 手动切换到当前请求的组织对应的库
     *
     * @param dataSourceType 数据源类型,tn tw wms
     */
    public static void setDataSourceWithRootDbName(DataSourceType dataSourceType) {
        String dataSourceRouterKey = dataSourceType.getCode() + "_" + getRootDbNameParam();
        setDataSourceRouterKey(dataSourceRouterKey);
    }

    /**
     * 手动切换到当前请求的原始数据源
     */
    public static void setRootDataSource() {
        String dataSourceRouterKey = SYSTEM_HOLDER.get() + "_" + getRootDbNameParam();
        setDataSourceRouterKey(dataSourceRouterKey);
    }

    /**
     * 将请求中的组织参数存储在线程变量中,避免后续多线程获取不到请求参数
     *
     * @param dbName
     */
    public static void setDbNameHolder(String dbName) {
        DBNAME_HOLDER.set(dbName);
    }

    public static String getDbNameHolder() {
        return DBNAME_HOLDER.get();
    }

    /**
     * 将请求中的system参数存储在线程变量中,避免后续多线程获取不到请求参数
     *
     * @param system 服务器
     */
    public static void setSystemHolder(String system) {
        SYSTEM_HOLDER.set(system);
    }

    public static String getSystemHolder() {
        return SYSTEM_HOLDER.get();
    }
}

AOP动态切换数据源

  • 自定义数据源动态切换注解
/**
 * 切换数据注解 可以用于类或者方法级别 方法级别优先级 > 类级别
 * 
 */
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DataSource {
    /**
     * 值为自定义dataSource的key
     */
    String value() default "";

    DataSourceType dataSourceType() default DataSourceType.LOCAL;

    String dbName() default "";
}

/**
 * 数据源类型
 *
 */
@Getter
@AllArgsConstructor
public enum DataSourceType {

    LOCAL("local","本地虚拟机数据库"),
    LOCAL127("local127","本地数据库"),
    ROOT("root","当前请求默认的组织");

    private String code;
    private String value;
}
  • AOP动态切换
/**
 * 数据源动态切换操作类
 *
 * @author itoyoung
 * @date 2020-06-13 10:02
 */
@Aspect
@Component
public class DynamicDataSourceAspect {

    private static final Logger logger = LoggerFactory.getLogger(DynamicDataSourceAspect.class);

    @Pointcut("@annotation(com.itoyoung.datasource.annotation.DataSource)")
    public void dataSource() {
    }

    /**
     * 在进入加上注解的类或方法时切换到对应的数据源
     *
     * @param joinPoint
     * @param ds
     */
    @Before("@annotation(ds)")
    public void changeDataSource(JoinPoint joinPoint, DataSource ds) {
        String dataSourceRouterKey = getDataSourcRoutereKey(ds);
        if (!DynamicDataSourceContextHolder.containsDataSource(dataSourceRouterKey)) {
            //使用默认数据源或者抛出异常
            logger.info("数据源[{}]不存在,使用默认数据源>{}", dataSourceRouterKey, joinPoint.getSignature());
        } else {
            logger.info("使用数据源:{} > {}", dataSourceRouterKey, joinPoint.getSignature());
            DynamicDataSourceContextHolder.setDataSourceRouterKey(dataSourceRouterKey);
        }
    }

    /**
     * 在加上注解的类或方法运行之后移除数据源
     *
     * @param joinPoint
     * @param ds
     */
    @After("@annotation(ds)")
    public void restoreDataSource(JoinPoint joinPoint, DataSource ds) {
        // 如果数据源移除后则需要自己手动切换,看需求
        // String dataSourceRouterKey = getDataSourcRoutereKey(ds);
        // if (DynamicDataSourceContextHolder.containsDataSource(dataSourceRouterKey)) {
        //     logger.info("移除数据源:{} > {}", dataSourceRouterKey, joinPoint.getSignature());
        //     DynamicDataSourceContextHolder.remove();
        // }
    }

    private String getRootDbName() {
        return DynamicDataSourceContextHolder.getDbNameHolder();
    }

    private String getDataSourcRoutereKey(DataSource ds) {
        String dataSourceRouterKey = ds.value();
        if (StringUtil.isEmpty(dataSourceRouterKey)) {
            DataSourceType dataSourceType = ds.dataSourceType();
            String dbName = ds.dbName();
            dataSourceRouterKey = dbName.isEmpty() ? dataSourceType.getCode() : ("root".equals(dbName) ? dataSourceType.getCode() + "_" + getRootDbName() : dataSourceType.getCode() + "_" + dbName);
        }
        return dataSourceRouterKey;
    }
}

标签:return,String,数据源,切换,new,分布式,public,dataSource
来源: https://blog.csdn.net/qq_37163392/article/details/114649057

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有