Postgres-xl GTM(全局事务管理器 Globale Transaction Manage

2022-05-06 00:00:00 事务 协调 后端 快照 全局
由于Postgres-xl是基于Postgres-xc的,故这里使用Postgres-xc的资料对其GTM组件进行分析。GTM 是 Postgres-XC 的一个关键组件,用于提供一致的事务管理和元组可见性控制。 首先,我们将介绍 PostgreSQL 如何管理事务和数据库更新。

在 PostgreSQL 中,每个事务都被赋予的 ID,称为事务 ID(或 XID)。 XID 按升序给出,以确定哪个事务较旧/较新。 当事务尝试读取元组时,每个元组都有一组 XID 来指示创建和删除该元组的事务。 因此,如果目标元组是由活动事务创建的,则不会提交或中止,读取事务应忽略此类元组。 以这种方式(在实践中,这是由 PostgreSQL 核心中的 tqual.c 模块完成的),如果我们在整个系统中(不仅在单个服务器中而且在所有服务器中 )给每个事务一个的事务 Id 并维护快照(哪个事务是活动的),即使服务器接受来自仅在其他服务器上运行的其他事务的新语句,我们也可以保持每个元组的全局一致可见性。

这些信息存储在表的每一行的“xmin”和“xmax”字段中。 当我们插入行时,插入事务的 XID 记录在 xmin 字段中。 当我们更新表的行(使用 UPDATE 或 DELETE 语句)时,PostgreSQL 不会简单地覆盖旧的行。 相反,PostgreSQL 通过将更新事务的 XID 写入 xmax 字段来“标记”旧行为“已删除”。 在 UPDATE(就像 INSERT)的情况下,会创建新行,其 xmin 字段被“标记”为创建事务的 XID。

这些“xmin”和“xmax”用于确定哪个行对事务可见。 为此,PostgreSQL 需要一个数据来指示特定时间正在运行哪些事务。 这称为“快照”。 如果一个事务在快照中,即使它已经完成也被认为是在运行。 你应该明白,这个特定的时间不仅仅是现在。 如果事务的隔离级别是读提交read committed,则事务需要在一段时间内保持一致的可见性,至少在执行 SQL 语句时是这样。 如果 SQL 语句读取在此执行期间提交的单行,则不是可取的。 因此,在读提交隔离级别的情况下,数据库应该在语句执行前获取快照并在整个执行过程中继续使用它。 在可重复读取和可序列化的情况下,事务需要在整个事务执行过程中保持一致的可见性。在这种情况下,事务应该在语句执行之前获取快照,并且应该在整个事务执行过程中继续使用快照,而不是单个语句执行。如果创建行的事务没有运行,则每行的可见性取决于创建事务是提交还是中止。假设一个表的行是由某个事务创建的,但尚未删除。如果创建事务正在运行,则该行对创建该行的事务可见,但对其他事务不可见。如果创建事务未运行且已提交,则行可见。如果事务被中止,则此行不可见。

因此,PostgreSQL 需要两种信息来确定“哪个事务正在运行”和“是否提交或中止了旧事务”。 前者的信息可以作为“快照”获得。 PostgreSQL 将后面的信息维护为“CLOG”。PostgreSQL 使用所有这些信息来确定哪一行对给定事务可见 - 哪个事务正在运行 --> 快照 - 是否提交或中止了旧事务 --> CLOG 事务的元数据。这种日志用于告诉PostgreSQL哪个事务已经完成、哪个还没有完成



事务管理全局化

在 Postgres-XC 中,GTM 为事务管理提供了以下功能: 1. 给事务分配全局XID(全局事务ID GXID,Global Transaction ID)。 使用 GXID,可以全局识别事务。 如果一项事务写入多个节点,我们可以跟踪此类写入。 2. 提供快照。 GTM 收集所有事务的状态(running、commited、aborted 等)以提供全局快照(全局快照)。请注意,全局快照包括给其他服务器的 GXID,如图 1.8 所示。因为一些较旧的事务可能会访问新的服务器,在这种情况下,如果快照中不包含此类事务的 GXID,则该事务可能会被视为“足够老”,并且可能会读取未提交的行。如果此类事务的 GXID 一开始就包含在快照中,这种不一致不会发生。

我们需要全局快照的原因如下: 2PC协议强制每个分布式事务的更新。但是,它并不强制保持分布式事务对其他事务更新的一致可见性。根据每个节点的提交时间,更新可能对读取这些节点的同一事务可见,也可能不可见。为了保持一致的可见性,我们需要一个全局快照,其中包含postgresxc集群中所有正在运行的事务信息(在本例中为GXID、全局事务id),并在PostgreSQL中找到的读取操作的相同上下文中使用它。

为此,postgresxc引入了一个名为GTM(global transaction manager)的专用组件。GTM作为一个单独的组件运行,并为postgresxc服务器上运行的每个事务提供且有序的事务id。我们称之为GXID(全局事务Id),因为这是全局的Id, GTM从事务接收GXID请求并提供GXID。它还跟踪所有事务的开始和结束时间,以生成用于控制每个元组可见性的快照。因为这里的快照也是全局属性,所以称为全局快照。只要每个事务都使用GXID和Global Snapshot运行,它就可以在整个系统中保持一致的可见性,并且在任何服务器上并行运行事务都是安全的。

另一方面,一个由多个语句组成的事务可以使用多个服务器来执行,保持一致的更新和可见性。这一机制的概要如图1.9所示。请注意每个快照中包含的事务是如何根据全局事务进行更改的。

GTM为每个事务提供全局事务Id,并跟踪所有事务的状态,无论事务是正在运行、已提交还是已中止,以计算全局快照以保持元组可见性。 请注意,每个事务在开始和结束时以及在两阶段提交协议中发出PREPARE transaction命令时都会报告。还请注意,GTM提供的全局快照包括在其他组件上运行的其他事务。

每个事务根据PostgreSQL中的事务隔离级别请求快照。如果事务隔离级别为“read committed”,则事务将为每个语句请求一个快照。如果事务隔离级别为“repeatable read”,则事务将在事务开始时请求一个快照,并在整个事务中重用它。 GTM还提供全局值,例如sequence。其他全局属性(如时间戳和通知)将是以下版本中的一个扩展。

关键组件间的交互

如前一节所述,postgresxc有三个主要组件来提供多节点读写的全局一致性,并确定每条语句应该转到哪个datanode和处理该语句。 图1.11给出了postgresxc组件之间的全局事务控制和交互顺序。 如图所示,当协调器开始一个新事务时,它向GTM请求新的事务ID(GXID,global transaction ID)。GTM跟踪这些需求以计算全局快照。


如果事务隔离模式是重复读取,则将获取快照并在整个事务中使用。当协调器coordinator接受来自应用程序的语句并且隔离模式为READ COMMITTED时,将从GTM获取快照。然后分析语句,确定要转到哪个datanode,并在必要时为每个datanode进行转换。 请注意,语句将通过GXID和global snapshot传递到适当的datanode,以维护全局事务标识和表的每行的可见性。每个结果都将被收集并计算到对应用程序的响应中。 在事务结束时,如果事务中的更新涉及多个datanode,协调器将为2PC发出PREPARE transaction,然后发出COMMIT。这些步骤将报告给GTM,并跟踪每个事务状态,以便计算后续全局快照。

GTM的角色

GTM 有四种角色:Master、Slave、Proxy和client。如下是这些角色的配置文件的示例。


GTM Master

图1.11中的顺序,可以如图1.12所示实现。协调器Coordinator后端对应于PostgreSQL的后端进程,它处理来自应用程序的数据库连接并处理每个事务。 结构和算法概述如下: 1. Coordinator后端提供GTM客户端库,获取GXID和快照并报告事务状态。 2. GTM打开一个端口以接受来自每个协调器Coordinator后端的连接。当GTM接受连接时,它会创建一个线程(GTM thread)来处理从连接的协调器后端到GTM的请求。 3. GTM线程接收每个请求,记录并将GXID、快照和其他响应返回给协调器Coordinator后端。 4. 重复上述顺序,直到协调器后端请求断开连接。

这里看一个重要流程(当GTM接受连接时,它会创建一个线程GTM thread来处理从连接的协调器后端到GTM的请求)的代码: postgres-xl-10r1.1 gtm/main/main.c中的main函数第838行的ServerLoop函数,其主要结构是一个for(;;)死循环,用于等待连接的到来,多等待一分钟,保证其他后台有时间处理。这里主要看该死循环中的所进行的步骤: 1. 允许所有信号 PG_SETMASK(&UnBlockSig) 2. 判断是否退出GTM程序,判断标识GTMAbortPending 3. GTM_RWLockRelease,释放锁 Now GTM-Standby can backup current status during this region 4. selres = select(nSockets, &rmask, NULL, NULL, &timeout) 使用select接收连接 5. GTM_RWLockAcquire,获取锁 Prohibit GTM-Standby backup from here. 6. 阻塞所有信号 PG_SETMASK(&BlockSig) 7. 检测select函数调用结果 8. 如果有新连接连接进来,则fork一个GTM thread子进程进行处理

static int ServerLoop(void) {
    fd_set      readmask;
    int         nSockets;
    nSockets = initMasks(&readmask);
    for (;;)
    {
        fd_set      rmask;
        int         selres;
        //MemoryContextStats(TopMostMemoryContext);
        /* Wait for a connection request to arrive. We wait at most one minute, to ensure that the other background
         * tasks handled below get done even when no requests are arriving. */
        memcpy((char *) &rmask, (char *) &readmask, sizeof(fd_set));
        PG_SETMASK(&UnBlockSig);
        if (GTMAbortPending) {
            /* XXX We should do a clean shutdown here. For the time being, just write the next GXID to be issued in the control file and exit gracefully */
            elog(LOG, "GTM shutting down.");
            /* Tell GTM that we are shutting down so that no new GXIDs are issued this point onwards */
            GTM_SetShuttingDown();
            SaveControlInfo();
            exit(1);
        }
        {
            /* must set timeout each time; some OSes change it! */
            struct timeval timeout;
            GTM_ThreadInfo *my_threadinfo = GetMyThreadInfo;
            timeout.tv_sec = 60;
            timeout.tv_usec = 0;
            /* Now GTM-Standby can backup current status during this region */
            GTM_RWLockRelease(&my_threadinfo->thr_lock);
            selres = select(nSockets, &rmask, NULL, NULL, &timeout);
            /* Prohibit GTM-Standby backup from here. */
            GTM_RWLockAcquire(&my_threadinfo->thr_lock, GTM_LOCKMODE_WRITE);
        }

        /* Block all signals until we wait again.  (This makes it safe for our signal handlers to do nontrivial work.) */
        PG_SETMASK(&BlockSig);
        /* Now check the select() result */
        if (selres < 0){
            if (errno != EINTR && errno != EWOULDBLOCK){
                ereport(LOG,(EACCES,errmsg("select() failed in main thread: %m")));
                return STATUS_ERROR;
            }
        }
        /* New connection pending on any of our sockets? If so, fork a child process to deal with it. */
        if (selres > 0){
            int         i;
            for (i = 0; i < MAXLISTEN; i++){
                if (ListenSocket[i] == -1)
                    break;
                if (FD_ISSET(ListenSocket[i], &rmask)){
                    Port       *port;
                    port = ConnCreate(ListenSocket[i]);
                    if (port){
                        GTM_Conn *standby = NULL;
                        standby = gtm_standby_connect_to_standby();
                        if (GTMAddConnection(port, standby) != STATUS_OK){
                            gtm_standby_disconnect_from_standby(standby);
                            StreamClose(port->sock);
                            ConnFree(port);
                        }
                    }
                }
            }
        }
    }
}

相关文章