【Seata源码领读】揭秘 @GlobalTransactional 背后 RM 的黑盒操作之一阶段

2023-05-18 17:49:37 操作 源码 阶段 揭秘 领读

上善若水,水善利万物而不争

一、前奏

Seata 从设计层面将事务参与者的角色分为 TC(事务协调器)、RM(资源管理器)、TM(事务管理器) ,传统的 XA 方案中, RM 是在数据库层,即依赖了 DB 的 XA 驱动能力,也会有典型的数据锁定和连接锁定的问题,为了规避 XA 带来的制约,Seata 将 RM 从 DB 层迁移出来,以中间件的形式放在应用层,完全剥离了分布式事务方案对数据库在协议支持上的要求。在 Seata 的 AT 模式中, RM 的能力是在数据源做了一层代理,Seata 在这层代理中干预业务 SQL 执行过程,加入分布式事务所需的逻辑,通过这种方式,Seata 真正做到了对业务代码无侵入,只需要通过简单的配置和声明,业务方就可享受 Seata 所带来的分布式事务能力;而且跟 XA 模式相比,当本地事务执行完可以立即释放本地事务锁定的资源,性能更好。

二、Seata AT 模式的顶层设计

Seata AT 模式下,一个典型的分布式事务过程如下:

  • TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成一个全局的 XID。
  • XID 在微服务调用链路的上下文中传播,供 RM 使用。
  • RM 向 TC 注册分支事务,将其纳入 XID 对应全局事务的管辖,执行后上报分支事务状态。
  • TM 向 TC 发起针对 XID 的全局提交或回滚决议。
  • TC 驱动 RM 完成 XID 下管辖的全部分支事务的提交或回滚操作。

前文《【Seata 源码领读】揭秘 @GlobalTransactional 背后 TM 的黑盒操作》中描述了 TM 的能力,本篇继续介绍 RM,RM(Resource Manager)叫做资源管理器,控制分支事务,负责 1 阶段的分支注册、状态汇报,并接收事务协调器 TC 的指令,在 2 阶段执行分支(本地)事务的提交和回滚

在传统的 XA 方案中 RM 是放在数据库层的,它依赖了数据库的 XA 驱动程序,如下图所示

理论上分布式事务能力下沉,由 DB 提供是好事,但此种 XA 模式有两个典型的问题:

  1. 一是数据锁定问题,XA 事务过程中,数据是被锁定的。XA 的数据锁定是数据库的内部机制维护的,所以依赖 DBA 干预数据库去解除数据锁定。

  2. 另一个是连接锁定问题,XA 事务过程中,连接也是被锁定的。至少在两阶段提交的 prepare 之前,连接是不能释放的(因为连接断开,这个连接上的 XA 分支就会回滚,整个事务也会被迫回滚)。较之于数据的锁定(数据的锁定对于事务的隔离性是必要的机制),连接的锁定带给整个业务系统的直接影响,限制了并发度。

正式为了规避 XA 方案所带来的制约,Seata 将 RM 从 DB 层迁移出去,以中间件的形式放在应用层,完全剥离了分布式事务方案对数据库在协议支持上的要求。

Seata AT 模式下 RM 的能力概括来说是在数据源做了一层代理,当程序执行到 DAO 层,通过 JdbcTemplate 或 Mybatis 操作 DB 时所使用的数据源实际上用的是 Seata 提供的数据源代理 DataSourceProxy,Seata 在这层代理中干预业务 SQL 执行过程,加入分布式事务所需的逻辑,主要是解析 SQL,把业务数据在更新前后的数据镜像组织成回滚日志,并将 undoLog 日志插入 undo_log 表中,保证每条更新数据的业务 sql 都有对应的回滚日志存在。通过这种方式,Seata 真正做到了对业务代码无侵入,只需要通过简单的配置,业务方就可以轻松享受 Seata 所带来的功能。

另外这样做还有性能好处,当本地事务执行完时立即释放了本地事务锁定的资源,然后向 TC 上报分支状态。当 TM 决议全局事务提交时,就不需要同步调用 RM 做什么处理,而是给 TC 发送提交指令,委托 TC 以异步方式调度各个 RM 分支事务删除对应的 undoLog 日志即可,这个步骤完成的非常快速;但当 TM 决议全局回滚时(发生回滚的概率还是很小的),委托 TC 同步向所有 相关 RM 发送回滚请求,RM 通过 XID 找到对应的 undoLog 回滚日志,然后构建回滚 sql 并执行,以完成回滚操作。

三、Seata AT 模式 RM 的底层实现

3.1 关键类能力简述

1) DataSourceProxy

  1. 构建并向 TC 注册 Resource 信息
  2. 初始化业务表的元数据信息,用于为前后镜像的构建和二阶段回滚提供基础能力。

2)ConnectionProxy

提供增强版的 commit,增加的逻辑分两类:

  1. 若上下文中绑定当前全局事务的 xid,处理分支事务提交
  • 向 TC 注册分支事务、使用本地事务提交业务 SQL 和 undoLog、向 TC 上报本地 commit 结果;
  1. 若上下文中绑定是否需要检测全局锁,处理带@GlobalLock 的本地事务提交
  • 检测全局锁不存在则提交本地事务

若业务层还显式的开启了 JDBC 的事务(AutoCommit 被设置为 false),则提交中还伴有锁冲突后的重试。

3) StatmentProxy

  1. 解析 SQL,根据不同的 SQL 类型委托不同的执行器,构建前后镜像生成 undoLog 放置在上下文中。
  2. 若业务层未显式的开启 JDBC 的事务,则开启重试机制,并在执行完步之后,调用 ConnectionProxy 的增强版提交;
  3. 若业务层显式的开启 JDBC 的事务,则没有第 2 步中的自动提交

3.2 鸟瞰分支事务的 1 阶段处理

Seata AT 模式下,正如下图的源码检索结果所示,分支事务的执行是在 StatementProxyPreparedStatementProxyexecuteexecuteQueryexecuteUpdate 等方法中,而这些方法终都会执行到 ExecuteTemplate#execute 方法

所以StatementProxyPreparedStatementProxy 中是委托ExecuteTemplate完成分支事务的一阶段流程

下边使用伪代码,对照官方原理图,从宏观视角来描述以下分支事务的一阶段逻辑:获取链接,构建Statement,之后执行 SQL 解析、根据 SQL 类型构建执行器,由执行器在业务 SQL 执行前后查询数据快照并组织成 UndoLog;在提交环节有向 TC 注册分支事务、UndoLog 的刷盘随业务 SQL 在本地事务一并 Commit、向 TC 上报分支事务状态等;若遇到异常会执行本地回滚,上抛异常让业务逻辑感知;后释放资源。

image.png
conProxy = mybatis#getConnection()
pareparedStatement = conProxy.PareparedStatement();
pareparedStatementProxy.execute();
    ExecuteTemplate.execute
        解析SQL构建 xxxExecutor,如 update 对应为 UpdateExecutor
        如果autoCommit为true,设置autoCommit为false
        LockRetryPolicy.execute//重试策略
            AbstractDMLBaseExecutor#executeAutoCommitFalse()
                beforImage()//构建前镜像
                execute()//执行业务sql
                afterImage()//构建后镜像
            connectionProxy.commit // 增强版的提交事务
                try
                    doCommit
                        register()//向TC注册分支事务,TC会检测全局锁
                        flushUndoLogs//undoLog刷盘
                        try
                            targetConnection.commit();//使用原始con提交本地事务
                        catch
                            report : PhaseOne_Failed //本地commit失败,向TC上报1阶段失败,抛出异常
                        report PhaseOne_Done //向TC 上报 本地commit成功

                catch //捕获到异常就进行回滚
                    doRollback
                        targetConnection.rollback();// 执行本地回滚
                        report : PhaseOne_Failed //跟TC上报本地commit失败,这里似乎会重复report


pareparedStatement.close()
con.close()

3.3 详解分支事务的 1 阶段处理

1)基于执行器的设计

如果了解过 mybatis 源码,会有印象其中关键类的命名和执行流程是 xxxTemplate -调用-> yyyExecutor;Seata 中的实现很相似,是 ExecuteTemplate -调用-> xxxExecutor

  1. ExecuteTemplate 分析上下文,构建正确的 Executor
  2. Executor 的职责
  • 首先判断若当前上下文与 Seata 无关(当前即不是 AT 模式的分支事务,又不用检测全局锁),直接使用原始的 Statment 执行业务 SQL,避免因引入 Seata 导致非全局事务中的 SQL 执行性能下降。
  • 解析 SQL 识别 SQL 属于增删改查哪种类型,解析结果有缓存,因为有些 SQL 解析会比较耗时,可能会导致在应用启动后刚开始的那段时间里处理全局事务中的 SQL 执行效率降低。
  • 对于 INSERT、UPDATE、DELETE、SELECT..FOR UPDATE 等几类(具体看源码)的 sql 使用对应的Executor进行处理,其它 SQL (设计初衷这里是指普通的 select)直接使用原始的 Statment 执行。
  • 返回执行结果,如有异常则直接抛给上层业务代码进行处理。
image.png

2)解析 sql,构建 sql 执行器

目前 Seata 1.6.1 版本中 根据 sql 的的类型封装了如INSERTUPDATEDELETESELECT_FOR_UPDATEINSERT_ON_DUPLICATE_UPDATEUPDATE_JOIN 这六大类Executor(执行器)。但从事务处理的能力上有分为 3 大类

  1. PlainExecutor

其中 PlainExecutor 是 原生的 JDBC 接口实现,未做任何处理,提供给全局事务中的普通的 select 查询使用

  1. SelectForUpdateExecutor:

Seata 的 AT 模式在本地事务之上默认支持读未提交的隔离级别,但是通过SelectForUpdateExecutor 执行器,可以支持读已提交的隔离级别。

前面的文章我们说过用 select...for update 语句来保证隔离级别为读已提交。SelectForUpdateExecutor 就是用来执行 select...for update 语句的。

先通过 select 检索记录,构建出 lockKeys 发给 TC,请 TC 核实这些记录是否已经被其他事务加锁了,如果被加锁了,则根据重试策略不断重试,如果没被加锁,则正常返回查询的结果。

image.png
  1. DML 类的 Executor

DML 增删改类型的执行器主要在 sql 执行的前后对 sql 语句进行解析,并实现了如下两个抽象接口:

protected abstract TableRecords beforeImage() throws SQLException;

protected abstract TableRecords afterImage(TableRecords beforeImage) throws SQLException;

这两个接口便是 AT 模式下 RM 的核心能力:构建 beforeImage,执行 sql,之后再构建 afterImage,通过beforeImageafterImage 生成了提供回滚操作的 undoLog 日志,不同的执行器这两个接口的实现不同。

类型 构建前镜像 构建后镜像
insert
update
delete

其中构建 updatedelete 这两类前镜像的 sql 语句的是select ... for update,其中for update 是一种非常必要的基于本地事务排它机制的隔离保障。

3)执行器的核心方法execute

上下文中设置关键的标识信息:在ConnectionProxy中设置全局事务的 XID,则标识后续执行分支事务;如果RootContext.requireGlobalLock()true,则标识后续是处理@GlobalLock的全局锁检测+本地事务提交。

public T execute(Object... args) throws Throwable {
    // 从事务上下文中获取xid
    String xid = RootContext.getXID();
    if (xid != null) {
        // 将xid绑定到连接的 ConnectionContext 中,由此分支事务感知其所归属的全局事务的xid
        statementProxy.getConnectionProxy().bind(xid);
    }
    // 从上下文中获取是否需要全局锁的标记,传递给ConnectionProxy
    statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
    // 处理sql
    return doExecute(args);
}

DML 类执行器的核心逻辑在 AbstractDMLBaseExecutor#doExecute 中,这里根据是否有开启 Spring 事务而处理逻辑不通。executeAutoCommitTrue中会自动提交。而executeAutoCommitFalse中不执行提交,而由 Spring 事务发起commit(调用的是ConnectionProxy增强版的commit)

public T doExecute(Object... args) throws Throwable {
    AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
    if (connectionProxy.getAutoCommit()) {
        // 如果AutoCommit是true,没有开启spring事务(即没有con.setAutoCommit(false)的调用)
        return executeAutoCommitTrue(args);
    } else {
        // 如果AutoCommit是false
        // 目前已知的情况是由于显示开启了事务,保障多条SQL语句的执行只在后显式的commit提交后,才生效,
        // 如声明式Spring事务@Transactional,其处理过程会由con.setAutoCommit(false);
        // 如果是编程式Spring事务,需要显示调用con.setAutoCommit(false);
        return executeAutoCommitFalse(args);
    }
}

4) executeAutoCommitTrue执行业务 sql,构建undoLog并执行增强版的提交。如果有 Spring 事务开启(AutoCommit设置为false),则不执行这个方法,其中有 3 个关键逻辑

  1. 执行此方法时, Seata 框架将 AutoCommit设置为false,在 2.2 中主动 commit
  • 目的是 2.1 和 2.2 两个步骤中的所有本地 sql 同时提交,简单理解就是 业务 sql 和 Seata 框架的 undoLog 一起提交。
  • 提交过程可能遇到锁冲突,在遇到锁冲突时,会有重试策略,重试逻辑中有 2 个逻辑主体:
    • 2.1. 业务 sql 的执行(构造前后镜像)
    • 2.2. 增强版commit(此时,其内部的重试策略),下述逻辑根据上下文是三选一
      • 直接提交本地事务
      • 申请到全局锁后执行本地提交,这种情况下还需要构造前后镜像嘛?
      • 执行分支事务的提交,向 TC 申请行锁,锁冲突则进入重试逻辑
      • 不冲突执行注册分支事务,提交本地事务,向 TC 上报结果
      • 2.2.1 processGlobalTransactionCommit();
      • 2.2.2 processLocalCommitWithGlobalLocks();
      • 2.2.3 targetConnection.commit();
    1. 无论第 2 步成功还是失败,重置上下文,恢复自动提交

    第 2 步遇冲突则重试的机制在介绍完 2.1 和 2.2 的主体逻辑后,再补充

    protected T executeAutoCommitTrue(Object[] args) throws Throwable {
        ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        try {
            // AutoCommit设置的false
            // 目的是 2.1 和 2.2 两个步骤中的所有本地sql同时提交,简单理解就是 业务sql 和 Seata 框架的undoLog一起提交。
            connectionProxy.changeAutoCommit();
            // 2. 提交过程可能遇到锁冲突,在遇到锁冲突时,会有重试策略,重试逻辑中有2个逻辑主体:
            return new LockRetryPolicy(connectionProxy).execute(() -> {
                // 2.1 业务sql的执行(构造前后镜像)
                T result = executeAutoCommitFalse(args);
                // 2.2 commit(此时,其内部的重试策略),下述逻辑根据上下文是三选一
                //            2.2.1 processGlobalTransactionCommit();
                //                执行分支事务的提交,向TC申请行锁,锁冲突则进入重试逻辑
                //                不冲突执行注册分支事务,提交本地事务,向TC上报结果
                //            2.2.2 processLocalCommitWithGlobalLocks();
                //                申请到全局锁后执行本地提交,这种情况下还需要构造前后镜像嘛?
                //            2.2.3 targetConnection.commit();
                //                直接提交本地事务
                connectionProxy.commit();
                return result;
            });
        } catch (Exception e) {
            // when exception occur in finally,this exception will lost, so just print it here
            LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
            // isLockRetryPolicyBranchRollbackOnConflict() 默认是true,冲突时会重试,则不在这里回滚
            if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
                connectionProxy.getTargetConnection().rollback();
            }
            throw e;
        } finally {
            // 重置上下文
            connectionProxy.getContext().reset();
            // 设置为自动提交
            connectionProxy.setAutoCommit(true);
        }
    }

    5)executeAutoCommitFalse 执行业务 sql,生成前后镜像融合成 undoLog,注意此时不提交。

    protected T executeAutoCommitFalse(Object[] args) throws Exception {
        // 构造beforeImage
        TableRecords beforeImage = beforeImage();
        // 使用原始Statement 执行sql
        T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
        // 构建afterImage
        TableRecords afterImage = afterImage(beforeImage);
        // 整合 beforeImage 和 afterImage 构建undoLog
        prepareUndoLog(beforeImage, afterImage);
        // 返回业务sql的执行结果,并未commit
        return result;
    }

    5.1)整合 beforeImageafterImage 构建 undoLog

    1. 根据前后镜像构建 lockKeysundoLog,暂存到connectionProxy的上下文中,在下文commit`方法中才刷盘
    2. 锁 key 的构建有其规则,形如 t_user:1_a,2_b 。其中 t_user 是表名,第 1 条记录的主键是 1a,第 2 条记录的逐渐是 2b;即一条记录的多个主键值之间用*串联 ;记录和记录之间的 key 信息用,串联;表名和主键部分用:串联
    3. 如果是 DELETE 语句,则使用前镜像构建 lockKeys
    protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
        if (beforeImage.getRows().isEmpty() && afterImage.getRows().isEmpty()) {
            return;
        }
        if (SQLType.UPDATE == sqlRecognizer.getSQLType()) {
            if (beforeImage.getRows().size() != afterImage.getRows().size()) {
                throw new ShouldNeverHappenException("Before image size is not equaled to after image size, probably because you updated the primary keys.");
            }
        }
        ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();

        // 如果是DELETE 语句,则使用前镜像构建锁key,
        TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
        // 一条记录的多个主键值之间用_串联;记录之间的key信息用,串联;
        // 形如 t_user:1_a,2_b,第1条记录的主键是1和a,第2条记录的逐渐是2和b
        String lockKeys = buildLockKey(lockKeyRecords);
        if (null != lockKeys) {
            // lockKeys 暂存到connectionProxy的上下文中,在commit环节,向注册分支事务环节,这些锁key被用于检测全局行锁存储和冲突检测
            connectionProxy.appendLockKey(lockKeys);
            // 整合 beforeImage 和 afterImage 构建undoLog
            SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
            // undoLog暂存到connectionProxy的上下文中,在commit环节,才执行undoLog刷盘,伴随业务SQL在本地事务一起提交
            connectionProxy.appendUndoLog(sqlUndoLog);
        }
    }

    6)connectionProxy.commit()增强版的提交

    6.1)connectionProxy.commit()中的重试

    此处的重试很容易让人迷糊,因为通过对上文的源码梳理可知,重试逻辑在数据源的代理中有两处。需清楚这两个重试是互补的,即同程中只会有其中一个重试逻辑生效,这两个重试之间微妙的关系如下:

    • 如果业务 SQL 的执行上下文中,没有 Spring 的事务,那么程序会执行AbstractDMLBaseExecutor.executeAutoCommitTrue ,则其方法中的重试逻辑生效,则此处connectionProxy.commit()的重试逻辑不启用

    • 如果业务 SQL 的执行上下文中,有 Spring 的事务,那么程序会执行AbstractDMLBaseExecutor#executeAutoCommitFalse ,而不会被执行 AbstractDMLBaseExecutor.executeAutoCommitTrue ,则此处connectionProxy.commit()的重试逻辑生效

    • 另外connectionProxy.commit()AbstractDMLBaseExecutor.executeAutoCommitTrue的重试主体对比的话,此处的connectionProxy.commit()主体只有 doCommit,没有业务 SQL 的执行以及前后镜像的构建,这是重点。为何如此设计笔者理解的尚不够透彻,望读者老师能加 V 加群给与解惑。

    connectionProxy.commit()源码如下:

    public void commit() throws SQLException {
        try {
            // 这里的重试 只有 doCommit,没有业务SQL的执行以及前后镜像的构建
            // 重试策略在数据源的代理中从代码上看是有两处,这两个重试是互补的,也即同程中只会有其中一个重试生效。
            // 首先如果业务SQL的执行上下文中,没有Spring的事务,那么AbstractDMLBaseExecutor.executeAutoCommitTrue 中的重试策略生效,则此处的重试策略不启用
            // 首先如果业务SQL的执行上下文中,有Spring的事务,那么此处的重试策略生效,而 AbstractDMLBaseExecutor.executeAutoCommitTrue 不会被执行
            lockRetryPolicy.execute(() -> {
                doCommit();
                return null;
            });
        } catch (SQLException e) {
            // 没有自动提交,也没有被Seata调整为非自动提交(没有执行AbstractDMLBaseExecutor.executeAutoCommitTrue)
            // 那么遇到Seata 增强逻辑中抛出的 SQLException 异常时,在此处执行回滚。并且抛出异常
            // 否则,是由上层发起回滚。
            if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {
                rollback();
            }
            throw e;
        } catch (Exception e) {
            throw new SQLException(e);
        }
    }

    LockRetryController#sleep方法中控制 重试次数(--lockRetryTimes) 和 重试间隔(Thread.sleep(lockRetryInterval)),超过次数上抛异常,退出循环。

    public void sleep(Exception e) throws LockWaitTimeoutException {
        // prioritize the rollback of other transactions
        // 重试次数控制
        if (--lockRetryTimes <  || (e instanceof LockConflictException
            && ((LockConflictException)e).getCode() == TransactionExceptionCode.LockKeyConflictFailFast)) {
            throw new LockWaitTimeoutException("Global lock wait timeout", e);
        }

        try {
            // 通过sleep控制重试间隔
            Thread.sleep(lockRetryInterval);
        } catch (InterruptedException ignore) {
        }
    }

    是否重试是有开关的,在启动时读取配置,从 1.6.1 的代码来看,未支持运行期变更,默认值是 true

    // 在冲突时是否重试的开关
    client.rm.lock.retry-policy-branch-rollback-on-conflict=true

    重试间隔和次数可在通过配置中心做运行时变更,默认值如下:

    // 重试间隔 lockRetryInterval 默认值 10 毫秒
    client.rm.lock.retry-interval=10
    // 重试次数 lockRetryTimes 默认值 30
    client.rm.lock.retry-times=30

    6.2)doCommit()中的 3 种选择

    增强版的提交代码中有下述三种提交逻辑,根据上下文只选其一

    1. processGlobalTransactionCommit();
    • 执行分支事务的提交,向 TC 申请行锁,锁冲突则向上反馈后进入上层的重试逻辑
    • 不冲突执行注册分支事务,提交本地事务,向 TC 上报结果
    1. processLocalCommitWithGlobalLocks();
    • 申请到全局锁后执行本地提交,这种情况下还需要构造前后镜像嘛?
    1. targetConnection.commit();
    • 直接提交本地事务

    ConnectionProxy#doCommit源码如下:

    private void doCommit() throws SQLException {
        // xid不为空
        // 如果 BaseTransactionalExecutor.execute 中 通过 statementProxy.getConnectionProxy().bind(xid) 在context绑定了xid
        // 其内部是 context.bind(xid); 那么此处context.inGlobalTransaction() = true
        // 则执行增强版的分支事务提交
        if (context.inGlobalTransaction()) {
            processGlobalTransactionCommit();
        }
        // 如果开发者使用@GlobalLock,则 BaseTransactionalExecutor.execute 中
        // 通过statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock())
        // 在context绑定了全局锁标识,那么此处context.isGlobalLockRequire() = true
        // 则执行增强版的检测不到全局锁才做本地事务提交
        else if (context.isGlobalLockRequire()) {
            //申请到全局锁后执行本地提交
            processLocalCommitWithGlobalLocks();
        } else {
            // 既不是分支事务,又不是@Globallock,那使用原生的本地事务提交
            targetConnection.commit();
        }
    }

    6.4)processGlobalTransactionCommit处理分支事务的提交

    前文BaseTransactionalExecutor#execute中如果识别出有全局事务的 xid,则给ConnectionProxyConnectionContext上绑定 xid,表明数据源代理层是要做分支事务的处理。

    所以如果那么此处context.inGlobalTransaction()就等于 true,则通过processGlobalTransactionCommit处理分支事务的提交,在这个方法中是分支事务处理核心中的核心:

    1. 注册分支事务,申请全局行锁,如果锁冲突则抛出异常,重试机制识别到冲突的异常后做重试处理
    2. undoLog 刷盘
    3. 执行本地事务提交,会将本地业务 sql 和 undoLog 一起提交
    4. 将本地事务提交的结果(1 阶段的处理结果)上报给 TC,TC 若在二阶段回滚,而分支事务上报的是 1 阶段失败了,则无需通知此分支事务做 2 阶段回滚;否则通知分支事务做 2 阶段回滚
    5. 重置上下文
    private void processGlobalTransactionCommit() throws SQLException {
        try {
            // 1. 注册分支事务,申请全局行锁,如果锁冲突则抛出异常
            // 有没有重复注册的情况呢?
            register();
        } catch (TransactionException e) {
            // 如果异常code是 LockKeyConflict 和 LockKeyConflictFailFast 才重新组织抛出异常 LockConflictException
            // 外部的重试管控,识别出LockConflictException后实施重试。
            // 其他异常此处不处理
            recognizeLockKeyConflictException(e, context.buildLockKeys());
        }
        try {
            // 2. 写undoLog
            UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
            // 3. 执行本地事务提交,将本地业务sql和undoLog一起提交
            targetConnection.commit();
        } catch (Throwable ex) {
            LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
            // 4. 向TC上报异常,并抛出SQLException,告诉TC二阶段若回滚则此分支事务无需回滚,因为1阶段失败了。
            report(false);
            throw new SQLException(ex);
        }
        if (IS_REPORT_SUCCESS_ENABLE) {
            // 4. 上报事务处理结果,告诉TC二阶段若回滚则此分支事务必须回滚,因为1阶段成功了。
            report(true);
        }
        // 5. 重试上下文
        context.reset();
    }

    6.5)report(false)默认只上报 commit 异常

    1. 默认是只上报 commit 失败的情况,开关是 client.rm.reportSuccessEnable默认值是 false
    2. 上报异常会重试,默认 5 次机会 阈值 REPORT_RETRY_COUNT 其配置为client.rm.reportRetryCount默认值是 5
    3. 提交失败一定要上报 BranchStatus.PhaseOne_Failed 告诉 TC 二阶段若回滚则此分支事务无需回滚,因为 1 阶段失败了。
    // client.rm.reportRetryCount = 5
    // 成员变量,构建新对象时就会读取新到配置中心的新值,也算支持运行期配置变更
    private static final int REPORT_RETRY_COUNT = ConfigurationFactory.getInstance().getInt(
        ConfigurationKeys.CLIENT_REPORT_RETRY_COUNT, DEFAULT_CLIENT_REPORT_RETRY_COUNT);

    // client.rm.reportSuccessEnable = false
    public static final boolean IS_REPORT_SUCCESS_ENABLE = ConfigurationFactory.getInstance().getBoolean(
        ConfigurationKeys.CLIENT_REPORT_SUCCESS_ENABLE, DEFAULT_CLIENT_REPORT_SUCCESS_ENABLE);


    private void report(boolean commitDone) throws SQLException {
        if (context.getBranchId() == null) {
            return;
        }
        int retry = REPORT_RETRY_COUNT;//client.rm.reportRetryCount 默认值是 5,不支持运行期变更
        while (retry > ) {
            try {
                DefaultResourceManager.get().branchReport(BranchType.AT, context.getXid(), context.getBranchId(),
                    commitDone ? BranchStatus.PhaseOne_Done : BranchStatus.PhaseOne_Failed, null);
                return;
            } catch (Throwable ex) {
                LOGGER.error("Failed to report [" + context.getBranchId() + "/" + context.getXid() + "] commit done ["
                    + commitDone + "] Retry Countdown: " + retry);
                retry--;

                if (retry == ) {
                    throw new SQLException("Failed to report branch status " + commitDone, ex);
                }
            }
        }
    }

    6.6)向 TC 注册分支事务,并申请全局行锁,如果全局行锁申请成功才意味着注册成功,返回分支事务branchId,存储在上下文中。

    private void register() throws TransactionException {
        // 不需要回滚,或不需要全局锁,就不注册
        if (!context.hasUndoLog() || !context.hasLockKey()) {
            return;
        }
        // 向TC发送 BranchRegisterRequest 请求
        Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
            null, context.getXid(), context.getApplicationData(),
            context.buildLockKeys());
        // 将branchId绑定到上下文中,同一时刻,一个con上只有一个分支事务
        context.setBranchId(branchId);
    }

    6.7)若向 TC 注册分支事务时,因行锁冲突导致注册失败,则会抛出锁冲突的异常LockConflictException,前边提到过重试逻辑中会识别此异常后执行重试,这个重试逻辑就在LockRetryPolicy#doRetryOnLockConflict中。

    protected <T> T doRetryOnLockConflict(Callable<T> callable) throws Exception {
        LockRetryController lockRetryController = new LockRetryController();
        // 循环
        while (true) {
            try {
                return callable.call();
            } catch (LockConflictException lockConflict) {
                // 冲突的情况下,清空context,执行本地rollback();
                onException(lockConflict);
                // AbstractDMLBaseExecutor#executeAutoCommitTrue the local lock is released
                if (connection.getContext().isAutoCommitChanged()
                    && lockConflict.getCode() == TransactionExceptionCode.LockKeyConflictFailFast) {
                    // 这个转换,目前还未搞清楚用意
                    lockConflict.setCode(TransactionExceptionCode.LockKeyConflict);
                }
                // sleep方法里 重试 和 间隔控制;
                // 超过次数抛出异常,退出循环
                lockRetryController.sleep(lockConflict);
            } catch (Exception e) {
                // 其他异常情况下,清空context,执行本地rollback();
                onException(e);
                // 上抛异常
                throw e;
            }
        }
    }

    对于分支事务来说,这其中return callable.call();对应的就是下图中红框圈注的内容

    但需特别注意LockRetryPolicy#onException这个方法是空的,但AbstractDMLBaseExecutor.LockRetryPolicy重写了onException方法,在这个方法中会清除上边重试主体执行过程暂存在上下文中的的锁 key 和 undoLog,并通过原始Connection执行回滚。

    protected void onException(Exception e) throws Exception {
        ConnectionContext context = connection.getContext();
        //UndoItems can't use the Set collection class to prevent ABA
        //清除构建undoLog时,暂存在上下文中的的锁key 和  undoLog
        context.removeSavepoint(null);
        // 通过原始con 执行回滚
        connection.getTargetConnection().rollback();
    }

    从重试的管控逻辑分析可知,AbstractDMLBaseExecutor.LockRetryPolicy的逻辑是这样,当遇到锁冲突异常后,会在onException中清除上下文,执行回滚之后重试,当重试次数达到上限后,还是会上抛异常。另外若遇到的并非锁冲突类的异常,会在onException中清除上下文,执行回滚,之后上跑异常。 异常上抛后,业务上层会接收到该异常,至于是给 TM 模块返回成功还是失败,由业务上层实现决定,如果返回失败,则 TM 识别到异常后,会裁决对全局事务进行回滚。

    四、如果跟 GlobalLock 相关

    简单来说 RM 在数据源代理层的逻辑为

    • 向 TC 查询锁是否存在,全局事务的锁还存在就通过抛异常继续重试
    • 如果向 TC 查询锁不存在,则提交本地事务。

    详情下篇介绍

    参考:

    • https://mp.weixin.qq.com/s/EzmZ-DAi-hxJhRkFvFhlJQ
    • https://www.jianshu.com/p/0ed828c2019a
    • https://blog.csdn.net/zzti_erlie/article/details/120939588

      相关文章