上篇讲解了opengauss CLOG模块分区优化原理篇,本文将从源代码实现层面讨论具体实现。原理部分内容见:Opengauss CLOG模块分区优化–1(原理)
1 CLOG 轻量级分区锁
/* CLog lwlock partition*/
#define CBufHashPartition(hashcode) \((hashcode) % NUM_CLOG_PARTITIONS)
#define CBufMappingPartitionLock(hashcode) \(&t_thrd.shemem_ptr_cxt.mainLWLockArray[FirstCBufMappingLock + CBufHashPartition(hashcode)].lock)
#define CBufMappingPartitionLockByIndex(i) \(&t_thrd.shemem_ptr_cxt.mainLWLockArray[FirstCBufMappingLock + i].lock)
2 CLOG 轻量级分区锁初始化
for (id = 0; id < NUM_CLOG_PARTITIONS; id++, lock++) {LWLockInitialize(&lock->lock, LWTRANCHE_CLOG_BUFMAPPING);}
3 CLOG共享内存初始化
与原生的postgres相比,新增每个分区CLOG 共享内存的初始化 ,用分区锁代替之前的全局大锁
void CLOGShmemInit(void)
{int i = 0;int rc = 0;char name[SLRU_MAX_NAME_LENGTH];for (i = 0; i < NUM_CLOG_PARTITIONS; i++) {rc = sprintf_s(name, SLRU_MAX_NAME_LENGTH, "%s%d", "CLOG Ctl", i);securec_check_ss(rc, "\0", "\0");SimpleLruInit(ClogCtl(i),name,LWTRANCHE_CLOG_CTL,CLOGShmemBuffers(),CLOG_LSNS_PER_PAGE,CBufMappingPartitionLockByIndex(i),"pg_clog");}
}
4 CLOG模块的Bootstrap执行逻辑
/** This func must be called ONCE on system install. It creates* the initial CLOG segment. (The CLOG directory is assumed to* have been created by initdb, and CLOGShmemInit must have been* called already.)*/
void BootStrapCLOG(void)
{int slotno;int64 pageno;// 引导阶段批量初始化 32个clog页,需要进行写入刷盘操作for (pageno = 0; pageno < CLOG_BATCH_SIZE; pageno++) {(void)LWLockAcquire(ClogCtl(pageno)->shared->control_lock, LW_EXCLUSIVE);slotno = ZeroCLOGPage(pageno, false);SimpleLruWritePage(ClogCtl(pageno), slotno);Assert(!ClogCtl(pageno)->shared->page_dirty[slotno]);LWLockRelease(ClogCtl(pageno)->shared->control_lock);}pageno = TransactionIdToPage(t_thrd.xact_cxt.ShmemVariableCache->nextXid);(void)LWLockAcquire(ClogCtl(pageno)->shared->control_lock, LW_EXCLUSIVE);if (pageno >= CLOG_BATCH_SIZE) {/* Create and zero the first page of the commit log */slotno = ZeroCLOGPage(pageno, false);/* Make sure it's written out */SimpleLruWritePage(ClogCtl(pageno), slotno);Assert(!ClogCtl(pageno)->shared->page_dirty[slotno]);}LWLockRelease(ClogCtl(pageno)->shared->control_lock);
}
5 ShutdownCLOG
该函数的功能负责关闭CLOG缓冲区,并将各个分区的脏数据进行刷盘
/** This must be called ONCE during postmaster or standalone-backend shutdown*/
void ShutdownCLOG(void)
{/* Flush dirty CLOG pages to disk */TRACE_POSTGRESQL_CLOG_CHECKPOINT_START(false);for (int i = 0; i < NUM_CLOG_PARTITIONS; i++) {(void)SimpleLruFlush(ClogCtl(i), false);}TRACE_POSTGRESQL_CLOG_CHECKPOINT_DONE(false);
}
6 CheckPointCLOG
该函数的功能在检查点期间将各个分区的脏数据进行刷盘
/** Perform a checkpoint --- either during shutdown, or on-the-fly*/
void CheckPointCLOG(void)
{/* Flush dirty CLOG pages to disk */TRACE_POSTGRESQL_CLOG_CHECKPOINT_START(true);int flush_num = 0;for (int i = 0; i < NUM_CLOG_PARTITIONS; i++) {flush_num += SimpleLruFlush(ClogCtl(i), true);}g_instance.ckpt_cxt_ctl->ckpt_clog_flush_num += flush_num;TRACE_POSTGRESQL_CLOG_CHECKPOINT_DONE(true);
}
7 ExtendCLOG
/** Make sure that CLOG has room for a newly-allocated XID.** NB: this is called while holding XidGenLock. We want it to be very fast* most of the time; even when it's not so fast, no actual I/O need happen* unless we're forced to write out a dirty clog or xlog page to make room* in shared memory.*/
void ExtendCLOG(TransactionId newestXact, bool allowXlog)
{int64 pageno;/** No work except at first XID of a page.*/if (TransactionIdToPgIndex(newestXact) != 0 && !TransactionIdEquals(newestXact, FirstNormalTransactionId))return;pageno = TransactionIdToPage(newestXact);(void)LWLockAcquire(ClogCtl(pageno)->shared->control_lock, LW_EXCLUSIVE);/* Zero the page and make an XLOG entry about it */ZeroCLOGPage(pageno, !t_thrd.xlog_cxt.InRecovery);LWLockRelease(ClogCtl(pageno)->shared->control_lock);#endif
}
8 TruncateCLOG
/** Remove all CLOG segments before the one holding the passed transaction ID** Before removing any CLOG data, we must flush XLOG to disk, to ensure* that any recently-emitted HEAP_FREEZE records have reached disk; otherwise* a crash and restart might leave us with some unfrozen tuples referencing* removed CLOG data. We choose to emit a special TRUNCATE XLOG record too.* Replaying the deletion from XLOG is not critical, since the files could* just as well be removed later, but doing so prevents a long-running hot* standby server from acquiring an unreasonably bloated CLOG directory.** Since CLOG segments hold a large number of transactions, the opportunity to* actually remove a segment is fairly rare, and so it seems best not to do* the XLOG flush unless we have confirmed that there is a removable segment.*/
void TruncateCLOG(TransactionId oldestXact)
{int64 cutoffPage;/** The cutoff point is the start of the segment containing oldestXact. We* pass the *page* containing oldestXact to SimpleLruTruncate.*/cutoffPage = TransactionIdToPage(oldestXact);/* Check to see if there's any files that could be removed */if (!SlruScanDirectory(ClogCtl(cutoffPage), SlruScanDirCbReportPresence, &cutoffPage))return; /* nothing to remove *//* Write XLOG record and flush XLOG to disk */WriteTruncateXlogRec(cutoffPage);/* Now we can remove the old CLOG segment(s) */SimpleLruTruncate(ClogCtl(cutoffPage), cutoffPage);ereport(LOG, (errmsg("Truncate CLOG at xid %lu", oldestXact)));
}
9 clog_redo
/** CLOG resource manager's routines*/
void clog_redo(XLogReaderState* record)
{uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;errno_t rc = EOK;/* Backup blocks are not used in clog records */Assert(!XLogRecHasAnyBlockRefs(record));if (info == CLOG_ZEROPAGE) {int64 pageno;int slotno;rc = memcpy_s(&pageno, sizeof(int64), XLogRecGetData(record), sizeof(int64));securec_check(rc, "", "");(void)LWLockAcquire(ClogCtl(pageno)->shared->control_lock, LW_EXCLUSIVE);slotno = ZeroCLOGPage(pageno, false);SimpleLruWritePage(ClogCtl(pageno), slotno);Assert(!ClogCtl(pageno)->shared->page_dirty[slotno]);LWLockRelease(ClogCtl(pageno)->shared->control_lock);} else if (info == CLOG_TRUNCATE) {int64 pageno;rc = memcpy_s(&pageno, sizeof(int64), XLogRecGetData(record), sizeof(int64));securec_check(rc, "", "");/** During XLOG replay, latest_page_number isn't set up yet; insert a* suitable value to bypass the sanity test in SimpleLruTruncate.*/ClogCtl(pageno)->shared->latest_page_number = pageno;SimpleLruTruncate(ClogCtl(pageno), pageno);} elseereport(PANIC, (errmsg("clog_redo: unknown op code %u", (uint32)info)));
}
10 WriteTruncateXlogRec
/** Write a TRUNCATE xlog record** We must flush the xlog record to disk before returning --- see notes* in TruncateCLOG().*/
static void WriteTruncateXlogRec(int64 pageno)
{XLogRecPtr recptr;XLogBeginInsert();XLogRegisterData((char*)(&pageno), sizeof(int64));recptr = XLogInsert(RM_CLOG_ID, CLOG_TRUNCATE);XLogFlush(recptr);
}