POSTGRESQL 事务控制(一) (写着费力,看着费劲系列)

2021-08-26 00:00:00 事务 分配 状态 结构 触发

近发现一个问题, 近写的关于感性的文字如 DBA 职业迷茫何去何从, 和另外一篇都是较高的用户读取量, 而反观到技术性的文字,基本上都不太高, 能到400以上就属于"上帝帮助" 了


原因我是明白的, 大众化的东西受众必然很多, 反而纯技术性的文字实撰写困难,首先自己要理解, 然后在转化. 很难掺杂个人感情,耗费精力大. 纵然如此个人还是喜欢去搞技术性的东西,主要有两点  1 要靠这个吃饭, 2 个人兴趣. 

这边准备一个深入系列,其实这也是费力不讨好, 自己写的费劲, 大家看的也比较费劲, 估计阅读数也不会太高。

___________________________________________________________________________

本期主要从事务入手, 从POSTGRESQL 的事务的原理由浅入深的开始搞搞更深入的东西, 这一个系列注重原理,源码, 少操作.

PostgreSQL 的事务的形成和处理是通过 Transaction Block 块, 这个Transaction Block 块中会包含, 一条SQL ,或者 N 条SQL . 

下面是postgresql 的在事务处理中的事务可能处于的状态, (为后面和代码连接做准备)



事务处理的分为 begin commit  rollback 三个过程, 这里分别有几个函数来代表功能的完成, 在事务处理中, 主要分为上层, 中层, 下层三种函数

上层

1  BeginTransacionBlock

2  EndTransactionBlock

3  UserAbortTransactionBlock

4  DefineSavepoint

5  RollbackToSavepoint

6  ReleaseSavepoint

中层

1 StartTransactionCommand

2 CommitTransactionCommand

3 AbortTransactionCommand

底层

1 StartTransaction

2 CommitTransation

3 AbortTransaction

4 Cleanup Transaction

5 StartSubTransaction

6 CommitSubTransaction

7 AbortSub Transaction

8 CleanupsSub Transaction


下面通过一个事务的实例来看上面的函数和状态如何应用到事务的处理当中


typedef struct TransactionStateData

{

    

    TransactionId transactionId;    

    SubTransactionId subTransactionId;     

    char       *name;        

    int         savepointLevel; 

    TransState  state;         

    TBlockState blockState;     

    int         nestingLevel; 

    int         gucNestLevel;   

   MemoryContext curTransactionContext;   

    ResourceOwner curTransactionOwner;  

    TransactionId *childXids;   

    int         nChildXids;    

    int         maxChildXids;   

    Oid         prevUser;      

    int         prevSecContext; 

    bool        prevXactReadOnly;   

    bool        startedInRecovery; 

    bool        didLogXid;     

    int         parallelModeLevel;   

    struct TransactionStateData *parent;    

} TransactionStateData;


以一个简单得事务,会使用如下的流程和相关的函数.


看上图, 在事务的开始会在 typedef struct TransactionStateData 结构体内修改transState  状态,默认值为trans_default,  在执行了Begin后,会开始获取 transactionID, 然后开始变化结构体的状态, 将状态变为Trans_start, 然后马上执行语句,分配transactionID, 在将事务的状态变为 Trans_inprogress,

在事务运行完毕,并提交是将事务的状态转为 trans_abort.   

在期间调用 heap_insert 函数,将数据插入到数据页面中. 


下图是证明产生事务后,也不见得产生事务ID, 只要整体的事务中没有任何的DML操作, Insert 操作, 则是不会分配事务ID的. 


txid_current_if_assigned()


上面的只是非常简单的事务,而复杂的事务,都会包含 子事务, 以及一些回滚点, 如在事务中加载了save point . 则分配事务 SubTransactionId subTransactionId;  


上面这段代码的就是为事务,以及子事务分配事务号的

if (isSubXact && !TransactionIdIsValid(s->parent->transactionId)

如果是子事务,并且结构体中中没有父的事务ID  则设置初始值

则根据结构体中的 nestingLevel 的级别来分配数组, 

parents = palloc(sizeof(TransactionState) * s->nestingLevel);

然后通过循环得方式, 为每一个子事务来分配事务ID

while (p != NULL && !TransactionIdIsValid(p->transactionId))
{
parents[parentOffset
++] = p;
p
= p->parent;
}

PG 获取事务ID 主要是通过无符号整型事务ID的计数器来分配事务ID ,ID是一个32位的整型递增的趋势,通过

1  结构体

2  缓存计数器

3  分配函数

三个部分组成事务ID的分配的任务.

同时由于FREEZEING的问题, 在分配事务ID的时候还要进行相关的判断,判断当前的分配得事务号,是否已经到了警戒线.


下面是这段分配事务ID 的代码, 及个人理解注解

/*

 

TransactionId

GetNewTransactionId(bool isSubXact)

{

    TransactionId xid;

   

    if (IsInParallelMode())

        elog(ERROR, "cannot assign TransactionIds during a parallel operation");

#如果在并行模式,则不会进行分配事务ID

    

    if (IsBootstrapProcessingMode())

    {

        Assert(!isSubXact);

        MyPgXact->xid = BootstrapTransactionId;

        return BootstrapTransactionId;//--> 1

    }

    #事务初始化之初先进行事务的ID的初始化

   


    if (RecoveryInProgress())

        elog(ERROR, "cannot assign TransactionIds during recovery");

    LWLockAcquire(XidGenLock, LW_EXCLUSIVE);

#如果事务在运行的模式,则不能分配事务ID, 否则获取LW锁,准备进行事务ID的分配

    

    xid = ShmemVariableCache->nextXid;

# 这里通过内存中的共享结构分配一个事务ID

#此时为了防止PG数据库事务ID回卷, 需要开始对分配事务ID 与现存大的事务ID进行比较

#这里有三个设置,

#1 xidVacLimit

#2 xidWarnLimit

#3 xidStopLimit

#三个值分别代表, 触发xidVacLimit ,系统则自动开始进行autovacuum的操作

#xidwarnLimit 则系统开始发出警告

#xidStopLimit 则系统开始停止工作,进入单用户模式

    

    

    if (TransactionIdFollowsOrEquals(xid, ShmemVariableCache->xidVacLimit))

    {

        

        TransactionId xidWarnLimit = ShmemVariableCache->xidWarnLimit;

        TransactionId xidStopLimit = ShmemVariableCache->xidStopLimit;

        TransactionId xidWrapLimit = ShmemVariableCache->xidWrapLimit;

        Oid         oldest_datoid = ShmemVariableCache->oldestXidDB;

        LWLockRelease(XidGenLock);

        

#当当前的事务ID 除以65536 余数为0 则触发autovacuum的机制

        if (IsUnderPostmaster && (xid % 65536) == 0)

           

            SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_LAUNCHER);

#如果触发了xidStgopLimit

        if (IsUnderPostmaster &&

            TransactionIdFollowsOrEquals(xid, xidStopLimit))

        {

            #获取这个数据库的oid 并转换为数据库名, 然后就开始疯狂的发送告警了

            char       *oldest_datname = get_database_name(oldest_datoid);

         

            if (oldest_datname)

                ereport(ERROR,

                        (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),

                         errmsg("database is not accepting commands to avoid wraparound data loss in database \"%s\"",

                                oldest_datname),

                         errhint("Stop the postmaster and vacuum that database in single-user mode.\n"

                                 "You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));

            else

                ereport(ERROR,

                        (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),

                         errmsg("database is not accepting commands to avoid wraparound data loss in database with OID %u",

                                oldest_datoid),

                         errhint("Stop the postmaster and vacuum that database in single-user mode.\n"

                                 "You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));

        }

        else if (TransactionIdFollowsOrEquals(xid, xidWarnLimit))

        {

          

            char       *oldest_datname = get_database_name(oldest_datoid);

          

            if (oldest_datname)

                ereport(WARNING,

                        (errmsg("database \"%s\" must be vacuumed within %u transactions",

                                oldest_datname,

                                xidWrapLimit - xid),

                         errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n"

                                 "You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));

            else

                ereport(WARNING,

                        (errmsg("database with OID %u must be vacuumed within %u transactions",

                                oldest_datoid,

                                xidWrapLimit - xid),

                         errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n"

                                 "You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));

        }

        /* Re-acquire lock and start over */

        

        # 在每一次获取事务ID 都会触发一次报警,警告信息在上面完成

LWLockAcquire(XidGenLock, LW_EXCLUSIVE);

        xid = ShmemVariableCache->nextXid;

    }

    #相关的事务ID 需要在CommitTs, WAL LOG中进行更新

    ExtendCLOG(xid);

    ExtendCommitTs(xid);

    ExtendSUBTRANS(xid);

    #然后将xid在缓冲结构中更新为下一次其他事务获取ID做准备

    TransactionIdAdvance(ShmemVariableCache->nextXid);

#同时需要判断是否是子事务

if (!isSubXact)

    #不是将就XID 放入缓冲共享体中,让事务开始对其他事务的可见性起作用

        MyPgXact->xid = xid;    

    else               

#如果不是则就是子事务,那么就需要进行循环,为每一个子事务,在这个事务上+1

 

    {

        int         nxids = MyPgXact->nxids;

        if (nxids < PGPROC_MAX_CACHED_SUBXIDS)

        {

            MyProc->subxids.xids[nxids] = xid;

            pg_write_barrier();

            MyPgXact->nxids = nxids + 1;

        }

        else

            MyPgXact->overflowed = true;

    }

    

    LWLockRelease(XidGenLock);

    return xid;

#此时一个整体的事务分配才完成



以下是 shmemVariableCache 的缓存结构

typedef struct VariableCacheData

{

   

    Oid         nextOid;        

    uint32      oidCount;      

    TransactionId nextXid;     

    TransactionId oldestXid;    #当前小的xid

    TransactionId xidVacLimit;  #存储触发autovacuum的xid预设值

    TransactionId xidWarnLimit; #告警的xid预设值

    TransactionId xidStopLimit; #设置停止工作的xid预设值

   TransactionId xidWrapLimit;  #设置整体系统冻结时的XID预设值

   

    Oid         oldestXidDB;    #当前拥有老的事务的数据库OID

    TransactionId oldestCommitTsXid;  老的commit id

    TransactionId newestCommitTsXid;  新的commit xid

    TransactionId latestCompletedXid;   

    TransactionId oldestClogXid;    /* oldest it's safe to look up in clog */

} VariableCacheData;

typedef VariableCacheData *VariableCache;

VariableCache ShmemVariableCache = NULL;


通过阅读代码可以今天可以了解如下


1  数据库freeze 的触发机制是在分配事务ID时触发的

2  数据库从强制autovacuum 到 FREEZE 是有一个过程的, 相关的的警告信息会不断在这期间进行发送

3  触发事务autovacuum 的机制          if (IsUnderPostmaster && (xid % 65536) == 0)

4   尽量不要建立太多的子事务, 原因从分配事务ID 也可以看出来,save point的功能怎么使用心理的有点数


相关文章