Shardingsphere整合Atomikos对XA分布式事务的支持
Apache ShardingSphere 是一套开源的分布式数据库中间件解决方案组成的生态圈,它由 JDBC、Proxy 和 Sidecar(规划中)这 3 款相互独立,却又能够混合部署配合使用的产品组成。 它们均提供标准化的数据分片、分布式事务和数据库治理功能,可适用于如 Java 同构、异构语言、云原生等各种多样化的应用场景。
ShardingSphere 已于2020年4月16日成为 Apache 软件基金会的顶级项目。
咋们话不多,接上篇,我们直接进入正题。
Atomikos简单介绍
Atomikos(https://www.atomikos.com/),其实是一家公司的名字,提供了基于JTA规范的XA分布式事务TM的实现
。其旗下最著名的产品就是事务管理器。产品分两个版本:
TransactionEssentials:开源的免费产品;
ExtremeTransactions:上商业版,需要收费。
这两个产品的关系如下图所示:
ExtremeTransactions在TransactionEssentials的基础上额外提供了以下功能(重要的):
支持TCC:这是一种柔性事务
支持通过RMI、IIOP、SOAP这些远程过程调用技术,进行事务传播。
事务日志云存储,云端对事务进行恢复,并且提供了完善的管理后台。
org.apache.shardingsphere.transaction.xa.XAShardingTransactionManager详解
我们简单的来回顾下org.apache.shardingsphere.transaction.spi.ShardingTransactionManager
public interface ShardingTransactionManager extends AutoCloseable {
/**
* Initialize sharding transaction manager.
*
* @param databaseType database type
* @param resourceDataSources resource data sources
*/
void init(DatabaseType databaseType, Collection<ResourceDataSource> resourceDataSources);
/**
* Get transaction type.
*
* @return transaction type
*/
TransactionType getTransactionType();
/**
* Judge is in transaction or not.
*
* @return in transaction or not
*/
boolean isInTransaction();
/**
* Get transactional connection.
*
* @param dataSourceName data source name
* @return connection
* @throws SQLException SQL exception
*/
Connection getConnection(String dataSourceName) throws SQLException;
/**
* Begin transaction.
*/
void begin();
/**
* Commit transaction.
*/
void commit();
/**
* Rollback transaction.
*/
void rollback();
}
我们重点县关注init
方法,从它的命名,你就应该能够看出来,这是整个框架的初始化方法,让我们来看看它是如何进行初始化的。
private final Map<String, XATransactionDataSource> cachedDataSources = new HashMap<>();
private final XATransactionManager xaTransactionManager = XATransactionManagerLoader.getInstance().getTransactionManager();
@Override
public void init(final DatabaseType databaseType, final Collection<ResourceDataSource> resourceDataSources) {
for (ResourceDataSource each : resourceDataSources) {
cachedDataSources.put(each.getOriginalName(), new XATransactionDataSource(databaseType, each.getUniqueResourceName(), each.getDataSource(), xaTransactionManager));
}
xaTransactionManager.init();
}
首先SPI的方式加载XATransactionManager的具体实现类,这里返回的就是
org.apache.shardingsphere.transaction.xa.atomikos.manager.AtomikosTransactionManager
。我们在关注下
new XATransactionDataSource()
, 进入org.apache.shardingsphere.transaction.xa.jta.datasource。XATransactionDataSource
类的构造方法。public XATransactionDataSource(final DatabaseType databaseType, final String resourceName, final DataSource dataSource, final XATransactionManager xaTransactionManager) { this.databaseType = databaseType; this.resourceName = resourceName; this.dataSource = dataSource; if (!CONTAINER_DATASOURCE_NAMES.contains(dataSource.getClass().getSimpleName())) { // 重点关注 1 ,返回了xaDatasource xaDataSource = XADataSourceFactory.build(databaseType, dataSource); this.xaTransactionManager = xaTransactionManager; // 重点关注2 注册资源 xaTransactionManager.registerRecoveryResource(resourceName, xaDataSource); } }
我们重点来关注
XADataSourceFactory.build(databaseType, dataSource)
,从名字我们就可以看出,这应该是返回JTA规范里面的XADataSource
,在ShardingSphere里面很多的功能,可以从代码风格的命名上就能猜出来,这就是优雅代码(吹一波)。不多逼逼,我们进入该方法。public final class XADataSourceFactory {
public static XADataSource build(final DatabaseType databaseType, final DataSource dataSource) { return new DataSourceSwapper(XADataSourceDefinitionFactory.getXADataSourceDefinition(databaseType)).swap(dataSource); }
}
首先又是一个SPI定义的
XADataSourceDefinitionFactory
,它根据不同的数据库类型,来加载不同的方言。然后我们进入swap
方法。public XADataSource swap(final DataSource dataSource) { XADataSource result = createXADataSource(); setProperties(result, getDatabaseAccessConfiguration(dataSource)); return result; }
很简明,第一步创建,
XADataSource
,第二步给它设置属性(包含数据的连接,用户名密码等),然后返回。返回
XATransactionDataSource
类,关注xaTransactionManager.registerRecoveryResource(resourceName, xaDataSource);
从名字可以看出,这是注册事务恢复资源。这个我们在事务恢复的时候详解。返回
XAShardingTransactionManager.init()
,我们重点来关注:xaTransactionManager.init();
,最后进入AtomikosTransactionManager.init()
。流程图如下:
代码:
public final class AtomikosTransactionManager implements XATransactionManager {
private final UserTransactionManager transactionManager = new UserTransactionManager();
private final UserTransactionService userTransactionService = new UserTransactionServiceImp();
@Override
public void init() {
userTransactionService.init();
}
}
进入
UserTransactionServiceImp.init()
private void initialize() { //添加恢复资源 不用关心 for (RecoverableResource resource : resources_) { Configuration.addResource ( resource ); } for (LogAdministrator logAdministrator : logAdministrators_) { Configuration.addLogAdministrator ( logAdministrator ); } //注册插件 不用关心 for (TransactionServicePlugin nxt : tsListeners_) { Configuration.registerTransactionServicePlugin ( nxt ); } //获取配置属性 重点关心 ConfigProperties configProps = Configuration.getConfigProperties(); configProps.applyUserSpecificProperties(properties_); //进行初始化 Configuration.init(); }
我们重点关注,获取配置属性。最后进入
com.atomikos.icatch.provider.imp.AssemblerImp.initializeProperties()
方法。@Override public ConfigProperties initializeProperties() { //读取classpath下的默认配置transactions-defaults.properties Properties defaults = new Properties(); loadPropertiesFromClasspath(defaults, DEFAULT_PROPERTIES_FILE_NAME); //读取classpath下,transactions.properties配置,覆盖transactions-defaults.properties中相同key的值 Properties transactionsProperties = new Properties(defaults); loadPropertiesFromClasspath(transactionsProperties, TRANSACTIONS_PROPERTIES_FILE_NAME); //读取classpath下,jta.properties,覆盖transactions-defaults.properties、transactions.properties中相同key的值 Properties jtaProperties = new Properties(transactionsProperties); loadPropertiesFromClasspath(jtaProperties, JTA_PROPERTIES_FILE_NAME); //读取通过java -Dcom.atomikos.icatch.file方式指定的自定义配置文件路径,覆盖之前的同名配置 Properties customProperties = new Properties(jtaProperties); loadPropertiesFromCustomFilePath(customProperties); //最终构造一个ConfigProperties对象,来表示实际要使用的配置 Properties finalProperties = new Properties(customProperties); return new ConfigProperties(finalProperties); }
接下来重点关注,
Configuration.init()
, 进行初始化。ublic static synchronized boolean init() { boolean startupInitiated = false; if (service_ == null) { startupInitiated = true; //SPI方式加载插件注册,无需过多关心 addAllTransactionServicePluginServicesFromClasspath(); ConfigProperties configProperties = getConfigProperties(); //调用插件的beforeInit方法进行初始化话,无需过多关心 notifyBeforeInit(configProperties); //进行事务日志恢复的初始化,很重要,接下来详解 assembleSystemComponents(configProperties); //进入系统注解的初始化,一般重要 initializeSystemComponents(configProperties); notifyAfterInit(); if (configProperties.getForceShutdownOnVmExit()) { addShutdownHook(new ForceShutdownHook()); } } return startupInitiated; }
我们先来关注
assembleSystemComponents(configProperties);
进入它,进入com.atomikos.icatch.provider.imp.AssemblerImp.assembleTransactionService()
方法:@Override public TransactionServiceProvider assembleTransactionService( ConfigProperties configProperties) { RecoveryLog recoveryLog =null; //打印日志 logProperties(configProperties.getCompletedProperties()); //生成唯一名字 String tmUniqueName = configProperties.getTmUniqueName(); long maxTimeout = configProperties.getMaxTimeout(); int maxActives = configProperties.getMaxActives(); boolean threaded2pc = configProperties.getThreaded2pc(); //SPI方式加载OltpLog ,这是最重要的扩展地方,如果用户没有SPI的方式去扩展那么就为null OltpLog oltpLog = createOltpLogFromClasspath(); if (oltpLog == null) { LOGGER.logInfo("Using default (local) logging and recovery..."); //创建事务日志存储资源 Repository repository = createRepository(configProperties); oltpLog = createOltpLog(repository); //??? Assemble recoveryLog recoveryLog = createRecoveryLog(repository); } StateRecoveryManagerImp recoveryManager = new StateRecoveryManagerImp(); recoveryManager.setOltpLog(oltpLog); //生成唯一id生成器,以后生成XID会用的到 UniqueIdMgr idMgr = new UniqueIdMgr ( tmUniqueName ); int overflow = idMgr.getMaxIdLengthInBytes() - MAX_TID_LENGTH; if ( overflow > 0 ) { // see case 73086 String msg = "Value too long : " + tmUniqueName; LOGGER.logFatal ( msg ); throw new SysException(msg); } return new TransactionServiceImp(tmUniqueName, recoveryManager, idMgr, maxTimeout, maxActives, !threaded2pc, recoveryLog); }
我们重点来分析
createOltpLogFromClasspath()
, 采用SPI的加载方式来获取,默认这里会返回null
, 什么意思呢? 就是当没有扩展的时候,atomikos,会创建框架自定义的资源,来存储事务日志。private OltpLog createOltpLogFromClasspath() { OltpLog ret = null; ServiceLoader
loader = ServiceLoader.load(OltpLogFactory.class,Configuration.class.getClassLoader()); int i = 0; for (OltpLogFactory l : loader ) { ret = l.createOltpLog(); i++; } if (i > 1) { String msg = "More than one OltpLogFactory found in classpath - error in configuration!"; LOGGER.logFatal(msg); throw new SysException(msg); } return ret; } 我们跟着进入
Repository repository = createRepository(configProperties);
private CachedRepository createCoordinatorLogEntryRepository( ConfigProperties configProperties) throws LogException { //创建内存资源存储 InMemoryRepository inMemoryCoordinatorLogEntryRepository = new InMemoryRepository(); //进行初始化 inMemoryCoordinatorLogEntryRepository.init(); //创建使用文件存储资源作为backup FileSystemRepository backupCoordinatorLogEntryRepository = new FileSystemRepository(); //进行初始化 backupCoordinatorLogEntryRepository.init(); //内存与file资源进行合并 CachedRepository repository = new CachedRepository(inMemoryCoordinatorLogEntryRepository, backupCoordinatorLogEntryRepository); repository.init(); return repository; }
这里就会创建出
CachedRepository
,里面包含了InMemoryRepository
与FileSystemRepository
回到主线
com.atomikos.icatch.config.Configuration.init()
, 最后来分析下notifyAfterInit();
private static void notifyAfterInit() { //进行插件的初始化 for (TransactionServicePlugin p : tsListenersList_) { p.afterInit(); } for (LogAdministrator a : logAdministrators_) { a.registerLogControl(service_.getLogControl()); } //设置事务恢复服务,进行事务的恢复 for (RecoverableResource r : resourceList_ ) { r.setRecoveryService(recoveryService_); } }
插件的初始化会进入
com.atomikos.icatch.jta.JtaTransactionServicePlugin.afterInit()
public void afterInit() { TransactionManagerImp.installTransactionManager(Configuration.getCompositeTransactionManager(), autoRegisterResources); //如果我们自定义扩展了 OltpLog ,这里就会返回null,如果是null,那么XaResourceRecoveryManager就是null RecoveryLog recoveryLog = Configuration.getRecoveryLog(); long maxTimeout = Configuration.getConfigProperties().getMaxTimeout(); if (recoveryLog != null) { XaResourceRecoveryManager.installXaResourceRecoveryManager(new DefaultXaRecoveryLog(recoveryLog, maxTimeout),Configuration.getConfigProperties().getTmUniqueName()); } }
重点注意
RecoveryLog recoveryLog = Configuration.getRecoveryLog();
,如果用户采用SPI的方式
,扩展了com.atomikos.recovery.OltpLog
,这里就会返回 null
。 如果是null,则不会对XaResourceRecoveryManager
进行初始化。回到
notifyAfterInit()
, 我们来分析setRecoveryService
。public void setRecoveryService ( RecoveryService recoveryService ) throws ResourceException { if ( recoveryService != null ) { if ( LOGGER.isTraceEnabled() ) LOGGER.logTrace ( "Installing recovery service on resource " + getName () ); this.branchIdentifier=recoveryService.getName(); recover(); } }
我们进入
recover()
方法:public void recover() { XaResourceRecoveryManager xaResourceRecoveryManager = XaResourceRecoveryManager.getInstance(); //null for LogCloud recovery if (xaResourceRecoveryManager != null) { try { xaResourceRecoveryManager.recover(getXAResource()); } catch (Exception e) { refreshXAResource(); //cf case 156968 } } }
看到最关键的注释了吗,如果用户采用
SPI的方式
,扩展了com.atomikos.recovery.OltpLog
,那么XaResourceRecoveryManager
为null,则就会进行云端恢复,反之则进行事务恢复。 事务恢复很复杂,我们会单独来讲。
到这里atomikos的基本的初始化已经完成。
atomikos事务begin流程
我们知道,本地的事务,都会有一个 trainsaction.begin
, 对应XA分布式事务来说也不另外,我们再把思路切换回XAShardingTransactionManager.begin()
, 会调用com.atomikos.icatch.jta.TransactionManagerImp.begin()
。流程图如下:
代码:
public void begin ( int timeout ) throws NotSupportedException,
SystemException
{
CompositeTransaction ct = null;
ResumePreviousTransactionSubTxAwareParticipant resumeParticipant = null;
ct = compositeTransactionManager.getCompositeTransaction();
if ( ct != null && ct.getProperty ( JTA_PROPERTY_NAME ) == null ) {
LOGGER.logWarning ( "JTA: temporarily suspending incompatible transaction: " + ct.getTid() +
" (will be resumed after JTA transaction ends)" );
ct = compositeTransactionManager.suspend();
resumeParticipant = new ResumePreviousTransactionSubTxAwareParticipant ( ct );
}
try {
//创建事务补偿点
ct = compositeTransactionManager.createCompositeTransaction ( ( ( long ) timeout ) * 1000 );
if ( resumeParticipant != null ) ct.addSubTxAwareParticipant ( resumeParticipant );
if ( ct.isRoot () && getDefaultSerial () )
ct.setSerial ();
ct.setProperty ( JTA_PROPERTY_NAME , "true" );
} catch ( SysException se ) {
String msg = "Error in begin()";
LOGGER.logError( msg , se );
throw new ExtendedSystemException ( msg , se );
}
recreateCompositeTransactionAsJtaTransaction(ct);
}
这里我们主要关注
compositeTransactionManager.createCompositeTransaction()
,public CompositeTransaction createCompositeTransaction ( long timeout ) throws SysException { CompositeTransaction ct = null , ret = null; ct = getCurrentTx (); if ( ct == null ) { ret = getTransactionService().createCompositeTransaction ( timeout ); if(LOGGER.isDebugEnabled()){ LOGGER.logDebug("createCompositeTransaction ( " + timeout + " ): " + "created new ROOT transaction with id " + ret.getTid ()); } } else { if(LOGGER.isDebugEnabled()) LOGGER.logDebug("createCompositeTransaction ( " + timeout + " )"); ret = ct.createSubTransaction (); } Thread thread = Thread.currentThread (); setThreadMappings ( ret, thread ); return ret; }
创建了事务补偿点,然后把他放到了用当前线程作为key的Map当中,这里思考,
为啥它不用 threadLocal
。
到这里atomikos的事务begin流程已经完成。 大家可能有些疑惑,begin好像什么都没有做,XA start 也没调用? 别慌,下一节继续来讲。
XATransactionDataSource getConnection() 流程
我们都知道想要执行SQL语句,必须要获取到数据库的connection。让我们再回到 XAShardingTransactionManager.getConnection()
最后会调用到org.apache.shardingsphere.transaction.xa.jta.datasourceXATransactionDataSource.getConnection()
。流程图如下:
代码 :
public Connection getConnection() throws SQLException, SystemException, RollbackException {
//先检查是否已经有存在的connection,这一步很关心,也是XA的关键,因为XA事务,必须在同一个connection
if (CONTAINER_DATASOURCE_NAMES.contains(dataSource.getClass().getSimpleName())) {
return dataSource.getConnection();
}
//获取数据库连接
Connection result = dataSource.getConnection();
//转成XAConnection,其实是同一个连接
XAConnection xaConnection = XAConnectionFactory.createXAConnection(databaseType, xaDataSource, result);
//获取JTA事务定义接口
Transaction transaction = xaTransactionManager.getTransactionManager().getTransaction();
if (!enlistedTransactions.get().contains(transaction)) {
//进行资源注册
transaction.enlistResource(new SingleXAResource(resourceName, xaConnection.getXAResource()));
transaction.registerSynchronization(new Synchronization() {
@Override
public void beforeCompletion() {
enlistedTransactions.get().remove(transaction);
}
@Override
public void afterCompletion(final int status) {
enlistedTransactions.get().clear();
}
});
enlistedTransactions.get().add(transaction);
}
return result;
}
首先第一步很关心,尤其是对shardingsphere来说,因为在一个事务里面,会有多个SQL语句,打到相同的数据库,所以对相同的数据库,必须获取同一个XAConnection,这样才能进行XA事务的提交与回滚。
我们接下来关心
transaction.enlistResource(new SingleXAResource(resourceName, xaConnection.getXAResource()));
, 会进入com.atomikos.icatch.jta.TransactionImp.enlistResource()
, 代码太长,截取一部分。try { restx = (XAResourceTransaction) res .getResourceTransaction(this.compositeTransaction); // next, we MUST set the xa resource again, // because ONLY the instance we got as argument // is available for use now ! // older instances (set in restx from previous sibling) // have connections that may be in reuse already // ->old xares not valid except for 2pc operations restx.setXAResource(xares); restx.resume(); } catch (ResourceException re) { throw new ExtendedSystemException( "Unexpected error during enlist", re); } catch (RuntimeException e) { throw e; } addXAResourceTransaction(restx, xares);
我们直接看
restx.resume();
public synchronized void resume() throws ResourceException { int flag = 0; String logFlag = ""; if (this.state.equals(TxState.LOCALLY_DONE)) {// reused instance flag = XAResource.TMJOIN; logFlag = "XAResource.TMJOIN"; } else if (!this.knownInResource) {// new instance flag = XAResource.TMNOFLAGS; logFlag = "XAResource.TMNOFLAGS"; } else throw new IllegalStateException("Wrong state for resume: " + this.state); try { if (LOGGER.isDebugEnabled()) { LOGGER.logDebug("XAResource.start ( " + this.xidToHexString + " , " + logFlag + " ) on resource " + this.resourcename + " represented by XAResource instance " + this.xaresource); } this.xaresource.start(this.xid, flag); } catch (XAException xaerr) { String msg = interpretErrorCode(this.resourcename, "resume", this.xid, xaerr.errorCode); LOGGER.logWarning(msg, xaerr); throw new ResourceException(msg, xaerr); } setState(TxState.ACTIVE); this.knownInResource = true; }
哦多尅,看见了吗,各位,看见了
this.xaresource.start(this.xid, flag);
了吗????,我们进去,假设我们使用的Mysql数据库:public void start(Xid xid, int flags) throws XAException { StringBuilder commandBuf = new StringBuilder(300); commandBuf.append("XA START "); appendXid(commandBuf, xid); switch(flags) { case 0: break; case 2097152: commandBuf.append(" JOIN"); break; case 134217728: commandBuf.append(" RESUME"); break; default: throw new XAException(-5); } this.dispatchCommand(commandBuf.toString()); this.underlyingConnection.setInGlobalTx(true); }
组装
XA start Xid
SQL语句,进行执行。
到这里,我们总结下,在获取数据库连接的时候,我们执行了XA协议接口中的 XA start xid
atomikos事务commit流程
好了,上面我们已经开启了事务,现在我们来分析下事务commit流程,我们再把视角切换回XAShardingTransactionManager.commit()
,最后我们会进入com.atomikos.icatch.imp.CompositeTransactionImp.commit()
方法。流程图如下:
代码:
public void commit () throws HeurRollbackException, HeurMixedException,
HeurHazardException, SysException, SecurityException,
RollbackException
{
//首先更新下事务日志的状态
doCommit ();
setSiblingInfoForIncoming1pcRequestFromRemoteClient();
if ( isRoot () ) {
//真正的commit操作
coordinator.terminate ( true );
}
}
我们关注
coordinator.terminate ( true );
protected void terminate ( boolean commit ) throws HeurRollbackException, HeurMixedException, SysException, java.lang.SecurityException, HeurCommitException, HeurHazardException, RollbackException, IllegalStateException { synchronized ( fsm_ ) { if ( commit ) { //判断有几个参与者,如果只有一个,直接提交 if ( participants_.size () <= 1 ) { commit ( true ); } else { //否则,走XA 2阶段提交流程,先prepare, 再提交 int prepareResult = prepare (); // make sure to only do commit if NOT read only if ( prepareResult != Participant.READ_ONLY ) commit ( false ); } } else { rollback (); } } }
首先会判断参与者的个数,这里我们可以理解为MySQL的database数量,如果只有一个,退化成一阶段,直接提交。 如果有多个,则走标准的XA二阶段提交流程。
我们来看
prepare ();
流程,最后会走到com.atomikos.icatch.imp.PrepareMessage.send()
--->com.atomikos.datasource.xa.XAResourceTransaction.prepare()
int ret = 0; terminateInResource(); if (TxState.ACTIVE == this.state) { // tolerate non-delisting apps/servers suspend(); } // duplicate prepares can happen for siblings in serial subtxs!!! // in that case, the second prepare just returns READONLY if (this.state == TxState.IN_DOUBT) return Participant.READ_ONLY; else if (!(this.state == TxState.LOCALLY_DONE)) throw new SysException("Wrong state for prepare: " + this.state); try { // refresh xaresource for MQSeries: seems to close XAResource after // suspend??? testOrRefreshXAResourceFor2PC(); if (LOGGER.isTraceEnabled()) { LOGGER.logTrace("About to call prepare on XAResource instance: " + this.xaresource); } ret = this.xaresource.prepare(this.xid); } catch (XAException xaerr) { String msg = interpretErrorCode(this.resourcename, "prepare", this.xid, xaerr.errorCode); if (XAException.XA_RBBASE <= xaerr.errorCode && xaerr.errorCode <= XAException.XA_RBEND) { LOGGER.logWarning(msg, xaerr); // see case 84253 throw new RollbackException(msg); } else { LOGGER.logError(msg, xaerr); throw new SysException(msg, xaerr); } } setState(TxState.IN_DOUBT); if (ret == XAResource.XA_RDONLY) { if (LOGGER.isDebugEnabled()) { LOGGER.logDebug("XAResource.prepare ( " + this.xidToHexString + " ) returning XAResource.XA_RDONLY " + "on resource " + this.resourcename + " represented by XAResource instance " + this.xaresource); } return Participant.READ_ONLY; } else { if (LOGGER.isDebugEnabled()) { LOGGER.logDebug("XAResource.prepare ( " + this.xidToHexString + " ) returning OK " + "on resource " + this.resourcename + " represented by XAResource instance " + this.xaresource); } return Participant.READ_ONLY + 1; }
终于,我们看到了这么一句
ret = this.xaresource.prepare(this.xid);
但是等等,我们之前不是说了,XA start xid
以后要先XA end xid
吗? 答案就在suspend();
里面。public synchronized void suspend() throws ResourceException {
// BugzID: 20545 // State may be IN_DOUBT or TERMINATED when a connection is closed AFTER // commit! // In that case, don't call END again, and also don't generate any // error! // This is required for some hibernate connection release strategies. if (this.state.equals(TxState.ACTIVE)) { try { if (LOGGER.isDebugEnabled()) { LOGGER.logDebug("XAResource.end ( " + this.xidToHexString + " , XAResource.TMSUCCESS ) on resource " + this.resourcename + " represented by XAResource instance " + this.xaresource); } //执行了 xa end 语句 this.xaresource.end(this.xid, XAResource.TMSUCCESS); } catch (XAException xaerr) { String msg = interpretErrorCode(this.resourcename, "end", this.xid, xaerr.errorCode); if (LOGGER.isTraceEnabled()) LOGGER.logTrace(msg, xaerr); // don't throw: fix for case 102827 } setState(TxState.LOCALLY_DONE); } }
到了这里,我们已经执行了 XA start xid -> XA end xid --> XA prepare xid, 接下来就是最后一步 commit
我们再回到
terminate(false)
方法,来看 commit()流程。其实和 prepare流程一样,最后会走到com.atomikos.datasource.xa.XAResourceTransaction.commit()
。 commit执行完,数据提交//繁杂代码过多,就显示核心的 this.xaresource.commit(this.xid, onePhase);
思考:这里的参与者提交是在一个循环里面,一个一个提交的,如果之前的提交了,后面的参与者提交的时候,挂了,就会造成数据的不一致性。
Atomikos rollback() 流程
上面我们已经分析了commit流程,其实rollback流程和commit流程一样,我们在把目光切换回 org.apache.shardingsphere.transaction.xa.XAShardingTransactionManager.rollback()
,最后会执行到com.atomikos.icatch.imp.CompositeTransactionImp.rollback()
。
public void rollback () throws IllegalStateException, SysException
{
//清空资源,更新事务日志状态等
doRollback ();
if ( isRoot () ) {
try {
coordinator.terminate ( false );
} catch ( Exception e ) {
throw new SysException ( "Unexpected error in rollback: " + e.getMessage (), e );
}
}
}
重点关注
coordinator.terminate ( false );
,这个和 commit流程是一样的,只不过在 commit流程里面,参数传的是true。protected void terminate ( boolean commit ) throws HeurRollbackException, HeurMixedException, SysException, java.lang.SecurityException, HeurCommitException, HeurHazardException, RollbackException, IllegalStateException { synchronized ( fsm_ ) { if ( commit ) { if ( participants_.size () <= 1 ) { commit ( true ); } else { int prepareResult = prepare (); // make sure to only do commit if NOT read only if ( prepareResult != Participant.READ_ONLY ) commit ( false ); } } else { //如果是false,走的是rollback rollback (); } } }
我们重点关注
rollback()
,最后会走到com.atomikos.datasource.xa.XAResourceTransaction.rollback()
。public synchronized void rollback() throws HeurCommitException, HeurMixedException, HeurHazardException, SysException { terminateInResource(); if (rollbackShouldDoNothing()) { return; } if (this.state.equals(TxState.TERMINATED)) { return; } if (this.state.equals(TxState.HEUR_MIXED)) throw new HeurMixedException(); if (this.state.equals(TxState.HEUR_COMMITTED)) throw new HeurCommitException(); if (this.xaresource == null) { throw new HeurHazardException("XAResourceTransaction " + getXid() + ": no XAResource to rollback?"); } try { if (this.state.equals(TxState.ACTIVE)) { // first suspend xid suspend(); } // refresh xaresource for MQSeries: seems to close XAResource after // suspend??? testOrRefreshXAResourceFor2PC(); if (LOGGER.isDebugEnabled()) { LOGGER.logDebug("XAResource.rollback ( " + this.xidToHexString + " ) " + "on resource " + this.resourcename + " represented by XAResource instance " + this.xaresource); } this.xaresource.rollback(this.xid);
先在
supend()
方法里面执行了XA end xid
语句, 接下来执行this.xaresource.rollback(this.xid);
进行数据的回滚。
文章到此,已经写的很长很多了,我们分析了ShardingSphere对于XA方案,提供了一套SPI解决方案,对Atomikos进行了整合,也分析了Atomikos初始化流程,开始事务流程,获取连接流程,提交事务流程,回滚事务流程。希望对大家理解XA的原理有所帮助。
作者介绍: 肖宇,Apache ShardingSphere Committer,开源hmily分布式事务框架作者, 开源soul网关作者,热爱开源,追求写优雅代码。目前就职入京东数科,参与ShardingSphere的开源建设,以及分布式数据库的研发工作。