VecAgg
声明:本文的部分内容参考了他人的文章。在编写过程中,我们尊重他人的知识产权和学术成果,力求遵循合理使用原则,并在适用的情况下注明引用来源。
本文主要参考了 OpenGauss5.1.0 的开源代码和《OpenGauss数据库源码解析》一书
ExecInitVecAggregation、ExecVecAggregation、ExecEndVecAggregation 和 ExecReScanVecAggregation 四个函数共同构成了向量化聚合操作的执行流程。在查询计划的执行过程中,ExecInitVecAggregation 用于初始化聚合节点的状态,设置必要的参数和数据结构,为后续的聚合操作做好准备;ExecVecAggregation 则负责实际的聚合计算,根据不同的聚合策略(如哈希聚合、排序聚合等)调用相应的聚合运行器进行数据处理;ExecEndVecAggregation 用于清理聚合操作的资源,释放内存和关闭文件,确保不再使用的资源能够得到妥善处理;而 ExecReScanVecAggregation 则负责在需要重新扫描聚合操作的情况下重置聚合节点的状态,确保聚合操作能够根据新的输入数据进行正确的计算。这些函数相互联系,形成了一个完整的向量化聚合执行框架,确保在执行查询时能够高效、准确地处理聚合操作。
ExecInitVecAggregation 函数
ExecInitVecAggregation 函数的作用是为向量化聚合(Vec Aggregation)操作进行初始化工作。它会根据聚合操作的类型(如哈希聚合或排序聚合)和操作符(如SUM、COUNT等),为执行聚合过程准备所需的数据结构和函数,并初始化相关的状态信息。在函数中会根据不同的聚合策略和聚合算子的数量,配置相应的向量化函数,并为矢量化聚合操作分配内存。函数源码如下所示:(路径:src\gausskernel\runtime\vecexecutor\vecnode\vecagg.cpp
)
ExecInitVecAggregation 的完整流程描述如下:
- 参数解析:
- 该函数接受的主要参数是执行状态(AggState * aggstate)和计划节点(Agg *node),这些参数分别表示当前聚合执行的状态和聚合操作计划。
- 初始化上下文和内存分配:
- 函数首先会为当前执行的聚合操作分配所需的内存和数据结构,确保聚合状态能够正确存储并跟踪计算中间状态。它会为各类聚合操作分配足够的空间,如存储聚合的输入、聚合函数的参数、去重列的信息等。
- 处理输入元组的列信息:
- 通过 ExecAssignProjectionInfo(),函数将输入的元组投影到聚合操作需要处理的列上,并在执行聚合计算时使用这些列的信息。
- 此外,如果聚合中有排序列或去重列(DISTINCT),还需要为这些列额外分配内存,并初始化相关的排序算子和相等比较函数。
- 初始化每个聚合算子的状态:
- 该函数接下来会遍历所有的聚合算子,如 SUM、COUNT 等,并为每个算子初始化状态结构。在这里,函数会确定每个算子是否需要排序、是否有去重的要求等。
- 如果需要排序(ORDER BY),会根据传入的排序列进行排序信息的初始化,并确保相关的操作符(sortop)有效。
- 对于 DISTINCT 操作,函数会初始化比较操作符,以便在聚合计算中进行相等性判断。
- 区分聚合策略(哈希或排序):
- 根据传入的聚合策略(Aggstrategy),函数会判断当前聚合是哈希聚合(AGG_HASHED)还是流式聚合(AGG_SORTED)。
- 如果是哈希聚合,则会为哈希表的创建和管理分配内存,并在后续的聚合计算中通过哈希表管理不同组的中间结果。
- 如果是排序聚合,则会为排序列分配相关的内存,并在聚合计算中确保按照排序列进行处理。
- 如果启用了向量化哈希聚合的优化功能(如is_sonichash),函数会进行针对性的设置和初始化。
- 设置矢量化函数:
- 函数根据聚合的上下文,设置适当的向量化函数。对于每个聚合算子,函数会为其选择合适的向量化处理函数,并根据计划执行环境选择是单节点还是多节点执行(例如在分布式场景中可能涉及多个节点)。
- 如果启用了最终聚合函数(finalfn),则会为其初始化,并在聚合结束时使用。
- LLVM代码生成(可选):
- 如果启用了 LLVM 代码生成(条件编译开启 ENABLE_LLVM_COMPILE),函数会尝试为哈希聚合或向量化聚合生成 LLVM IR 代码。这部分代码旨在优化聚合操作的性能,通过 LLVM 进行 JIT 编译加速执行。
- 处理输入/输出的矢量化数据结构:
- 为了提高聚合操作的效率,函数会分配矢量(ScalarVector)以存储输入数据,并通过这些矢量来进行批处理操作。
- 此外,如果聚合操作中涉及布尔类型的数据(如 WHERE 条件),函数也会分配布尔值检查的相关矢量。
- 最终初始化:
- 在所有必要的数据结构和函数被初始化后,函数会为表达式上下文分配内存(ExecAssignVectorForExprEval),确保所有表达式的计算能正常进行。
- 聚合的元数据状态(如聚合算子的数量、是否启用了哈希表等)会在此阶段进行更新。
- 返回聚合状态:
- 函数的最后一步是返回已完成初始化的聚合状态结构,这个结构包含了所有初始化的向量化聚合操作所需的参数和函数。
VecAggState* ExecInitVecAggregation(VecAgg* node, EState* estate, int eflags)
{
VecAggState* aggstate = NULL; // 定义VecAggState指针,初始为空,用于保存聚合节点的状态信息
VecAggStatePerAgg peragg; // 每个聚合操作的状态信息,稍后使用
Plan* outer_plan = NULL; // 外部计划节点的指针,稍后初始化
int numaggs, aggno; // 聚合函数的数量和编号
ListCell* l = NULL; // 用于遍历链表的迭代器
ScalarDesc unknown_desc; // 未知的描述符
bool is_singlenode = false; // 标记是否为单节点聚合
int col_number = 0; // 列编号,用于遍历和操作
int phase; // 当前阶段的索引
Bitmapset* all_grouped_cols = NULL; // 保存所有分组列的集合
int num_grp_sets = 1; // 分组集的数量,默认为1
int num_phases; // 阶段数量
int currentsortno = 0; // 当前排序编号
int i = 0; // 循环变量
int j = 0; // 循环变量
// 检查不支持的标志位,防止使用不支持的执行标志
Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
aggstate = makeNode(VecAggState); // 创建一个VecAggState节点
aggstate->ss.ps.plan = (Plan*)node; // 将传入的聚合节点赋给状态节点中的计划字段
aggstate->ss.ps.state = estate; // 赋值执行状态(EState)
aggstate->ss.ps.vectorized = true; // 标记当前执行为矢量化执行
aggstate->aggs = NIL; // 初始化聚合列表为空
aggstate->numaggs = 0; // 聚合函数数量初始化为0
aggstate->eqfunctions = NULL; // 初始化相等函数为空
aggstate->hashfunctions = NULL; // 初始化哈希函数为空
aggstate->pervecagg = NULL; // 每个矢量聚合状态为空
aggstate->agg_done = false; // 聚合完成标志初始化为假
aggstate->pergroup = NULL; // 每个分组的状态初始化为空
aggstate->grp_firstTuple = NULL; // 每个分组的首元组初始化为空
aggstate->aggcontexts = NULL; // 聚合上下文初始化为空
aggstate->maxsets = 0; // 最大分组集数量初始化为0
aggstate->projected_set = -1; // 投影集初始化为-1
aggstate->current_set = 0; // 当前分组集初始化为0
aggstate->input_done = false; // 输入完成标志初始化为假
aggstate->sort_in = NULL; // 排序输入初始化为空
aggstate->sort_out = NULL; // 排序输出初始化为空
is_singlenode = node->single_node || node->is_final; // 判断是否为单节点聚合或最终节点
// 如果是分布式环境的协调器,且不是最终节点,则标记为单节点聚合
if (IS_PGXC_COORDINATOR && node->is_final == false)
is_singlenode = true;
// 如果聚合策略是哈希聚合,设置操作内存的限制
if (node->aggstrategy == AGG_HASHED) {
int64 operator_mem = SET_NODEMEM(((Plan*)node)->operatorMemKB[0], ((Plan*)node)->dop); // 计算操作内存
AllocSetContext* set = (AllocSetContext*)(estate->es_query_cxt); // 获取查询上下文
set->maxSpaceSize = operator_mem * 1024L + SELF_GENRIC_MEMCTX_LIMITATION; // 设置最大空间大小
}
// 创建表达式上下文,处理每个输入元组和输出元组的表达式
ExecAssignExprContext(estate, &aggstate->ss.ps);
aggstate->tmpcontext = aggstate->ss.ps.ps_ExprContext; // 将临时上下文设置为当前表达式上下文
ExecAssignExprContext(estate, &aggstate->ss.ps); // 再次分配一个表达式上下文
// 如果存在分组集,进行相关的初始化
if (node->groupingSets) {
Assert(node->aggstrategy != AGG_HASHED); // 如果有分组集,不能使用哈希聚合
num_grp_sets = list_length(node->groupingSets); // 计算分组集的数量
// 遍历聚合链,计算最大的分组集数量
foreach (l, node->chain) {
Agg* agg = (Agg*)lfirst(l);
num_grp_sets = Max(num_grp_sets, list_length(agg->groupingSets));
}
}
aggstate->maxsets = num_grp_sets; // 设置最大分组集数量
aggstate->numphases = num_phases = 1 + list_length(node->chain); // 计算聚合的阶段数量
// 初始化扫描元组槽和结果元组槽
ExecInitScanTupleSlot(estate, &aggstate->ss);
ExecInitResultTupleSlot(estate, &aggstate->ss.ps);
aggstate->sort_slot = ExecInitExtraTupleSlot(estate); // 初始化额外的元组槽用于排序
// 初始化子表达式
aggstate->ss.ps.targetlist = (List*)ExecInitVecExpr((Expr*)node->plan.targetlist, (PlanState*)aggstate); // 初始化目标列表中的表达式
aggstate->ss.ps.qual = (List*)ExecInitVecExpr((Expr*)node->plan.qual, (PlanState*)aggstate); // 初始化条件表达式
// 初始化子节点(外部计划)
eflags &= ~EXEC_FLAG_REWIND; // 禁用子计划的REWIND标志
outer_plan = outerPlan(node); // 获取外部计划
outerPlanState(aggstate) = ExecInitNode(outer_plan, estate, eflags); // 初始化外部计划节点
// 初始化扫描元组的类型
ExecAssignScanTypeFromOuterPlan(&aggstate->ss);
if (node->chain)
ExecSetSlotDescriptor(aggstate->sort_slot, aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor); // 如果有链表,设置排序槽的描述符
// 初始化结果元组类型和投影信息
ExecAssignResultTypeFromTL(&aggstate->ss.ps);
aggstate->ss.ps.ps_ProjInfo = ExecBuildVecProjectionInfo(aggstate->ss.ps.targetlist,
node->plan.qual,
aggstate->ss.ps.ps_ExprContext,
aggstate->ss.ps.ps_ResultTupleSlot,
NULL); // 构建矢量化的投影信息
aggstate->ss.ps.ps_vec_TupFromTlist = false; // 初始化矢量化投影标志为false
// 获取目标列表和条件中的聚合数量
numaggs = aggstate->numaggs;
Assert(numaggs == list_length(aggstate->aggs)); // 检查聚合函数数量是否正确
if (numaggs <= 0) {
numaggs = 1; // 如果没有聚合函数,仍然继续执行,避免内存分配失败
}
// 如果有分组列,预计算相等函数和哈希函数
if (node->numCols > 0) {
if (node->aggstrategy == AGG_HASHED)
execTuplesHashPrepare(node->numCols, node->grpOperators, &aggstate->eqfunctions, &aggstate->hashfunctions); // 准备哈希函数
else
aggstate->eqfunctions = execTuplesMatchPrepare(node->numCols, node->grpOperators); // 准备相等函数
}
// 为每个阶段准备分组集的数据并进行函数查找,同时积累所有分组列
aggstate->phases = (AggStatePerPhaseData*)palloc0(num_phases * sizeof(AggStatePerPhaseData)); // 分配并初始化聚合阶段的数据
for (phase = 0; phase < num_phases; ++phase) { // 遍历每一个聚合阶段
AggStatePerPhase phase_data = &aggstate->phases[phase]; // 获取当前阶段的数据
Agg* agg_node = NULL; // 初始化聚合节点指针
Sort* sort_node = NULL; // 初始化排序节点指针
int num_sets; // 初始化分组集合数量的变量
if (phase > 0) { // 如果不是第一个阶段
agg_node = (Agg*)list_nth(node->chain, phase - 1); // 获取前一阶段的聚合节点
sort_node = (Sort*)agg_node->plan.lefttree; // 获取聚合节点的排序子树
Assert(IsA(sort_node, Sort)); // 确认该节点是一个排序节点
} else { // 如果是第一个阶段
agg_node = node; // 当前节点作为聚合节点
sort_node = NULL; // 没有排序节点
}
phase_data->numsets = num_sets = list_length(agg_node->groupingSets); // 计算当前阶段的分组集合数量
if (num_sets) { // 如果有分组集合
phase_data->gset_lengths = (int*)palloc(num_sets * sizeof(int)); // 分配内存存储每个分组集合的长度
phase_data->grouped_cols = (Bitmapset**)palloc(num_sets * sizeof(Bitmapset*)); // 分配内存存储每个分组集合的位图
i = 0; // 初始化循环计数器
foreach (l, agg_node->groupingSets) { // 遍历每一个分组集合
int current_length = list_length((List*)lfirst(l)); // 获取当前分组集合的列数
Bitmapset* cols = NULL; // 初始化位图集合
/* planner forces this to be correct */
for (j = 0; j < current_length; ++j) // 遍历当前分组集合的每一列
cols = bms_add_member(cols, agg_node->grpColIdx[j]); // 将当前列添加到位图集合
phase_data->grouped_cols[i] = cols; // 将位图集合存储到当前阶段的数据中
phase_data->gset_lengths[i] = current_length; // 记录当前分组集合的列数
++i; // 计数器加一
}
all_grouped_cols = bms_add_members(all_grouped_cols, phase_data->grouped_cols[0]); // 将第一个分组集合的列信息添加到所有分组列集合中
} else { // 如果没有分组集合
Assert(phase == 0); // 确保这是第一个阶段
phase_data->gset_lengths = NULL; // 没有分组集合,设为空
phase_data->grouped_cols = NULL; // 没有分组列,设为空
}
/*
* If we are grouping, precompute fmgr lookup data for inner loop.
*/
if (agg_node->aggstrategy == AGG_SORTED) { // 如果聚合策略是排序
Assert(agg_node->numCols > 0); // 确保分组列数量大于0
phase_data->eqfunctions = execTuplesMatchPrepare(agg_node->numCols, agg_node->grpOperators); // 为分组列预先计算比较函数
}
phase_data->aggnode = agg_node; // 将当前聚合节点存储在阶段数据中
phase_data->sortnode = sort_node; // 将当前排序节点存储在阶段数据中
}
/*
* Convert all_grouped_cols to a descending-order list.
*/
i = -1; // 初始化位图集合索引
while ((i = bms_next_member(all_grouped_cols, i)) >= 0) // 遍历所有分组列的位图集合
aggstate->all_grouped_cols = lcons_int(i, aggstate->all_grouped_cols); // 将分组列索引添加到聚合状态中的列列表
/*
* Hashing can only appear in the initial phase.
*/
if (node->aggstrategy == AGG_HASHED) { // 如果聚合策略是哈希
execTuplesHashPrepare(
node->numCols, node->grpOperators, &aggstate->phases[0].eqfunctions, &aggstate->hashfunctions); // 准备哈希相关的比较和哈希函数
if (u_sess->attr.attr_sql.enable_fast_numeric) { // 如果启用了快速数字处理
replace_numeric_hash_to_bi(node->numCols, aggstate->hashfunctions); // 将哈希函数替换为优化的数字哈希函数
}
}
/*
* Initialize current phase-dependent values to initial phase
*/
aggstate->current_phase = 0; // 初始化当前阶段为第一个阶段
/*
* Set up aggregate-result storage in the output expr context, and also
* allocate my private per-agg working storage
*/
peragg = (VecAggStatePerAgg)palloc0(sizeof(VecAggStatePerAggData) * numaggs * num_grp_sets); // 为每个聚合操作分配内存空间
aggstate->pervecagg = peragg; // 将内存存储结构保存到聚合状态中
/* Compute the columns we actually need to hash on */
aggstate->hash_needed = find_hash_columns(aggstate); // 计算实际需要哈希的列
if (aggstate->all_grouped_cols) { // 如果存在分组列
Bitmapset* all_need_cols = NULL; // 初始化需要的列集合
ListCell* lc = NULL; // 初始化链表遍历指针
foreach (lc, aggstate->all_grouped_cols) { // 遍历所有分组列
int varNumber = lfirst_int(lc) - 1; // 获取当前列的编号
all_need_cols = bms_add_member(all_need_cols, varNumber); // 将当前列添加到需要的列集合中
}
foreach (lc, aggstate->hash_needed) { // 遍历所有需要哈希的列
int varNumber = lfirst_int(lc) - 1; // 获取当前哈希列的编号
all_need_cols = bms_add_member(all_need_cols, varNumber); // 将哈希列添加到需要的列集合中
}
col_number = bms_num_members(all_need_cols); // 计算需要处理的列总数
bms_free_ext(all_need_cols); // 释放位图集合内存
} else {
col_number = list_length(aggstate->hash_needed); // 如果没有分组列,直接使用哈希列的数量
}
/* Create aggregation list */
aggstate->aggInfo = (VecAggInfo*)palloc0(sizeof(VecAggInfo) * numaggs); // 为聚合信息分配内存
aggno = -1; // 初始化聚合编号
foreach (l, aggstate->aggs) { // 遍历每一个聚合操作
AggrefExprState* aggrefstate = (AggrefExprState*)lfirst(l); // 获取当前聚合表达式状态
Aggref* aggref = (Aggref*)aggrefstate->xprstate.expr; // 获取当前聚合表达式
VecAggStatePerAgg peraggstate; // 初始化每个聚合的状态
Oid input_types[FUNC_MAX_ARGS]; // 初始化输入参数类型数组
int num_arguments; // 初始化参数数量
int num_inputs; // 初始化输入数量
int num_sort_cols; // 初始化排序列数量
int num_dist_cols; // 初始化分布列数量
List* sortlist = NIL; // 初始化排序列列表
HeapTuple agg_tuple; // 初始化聚合元组
Form_pg_aggregate aggform; // 初始化聚合函数表单
Oid aggtranstype; // 初始化聚合转换类型
AclResult aclresult; // 初始化权限检查结果
Oid transfn_oid, finalfn_oid; // 初始化转换函数和最终函数的OID
Expr* transfnexpr = NULL; // 初始化转换函数表达式
Expr* finalfnexpr = NULL; // 初始化最终函数表达式
Datum text_init_val; // 初始化初始值
ListCell* lc = NULL; // 初始化链表指针
/* Planner should have assigned aggregate to correct level */
Assert(aggref->agglevelsup == 0); // 确保聚合分配到正确的层级
/* Nope, so assign a new PerAgg record */
peraggstate = &aggstate->pervecagg[++aggno]; // 获取当前聚合状态
/* 在结果数组中用分配的索引标记聚合状态节点 */
aggrefstate->aggno = aggno;
peraggstate->aggrefstate = aggrefstate; // 保存当前聚合表达式状态
peraggstate->aggref = aggref; // 将当前聚合表达式保存到聚合状态中
num_inputs = list_length(aggref->args); // 计算聚合函数的输入参数数量
peraggstate->numInputs = num_inputs; // 将输入参数数量保存到聚合状态中
peraggstate->sortstate = NULL; // 初始化排序状态为空
peraggstate->sortstates = (Tuplesortstate**)palloc0(sizeof(Tuplesortstate*) * num_grp_sets); // 为每个分组集合分配内存,保存每个分组的排序状态
for (currentsortno = 0; currentsortno < num_grp_sets; currentsortno++) // 遍历每个分组集合
peraggstate->sortstates[currentsortno] = NULL; // 初始化每个分组的排序状态为空
/*
* 获取实际的输入数据类型。输入数据类型可能与聚合函数声明的类型不同,尤其在聚合函数接受ANY或多态类型时。
*/
num_arguments = 0; // 初始化参数计数
foreach (lc, aggref->args) { // 遍历聚合函数的每一个输入参数
TargetEntry* tle = (TargetEntry*)lfirst(lc); // 获取当前输入参数
if (!tle->resjunk) // 如果当前参数不是冗余字段
input_types[num_arguments++] = exprType((Node*)tle->expr); // 获取当前参数的实际数据类型并保存
}
peraggstate->numArguments = num_arguments; // 将参数数量保存到聚合状态中
agg_tuple = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(aggref->aggfnoid)); // 从系统缓存中查找聚合函数的元数据
if (!HeapTupleIsValid(agg_tuple)) // 如果查找失败
ereport(ERROR, // 抛出错误
(errmodule(MOD_VEC_EXECUTOR),
errcode(ERRCODE_CACHE_LOOKUP_FAILED),
errmsg("cache lookup failed for aggregate %u", aggref->aggfnoid))); // 显示聚合函数ID的查找失败信息
aggform = (Form_pg_aggregate)GETSTRUCT(agg_tuple); // 获取聚合函数的元组结构信息
/* 检查调用聚合函数的权限 */
aclresult = pg_proc_aclcheck(aggref->aggfnoid, GetUserId(), ACL_EXECUTE); // 检查当前用户是否有执行该聚合函数的权限
if (aclresult != ACLCHECK_OK) // 如果权限检查不通过
aclcheck_error(aclresult, ACL_KIND_PROC, get_func_name(aggref->aggfnoid)); // 抛出权限检查错误
transfn_oid = aggform->aggtransfn; // 获取聚合函数的转换函数OID
finalfn_oid = aggform->aggfinalfn; // 获取聚合函数的最终函数OID
#ifndef ENABLE_MULTIPLE_NODES // 如果未启用多节点模式
if (estate->es_is_flt_frame) { // 如果当前执行状态是流框架模式
numeric_aggfn_info_change(aggref->aggfnoid, &transfn_oid, &aggtranstype, &transfn_oid); // 根据聚合函数ID调整数值聚合函数的转换信息
}
#endif
peraggstate->transfn_oid = transfn_oid; // 将转换函数OID保存到聚合状态中
peraggstate->finalfn_oid = finalfn_oid; // 将最终函数OID保存到聚合状态中
#ifdef PGXC
peraggstate->collectfn_oid = aggform->aggcollectfn; // 获取聚合函数的收集函数OID(用于分布式计算场景)
if (aggref->aggstage == 0) // 如果聚合阶段为0
peraggstate->collectfn_oid = InvalidOid; // 将收集函数OID设置为无效
#endif /* PGXC */
/* 检查聚合函数的所有者是否有权限调用相关组件函数 */
{
HeapTuple proc_tuple;
Oid agg_owner;
proc_tuple = SearchSysCache1(PROCOID, ObjectIdGetDatum(aggref->aggfnoid)); // 从系统缓存中获取聚合函数的元数据
if (!HeapTupleIsValid(proc_tuple)) // 如果没有找到有效的元组
ereport(ERROR, // 抛出错误信息
(errmodule(MOD_VEC_EXECUTOR),
errcode(ERRCODE_CACHE_LOOKUP_FAILED),
errmsg("cache lookup failed for function %u", aggref->aggfnoid))); // 显示函数ID的缓存查找失败错误
agg_owner = ((Form_pg_proc)GETSTRUCT(proc_tuple))->proowner; // 获取聚合函数的所有者
ReleaseSysCache(proc_tuple); // 释放系统缓存
aclresult = pg_proc_aclcheck(transfn_oid, agg_owner, ACL_EXECUTE); // 检查所有者是否有权限执行转换函数
if (aclresult != ACLCHECK_OK) // 如果权限检查失败
aclcheck_error(aclresult, ACL_KIND_PROC, get_func_name(transfn_oid)); // 抛出权限错误
if (OidIsValid(finalfn_oid)) { // 如果最终函数有效
aclresult = pg_proc_aclcheck(finalfn_oid, agg_owner, ACL_EXECUTE); // 检查所有者是否有权限执行最终函数
if (aclresult != ACLCHECK_OK) // 如果权限检查失败
aclcheck_error(aclresult, ACL_KIND_PROC, get_func_name(finalfn_oid)); // 抛出权限错误
}
}
/* 如果转换状态是多态类型,则解析实际的转换状态类型 */
aggtranstype = aggform->aggtranstype; // 获取聚合函数的转换状态类型
if (IsPolymorphicType(aggtranstype)) { // 如果转换状态是多态类型
/* 需要获取聚合函数声明的输入类型 */
Oid* declared_arg_types = NULL;
int agg_nargs;
(void)get_func_signature(aggref->aggfnoid, &declared_arg_types, &agg_nargs); // 获取聚合函数的输入签名
Assert(agg_nargs == num_arguments); // 确保输入参数数量匹配
aggtranstype =
enforce_generic_type_consistency(input_types, declared_arg_types, agg_nargs, aggtranstype, false); // 强制确保输入类型和转换类型的一致性
pfree_ext(declared_arg_types); // 释放声明的参数类型内存
}
/* 使用实际的参数和结果类型构建表达式树 */
build_aggregate_fnexprs(input_types, // 输入参数类型
num_arguments, // 参数数量
aggtranstype, // 转换状态类型
aggref->aggtype, // 聚合函数类型
aggref->inputcollid, // 输入的排序规则
transfn_oid, // 转换函数OID
finalfn_oid, // 最终函数OID
&transfnexpr, // 转换函数表达式
&finalfnexpr); // 最终函数表达式
fmgr_info(transfn_oid, &peraggstate->transfn); // 初始化转换函数的函数管理器信息
fmgr_info_set_expr((Node*)transfnexpr, &peraggstate->transfn); // 设置转换函数的表达式
if (OidIsValid(finalfn_oid)) { // 如果最终函数有效
fmgr_info(finalfn_oid, &peraggstate->finalfn); // 初始化最终函数的函数管理器信息
fmgr_info_set_expr((Node*)finalfnexpr, &peraggstate->finalfn); // 设置最终函数的表达式
}
peraggstate->aggCollation = aggref->inputcollid; // 设置聚合函数的排序规则
get_typlenbyval(aggref->aggtype, &peraggstate->resulttypeLen, &peraggstate->resulttypeByVal); // 获取聚合结果类型的长度和按值传递属性
get_typlenbyval(aggtranstype, &peraggstate->transtypeLen, &peraggstate->transtypeByVal); // 获取转换状态类型的长度和按值传递属性
/*
* 初始值可能为NULL,因此不能直接作为结构字段访问,必须通过SysCacheGetAttr处理。
*/
text_init_val = SysCacheGetAttr(AGGFNOID, agg_tuple, Anum_pg_aggregate_agginitval, &peraggstate->initValueIsNull); // 获取聚合函数的初始值
#ifndef ENABLE_MULTIPLE_NODES
if (estate->es_is_flt_frame) { // 如果执行状态为流框架
peraggstate->initValueIsNull = numeric_agg_trans_initvalisnull(peraggstate->transfn_oid,
peraggstate->initValueIsNull); // 根据转换函数OID调整初始值是否为NULL
}
#endif
if (peraggstate->initValueIsNull) // 如果初始值为NULL
peraggstate->initValue = (Datum)0; // 设置初始值为0
else
peraggstate->initValue = GetAggInitVal(text_init_val, aggtranstype); // 获取聚合函数的实际初始值
/*
* 如果转换函数是严格的并且初始值为NULL,确保输入类型和转换类型是相同的(或至少是二进制兼容的),
* 以确保可以使用第一个输入值作为初始转换值。这应该在聚合定义时进行检查,但以防万一...
*/
if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull) { // 如果转换函数是严格的并且初始值为NULL
if (num_arguments < 1 || !IsBinaryCoercible(input_types[0], aggtranstype)) // 如果参数数量少于1或类型不兼容
ereport(ERROR, // 抛出错误
(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
errmsg(
"aggregate %u needs to have compatible input type and transition type", aggref->aggfnoid))); // 聚合函数需要兼容的输入和转换类型
}
/*
* 获取聚合函数的输入参数(包括排序表达式)的tuple描述符。
*/
peraggstate->evaldesc = ExecTypeFromTL(aggref->args, false); // 从目标列表中获取聚合函数的tuple描述符
/* 创建用于参数评估的tuple槽 */
peraggstate->evalslot = ExecInitExtraTupleSlot(estate); // 初始化一个额外的tuple槽
ExecSetSlotDescriptor(peraggstate->evalslot, peraggstate->evaldesc); // 设置tuple槽的描述符
/* 为评估设置投影信息 */
peraggstate->evalproj = NULL; // 初始化投影信息为空
if (ExecTargetListLength(aggrefstate->args) > 0) { // 如果目标列表长度大于0
peraggstate->evalproj = ExecBuildVecProjectionInfo( // 构建向量投影信息
aggrefstate->args, node->plan.qual, aggstate->tmpcontext, peraggstate->evalslot, NULL);
}
/*
* 如果有DISTINCT或ORDER BY,那么我们有一个SortGroupClause节点列表;
* 提取其中的数据并存储到数组中。
* 注意,如果存在DISTINCT子句,那么ORDER BY子句是其前缀(参见transformDistinctClause)。
*/
if (aggref->aggdistinct) { // 如果存在DISTINCT子句
sortlist = aggref->aggdistinct; // 获取DISTINCT排序列表
num_sort_cols = num_dist_cols = list_length(sortlist); // 获取排序列和DISTINCT列的数量
Assert(num_sort_cols >= list_length(aggref->aggorder)); // 确保排序列数量大于或等于ORDER BY列数量
} else {
sortlist = aggref->aggorder; // 获取ORDER BY排序列表
num_sort_cols = list_length(sortlist); // 获取排序列数量
num_dist_cols = 0; // DISTINCT列数量为0
}
// 将排序列数和去重列数赋值给聚合状态
peraggstate->numSortCols = num_sort_cols;
peraggstate->numDistinctCols = num_dist_cols;
// 如果存在排序列
if (num_sort_cols > 0) {
/*
* 尚未实现哈希聚合中的DISTINCT或ORDER BY功能
*/
Assert(node->aggstrategy != AGG_HASHED);
// 如果仅有一个输入,需要获取其长度和是否按值传递的属性
if (num_inputs == 1) {
get_typlenbyval(input_types[0], &peraggstate->inputtypeLen, &peraggstate->inputtypeByVal);
} else if (num_dist_cols > 0) {
// 如果有多个DISTINCT列,需要一个额外的槽来存储之前的值
peraggstate->uniqslot = NULL;
}
// 提取排序信息以供后续使用
peraggstate->sortColIdx = (AttrNumber*)palloc(num_sort_cols * sizeof(AttrNumber));
peraggstate->sortOperators = (Oid*)palloc(num_sort_cols * sizeof(Oid));
peraggstate->sortCollations = (Oid*)palloc(num_sort_cols * sizeof(Oid));
peraggstate->sortNullsFirst = (bool*)palloc(num_sort_cols * sizeof(bool));
// 遍历排序列表,获取排序列的详细信息
i = 0;
foreach (lc, sortlist) {
SortGroupClause* sortcl = (SortGroupClause*)lfirst(lc);
TargetEntry* tle = get_sortgroupclause_tle(sortcl, aggref->args);
// 确保排序操作符是有效的
Assert(OidIsValid(sortcl->sortop));
peraggstate->sortColIdx[i] = tle->resno;
peraggstate->sortOperators[i] = sortcl->sortop;
peraggstate->sortCollations[i] = exprCollation((Node*)tle->expr);
peraggstate->sortNullsFirst[i] = sortcl->nulls_first;
i++;
}
// 确保遍历的列数与实际排序列数一致
Assert(i == num_sort_cols);
}
// 如果存在DISTINCT聚合操作
if (aggref->aggdistinct) {
Assert(num_arguments > 0);
// 如果DISTINCT列多于1列,则抛出错误(矢量聚合不支持多个DISTINCT列)
if (num_dist_cols > 1) {
ereport(ERROR,
(errcode(ERRCODE_INVALID_AGG),
errmsg("Vector aggregation does not support this distinct clause in aggregate function")));
}
/*
* 为每个DISTINCT列比较操作准备相等函数
*/
peraggstate->equalfns = (FmgrInfo*)palloc(num_dist_cols * sizeof(FmgrInfo));
// 遍历DISTINCT列,获取比较操作符
i = 0;
foreach (lc, aggref->aggdistinct) {
SortGroupClause* sortcl = (SortGroupClause*)lfirst(lc);
// 为DISTINCT列的相等比较函数初始化函数信息
fmgr_info(get_opcode(sortcl->eqop), &peraggstate->equalfns[i]);
// 根据列类型为特定类型设置相等函数指针
switch (peraggstate->evaldesc->tdtypeid) {
case TIMETZOID:
peraggstate->equalfns[i].fn_addr = timetz_eq_withhead;
break;
case TINTERVALOID:
peraggstate->equalfns[i].fn_addr = tintervaleq_withhead;
break;
case INTERVALOID:
peraggstate->equalfns[i].fn_addr = interval_eq_withhead;
break;
case NAMEOID:
peraggstate->equalfns[i].fn_addr = nameeq_withhead;
break;
default:
break;
}
i++;
}
// 确保DISTINCT列处理正确
Assert(i == num_dist_cols);
}
// 释放缓存的聚合元组
ReleaseSysCache(agg_tuple);
// 计算聚合索引,并初始化相关状态
int idx = aggstate->numaggs - 1 - aggno;
{
DBG_ASSERT(idx >= 0);
bool use_sonichash = (node->aggstrategy == AGG_HASHED && node->is_sonichash);
// 调用函数进行聚合操作调度
DispatchAggFunction(&aggstate->pervecagg[aggno], &aggstate->aggInfo[idx], use_sonichash);
/* 初始化函数调用参数结构 */
InitFunctionCallInfoData(aggstate->aggInfo[idx].vec_agg_function,
&peraggstate->transfn,
4,
peraggstate->aggCollation,
NULL,
NULL);
/* 在不同节点选择矢量聚合函数 */
if (OidIsValid(peraggstate->collectfn_oid) == false)
aggstate->aggInfo[idx].vec_agg_function.flinfo->vec_fn_addr = aggstate->aggInfo[idx].vec_agg_cache[0];
else
aggstate->aggInfo[idx].vec_agg_function.flinfo->vec_fn_addr = aggstate->aggInfo[idx].vec_agg_cache[1];
#ifdef ENABLE_MULTIPLE_NODES
if (OidIsValid(peraggstate->finalfn_oid)) {
#else
if (OidIsValid(peraggstate->finalfn_oid) && aggstate->aggInfo[idx].vec_agg_cache[0] &&
aggstate->aggInfo[idx].vec_agg_final[0]) {
#endif
// 初始化最终函数调用参数
InitFunctionCallInfoData(aggstate->aggInfo[idx].vec_final_function,
&peraggstate->finalfn,
2,
peraggstate->aggCollation,
NULL,
NULL);
// 选择最终聚合函数
if (OidIsValid(peraggstate->collectfn_oid) == false) {
if (is_singlenode)
aggstate->aggInfo[idx].vec_final_function.flinfo->fn_addr =
aggstate->aggInfo[idx].vec_agg_final[0];
else
aggstate->aggInfo[idx].vec_final_function.flinfo->fn_addr =
aggstate->aggInfo[idx].vec_agg_final[1];
aggstate->aggInfo[idx].vec_agg_function.flinfo->vec_fn_addr =
aggstate->aggInfo[idx].vec_agg_cache[0];
} else {
if (is_singlenode)
aggstate->aggInfo[idx].vec_final_function.flinfo->fn_addr =
aggstate->aggInfo[idx].vec_agg_final[2];
else
aggstate->aggInfo[idx].vec_final_function.flinfo->fn_addr =
aggstate->aggInfo[idx].vec_agg_final[3];
aggstate->aggInfo[idx].vec_agg_function.flinfo->vec_fn_addr =
aggstate->aggInfo[idx].vec_agg_cache[1];
}
}
// 分配矢量参数
ScalarVector* p_vec = New(CurrentMemoryContext) ScalarVector[4];
for (int k = 0; k < 4; k++)
p_vec[k].init(CurrentMemoryContext, unknown_desc);
// 为聚合函数分配矢量
aggstate->aggInfo[idx].vec_agg_function.argVector = p_vec;
// 为布尔值检查分配矢量
if (peraggstate->evalproj != NULL)
ExecAssignVectorForExprEval(peraggstate->evalproj->pi_exprContext);
// 更新哈希表的偏移量
aggrefstate->m_htbOffset = col_number + idx;
}
}
// 为表达式上下文分配矢量
ExecAssignVectorForExprEval(aggstate->ss.ps.ps_ExprContext);
// 更新聚合数量
aggstate->numaggs = aggno + 1;
aggstate->aggRun = NULL;
#ifdef ENABLE_LLVM_COMPILE
/*
* 如果启用了LLVM编译,生成哈希聚合的IR函数
*/
bool consider_codegen =
CodeGenThreadObjectReady() &&
CodeGenPassThreshold(((Plan*)outer_plan)->plan_rows, estate->es_plannedstmt->num_nodes, ((Plan*)outer_plan)->dop);
if (consider_codegen) {
if (node->aggstrategy == AGG_HASHED && node->is_sonichash) {
dorado::VecHashAggCodeGen::SonicHashAggCodeGen(aggstate);
} else if (node->aggstrategy == AGG_HASHED) {
dorado::VecHashAggCodeGen::HashAggCodeGen(aggstate);
}
}
#endif
// 返回聚合状态
return aggstate;
}
ExecVecAggregation 函数
ExecVecAggregation 函数是向量化聚合的入口函数,根据不同的聚合策略,执行不同类型的聚合操作。它支持多种聚合策略,包括:
- AGG_PLAIN:用于简单聚合操作,即不涉及分组的聚合,例如对整个数据集的总和、平均值等操作。
- AGG_HASHED:使用哈希表来处理分组聚合的操作,适用于大量分组的情况。若开启了 SonicHash,则使用优化过的 SonicHashAgg 执行器。
- AGG_SORTED:基于排序的聚合方法,适用于已经按分组键排序的数据集。
操作流程描述
- 判断是否为 dummy 聚合:
- 若当前聚合节点为 dummy 聚合(由远程 SQL 解析),则不执行聚合操作,而是直接返回外部节点的执行结果。
- 根据聚合策略创建执行器:
- 若 aggRun 尚未初始化,根据当前计划的聚合策略创建不同的聚合执行器,包括 PlainAggRunner、HashAggRunner 或 SortAggRunner。对于哈希聚合,若启用了 SonicHash,则使用 SonicHashAgg 执行器。
- 执行聚合操作:
- 如果启用了 SonicHash,则执行 SonicHashAgg 的 Run 方法。
- 否则,调用 DO_AGGEGATION 函数,使用相应的执行器完成聚合操作。
- 返回聚合结果:
- 最终,函数==返回聚合计算后的结果批次 ==(VectorBatch),供后续处理使用。
此函数的主要目的是根据不同的聚合策略,执行对应的聚合操作,并返回向量化的批次结果。函数源码如下所示:(路径:src\gausskernel\runtime\vecexecutor\vecnode\vecagg.cpp
)
/*
* @Description: vec agg 入口函数,根据聚合策略调用不同的函数。
*/
VectorBatch* ExecVecAggregation(VecAggState* node)
{
// 确保没有从目标列表(Tlist)中获取向量化元组。
Assert(!node->ss.ps.ps_vec_TupFromTlist);
/*
* 为协同分析保留,若 is_dummy 为 true 则不执行任何操作。
* is_dummy 为 true 意味着 Agg 节点已经在 ForeignScan 节点中通过远程 SQL 解析。
*/
if (((VecAgg*)node->ss.ps.plan)->is_dummy) {
PlanState* outer_node = outerPlanState(node); // 获取外部计划节点
VectorBatch* result_batch = VectorEngine(outer_node); // 执行外部计划并返回结果批次
return result_batch; // 返回外部计划结果
}
// 获取当前的 VecAgg 计划节点
VecAgg* plan = (VecAgg*)(node->ss.ps.plan);
// 如果 aggRun 尚未初始化,基于聚合策略创建不同的聚合执行器
if (node->aggRun == NULL) {
switch (plan->aggstrategy) {
case AGG_PLAIN: // 简单聚合(无分组聚合)
node->aggRun = New(CurrentMemoryContext) PlainAggRunner(node); // 创建 Plain 聚合执行器
break;
case AGG_HASHED: // 哈希聚合
if (plan->is_sonichash)
node->aggRun = New(CurrentMemoryContext) SonicHashAgg(node, INIT_DATUM_ARRAY_SIZE); // Sonic Hash 聚合
else
node->aggRun = New(CurrentMemoryContext) HashAggRunner(node); // 普通 Hash 聚合
break;
case AGG_SORTED: // 排序聚合
node->aggRun = New(CurrentMemoryContext) SortAggRunner(node); // 创建排序聚合执行器
break;
default:
// 报错:未识别的聚合类型
ereport(ERROR,
(errmodule(MOD_VEC_EXECUTOR),
errcode(ERRCODE_UNRECOGNIZED_NODE_TYPE),
errmsg("unsupported aggregation type"))); // 不支持的聚合类型
break;
}
}
// 如果使用 Sonic Hash 聚合则执行 Sonic Hash 聚合逻辑
if (plan->is_sonichash)
return ((SonicHashAgg*)node->aggRun)->Run(); // 调用 Sonic Hash 聚合的 Run 方法返回批次结果
else
return DO_AGGEGATION(node->aggRun); // 执行普通聚合操作
}
ExecEndVecAggregation 函数
ExecEndVecAggregation 函数用于在向量化聚合操作完成后执行清理工作。它处理不同聚合策略下的资源释放,确保在聚合操作结束时释放内存和其他资源。
操作流程描述
- 获取当前计划节点:
- 首先,从 VecAggState 中获取当前的聚合计划节点。
- 根据聚合策略执行清理:
- 如果聚合策略是 AGG_HASHED(哈希聚合):
- 检查是否使用了 SonicHash,如果是,则调用 freeMemoryContext() 释放 SonicHash 聚合的内存上下文。
- 如果不是,则调用 closeFile() 关闭哈希聚合使用的文件,然后调用 freeMemoryContext() 释放普通哈希聚合的内存上下文。
- 如果聚合策略是 AGG_SORTED(排序聚合),则调用 endSortAgg() 结束排序聚合的处理。
- 如果是 AGG_PLAIN(简单聚合),则调用 endPlainAgg() 结束简单聚合的处理。
- 释放表达式上下文:
- 释放当前的表达式上下文并恢复到临时表达式上下文,确保不再占用不必要的内存。
- 清理元组槽和外部计划:
- 清理扫描元组槽,确保释放元组表的资源。
- 获取外部计划状态,并调用 ExecEndNode() 结束外部计划节点的处理。
此函数的主要目的是在聚合操作完成后,清理和释放与聚合相关的所有资源,以避免内存泄漏和资源浪费。函数源码如下所示:(路径:src\gausskernel\runtime\vecexecutor\vecnode\vecagg.cpp
)
/*
* @Description: 聚合结束处理函数。
*/
void ExecEndVecAggregation(VecAggState* node)
{
PlanState* outer_plan = NULL; // 定义外部计划节点的指针
// 获取当前的 VecAgg 计划节点
VecAgg* plan = (VecAgg*)(node->ss.ps.plan);
// 根据聚合策略处理聚合执行器的结束逻辑
if (plan->aggstrategy == AGG_HASHED) { // 哈希聚合策略
/* 根据当前运行时释放内存上下文 */
if (plan->is_sonichash) { // 使用 SonicHash 聚合
SonicHashAgg* vshashagg = (SonicHashAgg*)node->aggRun;
if (vshashagg != NULL) {
vshashagg->freeMemoryContext(); // 释放 SonicHash 聚合的内存上下文
}
} else { // 普通哈希聚合
HashAggRunner* vechashTbl = (HashAggRunner*)node->aggRun;
if (vechashTbl != NULL) {
vechashTbl->closeFile(); // 关闭哈希聚合使用的文件
vechashTbl->freeMemoryContext(); // 释放哈希聚合的内存上下文
}
}
} else if (plan->aggstrategy == AGG_SORTED) { // 排序聚合策略
SortAggRunner* vecSortAgg = (SortAggRunner*)node->aggRun;
if (vecSortAgg != NULL) {
vecSortAgg->endSortAgg(); // 结束排序聚合的处理
}
} else { // 简单聚合策略
PlainAggRunner* vecPlainAgg = (PlainAggRunner*)node->aggRun;
if (vecPlainAgg != NULL) {
vecPlainAgg->endPlainAgg(); // 结束简单聚合的处理
}
}
/*
* 释放表达式上下文。
*/
ExecFreeExprContext(&node->ss.ps); // 释放当前的表达式上下文
node->ss.ps.ps_ExprContext = node->tmpcontext; // 恢复临时表达式上下文
ExecFreeExprContext(&node->ss.ps); // 释放临时的表达式上下文
/* 清理元组表 */
(void)ExecClearTuple(node->ss.ss_ScanTupleSlot); // 清理扫描元组槽
outer_plan = outerPlanState(node); // 获取外部计划状态
ExecEndNode(outer_plan); // 结束外部计划节点的处理
}
ExecReScanVecAggregation 函数
ExecReScanVecAggregation 函数的设计是为了确保在执行计划的不同阶段中,聚合操作可以适当地重新扫描左侧树。这对于动态查询和状态变化(如参数变化)尤其重要,确保了聚合操作的正确性和一致性。通过重置机制,该函数允许优化器在处理复杂查询时更加灵活地管理聚合过程。
操作流程描述
- 初始化重置标志:
- 创建一个布尔变量 reset_left_tree,用于跟踪是否需要重置聚合操作的左侧树(即左侧子节点)。
- 获取当前聚合计划:
- 从 VecAggState 中获取当前的聚合计划,通过强制转换获取 VecAgg 结构体。
- 判断聚合运行器类型:
- 检查当前聚合运行器的类型。如果使用的是 SonicHashAgg 类型的运行器,则调用其 ResetNecessary 方法来判断是否需要重置左侧树;如果是其他类型的 BaseAggRunner,也进行相同的判断。
- 条件重新扫描左侧树:
- 如果需要重置左侧树,并且左侧树的参数(chgParam)没有发生变化(即没有新参数需要处理),则调用 VecExecReScan 函数重新扫描左侧树。这一部分确保了聚合操作可以在状态改变后继续正常执行。
ExecReScanVecAggregation 函数的设计是为了确保在执行计划的不同阶段中,聚合操作可以适当地重新扫描左侧树。这对于动态查询和状态变化(如参数变化)尤其重要,确保了聚合操作的正确性和一致性。通过重置机制,该函数允许优化器在处理复杂查询时更加灵活地管理聚合过程。函数源码如下所示:(路径:src\gausskernel\runtime\vecexecutor\vecnode\vecagg.cpp
)
/*
* @Description: 聚合操作的重新扫描函数。
*/
void ExecReScanVecAggregation(VecAggState* node)
{
bool reset_left_tree = false; // 用于标记是否需要重置左侧树
VecAgg* plan = (VecAgg*)(node->ss.ps.plan); // 获取当前的聚合计划
// 检查当前聚合运行器的类型并调用相应的重置方法
if (plan->is_sonichash) {
SonicHashAgg* tb = (SonicHashAgg*)node->aggRun; // 强制转换为SonicHashAgg类型
if (tb != NULL) {
// 如果需要重置,则标记reset_left_tree
reset_left_tree = tb->ResetNecessary(node);
}
} else {
BaseAggRunner* tb = (BaseAggRunner*)node->aggRun; // 强制转换为BaseAggRunner类型
if (tb != NULL) {
// 如果需要重置,则标记reset_left_tree
reset_left_tree = tb->ResetNecessary(node);
}
}
// 如果需要重置左侧树,并且左侧树的参数没有变化,则重新扫描左侧树
if (reset_left_tree && node->ss.ps.lefttree->chgParam == NULL) {
VecExecReScan(node->ss.ps.lefttree); // 调用左侧树的重新扫描
}
}
本站资源均来自互联网,仅供研究学习,禁止违法使用和商用,产生法律纠纷本站概不负责!如果侵犯了您的权益请与我们联系!
转载请注明出处: 免费源码网-免费的源码资源网站 » 【OpenGauss源码学习 —— (VecAgg)】
发表评论 取消回复