開發與維運

atomikos多數據源配置-在工作流(activiti)分庫時的事務管理實戰

配置多個數據源

yml配置兩個數據源, act和business:

datasource:
    act:
      jdbcUrl: jdbc:mysql://localhost:3306/lmwy_product_act?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai
      username: root
      password: 123456
      driverClassName: com.mysql.jdbc.Driver
    business:
      jdbc-url: jdbc:mysql://localhost:3306/lmwy_product?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai
      username: root
      password: 123456
      driver-class-name: com.mysql.jdbc.Driver

分別對應到兩個數據源的配置:

package com.zhirui.lmwy.flow.config;

import org.activiti.spring.boot.AbstractProcessEngineAutoConfiguration;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

import javax.sql.DataSource;


/**
 *  多數據源配置類
 *      1.activiti數據庫連接池
 *          默認, 我們也無法去修改源碼故默認
 *      2.工作流業務數據庫連接池
 *          明確指定 businessDataSource
 */
@Configuration
public class FlowDatasourceConfig extends AbstractProcessEngineAutoConfiguration {
    @Bean
    @Primary
    @ConfigurationProperties(prefix = "spring.datasource.act")
    @Qualifier("activitiDataSource")
    public DataSource activitiDataSource() {
        DataSource build = DataSourceBuilder.create().build();
        return build;
    }

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.business")
    @Qualifier("businessDataSource")
    public DataSource businessDataSource() {
        DataSource build = DataSourceBuilder.create().build();
        return build;
    }
}

業務的springdata配置和事物管理器配置

package com.zhirui.lmwy.flow.config;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.domain.EntityScan;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.env.Environment;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;
import java.util.HashMap;

/**
 * 需要在配置類的上面加上@EnableJpaRepositories(basePackages={"dao層對應的包路徑"}),這樣jpa的dao層就注入進來了。結果啟動spring boot 時發現,又有 Not a managed type: class ******的錯誤,經查詢發現少了jpa entity路徑的配置,在配置類的頭部加上標記:@EntityScan("entity對應的包路徑")
 */
@Configuration
@EnableJpaRepositories(
        basePackages = {"com.zhirui.lmwy.flow.dao"},//代理的dao接口所在的包
        entityManagerFactoryRef = "flowEntityManager",
        transactionManagerRef = "flowTransactionManager"
)
public class JpaRepositoriesConfig {

    @Autowired
    private Environment env;

    @Autowired
    @Qualifier("businessDataSource")
    private DataSource businessDataSource;

    /**
     * 創建entityManagerFactory工廠
     */
    @Bean
    @Primary
    public LocalContainerEntityManagerFactoryBean flowEntityManager() {
        LocalContainerEntityManagerFactoryBean em = new LocalContainerEntityManagerFactoryBean();
        em.setDataSource(businessDataSource);
        //配置掃描的實體類包 ,否則報錯:No persistence units parsed from {classpath*:META-INF/persistence.xml}
        em.setPackagesToScan(new String[]{"com.zhirui.lmwy.flow.entity"});
        HibernateJpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter();
        em.setJpaVendorAdapter(vendorAdapter);
        HashMap<String, Object> properties = new HashMap<>();
        // application.yaml配置文件的ddl-auto的值
//      properties.put("hibernate.hbm2ddl.auto", env.getProperty("hibernate.hbm2ddl.auto"));
        properties.put("hibernate.show_sql", "true");
        properties.put("hibernate.format_sql", "true");
        // application.yaml配置文件的database-platform的值
//      properties.put("hibernate.dialect", "org.hibernate.dialect.MySQLDialect");
        properties.put("hibernate.implicit_naming_strategy", "org.springframework.boot.orm.jpa.hibernate.SpringImplicitNamingStrategy");
        properties.put("hibernate.physical_naming_strategy", "org.springframework.boot.orm.jpa.hibernate.SpringPhysicalNamingStrategy");
        em.setJpaPropertyMap(properties);
        return em;
    }

    /**
     * 創建事務管理器
     */
    @Primary
    @Bean
    public PlatformTransactionManager flowTransactionManager() {
        JpaTransactionManager transactionManager = new JpaTransactionManager();
        transactionManager.setEntityManagerFactory(flowEntityManager().getObject());
        return transactionManager;
    }
}

activiti中使用act數據源

package com.zhirui.lmwy.flow.config;

import lombok.AllArgsConstructor;
import org.activiti.spring.SpringProcessEngineConfiguration;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;

/**
 * Activiti 配置
 */
@Configuration
@AllArgsConstructor
public class ActivitiConfig {

    private final DataSource dataSource;

    @Autowired
    @Qualifier("activitiDataSource")
    private DataSource activitiDataSource;

    @Bean
    public SpringProcessEngineConfiguration getProcessEngineConfiguration() {
        SpringProcessEngineConfiguration config =
                new SpringProcessEngineConfiguration();
        config.setDataSource(activitiDataSource);
        config.setTransactionManager(activitiTransactionManager());
        return config;
    }

    @Bean
    public DataSourceTransactionManager activitiTransactionManager() {
        DataSourceTransactionManager transactionManager = new DataSourceTransactionManager();
        transactionManager.setDataSource(activitiDataSource);
        return transactionManager;
    }
}

事物管理問題

service方法中只能一個生效,business的事務管理器是"flowTransactionManager",且配置了@Primary組件,所以下面的方法中使用的數據源就是"flowTransactionManager",那麼針對數據源act的操作將無法回滾。

@Override
@Transactional(rollbackFor = Exception.class)
public void commit(FlowTaskInstance flowTaskInstance, String tenantId) throws Exception {
    針對數據源act的操作
        針對數據源business的操作
        ...

可以將@Transactional(rollbackFor = Exception.class) 指定事務管理器名稱:

@Transactional(rollbackFor = Exception.class, transactionManager = "activitiTransactionManager")

那麼方法上使用事物管理器為“activitiTransactionManager”, 但是針對數據源business的操作也將無法回滾。

我們需要一種分佈式事物管理器。

* 如果是單應用單節點服務的多數據源事務,可以採用下午的 atomikos實現分佈式市委 的方案。

* 如果是微服務架構,那麼直接在改基礎上整合seata即可!

atomikos實現分佈式事務

先了解下spring中事務的管理器

PlatformTransactionManager頂級接口定義了最核心的事務管理方法,下面一層是AbstractPlatformTransactionManager抽象類,實現了PlatformTransactionManager接口的方法並定義了一些抽象方法,供子類拓展。最下面一層是2個經典事務管理器:

1.DataSourceTransactionmanager: 即本地單資源事務管理器,也是spring默認的事務管理器。

2.JtaTransactionManager: 即多資源事務管理器(又叫做分佈式事務管理器),其實現了JTA規範,使用XA協議進行兩階段提交。

3. atomikos是JTA規範的具體技術,比較火和流行。

pom配置

<!-- 分佈式事務管理 參考:https://blog.csdn.net/qq_35387940/article/details/103474353 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-aop</artifactId>
</dependency>

yaml配置

#datasource
spring:
  jta:
    enabled: true
    atomikos:
      datasource:
        act:
          xa-properties.url: jdbc:mysql://localhost:3306/lmwy_product_act?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai&pinGlobalTxToPhysicalConnection=true
          xa-properties.user: root
          xa-properties.password: 123456
          xa-data-source-class-name: com.mysql.jdbc.jdbc2.optional.MysqlXADataSource
          unique-resource-name: act
          max-pool-size: 10
          min-pool-size: 1
          max-lifetime: 10000
          borrow-connection-timeout: 10000
        business:
          xa-properties.url: jdbc:mysql://localhost:3306/lmwy_product?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai&pinGlobalTxToPhysicalConnection=true
          xa-properties.user: root
          xa-properties.password: 123456
          xa-data-source-class-name: com.mysql.jdbc.jdbc2.optional.MysqlXADataSource
          unique-resource-name: business
          max-pool-size: 10
          min-pool-size: 1
          max-lifetime: 10000
          borrow-connection-timeout: 10000

配置多數據源和事務管理器

package com.zhirui.lmwy.flow.config;

import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import org.activiti.spring.boot.AbstractProcessEngineAutoConfiguration;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.transaction.jta.JtaTransactionManager;

import javax.sql.DataSource;
import javax.transaction.SystemException;
import javax.transaction.UserTransaction;


/**
 *  多數據源配置類
 *      1.activiti數據庫連接池
 *          默認
 *      2.工作流業務數據庫連接池
 *          明確指定 businessDataSource
 */
@Configuration
public class AtomikosConfig extends AbstractProcessEngineAutoConfiguration {

    @Primary
    @Bean(name = "actDatasource")
    @Qualifier("actDatasource")
    @ConfigurationProperties(prefix="spring.jta.atomikos.datasource.act")
    public DataSource actDatasource() {
        return new AtomikosDataSourceBean();
    }

    @Bean(name = "businessDatasource")
    @Qualifier("businessDatasource")
    @ConfigurationProperties(prefix="spring.jta.atomikos.datasource.business")
    public DataSource businessDatasource() {
        return new AtomikosDataSourceBean();
    }

    @Bean("jtaTransactionManager")
    @Primary
    public JtaTransactionManager activitiTransactionManager() throws SystemException {
        UserTransactionManager userTransactionManager = new UserTransactionManager();
        UserTransaction userTransaction = new UserTransactionImp();
        return new JtaTransactionManager(userTransaction, userTransactionManager);
    }
}

業務jpa綁定數據源

package com.zhirui.lmwy.flow.config;

import org.hibernate.engine.transaction.jta.platform.internal.AtomikosJtaPlatform;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.domain.EntityScan;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.env.Environment;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;
import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;
import java.util.HashMap;
import java.util.Map;

/**
 * 需要在配置類的上面加上@EnableJpaRepositories(basePackages={"dao層對應的包路徑"}),這樣jpa的dao層就注入進來了。結果啟動spring boot 時發現,又有 Not a managed type: class ******的錯誤,經查詢發現少了jpa entity路徑的配置,在配置類的頭部加上標記:@EntityScan("entity對應的包路徑")
 */
@Configuration
@EnableJpaRepositories(
        basePackages = {"com.zhirui.lmwy.flow.dao"},//代理的dao接口所在的包
        entityManagerFactoryRef = "flowEntityManager",
        transactionManagerRef = "jtaTransactionManager" //指定jta的事務管理器
)
public class JpaRepositoriesConfig {

    @Autowired
    private Environment env;

    @Autowired
    @Qualifier("businessDatasource")
    private DataSource businessDataSource;

    /**
     * 創建entityManagerFactory工廠
     */
    @Bean
//    @Primary
    public LocalContainerEntityManagerFactoryBean flowEntityManager() {
        // *** jta 事務管理 ***
//        AtomikosJtaPlatform.setTransactionManager(transactionManager);
//        AtomikosJtaPlatform.setUserTransaction(userTransaction);

        LocalContainerEntityManagerFactoryBean em = new LocalContainerEntityManagerFactoryBean();

        // *** jta datasource ***
        em.setJtaDataSource(businessDataSource);
//        em.setDataSource(businessDataSource);
        // 配置掃描的實體類包 ,否則報錯:No persistence units parsed from {classpath*:META-INF/persistence.xml}
        em.setPackagesToScan(new String[]{"com.zhirui.lmwy.flow.entity"});
        HibernateJpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter();
        em.setJpaVendorAdapter(vendorAdapter);
        HashMap<String, Object> properties = new HashMap<>();

        // *** jta datasource ***
        properties.put("hibernate.transaction.jta.platform", AtomikosJtaPlatform.class.getName());
        properties.put("javax.persistence.transactionType", "JTA");

        // application.yaml配置文件的ddl-auto的值
//      properties.put("hibernate.hbm2ddl.auto", env.getProperty("hibernate.hbm2ddl.auto"));
        properties.put("hibernate.show_sql", "true");
        properties.put("hibernate.format_sql", "true");
        // application.yaml配置文件的database-platform的值
//      properties.put("hibernate.dialect", "org.hibernate.dialect.MySQLDialect");
        properties.put("hibernate.implicit_naming_strategy", "org.springframework.boot.orm.jpa.hibernate.SpringImplicitNamingStrategy");
        properties.put("hibernate.physical_naming_strategy", "org.springframework.boot.orm.jpa.hibernate.SpringPhysicalNamingStrategy");
        em.setJpaPropertyMap(properties);
        return em;
    }

}

activiti綁定數據源

package com.zhirui.lmwy.flow.config;

import lombok.AllArgsConstructor;
import org.activiti.spring.SpringProcessEngineConfiguration;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.transaction.jta.JtaTransactionManager;

import javax.sql.DataSource;

/**
 * Activiti 配置
 */
@Configuration
@AllArgsConstructor
public class ActivitiConfig {

    @Autowired
    @Qualifier("actDatasource")
    private DataSource activitiDataSource;

    @Bean
    @Primary
    public SpringProcessEngineConfiguration getProcessEngineConfiguration(JtaTransactionManager jtaTransactionManager) {
        SpringProcessEngineConfiguration config =
                new SpringProcessEngineConfiguration();
        config.setDataSource(activitiDataSource);
        config.setTransactionManager(jtaTransactionManager);
        return config;
    }

}

測試:

執行任務提交方法,報錯後都會進行回滾

public void commit(){
    // 操作1:activiti執行任務,用的act數據源
    taskService.complete(task.getId());
    ...
    // 操作2:更新業務流程對象,保存業務對象,用的business數據源
    flowInstanceDao.save(flowInstance);
    ...
    int a = 100 / 0;
}

參考:

http://www.manongjc.com/detail/6-anzuqtksygeselj.html

https://www.cnblogs.com/xkzhangsanx/p/11218453.html

https://www.jianshu.com/p/099c0850ba16

Leave a Reply

Your email address will not be published. Required fields are marked *