Apache ShardingSphere 默認的 XA 事務管理器為 Atomikos。
j2ee對分佈式事務定義了標準的規範,分別是JTA和JTS,JTA(Java Transaction API)是根據XA規範制定的java版本的接口規範,Atomikos就是jta的一種實現。JTA中約定了幾種主要的程序角色:客戶端、應用服務器、事務管理器、資源管理器。
JTA和JTS有什麼關係呢?事務管理器要和資源管理器要進行事務上下文傳播的交互,其中應用服務器和事務管理器之間也有傳播事務上下文的交互,JTS就是定義各個程序角色之間如何傳遞事務上下文的規範。
JTA從框架的角度來約定實現者需要實現的接口,JTS約定了具體程序角色應該怎樣去進行交互。
XA是X/Open CAE Specification (Distributed Transaction Processing)模型中定義的TM(Transaction Manager)與RM(Resource Manager)之間進行通信的接口。
在XA規範中,數據庫充當RM角色,應用需要充當TM的角色,即生成全局的txId,調用XAResource接口,把多個本地事務協調為全局統一的分佈式事務。
二階段提交是XA的標準實現,它將分佈式事務的提交拆分為2個階段:prepare和commit/rollback。
開啟XA全局事務後,所有子事務會按照本地默認的隔離級別鎖定資源,並記錄undo和redo日誌,然後由TM發起prepare投票,詢問所有的子事務是否可以進行提交。
當所有子事務反饋的結果為“yes”時,TM再發起commit;若其中任何一個子事務反饋的結果為“no”,TM則發起rollback。
如果在prepare階段的反饋結果為yes,而commit的過程中出現宕機等異常時,則在節點服務重啟後,可根據XA recover再次進行commit補償,以保證數據的一致性。
XA recover的作用是列出所有處於PREPARE階段的XA事務。
2PC模型中,在prepare階段需要等待所有參與子事務的反饋,因此可能造成數據庫資源鎖定時間過長,不適合併發高以及子事務生命周長較長的業務場景。
Sharding-Sphere支持基於XA的強一致性事務解決方案,可以通過SPI注入不同的第三方組件作為事務管理器實現XA協議,如Atomikos和Narayana。
在sharding-sphere中默認使用了atomikos,編程時可以直接使用,例如註解方式為:
@ShardingTransactionType(TransactionType.XA)
@Transactional(rollbackFor = Exception.class)
public void testTransactional() {
User user1 = new User(1, "faith", 12);
this.userDao.addOne(user1);
User user2 = new User(2, "belief", 12);
this.userDao.addOne(user2);
this.userDao.addOne(user2); // 這裡會報錯,因為在分佈式事務中,因此user1、user2都會插入失敗
}
如果spring項目中單獨使用atomikos,需要做數據源以及事務管理器等配置,例如:
<!-- 數據源 -->
<bean id="abstractDatasource" abstract="true"
class="com.atomikos.jdbc.AtomikosDataSourceBean" init-method="init" destroy-method="close">
<property name="xaDataSourceClassName" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource" />
<property name="poolSize" value="${datasource.poolSize}" />
<property name="minPoolSize" value="${datasource.minPoolSize}" />
<property name="maxPoolSize" value="${datasource.maxPoolSize}" />
<!--獲取連接失敗重新獲等待最大時間,在這個時間內如果有可用連接,將返回 -->
<property name="borrowConnectionTimeout" value="60" />
<!--最大獲取數據時間,如果不設置這個值,Atomikos使用默認的5分鐘,那麼在處理大批量數據讀取的時候,一旦超過5分鐘,就會拋出類似 Resultset
is close 的錯誤. -->
<property name="reapTimeout" value="20" />
<!--最大閒置時間,超過最小連接池連接的連接將將關閉 -->
<property name="maxIdleTime" value="${datasource.maxIdleTime}" />
<!--連接回收時間 -->
<property name="maintenanceInterval" value="60" />
<!--java數據庫連接池,最大可等待獲取datasouce的時間 -->
<property name="loginTimeout" value="60" />
<property name="logWriter" value="60" />
<!-- <property name="maxLifetime" value="1800000"/> -->
<property name="testQuery">
<value>select 1</value>
</property>
</bean>
<!-- atomikos事務管理器 -->
<bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"
init-method="init" destroy-method="close">
<description>UserTransactionManager</description>
<property name="forceShutdown">
<value>true</value>
</property>
</bean>
<bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
<property name="transactionTimeout" value="300" />
</bean>
<!-- spring 事務管理器,包裝了atomikos事務 -->
<bean id="jtaTransactionManager"
class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="transactionManager">
<ref bean="atomikosTransactionManager" />
</property>
<property name="userTransaction">
<ref bean="atomikosUserTransaction" />
</property>
</bean>
<!-- spring 事務模板,和spring的jta事務類似功能 -->
<bean id="transactionTemplate"
class="org.springframework.transaction.support.TransactionTemplate">
<property name="transactionManager">
<ref bean="jtaTransactionManager" />
</property>
</bean>