源码学习 —— 列存储(CStore)(四)】"/>
【 OpenGauss源码学习 —— 列存储(CStore)(四)】
列存储(CStore)(四)
- 概述
- CStore::FillVecBatch 函数
- VectorBatch 类
- CStore::FillVector 函数
- ScalarVector 类
- CStore::FillVectorByTids 函数
- CStore::FillVectorLateRead 函数
- CStore::FillVectorByIndex 函数
- CStore::FillSysVecByTid 函数
- CStore::FillSysColVector 函数
- CStore::FillTidForLateRead 函数
- CStore::FillScanBatchLateIfNeed 函数
- 总结
声明:本文的部分内容参考了他人的文章。在编写过程中,我们尊重他人的知识产权和学术成果,力求遵循合理使用原则,并在适用的情况下注明引用来源。
本文主要参考了 OpenGauss1.1.0 的开源代码和《OpenGauss数据库源码解析》一书以及OpenGauss社区学习文档和一些学习资料
概述
本章我们继续在【 OpenGauss源码学习 —— 列存储(CStore)(三)】基础上进行进一步学习,本文将继续介绍 CStore 类中与填充列存储引擎中的数据结构 ScalarVector 或 VectorBatch有关的部分公有成员函数。
CStore::FillVecBatch 函数
CStore::FillVecBatch 函数执行了填充 VectorBatch 的操作,包括正常列、系统列、常量列以及其他列的填充。根据不同的情况,它会设置各个列向量的行数,并在必要时处理已删除的行。
这个函数 CStore::FillVecBatch 的执行过程可以总结如下:
- 函数开始时,首先会进行一些断言检查,确保传入的 vecBatchOut 不为空。
- 随后,函数会获取当前指定的 idx 值,该值可能用于确定要处理的列的数据。
- 函数初始化了一些变量,包括 deadRows、i 和 hasCtidForLateRead。
- 接下来,函数执行一系列步骤,其中每个步骤都用于填充 VectorBatch 中的不同类型的列数据:
- 步骤1:填充正常列数据:这个步骤涉及遍历正常列,对每个列执行以下操作:
- 检查是否列已被标记为已删除,如果是则抛出错误。
- 根据列的索引,获取对应的标量向量 vec 和 CUDesc 描述信息。
- 获取可能的删除掩码(delete mask),以便进行后续操作。
- 根据是否支持延迟读取(IsLateRead)来决定是直接填充数据,还是处理延迟读取的情况。
- 对于延迟读取列,如果尚未填充 ctid 信息,会先填充 ctid 信息。
- 步骤2:填充系统列数据:这个步骤用于填充系统列数据,遍历系统列并将其填充到 VectorBatch 中。
- 步骤3:填充常量列数据:如果存在常量列,它会仅设置行数,不涉及实际的数据填充。
- 步骤4:填充其他列数据:这个步骤用于填充其他类型的列数据,通常用于已删除的列,将它们设置为 Null。
- 最后,函数返回已删除行数(deadRows)。
CStore::FillVecBatch 函数源码如下所示:(路径:src\gausskernel\storage\cstore\cstore_am.cpp
)
int CStore::FillVecBatch(_out_ VectorBatch* vecBatchOut)
{// 断言:确保传入的vecBatchOut不为空Assert(vecBatchOut);int idx = m_CUDescIdx[m_cursor];int deadRows = 0, i;this->m_cuDescIdx = idx;bool hasCtidForLateRead = false;/* 步骤1:填充正常列数据(如果需要) */for (i = 0; i < m_colNum; ++i) {int colIdx = m_colId[i];// 如果列被标记为已删除,抛出错误if (m_relation->rd_att->attrs[colIdx]->attisdropped) {ereport(PANIC,(errmsg("无法填充表 \"%s\" 的已删除列 \"%s\" 的 VecBatch 数据",RelationGetRelationName(m_relation),NameStr(m_relation->rd_att->attrs[colIdx]->attname)));}if (likely(colIdx >= 0)) {Assert(colIdx < vecBatchOut->m_cols);ScalarVector* vec = vecBatchOut->m_arr + colIdx;CUDesc* cuDescPtr = m_CUDescInfo[i]->cuDescArray + idx;GetCUDeleteMaskIfNeed(cuDescPtr->cu_id, m_snapshot);// 不能进行延迟读取if (!IsLateRead(i)) {int funIdx = m_hasDeadRow ? 1 : 0;deadRows = (this->*m_colFillFunArrary[i].colFillFun[funIdx])(i, cuDescPtr, vec);} else {// 对于延迟读取的列,如果还未填充ctid信息if (!hasCtidForLateRead) {if (!m_hasDeadRow)deadRows = FillTidForLateRead<false>(cuDescPtr, vec);elsedeadRows = FillTidForLateRead<true>(cuDescPtr, vec);hasCtidForLateRead = true;this->m_laterReadCtidColIdx = colIdx;} elsevec->m_rows = vecBatchOut->m_rows;}vecBatchOut->m_rows = vec->m_rows;}}// 步骤2:填充系统列数据(如果需要)for (i = 0; i < m_sysColNum; ++i) {int sysColIdx = m_sysColId[i];ScalarVector* sysVec = vecBatchOut->GetSysVector(sysColIdx);deadRows = FillSysColVector(sysColIdx, m_virtualCUDescInfo->cuDescArray + idx, sysVec);vecBatchOut->m_rows = sysVec->m_rows;}// 步骤3:填充常量列数据(如果需要)if (unlikely(m_onlyConstCol)) {// 仅设置行数CUDesc* cuDescPtr = m_virtualCUDescInfo->cuDescArray + idx;int liveRows = 0, leftSize = cuDescPtr->row_count - m_rowCursorInCU;ScalarVector* vec = vecBatchOut->m_arr;errno_t rc = memset_s(vec->m_flag, sizeof(uint8) * BatchMaxSize, 0, sizeof(uint8) * BatchMaxSize);securec_check(rc, "", "");Assert(deadRows == 0 && leftSize > 0);GetCUDeleteMaskIfNeed(cuDescPtr->cu_id, m_snapshot);for (i = 0; i < leftSize && liveRows < BatchMaxSize; i++) {if (IsDeadRow(cuDescPtr->cu_id, i + m_rowCursorInCU))++deadRows;else++liveRows;}vec->m_rows = liveRows;vecBatchOut->m_rows = vec->m_rows;}/* 步骤4:填充其他列数据(如果需要),通常用于已删除的列 */for (i = 0; i < vecBatchOut->m_cols; i++) {if (m_relation->rd_att->attrs[i]->attisdropped) {ScalarVector* vec = vecBatchOut->m_arr + i;vec->m_rows = vecBatchOut->m_rows;vec->SetAllNull();}}// 返回已删除行数return deadRows;
}
注解:
延迟读(Late Read)是一种数据处理策略,通常在数据库或数据存储系统中使用。它指的是在需要访问数据时,尽可能地延迟从磁盘或其他存储介质中读取数据,以减少 I/O 开销和提高性能。延迟读的目标是根据需要只读取所需的数据,而不是预先读取所有数据,从而节省时间和资源。
系统列(System Column)是数据库表中的一种特殊列,用于存储系统级别的信息,通常与数据库管理或表维护相关。这些列不包含用户数据,而包括诸如行标识符(如行号或 CTID)以及其他系统元数据。例如,在数据库中,ctid 列存储了行的物理地址信息。
常量列(Constant Column)指的是表中的一列,其所有值都是常量,即不会发生变化的固定值。常量列通常用于存储一些固定的元数据信息或标识符,而不是存储可变的用户数据。例如,一个包含产品信息的表可能有一个列用于存储产品的制造日期,这个日期通常不会变化,因此可以被视为常量列。
其他列(Other Column)通常是指表中的普通列,包括存储用户数据的列,这些列可以包含各种数据类型,如整数、文本、日期等。这些列用于存储表中的实际数据,而不是系统信息或常量数据。
VectorBatch 类
CStore::FillVecBatch 函数执行了填充 VectorBatch 的操作,那 VectorBatch 是什么呢?起什么作用?
VectorBatch 是一个 C++ 类,这个类用于管理批量数据的处理,提供了各种方法来操作、序列化、反序列化以及复制数据。它在数据处理和数据库系统中的许多场景中都有广泛的应用。该类的函数源码如下所示:(路径:src/include/vecexecutor/vectorbatch.h
)
// A batch of vectorize rows
// 一批矢量行数据class VectorBatch : public BaseObject {
public:// number of rows in the batch.// 批量数据中的行数int m_rows;// number of columns in the batch.// 批量数据中的列数int m_cols;// Shall we check the selection vector.// 是否应检查选择向量bool m_checkSel;// Selection vector;// 选择向量bool* m_sel;// ScalarVector// 标量向量数组ScalarVector* m_arr;// SysColumns// 系统列容器SysColContainer* m_sysColumns;// Compress buffer// 压缩缓冲区StringInfo m_pCompressBuf;public:// Many Constructors// 构造函数,有多个重载版本VectorBatch(MemoryContext cxt, TupleDesc desc);VectorBatch(MemoryContext cxt, VectorBatch* batch);VectorBatch(MemoryContext cxt, ScalarDesc* desc, int ncols);// Deconstructor.// 析构函数~VectorBatch();// Serialize the particular data index of the batch into the buffer.// 将批量数据的特定数据索引序列化到缓冲区中void Serialize(StringInfo buf, int idx);// Deserialze the per-row msg into the batch// 将每行消息反序列化到批量数据中void Deserialize(char* msg);// Serialize the batch into the buffer without compress.// 将批量数据序列化到缓冲区中,不进行压缩void SerializeWithoutCompress(StringInfo buf);// Deserialze the msg into the batch without compress.// 将消息反序列化到批量数据中,不进行解压缩void DeserializeWithoutDecompress(char* msg, size_t msglen);// Serialize the batch into the buffer with lz4 compress.// 将批量数据序列化到缓冲区中,使用LZ4压缩void SerializeWithLZ4Compress(StringInfo buf);// Deserialze the compressed msg into the batch with lz4 compress.// 将压缩的消息反序列化到批量数据中,使用LZ4解压缩void DeserializeWithLZ4Decompress(char* msg, size_t msglen);// Reset// 重置对象状态void Reset(bool resetflag = false);// Reset selection// 重置选择向量void ResetSelection(bool value);// Test the batch is valid or not// 检查批量数据是否有效bool IsValid();// Fix row count// 修正行数void FixRowCount();// Fix row count with given rows// 使用给定的行数修正行数void FixRowCount(int rows);// Pack the batch// 打包批量数据void Pack(const bool* sel);// Optimized Pack function// 优化的打包函数void OptimizePack(const bool* sel, List* CopyVars);// Optimized Pack function for later read. later read cols and ctid col// 用于延迟读取的优化打包函数,用于处理延迟读取的列和ctid列void OptimizePackForLateRead(const bool* sel, List* lateVars, int ctidColIdx);// SysColumns// 系统列操作// Create SysColContainer// 创建系统列容器void CreateSysColContainer(MemoryContext cxt, List* sysVarList);// Get system vector// 获取系统列向量ScalarVector* GetSysVector(int sysColIdx);// Get system column number// 获取系统列数量int GetSysColumnNum();// Copy data between two VectorBatch objects// 在两个VectorBatch对象之间复制数据template <bool deep, bool add>void Copy(VectorBatch* batch, int start_idx = 0, int endIdx = -1);// Copy Nth row data from another VectorBatch object// 从另一个VectorBatch对象中复制第N行的数据void CopyNth(VectorBatch* batchSrc, int Nth);public:/* Pack template function. */// 打包的模板函数template <bool copyMatch, bool hasSysCol>void PackT(_in_ const bool* sel);/* Optimize template function. */// 优化打包的模板函数template <bool copyMatch, bool hasSysCol>void OptimizePackT(_in_ const bool* sel, _in_ List* CopyVars);/* Optimize template function for later read. */// 用于延迟读取的优化打包的模板函数template <bool copyMatch, bool hasSysCol>void OptimizePackTForLateRead(_in_ const bool* sel, _in_ List* lateVars, int ctidColIdx);private:// init the vectorbatch.// 初始化VectorBatch对象void init(MemoryContext cxt, TupleDesc desc);// init the vectorbatch from another VectorBatch object// 从另一个VectorBatch对象初始化VectorBatch对象void init(MemoryContext cxt, VectorBatch* batch);// init the vectorbatch from scalar descriptors and column count// 从标量描述符和列数初始化VectorBatch对象void init(MemoryContext cxt, ScalarDesc* desc, int ncols);
};
这里不对 VectorBatch 类进行详细展开了,在以后的学习中若有机会再进一步学习。
CStore::FillVector 函数
CStore::FillVector 函数是用于在列中填充矢量数据的函数。它根据列的属性和 CU(压缩单元)的描述,将数据转换为矢量形式。根据不同的情况,它可以处理 NULL 值、相同值的 CU,以及普通的 CU 数据。它还支持处理无效行和 CU 数据的转换。这个函数是数据库系统中数据扫描和查询操作的关键部分。函数源码如下所示:(路径:src/gausskernel/storage/cstore/cstore_am.cpp
)
入参解释:
- seq:一个整数,表示列的序号,用于确定要填充的列。
- cuDescPtr:一个指向 CUDesc 结构的指针,包含了与压缩单元 (CU) 相关的描述信息,如行数、最小值、最大值等。
- vec:一个指向 ScalarVector 结构的指针,用于存储填充后的矢量数据。
// 填充列的矢量数据
template <bool hasDeadRow, int attlen>
int CStore::FillVector(_in_ int seq, _in_ CUDesc* cuDescPtr, _out_ ScalarVector* vec)
{// 获取列的索引int colIdx = this->m_colId[seq];int pos = 0; // 初始化行数据的位置int deadRows = 0; // 初始化无效行的计数// 重置标志位errno_t rc = memset_s(vec->m_flag, sizeof(uint8) * BatchMaxSize, 0, sizeof(uint8) * BatchMaxSize);securec_check(rc, "", "");// 步骤 1:计算剩余的行数int leftRows = cuDescPtr->row_count - this->m_rowCursorInCU;Assert(leftRows > 0); // 确保剩余行数大于零// 步骤 2:CU中的所有值都为NULLif (cuDescPtr->IsNullCU()) {for (int i = 0; i < leftRows && pos < BatchMaxSize; ++i) {if (hasDeadRow && this->IsDeadRow(cuDescPtr->cu_id, i + this->m_rowCursorInCU)) {++deadRows; // 记录无效行数continue;}vec->SetNull(pos); // 将当前行标记为NULL++pos;}vec->m_rows = pos; // 设置实际有效行数return deadRows;}// 步骤 3:如果最小和最大值相等,没有存储CUif (cuDescPtr->IsSameValCU()) {for (int i = 0; i < leftRows && pos < BatchMaxSize; ++i) {if (hasDeadRow && this->IsDeadRow(cuDescPtr->cu_id, i + this->m_rowCursorInCU)) {++deadRows; // 记录无效行数continue;}if (attlen > 0 && attlen <= 8) {Datum cuMin = *(Datum*)(cuDescPtr->cu_min);vec->m_vals[pos] = cuMin;} else if (attlen == 12 || attlen == 16) {Datum cuMin = PointerGetDatum(cuDescPtr->cu_min);vec->AddVar(cuMin, pos);} else {Datum cuMin = PointerGetDatum(cuDescPtr->cu_min + 1);Size len = (Size)(unsigned char)cuDescPtr->cu_min[0];Assert(len < MIN_MAX_LEN);// 将字符串转换为varattrib_1b// 这是安全的,因为len < MIN_MAX_LENchar tmpStr[MIN_MAX_LEN + 4];if (attlen == -1) {Size varLen = len + VARHDRSZ_SHORT;SET_VARSIZE_SHORT(tmpStr, varLen);rc = memcpy_s(VARDATA_ANY(tmpStr), sizeof(tmpStr) - VARHDRSZ_SHORT, DatumGetPointer(cuMin), len);securec_check(rc, "", "");cuMin = PointerGetDatum(tmpStr);}vec->AddVar(cuMin, pos);}++pos;}vec->m_rows = pos; // 设置实际有效行数return deadRows;}// 步骤 4:获取CU数据int slotId = CACHE_BLOCK_INVALID_IDX;CSTORESCAN_TRACE_START(GET_CU_DATA);CU* cuPtr = this->GetCUData(cuDescPtr, colIdx, attlen, slotId);CSTORESCAN_TRACE_END(GET_CU_DATA);// 步骤 5:将CU数据转换为矢量数据pos = cuPtr->ToVector<attlen, hasDeadRow>(vec, leftRows, this->m_rowCursorInCU, this->m_scanPosInCU[seq], deadRows, this->m_cuDelMask);if (IsValidCacheSlotID(slotId)) {// CU已固定CUCache->UnPinDataBlock(slotId);} elseAssert(false);vec->m_rows = pos; // 设置实际有效行数return deadRows; // 返回无效行数
}
函数的主要执行过程可总结如下:
- 获取列的索引 colIdx,以确定要填充的列。
- 初始化行数据的位置 pos 和无效行的计数 deadRows。
- 重置标志位,将标志位数组清零。
- 计算剩余的行数 leftRows,这是压缩单元 (CU) 中剩余的未处理行数。
- 如果压缩单元的所有值都为 NULL,那么处理步骤2:CU 中所有值都为 NULL 的情况,将对应行的矢量数据标记为NULL,并更新 deadRows 和 pos。
- 如果压缩单元的最小值和最大值相等,处理步骤3:CU 中最小值和最大值相等的情况。根据列的数据类型长度 attlen,将对应行的矢量数据填充为最小值或使用其他适当的方式填充,同时更新 deadRows 和 pos。
- 如果压缩单元中有实际数据,需要进一步处理。首先,获取 CU 数据,并将其存储在 cuPtr 中。
- 调用 cuPtr->ToVector 函数,将 CU 数据转换为矢量数据,并将其存储在 vec 中。这一步是处理 CU 中实际数据的核心操作,根据数据类型长度和是否包含无效行来转换数据。
- 如果 CU 已固定,解除 CU 数据块的固定。
- 最后,设置矢量数据的行数 vec->m_rows,返回无效行的计数 deadRows。
ScalarVector 类
ScalarVector 类用于表示列的数据。它包含了用于存储和处理标量数据的各种成员变量和方法。这个数据结构在数据库系统中用于处理列数据,支持不同数据类型的存储和操作。该类的函数源码如下所示:(路径:src/include/vecexecutor/vectorbatch.h
)
// 用于表示列的核心数据结构
class ScalarVector : public BaseObject {friend class VectorBatch; // 声明友元类 VectorBatchpublic:int m_rows; // 值的数量ScalarDesc m_desc; // 用于标识标量值的类型描述信息bool m_const; // 标志位,表示标量矢量中的值是否总是相同uint8* m_flag; // 存储值的标志位数组VarBuf* m_buf; // 用于存储非平凡数据类型的公司缓冲区ScalarValue* m_vals; // 值数组public:static Datum Decode(ScalarValue val); // 解码可变长度数据的静态方法// 将 Datum 转换为标量值的静态方法static ScalarValue DatumToScalar(Datum datumVal, Oid datumType, bool isNull);// 根据数据类型的 Oid 将 Datum 转换为标量值的静态模板方法template <Oid datumType>static ScalarValue DatumToScalarT(Datum datumVal, bool isNull);public:ScalarVector(); // 构造函数~ScalarVector(); // 析构函数// 初始化标量矢量void init(MemoryContext cxt, ScalarDesc desc);// 用于时间序列数据库的初始化,以另一个 ScalarVector 对象初始化void init(MemoryContext cxt, ScalarVector* vec, const int batchSize);// 序列化标量矢量void Serialize(StringInfo buf);// 序列化特定索引的标量矢量void Serialize(StringInfo buf, int idx);// 反序列化矢量char* Deserialize(char* msg, size_t len);// 添加可变长度数据,这个变量可以是来自 cstring、固定长度(> 8)数据类型,或包含变长头的传统 PG 数据Datum AddVar(Datum data, int index);// 添加带头的变量Datum AddVarWithHeader(Datum data);// 在特定位置添加没有头的变量。原始变量将连同内容的长度一起传递。在函数内部,将根据数据类型在实际内容之前添加标量值的头。Datum AddBPCharWithoutHeader(const char* data, int maxLen, int len, int aindex);Datum AddVarCharWithoutHeader(const char* data, int len, int aindex);// 在特定位置添加没有头的短十进制数。十进制值将以 int64 格式传递,连同其精度。在函数内部,将添加标量值的头,并将值转换为 PG 格式。这里仅支持可以使用 int64 存储的短十进制数。Datum AddShortNumericWithoutHeader(int64 value, uint8 scale, int aindex);Datum AddBigNumericWithoutHeader(int128 value, uint8 scale, int aindex);// 添加多个变量char* AddVars(const char* src, int length);// 添加常规包含头的变量Datum AddHeaderVar(Datum data, int index);// 添加 cstring 类型的变量Datum AddCStringVar(Datum data, int index);// 添加固定长度变量的模板方法template <Size len>Datum AddFixLenVar(Datum data, int index);// 复制矢量void copy(ScalarVector* vector, int start_idx, int endIdx);void copy(ScalarVector* vector);// 深度复制矢量void copyDeep(ScalarVector* vector, int start_idx, int endIdx);void copyNth(ScalarVector* vector, int Nth);void copy(ScalarVector* vector, const bool* pSel);// 将 cstring 转换为标量值的静态方法static Datum DatumCstringToScalar(Datum data, Size len);// 将固定长度数据类型转换为标量值的静态方法static Datum DatumFixLenToScalar(Datum data, Size len);// 判断是否为空FORCE_INLINEbool IsNull(int i){Assert(i >= 0 && i < m_rows);return ((m_flag[i] & V_NULL_MASK) == V_NULL_MASK);}// 将指定位置标记为空FORCE_INLINEvoid SetNull(int i){Assert(i >= 0 && i < BatchMaxSize);m_flag[i] |= V_NULL_MASK;}// 将所有值标记为空FORCE_INLINEvoid SetAllNull(){for (int i = 0; i < m_rows; i++) {SetNull(i);}}private:// 初始化一些函数指针void BindingFp();Datum (ScalarVector::*m_addVar)(Datum data, int index); // 成员函数指针
};
其中,ScalarVector 类也在【OpenGauss源码学习(CopyOneRowTo)】中进行过简单介绍,这里也不再进行赘述。
CStore::FillVectorByTids 函数
CStore::FillVectorByTids 函数是 CStore 列式存储引擎的一部分,其主要作用是从压缩的列数据中提取特定行的值,并将这些值填充到一个标量向量中,以供查询操作使用。它通过循环遍历 TID(行标识符)并在必要时加载压缩单元(CU)的数据,以有效地执行数据检索操作。函数根据不同的 CU 类型,如 NULL、相同值或普通数据,采用不同的策略来处理数据提取和填充,以确保高性能的数据检索操作。这个函数在列式存储引擎中起到了关键作用,为高效的数据查询提供了基础支持。函数源码如下所示:(路径:src/gausskernel/storage/cstore/cstore_am.cpp
)
template <int attlen>
void CStore::FillVectorByTids(_in_ int colIdx, _in_ ScalarVector* tids, _out_ ScalarVector* vec)
{ScalarValue* tidVals = tids->m_vals; // 获取Tid值的数组ItemPointer tidPtr = NULL; // 初始化Tid指针uint32 curCUId = InValidCUID; // 初始化当前CU(压缩单元)的IDuint32 tmpCuId = InValidCUID; // 初始化临时CU的IDuint32 tmpOffset = 0; // 初始化临时偏移量uint32 firstOffset = 0; // 初始化第一个偏移量uint32 nextOffset = 0; // 初始化下一个偏移量CUDesc cuDesc; // 创建CU描述对象int pos = 0; // 初始化位置变量int contiguous = 0; // 初始化连续数据的计数bool found = false; // 初始化发现标志// 在同一CU内部,我们只会进行一次缓存的固定/释放bool needLoadCu = false; // 初始化是否需要加载CU数据的标志int slot = CACHE_BLOCK_INVALID_IDX; // 初始化缓存槽位CU* lastCU = NULL; // 初始化最后一个CUerrno_t rc = EOK; // 初始化错误码// 主要复制过程:将每个值复制到输出向量中。要小心// 重用以前值的CU和CU描述符。for (int rowCnt = 0; rowCnt < tids->m_rows; ++rowCnt) {// 注意,tidPointer->tmpOffset 从1开始tidPtr = (ItemPointer)(tidVals + rowCnt); // 获取Tid指针tmpCuId = ItemPointerGetBlockNumber(tidPtr); // 获取Tid的CU IDtmpOffset = ItemPointerGetOffsetNumber(tidPtr) - 1; // 获取Tid的偏移量// 步骤1:获取CU描述和删除掩码(如果需要)if (curCUId != tmpCuId) {if (lastCU != NULL) {// 切换到新的CU。因此尽早取消固定// 以前的CU缓存。Assert(slot != CACHE_BLOCK_INVALID_IDX); // 断言确保槽位有效CUCache->UnPinDataBlock(slot); // 释放前一个CU的数据块// 在取消固定动作之后进行重置。lastCU = NULL; // 重置最后一个CUslot = CACHE_BLOCK_INVALID_IDX; // 重置槽位}// 获取CU描述元组和删除位图。curCUId = tmpCuId; // 更新当前CU IDfound = this->GetCUDesc(colIdx, curCUId, &cuDesc, this->m_snapshot); // 获取CU描述if (!found) {if (m_useBtreeIndex) {m_delMaskCUId = InValidCUID; // 无效CU IDcontinue;} else {Assert(false);// 报告严重错误,指示CU描述符未找到ereport(FATAL,(errmsg("未找到压缩单元描述符,表(%s),列(%s),relfilenode(%u/%u/%u),""cuid(%u)。",RelationGetRelationName(this->m_relation), // 获取关系名NameStr(this->m_relation->rd_att->attrs[colIdx]->attname), // 获取列名this->m_relation->rd_node.spcNode, // 获取存储空间节点this->m_relation->rd_node.dbNode, // 获取数据库节点this->m_relation->rd_node.relNode, // 获取关系节点curCUId)); // 当前CU ID}} else {this->GetCUDeleteMaskIfNeed(curCUId, this->m_snapshot); // 获取CU的删除掩码(如果需要)}// 表示在切换到新的CU时需要加载数据。needLoadCu = true; // 设置需要加载CU数据的标志}// 检查当前CU是否有效(可见)if (m_delMaskCUId == InValidCUID) // 如果删除掩码的CU ID为无效,则跳过continue;// 步骤2:计算相同CU内的连续数据数量contiguous = 0; // 重置连续数据计数nextOffset = tmpOffset; // 重置下一个偏移量firstOffset = tmpOffset; // 设置第一个偏移量// 在相同CU内,且偏移量连续且不是死数据的情况下,计算连续数据数量。while (tmpCuId == curCUId // 在相同CU内&& tmpOffset == nextOffset // 连续偏移量&& !this->IsDeadRow(curCUId, tmpOffset)) // 不是死数据{++contiguous; // 增加连续数据计数++nextOffset; // 移动到下一个偏移量if (unlikely(++rowCnt == tids->m_rows)) // 如果已经处理了所有Tidbreak;// 获取并检查下一个数据// 在相同CU内是连续的数据。tidPtr = (ItemPointer)(tidVals + rowCnt); // 获取下一个TidtmpCuId = ItemPointerGetBlockNumber(tidPtr); // 获取下一个Tid的CU IDtmpOffset = ItemPointerGetOffsetNumber(tidPtr) - 1; // 获取下一个Tid的偏移量}if (unlikely(contiguous == 0)) {// 这是一个死数据,所以检查下一个数据。Assert(this->IsDeadRow(curCUId, tmpOffset)); // 断言确保是死数据continue;} else if (tmpCuId != curCUId || !this->IsDeadRow(curCUId, tmpOffset)) {// 如果是下一个CU的第一个数据,// 或者在同一CU内的第一个新偏移量不是死数据,我们必须再次检查它。--rowCnt; // 重新检查下一个数据}/** 步骤3:填充输出向量。* 情况1:它是一个全NULL值*/if (cuDesc.IsNullCU()) {for (int k = 0; k < contiguous; ++k)vec->SetNull(pos++); // 设置NULL值continue;}// 情况2:它是全相同值if (cuDesc.IsSameValCU()) {if (attlen > 0 && attlen <= 8) {Datum cuMin = *(Datum*)(cuDesc.cu_min); // 获取最小值ScalarValue* dest = vec->m_vals + pos; // 获取目标数组的起始位置// 批量分配cuMin给dest[contiguous]。for (uint32 k = 0; k < ((uint32)contiguous >> 2); ++k) {*dest++ = cuMin; // 复制cuMin值*dest++ = cuMin;*dest++ = cuMin;*dest++ = cuMin;}for (int k = 0; k < (contiguous & 0x03); ++k) {*dest++ = cuMin;}pos += contiguous; // 更新位置} else if (attlen == 12 || attlen == 16) {// 注意:要小心AddVar()会在每个值前插入1字节的头部Datum cuMin = PointerGetDatum(cuDesc.cu_min); // 获取最小值for (int k = 0; k < contiguous; ++k)vec->AddVar(cuMin, pos++); // 添加变长值到输出向量} else {Datum cuMin = PointerGetDatum(cuDesc.cu_min + 1); // 获取最小值Size len = (Size)(unsigned char)cuDesc.cu_min[0];Assert(len < MIN_MAX_LEN); // 断言确保长度有效// 将字符串转换为varattrib_1b// 这是安全的,因为len < MIN_MAX_LENchar tmpStr[MIN_MAX_LEN + 4];if (attlen == -1) {SET_VARSIZE_SHORT(tmpStr, len + VARHDRSZ_SHORT); // 设置变长值的长度rc =memcpy_s(tmpStr + VARHDRSZ_SHORT, sizeof(tmpStr) - VARHDRSZ_SHORT, DatumGetPointer(cuMin), len); // 复制数据securec_check(rc, "", "");cuMin = PointerGetDatum(tmpStr); // 获取变长值}for (int k = 0; k < contiguous; ++k)vec->AddVar(cuMin, pos++); // 添加变长值到输出向量}continue;}// 情况3:它是普通的CUCU* cuPtr = lastCU; // 设置当前CU指针if (unlikely(needLoadCu)) {Assert(lastCU == NULL);Assert(slot == CACHE_BLOCK_INVALID_IDX);// 加载新的CU数据,然后重置标志。CSTORESCAN_TRACE_START(GET_CU_DATA_FROM_CACHE); // 启动CU数据获取的跟踪cuPtr = this->GetCUData(&cuDesc, colIdx, attlen, slot); // 获取CU数据CSTORESCAN_TRACE_END(GET_CU_DATA_FROM_CACHE); // 结束CU数据获取的跟踪lastCU = cuPtr; // 更新最后一个CUneedLoadCu = false; // 重置需要加载CU数据的标志}Assert(cuPtr); // 断言确保CU指针有效if (cuPtr->m_nulls == NULL) {Assert(!cuPtr->HasNullValue());switch (attlen) {case sizeof(char):case sizeof(int16):case sizeof(int32): {// 由于源是1/2/4字节长度,目标是8字节,因此在循环中分配每个项目ScalarValue* dest = vec->m_vals + pos; // 获取目标数组的起始位置for (int k = 0; k < contiguous; ++k)*dest++ = cuPtr->GetValue<attlen, false>(firstOffset++); // 复制CU数据pos += contiguous; // 更新位置break;}case sizeof(Datum): {// 因为源和目标都是8字节,所以通过memcpy()一次性复制所有数据。rc = memcpy_s((char*)(vec->m_vals + pos),(size_t)(uint32)contiguous << 3,((uint64*)cuPtr->m_srcData + firstOffset),(size_t)(uint32)contiguous << 3);securec_check(rc, "\0", "\0");pos += contiguous; // 更新位置break;}case -1:case -2: {// 需要复制的总字节数是从偏移表中计算的int32* offset = cuPtr->m_offset + firstOffset;int32 obase = offset[0];if (contiguous > cuDesc.row_count || contiguous >= (int)(cuPtr->m_offsetSize / sizeof(int32))) {ereport(defence_errlevel(), (errcode(ERRCODE_DATA_CORRUPTED),errmsg("Tid 和 CUDesc, CUId: %u, colId: %d, contiguous: %d, row count: %d.",curCUId, colIdx, contiguous, cuDesc.row_count),errdetail("please reindex the relation. relation info: name \"%s\", namespace id %u, id %u, relfilenode %u/%u/%u",RelationGetRelationName(this->m_relation), RelationGetNamespace(this->m_relation), RelationGetRelid(this->m_relation),this->m_relation->rd_node.spcNode, this->m_relation->rd_node.dbNode, this->m_relation->rd_node.relNode)));}int totalBytes = offset[contiguous] - obase;char* src = cuPtr->m_srcData + obase;// 我们可以一次性复制所有元素,并通过添加内存基址// 修正指针。这里手动展开循环以提供// 给编译器强烈的提示char* base = vec->AddVars(src, totalBytes) - obase;ScalarValue* dest = vec->m_vals + pos;for (uint32 k = 0; k < ((uint32)contiguous >> 2); ++k) {*dest++ = (ScalarValue)(base + *offset++);*dest++ = (ScalarValue)(base + *offset++);*dest++ = (ScalarValue)(base + *offset++);*dest++ = (ScalarValue)(base + *offset++);}for (int k = 0; k < (contiguous & 0x3); k++)*dest++ = (ScalarValue)(base + *offset++);pos += contiguous; // 更新位置指针break;}case 12:case 16: {// 注意:要小心 AddVar() 会在每个值之前插入 1B 头部for (int k = 0; k < contiguous; ++k) {ScalarValue value = cuPtr->GetValue<attlen, false>(firstOffset++);vec->AddVar(PointerGetDatum(value), pos++); // 添加值到输出向量}break;}default:Assert(0);ereport(ERROR, (errcode(ERRCODE_DATATYPE_MISMATCH), (errmsg("不支持的数据类型分支"))));break;}} else {Assert(cuPtr->HasNullValue());// 在正常情况下处理空值for (int k = 0; k < contiguous; ++k) {if (unlikely(cuPtr->IsNull(firstOffset)))vec->SetNull(pos); // 设置当前位置的标量为 NULLelse {ScalarValue value = cuPtr->GetValue<attlen, true>(firstOffset);switch (attlen) {case sizeof(char):case sizeof(int16):case sizeof(int32):case sizeof(Datum):vec->m_vals[pos] = value; // 复制数据break;case 12:case 16:case -1:case -2:vec->AddVar(PointerGetDatum(value), pos); // 添加值到输出向量break;default:Assert(0);ereport(ERROR, (errcode(ERRCODE_DATATYPE_MISMATCH), (errmsg("不支持的数据类型分支")));break;}}++firstOffset; // 更新第一个偏移量++pos; // 更新位置指针}}}if (lastCU != NULL) {// 取消固定最后一个使用的 CU 缓存Assert(slot != CACHE_BLOCK_INVALID_IDX);CUCache->UnPinDataBlock(slot);}vec->m_rows = pos; // 更新输出向量的行数
}
函数执行流程如下:
- 准备好输入参数:函数接收了三个参数,包括列索引(colIdx)、包含TID(行标识符)的输入标量向量(tids)、以及用于存储结果的输出标量向量(vec)。
- 初始化各种变量:函数中声明了一系列变量,如指向输入数据的指针、用于追踪当前处理的压缩单元(CU)的标识符、以及用于记录处理的状态信息。还有一些用于缓存优化的变量,以及用于错误处理的变量。
- 逐行处理TID:循环遍历输入的 TID 标量向量中的每一行。
- 检查当前TID所属的CU:根据 TID 中的块标识符(Block Number),判断当前 TID 属于哪个 CU。如果不是同一个 CU,需要进行一些处理,如清理之前的缓存、获取新 CU 的描述信息等。
- 判断CU是否有效:检查当前 CU 是否有效,如果无效(已被标记为删除),则跳过这个 TID,继续处理下一个 TID。
- 计算连续的TID:在同一个 CU 中,连续的 TID 通常对应相邻的数据。这个步骤用于计算一组连续的 TID。如果 TID 不连续,会在这个步骤中处理。
- 根据CU的类型进行处理:根据当前 CU 的类型,采取不同的处理方式。CU 的类型可能是 NULL、相同值或普通数据。
- 处理NULL类型CU:如果 CU 的类型是 NULL,将在输出标量向量中设置一组连续位置为 NULL。
- 处理相同值类型CU:如果 CU 的类型是相同值,会根据压缩单元中的信息,将相同值复制到输出标量向量的多个位置。
- 处理普通CU:如果 CU 的类型是普通的,需要从 CU 缓存中加载数据,然后将数据逐个复制到输出标量向量中。这一过程包括处理不同的数据类型(如整数、浮点数、文本等)。
- 最后:函数将结果写入输出标量向量,更新位置信息,并在处理完所有 TID 后,返回填充好数据的输出标量向量。
CStore::FillVectorLateRead 函数
CStore::FillVectorLateRead 函数用于在 CStore 扫描的后期读取阶段根据TID(行标识符)填充标量向量。它支持不同的压缩单元类型和不同长度的列数据。具体功能如下:
- 函数的参数包括列索引(colIdx)、TID向量(tids)、CU描述符指针(cuDescPtr),以及用于存储结果的标量向量(vec)。
- 首先,它从 TID 向量中获取 TID 值,并根据 TID 获取对应的CU ID和偏移量。
- 接下来,它根据不同的 CU 类型,执行不同的逻辑:
- Case 1:如果 CU 类型为 NULL,表示该列中的所有值都为空,那么函数将为标量向量中的每个 TID 设置 NULL 值。
- Case 2:如果 CU 类型为相同值 CU,表示该列中的所有值都相同,那么函数将为标量向量中的每个 TID 设置相同的值,根据列的长度类型,值可能是一个标量值、一个定长数据(12或16字节),或一个变长数据(比如字符串)。
- Case 3:如果 CU 类型为普通 CU,表示该列中的值多种多样,那么函数会首先加载相应的 CU 数据,然后根据列的长度类型和是否包含 NULL 值,将对应的数据填充到标量向量中。
- 最后,函数返回填充后的标量向量,以供后续查询操作使用。
如果不是很明白这个函数的执行流程可以看一个案例:
假设有一个大型的数据库表,其中包含了数百万条记录,每条记录有多个列。在某个查询中,你需要检索表中的特定行,而这些行在表中的位置由它们的 TID(行标识符)表示。具体来说,你需要检索以下情况的数据:
- 你需要获取表中前1000行的数据。
- 你已经知道了这些行的TID(行标识符)。
在这种情况下,FillVectorLateRead 函数的作用就变得非常明显。在这个具体场景中,该函数的工作流程如下:
- 你有一个包含前1000行TID的TID向量,这些TID标识了你需要的行。
- 你调用 FillVectorLateRead 函数,传递了列索引(colIdx)、TID向量(tids)、CU 描述符指针(cuDescPtr),以及一个空的标量向量(vec)作为参数。
- FillVectorLateRead 函数根据 TID 向量中的每个 TID,首先确定对应行的 CU(压缩单元),并根据 CU 类型(可能是NULL、相同值、或普通 CU)来从 CU 中提取相应的数据。
- 对于相同值 CU,FillVectorLateRead函数会为所有的 TID 填充相同的值,因为这些行都包含相同的数据。
- 对于普通 CU,FillVectorLateRead 函数会根据行的具体 TID 从 CU 中提取相应的数据。
- 最后,FillVectorLateRead 函数将这些提取出的数据填充到标量向量(vec)中,使你能够方便地访问这些行的数据。
CStore::FillVectorLateRead 函数源码如下:(路径:src/gausskernel/storage/cstore/cstore_am.cpp
)
/** @Description: 通过TID在CStore扫描的最后阶段读取并填充标量向量的函数。* @in colIdx: 列的索引。* @in tids: TID向量。* @in cuDescPtr: 指向CUDesc的指针。* @out vec: 输出的标量向量。* @template attlen: 列的长度。*/
template <int attlen>
void CStore::FillVectorLateRead(_in_ int colIdx, _in_ ScalarVector* tids, _in_ CUDesc* cuDescPtr, _out_ ScalarVector* vec)
{// 获取TID值数组和当前TID指针ScalarValue* tidVals = tids->m_vals;ItemPointer tidPtr = NULL;// 临时存储当前CU和偏移量uint32 tmpCuId = InValidCUID;uint32 tmpOffset = 0;// 初始化标量向量的位置int pos = 0;// 情况1: CU中的数据全为NULL值if (cuDescPtr->IsNullCU()) {// 遍历TID向量for (int rowCnt = 0; rowCnt < tids->m_rows; ++rowCnt) {// 获取当前TID的CU ID和偏移量tidPtr = (ItemPointer)(tidVals + rowCnt);tmpCuId = ItemPointerGetBlockNumber(tidPtr);tmpOffset = ItemPointerGetOffsetNumber(tidPtr) - 1;// 如果当前行对应的数据是无效数据(死行),则跳过if (this->IsDeadRow(cuDescPtr->cu_id, tmpOffset)) {continue;}// 将标量向量中的当前位置标记为NULLvec->SetNull(pos);pos++;}// 设置标量向量的行数vec->m_rows = pos;return;}// 情况2: CU中的数据全为相同值if (cuDescPtr->IsSameValCU()) {// 遍历TID向量for (int rowCnt = 0; rowCnt < tids->m_rows; ++rowCnt) {// 获取当前TID的CU ID和偏移量tidPtr = (ItemPointer)(tidVals + rowCnt);tmpCuId = ItemPointerGetBlockNumber(tidPtr);tmpOffset = ItemPointerGetOffsetNumber(tidPtr) - 1;// 如果当前行对应的数据是无效数据(死行),则跳过if (this->IsDeadRow(cuDescPtr->cu_id, tmpOffset)) {continue;}// 根据数据类型的长度和格式,从CU描述中获取值并填充到标量向量if (attlen > 0 && attlen <= 8) {Datum cuMin = *(Datum*)(cuDescPtr->cu_min);vec->m_vals[pos] = cuMin;} else if (attlen == 12 || attlen == 16) {Datum cuMin = PointerGetDatum(cuDescPtr->cu_min);vec->AddVar(cuMin, pos);} else {Datum cuMin = PointerGetDatum(cuDescPtr->cu_min + 1);Size len = (Size)(unsigned char)cuDescPtr->cu_min[0];// 如果数据是可变长度的字符串,则进行处理char tmpStr[MIN_MAX_LEN + VARHDRSZ];if (attlen == -1) {SET_VARSIZE_SHORT(tmpStr, len + VARHDRSZ_SHORT);errno_t rc =memcpy_s(tmpStr + VARHDRSZ_SHORT, sizeof(tmpStr) - VARHDRSZ_SHORT, DatumGetPointer(cuMin), len);securec_check(rc, "\0", "\0");cuMin = PointerGetDatum(tmpStr);}vec->AddVar(cuMin, pos);}pos++;}// 设置标量向量的行数vec->m_rows = pos;return;}// 情况3: CU中的数据为普通数据int slotId = CACHE_BLOCK_INVALID_IDX;CSTORESCAN_TRACE_START(GET_CU_DATA_LATER_READ);// 从CU缓存中获取CU数据CU* cuPtr = this->GetCUData(cuDescPtr, colIdx, attlen, slotId);CSTORESCAN_TRACE_END(GET_CU_DATA_LATER_READ);// 根据是否包含NULL值来将数据填充到标量向量if (cuPtr->HasNullValue()) {pos = cuPtr->ToVectorLateRead<attlen, true>(tids, vec);} else {pos = cuPtr->ToVectorLateRead<attlen, false>(tids, vec);}// 如果CU被固定在缓存中,则取消固定if (IsValidCacheSlotID(slotId)) {CUCache->UnPinDataBlock(slotId);} else {Assert(false);}// 设置标量向量的行数vec->m_rows = pos;return;
}
FillVectorLateRead 和 FillVectorByTids 是两个用于从 CStore(列存储)表中读取数据的函数,它们有以下区别:
- 适用场景:
- FillVectorLateRead 函数主要用于从 CStore 表中读取特定行的数据,通常是在已知 TID 的情况下进行快速的行读取。这个函数的典型用例是在查询操作中根据已知 TID 值从表中快速检索数据。
- FillVectorByTids 函数用于根据一组 TIDs 批量读取数据,它的输入参数中包含 TID 向量,用于指示需要读取的行,适合用于批量读取多行数据。
- 输入参数差异:
- FillVectorLateRead 接受的输入参数包括列索引(colIdx)、TID向量(tids)、CU 描述符指针(cuDescPtr),以及一个输出的标量向量(vec)。
- FillVectorByTids 接受的输入参数包括序列号(seq)、CU 描述符指针(cuDescPtr)、以及一个输出的标量向量(vec)。
- CU 描述符:
- 在 FillVectorLateRead 中,CU 描述符(cuDescPtr)通常是通过查询得到的,它描述了特定的 CU,用于确定从哪个 CU 中读取数据。
- 在 FillVectorByTids 中,CU 描述符通常是与每个 TID 关联的,它用于确定每个 TID 所在的 CU。
- 数据读取方式:
- FillVectorLateRead 函数通过直接根据已知 TID 从相应的 CU 中提取数据,因为 TID 已知。
- FillVectorByTids 函数需要根据 TID 集合的 CU 描述符找到对应的 CU,然后再从 CU 中提取数据。
- 用例区别:
- FillVectorLateRead 适用于需要根据已知的 TID 快速读取特定行数据的场景,通常用于单个行读取操作。
- FillVectorByTids 适用于需要批量读取多行数据的场景,其中 TID 集合在一起,通常用于批量查询或数据导出等操作。
CStore::FillVectorByIndex 函数
CStore::FillVectorByIndex 函数的作用是根据给定的 TID(行标识符)标量向量,从源标量向量中填充非删除行的数据到目标标量向量中。函数首先检查每个 TID 对应的行是否为删除行,如果不是,则将源标量向量中对应位置的值添加到目标标量向量中。函数源码如下所示:(路径:src/gausskernel/storage/cstore/cstore_am.cpp
)
作用: 根据索引信息填充目标标量向量。
参数: 使用给定的 TID 标量向量,在源标量向量中找到对应的值,填充到目标标量向量中。
典型场景: 用于根据索引信息从源数据中检索特定行的值,例如在数据库查询过程中
void CStore::FillVectorByIndex(_in_ int colIdx, _in_ ScalarVector* tids, _in_ ScalarVector* srcVec, _out_ ScalarVector* destVec)
{// 断言输入参数的有效性Assert(colIdx >= 0 && tids && destVec && srcVec);// 当前CU的ID,初始化为无效CU IDuint32 curCUId = InValidCUID;// 当前TID所在CU的ID、行偏移量uint32 thisCUId, rowOffset;// 目标标量向量和源标量向量的数值指针ScalarValue* destValue = destVec->m_vals;ScalarValue* srcValue = srcVec->m_vals;// TID标量向量的数值指针ScalarValue* tidValue = tids->m_vals;// 遍历TID标量向量的每一行for (int i = 0; i < tids->m_rows; i++) {// 获取当前TID的指针ItemPointer tidPtr = (ItemPointer)&tidValue[i];// 获取当前TID所在CU的IDthisCUId = ItemPointerGetBlockNumber(tidPtr);// 注意:tidPointer->rowOffset 从1开始rowOffset = ItemPointerGetOffsetNumber(tidPtr) - 1;// 步骤1:获取删除掩码(如果需要)if (curCUId != thisCUId) {curCUId = thisCUId;// 获取CU的删除掩码(如果需要)GetCUDeleteMaskIfNeed(curCUId, m_snapshot);}// 步骤2:当前行是活跃行,不是死亡行,需要填充向量if (m_delMaskCUId != InValidCUID && !IsDeadRow(curCUId, rowOffset)) {// 如果源标量向量中的当前值是NULL,则在目标标量向量中设置NULLif (srcVec->IsNull(i))destVec->SetNull(destVec->m_rows++);else// 否则,在目标标量向量中添加源标量向量中的当前值destValue[destVec->m_rows++] = srcValue[i];}}
}
CStore::FillSysVecByTid 函数
CStore::FillSysVecByTid 函数的主要目的是根据给定的 TID(Tuple ID)向量,在列存储表的压缩单元中检索相应的系统列值,并将这些系统列值填充到输出的 ScalarVector 中。函数通过遍历 TID 向量的每个元素,根据 TID 获取所在压缩单元的相关信息,然后根据系统列的不同类型填充相应的值。支持的系统列包括 SelfItemPointerAttributeNumber(包含 TID 本身)、XC_NodeIdAttributeNumber(包含节点 ID)、TableOidAttributeNumber(包含表的 OID)和 MinTransactionIdAttributeNumber(包含最小事务 ID)。函数的模板参数 sysColOid 动态确定要填充的系统列,使其具有通用性和灵活性。函数源码如下所示:(路径:src/gausskernel/storage/cstore/cstore_am.cpp
)
/** @Description: 根据 TID 填充系统列向量。* @in tids: 包含 TID 的输入 ScalarVector。* @out destVec: 用于存储系统列值的输出 ScalarVector。*/
template <int sysColOid>
void CStore::FillSysVecByTid(_in_ ScalarVector* tids, _out_ ScalarVector* destVec)
{// 检查输入参数的有效性Assert(tids && destVec);// 记录当前 CU(Compression Unit) 的 ID 和相关变量uint32 curCUId = InValidCUID, thisCUId, rowOffset;// 获取输出向量的值数组指针ScalarValue* destValue = destVec->m_vals;// 获取输入 TID 向量的值数组指针ScalarValue* tidValue = tids->m_vals;// 初始化输出向量的行数destVec->m_rows = 0;// 事务 ID(Transaction ID) 用于记录系统列 MinTransactionIdAttributeNumber 的值TransactionId xmin = InvalidTransactionId;// 遍历输入 TID 向量的每一行for (int i = 0; i < tids->m_rows; i++) {// 获取当前行对应的 TIDItemPointer tidPtr = (ItemPointer)&tidValue[i];// 获取当前 CU 的 IDthisCUId = ItemPointerGetBlockNumber(tidPtr);// TID 的偏移量,注意 TID 的偏移量从 1 开始rowOffset = ItemPointerGetOffsetNumber(tidPtr) - 1;// Step 1: 获取当前 CU 的 CUDesc(压缩单元描述符)和删除掩码(delmask)如果需要if (curCUId != thisCUId) {curCUId = thisCUId;// 获取当前 CU 的删除掩码this->GetCUDeleteMaskIfNeed(curCUId, m_snapshot);// 如果系统列的 OID 是 MinTransactionIdAttributeNumber,则获取当前 CU 的 xminif (this->m_delMaskCUId != InValidCUID && sysColOid == MinTransactionIdAttributeNumber) {xmin = this->GetCUXmin(curCUId);}}// Step 2: 如果是存活行而非已删除行,则填充输出向量if (this->m_delMaskCUId != InValidCUID && !this->IsDeadRow(curCUId, rowOffset)) {switch (sysColOid) {// 填充 SelfItemPointerAttributeNumber(系统列,包含 TID 本身)case SelfItemPointerAttributeNumber: {destValue[destVec->m_rows++] = *(ScalarValue*)tidPtr;break;}// 填充 XC_NodeIdAttributeNumber(系统列,包含节点 ID)case XC_NodeIdAttributeNumber: {destValue[destVec->m_rows++] = u_sess->pgxc_cxt.PGXCNodeIdentifier;break;}// 填充 TableOidAttributeNumber(系统列,包含表的 OID)case TableOidAttributeNumber: {destValue[destVec->m_rows++] = RelationGetRelid(m_relation);break;}// 填充 MinTransactionIdAttributeNumber(系统列,包含最小事务 ID)case MinTransactionIdAttributeNumber: {destValue[destVec->m_rows++] = xmin;break;}// 处理不支持的系统列 OIDdefault:ereport(ERROR,(errcode(ERRCODE_DATATYPE_MISMATCH),(errmsg("无法填充不支持的列存储表的系统列 %d", sysColOid))));break;}}}
}
CStore::FillSysColVector 函数
该函数用于通过给定的列存储压缩单元描述符(cuDescPtr)和列索引(colIdx),填充系统列数据到输出的 ScalarVector 中(vec)。在压缩单元中迭代处理未读取的行,根据系统列索引选择相应的数据填充方式。系统列包括 SelfItemPointer、XC_NodeId、TableOid 以及 MinTransactionId 等,其中 SelfItemPointer 存储 ItemPointer 数据,XC_NodeId 存储当前节点标识,TableOid 存储表的 OID,而 MinTransactionId 存储当前压缩单元的最小事务 ID。函数返回填充过程中遇到的已删除行数。函数源码如下所示:(路径:src/gausskernel/storage/cstore/cstore_am.cpp
)
/** @Description: 通过给定的列存储压缩单元描述符和列索引,填充系统列数据到输出的 ScalarVector 中。* @in colIdx: 指定的系统列索引。* @in cuDescPtr: 列存储压缩单元描述符。* @out vec: 输出的 ScalarVector。* @return: 返回填充过程中遇到的已删除行数。*/
int CStore::FillSysColVector(_in_ int colIdx, _in_ CUDesc* cuDescPtr, _out_ ScalarVector* vec)
{Assert(cuDescPtr && vec);uint32 cur_cuid = cuDescPtr->cu_id;int leftSize = cuDescPtr->row_count - m_rowCursorInCU;int pos = 0, deadRows = 0;Assert(leftSize > 0);// 初始化标志数组errno_t rc = memset_s(vec->m_flag, sizeof(uint8) * BatchMaxSize, 0, sizeof(uint8) * BatchMaxSize);securec_check(rc, "", "");// 获取压缩单元删除位图GetCUDeleteMaskIfNeed(cuDescPtr->cu_id, m_snapshot);// 迭代剩余的行数,并填充系统列数据到输出 Vector 中for (int i = 0; i < leftSize && pos < BatchMaxSize; i++) {// 如果是已删除行,则记录并跳过if (IsDeadRow(cuDescPtr->cu_id, i + m_rowCursorInCU)) {++deadRows;continue;}// 根据列索引填充系统列数据switch (colIdx) {case SelfItemPointerAttributeNumber: {// 设置 ScalarVector 描述信息vec->m_desc.typeId = INT8OID;// 设置 ItemPointer 数据vec->m_vals[pos] = 0;ItemPointer itemPtr = (ItemPointer)&vec->m_vals[pos];// 注意:itemPtr->offset 从 1 开始ItemPointerSet(itemPtr, cur_cuid, i + m_rowCursorInCU + 1);break;}case XC_NodeIdAttributeNumber: {// 设置 XC_NodeIdAttributeNumber 系统列数据vec->m_vals[pos] = u_sess->pgxc_cxt.PGXCNodeIdentifier;break;}case TableOidAttributeNumber: {// 设置 TableOidAttributeNumber 系统列数据vec->m_vals[pos] = RelationGetRelid(m_relation);break;}case MinTransactionIdAttributeNumber: {// 设置 MinTransactionIdAttributeNumber 系统列数据vec->m_vals[pos] = cuDescPtr->xmin;break;}default:// 不支持的系统列,抛出错误ereport(ERROR, (errcode(ERRCODE_DATATYPE_MISMATCH), (errmsg("Column store don't support"))));break;}// 移动到下一个位置++pos;}// 设置输出 Vector 的行数vec->m_rows = pos;// 返回已删除行数return deadRows;
}
CStore::FillTidForLateRead 函数
该函数用于在处理压缩单元时,通过给定的列存储压缩单元描述符(cuDescPtr)和是否包含已删除行的标志(hasDeadRow),填充表的 CTID(tuple ID)信息到输出的 ScalarVector 中(vec)。在压缩单元中迭代处理未读取的行,根据是否包含已删除行的标志判断是否需要跳过已删除行,对于未删除的行,将 CTID 信息构建为 ItemPointer 存储在 ScalarVector 中。函数返回填充过程中遇到的已删除行数。此函数的注释还说明,CTID 信息是在完成其他条件过滤(qual)后进行读取的,因此可以推迟到最后阶段进行读取。
// 该函数用于在处理压缩单元时,通过给定的列存储压缩单元描述符(`cuDescPtr`)和是否包含已删除行的标志(`hasDeadRow`),
// 填充表的 CTID(tuple ID)信息到输出的 ScalarVector 中(`vec`)。
// 在压缩单元中迭代处理未读取的行,根据是否包含已删除行的标志判断是否需要跳过已删除行,
// 对于未删除的行,将 CTID 信息构建为 ItemPointer 存储在 ScalarVector 中。
// 函数返回填充过程中遇到的已删除行数。
template <bool hasDeadRow>
int CStore::FillTidForLateRead(_in_ CUDesc* cuDescPtr, _out_ ScalarVector* vec)
{// 断言给定的压缩单元描述符和输出 ScalarVector 不为空Assert(cuDescPtr && vec);// 获取当前压缩单元的IDuint32 cur_cuid = cuDescPtr->cu_id;// 计算未处理的行数(未读取的行)int leftSize = cuDescPtr->row_count - m_rowCursorInCU;// 初始化行位置、已删除行计数int pos = 0, deadRows = 0;// 断言未处理的行数大于0Assert(leftSize > 0);// 遍历未读取的行for (int i = 0; i < leftSize && pos < BatchMaxSize; i++) {// 如果包含已删除行并且当前行是已删除行,则增加已删除行计数if (unlikely(hasDeadRow && IsDeadRow(cuDescPtr->cu_id, i + m_rowCursorInCU))) {++deadRows;} else {// 因为 sizeof(*itemPtr) 不同于 sizeof(vec->m_vals[0]),所以首先将其置为零vec->m_vals[pos] = 0;// 获取当前行的 CTID 信息,ItemPointer 的 offset 从 1 开始ItemPointer itemPtr = (ItemPointer)&vec->m_vals[pos];ItemPointerSet(itemPtr, cur_cuid, i + m_rowCursorInCU + 1);// 增加行位置++pos;}}// 设置输出 ScalarVector 的行数为填充后的行位置vec->m_rows = pos;// 返回已删除行计数return deadRows;
}
CStore::FillScanBatchLateIfNeed 函数
CStore::FillScanBatchLateIfNeed 函数主要用于在进行查询扫描时,根据需要填充延迟读取的列到给定的 VectorBatch 中。首先,它遍历所有列,识别并填充非第一个延迟读取列的相关信息。然后,它检查是否存在第一个延迟读取列,如果是,则记录其索引和列号,并填充该列的信息。整个过程中,函数会获取相应的压缩单元描述符,更新删除行掩码,确保只填充有效的数据到 VectorBatch 中,最终完成了延迟读取列的填充操作。
// 如果需要,填充延迟读取的列到给定的 VectorBatch 中
void CStore::FillScanBatchLateIfNeed(__inout VectorBatch* vecBatch)
{// 用于存储 CTID 信息的 ScalarVectorScalarVector* tidVec = NULL;// 用于记录第一个延迟读取的列的索引,初始化为无效值int ctidId = -1, colIdx;// 步骤 1:填充除第一个延迟读取列外的所有延迟读取列for (int i = 0; i < m_colNum; ++i) {// 获取当前列的索引colIdx = m_colId[i];// 如果是延迟读取列且索引有效if (IsLateRead(i) && colIdx >= 0) {// 断言索引小于 VectorBatch 中的列数Assert(colIdx < vecBatch->m_cols);// 如果 tidVec 不为空,表示已经填充过第一个延迟读取列,直接调用相应的填充函数if (tidVec != NULL) {// 获取当前列的压缩单元描述符CUDesc* cuDescPtr = this->m_CUDescInfo[i]->cuDescArray + this->m_cuDescIdx;// 获取并更新当前压缩单元的删除行掩码this->GetCUDeleteMaskIfNeed(cuDescPtr->cu_id, this->m_snapshot);// 调用相应的延迟读取列填充函数(this->*m_fillVectorLateRead[i])(colIdx, tidVec, cuDescPtr, vecBatch->m_arr + colIdx);} else {// 如果 tidVec 为空,表示当前是第一个延迟读取列,将 tidVec 指向当前列tidVec = vecBatch->m_arr + colIdx;// 记录第一个延迟读取列的索引ctidId = i;}}}// 步骤 2:填充第一个延迟读取列if (ctidId >= 0) {// 获取第一个延迟读取列的索引colIdx = m_colId[ctidId];// 断言为延迟读取列且索引有效Assert(IsLateRead(ctidId) && colIdx >= 0);// 获取当前列的压缩单元描述符CUDesc* cuDescPtr = this->m_CUDescInfo[ctidId]->cuDescArray + this->m_cuDescIdx;// 获取并更新当前压缩单元的删除行掩码this->GetCUDeleteMaskIfNeed(cuDescPtr->cu_id, this->m_snapshot);// 调用相应的延迟读取列填充函数(this->*m_fillVectorLateRead[ctidId])(colIdx, tidVec, cuDescPtr, vecBatch->m_arr + colIdx);}
}
总结
本文所讲的这一系列的函数主要用于==填充 Columnar 存储引擎中的数据结构 ScalarVector 或 VectorBatch。==这些函数涵盖了不同的场景和功能,包括普通列数据的填充、系统列数据的填充、延迟读取列的填充等。
- CStore::FillVecBatch: 用于填充一个 VectorBatch,主要用于查询扫描时读取多列数据。
- CStore::FillVector: 用于填充 ScalarVector,主要用于普通列的数据填充,支持不同数据类型和压缩方式。
- CStore::FillVectorByTids: 根据提供的 TID 列信息填充 ScalarVector,用于支持按 TID 进行批量读取列数据的场景。
- CStore::FillVectorLateRead: 用于填充 ScalarVector,在查询扫描完成后延迟读取列的数据,提高查询性能。
- CStore::FillVectorByIndex: 根据提供的 TID 列信息和源列的数据填充目标列的 ScalarVector,用于索引扫描后的数据填充。
- CStore::FillSysVecByTid: 用于填充 ScalarVector,主要用于系统列数据的填充,例如 ctid、xc_node_id、tableoid 等。
- CStore::FillSysColVector: 用于填充 ScalarVector,主要用于填充系统列的数据,例如 ctid、xc_node_id、tableoid 和 xmin。
- CStore::FillTidForLateRead: 用于填充 ScalarVector,提供 TID 列的信息,用于延迟读取列数据的场景,支持处理死行。
- CStore::FillScanBatchLateIfNeed: 在查询扫描时,根据需要填充延迟读取的列到给定的 VectorBatch 中,支持多列的延迟读取。
这些函数在整个 Columnar 存储引擎中扮演着不同的角色,涵盖了数据的读取、填充、延迟读取、系统列处理等多个方面,以满足查询和扫描的需求。
更多推荐
【 OpenGauss源码学习 —— 列存储(CStore)(四)】
发布评论