springboot整合动态多数据源+分布式事务(亲测可用)
2021-04-26 19:28
标签:数据 actor conf vfs interface targe iss contains turn 1.导入相关的依赖 2.相关的yml配置 3利用aop原理来动态 切换数据源,相关数据源的处理网上的基本差不多,本人也是参考了很多博主得来的,至于哪些博主得就不太记得了,将就着看吧 4.事务相关的处理 多数据源切换,支持事务 多数据源事务管理器是:根据数据源的不同类型,动态获取数据库连接,而不是从原来的缓存中获取导致数据源没法切换 支持Service内多数据源切换的Factory springboot整合动态多数据源+分布式事务(亲测可用) 标签:数据 actor conf vfs interface targe iss contains turn 原文地址:https://www.cnblogs.com/itliyh/p/13251572.html
datasource:
type: com.alibaba.druid.pool.xa.DruidXADataSource
driverClassName: com.mysql.cj.jdbc.Driver
druid:
# 主库数据源
master:
url: jdbc:mysql://localhost:3306/saas_master?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=false
username: root
password:
# 从库数据源
slave:
# 从数据源开关/默认关闭
open: true
type: com.alibaba.druid.pool.xa.DruidXADataSource
driverClassName: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/saas_slave?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=false
username: root
password:
package com.zt.common.annotation;
import com.zt.common.enums.DataSourceType;
import java.lang.annotation.*;
/**
* 自定义多数据源切换注解
*
* @author lyh
*/
@Target({ ElementType.METHOD, ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface DataSource
{
/**
* 切换数据源名称
*/
public DataSourceType value() default DataSourceType.MASTER;
}
package com.zt.common.aspect;
import com.zt.common.annotation.DataSource;
import com.zt.common.datasource.DynamicDataSourceContextHolder;
import com.zt.common.utils.StringUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
/**
* 多数据源处理
*
* @author lyh
*/
@Aspect
@Order(1)
@Component
public class DataSourceAspect
{
protected Logger logger = LoggerFactory.getLogger(getClass());
@Pointcut("@annotation(com.zt.common.annotation.DataSource)"
+ "|| @within(com.zt.common.annotation.DataSource)")
public void dsPointCut()
{
}
@Around("dsPointCut()")
public Object around(ProceedingJoinPoint point) throws Throwable
{
DataSource dataSource = getDataSource(point);
if (StringUtils.isNotNull(dataSource))
{
DynamicDataSourceContextHolder.setDataSourceType(dataSource.value().name());
}
try
{
return point.proceed();
}
finally
{
// 销毁数据源 在执行方法之后
DynamicDataSourceContextHolder.clearDataSourceType();
}
}
/**
* 获取需要切换的数据源
*/
public DataSource getDataSource(ProceedingJoinPoint point)
{
MethodSignature signature = (MethodSignature) point.getSignature();
Class extends Object> targetClass = point.getTarget().getClass();
DataSource targetDataSource = targetClass.getAnnotation(DataSource.class);
if (StringUtils.isNotNull(targetDataSource))
{
return targetDataSource;
}
else
{
Method method = signature.getMethod();
DataSource dataSource = method.getAnnotation(DataSource.class);
return dataSource;
}
}
}
package com.zt.common.datasource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
/**
* 数据源切换处理
*
* @author
*/
public class DynamicDataSourceContextHolder
{
public static final Logger log = LoggerFactory.getLogger(DynamicDataSourceContextHolder.class);
/**
* 使用ThreadLocal维护变量,ThreadLocal为每个使用该变量的线程提供独立的变量副本,
* 所以每一个线程都可以独立地改变自己的副本,而不会影响其它线程所对应的副本。
*/
private static final ThreadLocal
package com.zt.common.datasource;
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
import javax.sql.DataSource;
import java.util.Map;
/**
* 动态数据源
*
* @author
*/
public class DynamicDataSource extends AbstractRoutingDataSource
{
public DynamicDataSource(DataSource defaultTargetDataSource, Map
package com.zt.common.config;
import com.zt.common.datasource.DynamicDataSource;
import com.zt.common.enums.DataSourceType;
import com.zt.common.interceptor.PrepareInterceptor;
import com.zt.common.transaction.MultiDataSourceTransactionFactory;
import com.zt.common.transaction.PackagesSqlSessionFactoryBean;
import org.apache.ibatis.plugin.Interceptor;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.mybatis.spring.boot.autoconfigure.SpringBootVFS;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Primary;
import org.springframework.core.env.Environment;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.lang.Nullable;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
* druid 配置多数据源
*
* @author lyh
*/
@Configuration
@EnableTransactionManagement //开启事务
//@MapperScan("com.zt.*.mapper")
@Import({PrepareInterceptor.class})
public class DruidMutilConfig {
@Autowired
PrepareInterceptor prepareInterceptor;
@Bean(name = "masterDataSource")
public DataSource masterDataSource(Environment env) {
String sourceName = "master";
Properties prop = build(env, "spring.datasource.druid.master.");
AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
//druid的数据库驱动换成xa的
xaDataSource.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
xaDataSource.setUniqueResourceName(sourceName);
xaDataSource.setPoolSize(5);
xaDataSource.setXaProperties(prop);
return xaDataSource;
}
@Bean(name = "slaveDataSource")
public DataSource slaveDataSource(Environment env) {
String sourceName = "slave";
Properties prop = build(env, "spring.datasource.druid.slave.");
AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
//druid的数据库驱动换成xa的
xaDataSource.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
xaDataSource.setUniqueResourceName(sourceName);
xaDataSource.setPoolSize(5);
xaDataSource.setXaProperties(prop);
return xaDataSource;
}
private Properties build(Environment env, String prefix) {
Properties prop = new Properties();
prop.put("url", env.getProperty(prefix + "url"));
prop.put("username", env.getProperty(prefix + "username"));
prop.put("password", env.getProperty(prefix + "password"));
prop.put("driverClassName", env.getProperty(prefix + "driverClassName", ""));
//这里只设置了简单的几个属性,如果想做更多的配置可以继续往下添加即可
return prop;
}
/**
* 动态数据源,在这继续添加 DataSource Bean
*/
@Bean(name = "dynamicDataSource")
@Primary
public DynamicDataSource dataSource(@Qualifier("masterDataSource") DataSource masterDataSource, @Nullable @Qualifier("slaveDataSource") DataSource slaveDataSource) {
Map
package com.zt.common.transaction;
import com.alibaba.druid.support.logging.Log;
import com.alibaba.druid.support.logging.LogFactory;
import com.zt.common.datasource.DynamicDataSourceContextHolder;
import org.apache.ibatis.transaction.Transaction;
import org.springframework.jdbc.CannotGetJdbcConnectionException;
import org.springframework.jdbc.datasource.DataSourceUtils;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import static org.apache.commons.lang3.Validate.notNull;
/**
*
package com.zt.common.transaction;
import org.apache.ibatis.session.TransactionIsolationLevel;
import org.apache.ibatis.transaction.Transaction;
import org.mybatis.spring.transaction.SpringManagedTransactionFactory;
import javax.sql.DataSource;
/**
*
package com.zt.common.transaction;
import org.apache.commons.lang3.StringUtils;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver;
import org.springframework.core.type.classreading.CachingMetadataReaderFactory;
import org.springframework.core.type.classreading.MetadataReader;
import org.springframework.core.type.classreading.MetadataReaderFactory;
import org.springframework.util.ClassUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/** 配置typeAliasesPackage支持通配符包路径扫描
* 通过继承重写包路径读取方式来实现支持通配符配置,以前的SqlSessionFactoryBean
* 不支持通配符设置包别名,所以重写该方法
* Create by lyh
*/
public class PackagesSqlSessionFactoryBean extends SqlSessionFactoryBean {
private static final Logger logger = LoggerFactory.getLogger(PackagesSqlSessionFactoryBean.class);
static final String DEFAULT_RESOURCE_PATTERN = "**/*.class";
@Override
public void setTypeAliasesPackage(String typeAliasesPackage) {
ResourcePatternResolver resolver = (ResourcePatternResolver) new PathMatchingResourcePatternResolver();
MetadataReaderFactory metadataReaderFactory = new CachingMetadataReaderFactory(resolver);
typeAliasesPackage = ResourcePatternResolver.CLASSPATH_ALL_URL_PREFIX +
ClassUtils.convertClassNameToResourcePath(typeAliasesPackage) + "/" + DEFAULT_RESOURCE_PATTERN;
//将加载多个绝对匹配的所有Resource
//将首先通过ClassLoader.getResource("META-INF")加载非模式路径部分
//然后进行遍历模式匹配
try {
List
package com.zt.common.transaction;
import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.jta.JtaTransactionManager;
import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;
/** 分布式事务管理器
* 多数据源操作发生异常时,让多数据源的事务进行同步回滚
* Create by lyh
*/
@Configuration
public class XATransactionManagerConfig {
@Bean(name = "userTransaction")
public UserTransaction userTransaction() throws Throwable {
UserTransactionImp userTransactionImp = new UserTransactionImp();
userTransactionImp.setTransactionTimeout(10000);
return userTransactionImp;
}
@Bean(name = "atomikosTransactionManager")
public TransactionManager atomikosTransactionManager() throws Throwable {
UserTransactionManager userTransactionManager = new UserTransactionManager();
userTransactionManager.setForceShutdown(false);
return userTransactionManager;
}
@Bean(name = "transactionManager")
@DependsOn({ "userTransaction", "atomikosTransactionManager" })
public PlatformTransactionManager transactionManager() throws Throwable {
return new JtaTransactionManager(userTransaction(),atomikosTransactionManager());
}
}
package com.zt.common.transaction;
import com.alibaba.druid.support.logging.Log;
import com.alibaba.druid.support.logging.LogFactory;
import com.zt.common.datasource.DynamicDataSourceContextHolder;
import org.apache.ibatis.transaction.Transaction;
import org.springframework.jdbc.CannotGetJdbcConnectionException;
import org.springframework.jdbc.datasource.DataSourceUtils;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import static org.apache.commons.lang3.Validate.notNull;
/**
* 多数据源切换,支持事务
* 多数据源事务管理器是:根据数据源的不同类型,动态获取数据库连接,而不是从原来的缓存中获取导致数据源没法切换
* @author lyh
*/
public class MultiDataSourceTransaction implements Transaction {
private static final Log LOGGER = LogFactory.getLog(MultiDataSourceTransaction.class);
private final DataSource dataSource;
private Connection mainConnection;
private String mainDatabaseIdentification;
private ConcurrentMap
private boolean isConnectionTransactional;
private boolean autoCommit;
public MultiDataSourceTransaction(DataSource dataSource) {
notNull(dataSource, "No DataSource specified");
this.dataSource = dataSource;
otherConnectionMap = new ConcurrentHashMap();
mainDatabaseIdentification= DynamicDataSourceContextHolder.getDateSoureType();
}
/**
* 开启事务处理方法
*/
@Override
public Connection getConnection() throws SQLException {
String databaseIdentification = DynamicDataSourceContextHolder.getDateSoureType();
if (null==databaseIdentification||databaseIdentification.equals(mainDatabaseIdentification)) {
if (mainConnection != null) return mainConnection;
else {
openMainConnection();
mainDatabaseIdentification =databaseIdentification;
return mainConnection;
}
} else {
if (!otherConnectionMap.containsKey(databaseIdentification)) {
try {
Connection conn = dataSource.getConnection();
otherConnectionMap.put(databaseIdentification, conn);
} catch (SQLException ex) {
throw new CannotGetJdbcConnectionException("Could not get JDBC Connection", ex);
}
}
return otherConnectionMap.get(databaseIdentification);
}
}
private void openMainConnection() throws SQLException {
this.mainConnection = DataSourceUtils.getConnection(this.dataSource);
this.autoCommit = this.mainConnection.getAutoCommit();
this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.mainConnection, this.dataSource);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"JDBC Connection ["
+ this.mainConnection
+ "] will"
+ (this.isConnectionTransactional ? " " : " not ")
+ "be managed by Spring");
}
}
/**
* 提交处理方法
*/
@Override
public void commit() throws SQLException {
if (this.mainConnection != null && !this.isConnectionTransactional && !this.autoCommit) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Committing JDBC Connection [" + this.mainConnection + "]");
}
this.mainConnection.commit();
for (Connection connection : otherConnectionMap.values()) {
connection.commit();
}
}
}
/**
* 回滚处理方法
*/
@Override
public void rollback() throws SQLException {
if (this.mainConnection != null && !this.isConnectionTransactional && !this.autoCommit) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Rolling back JDBC Connection [" + this.mainConnection + "]");
}
this.mainConnection.rollback();
for (Connection connection : otherConnectionMap.values()) {
connection.rollback();
}
}
}
/**
* 关闭处理方法
*/
@Override
public void close() throws SQLException {
DataSourceUtils.releaseConnection(this.mainConnection, this.dataSource);
for (Connection connection : otherConnectionMap.values()) {
DataSourceUtils.releaseConnection(connection, this.dataSource);
}
}
@Override
public Integer getTimeout() throws SQLException {
return null;
}
}
文章标题:springboot整合动态多数据源+分布式事务(亲测可用)
文章链接:http://soscw.com/index.php/essay/79888.html