背景
本文基于 Starrocks 3.1.7
结论
Starrocks 会启动一个线程周期性的去进行Compaction,该周期间隔为 200 MS, 该Compaction以table的partition为切入点,tablet(也就是bucket)为粒度进行task的创建。
分析
CompactionMgr start 方法会启动一个CompactionScheduler 用来启动一个 合并的周期性任务.
这里的周期会由 LOOP_INTERVAL_MS参数控制,默认是 200ms.
然后每个周期内会调用 runOneCycle 方法:
protected void runOneCycle() {
cleanPartition();
// Schedule compaction tasks only when this is a leader FE and all edit logs have finished replay.
// In order to ensure that the input rowsets of compaction still exists when doing publishing version, it is
// necessary to ensure that the compaction task of the same partition is executed serially, that is, the next
// compaction task can be executed only after the status of the previous compaction task changes to visible or
// canceled.
if (stateMgr.isLeader() && stateMgr.isReady() && allCommittedCompactionsBeforeRestartHaveFinished()) {
schedule();
history.changeMaxSize(Config.lake_compaction_history_size);
failHistory.changeMaxSize(Config.lake_compaction_fail_history_size);
}
}
- cleanPartition 这里会清除无效的分区,便于后续进行Compaction
- 这里会有个 FE leader的判断(这里所涉及到的GlobalStateMgr只是单个FE的状态),只有是leader节点才可以进行
Compaction
,最主要的逻辑还是在schedule
方法中:for (Iterator<Map.Entry<PartitionIdentifier, CompactionJob>> iterator = runningCompactions.entrySet().iterator(); ... if (job.isCompleted()) { job.getPartition().setMinRetainVersion(0); try { commitCompaction(partition, job); assert job.transactionHasCommitted(); } catch (Exception e) { ... } } else if (job.isFailed()) { job.getPartition().setMinRetainVersion(0); errorMsg = Objects.requireNonNull(job.getFailMessage(), "getFailMessage() is null"); job.abort(); // Abort any executing task, if present. } if (errorMsg != null) { iterator.remove(); job.finish(); failHistory.offer(CompactionRecord.build(job, errorMsg)); compactionManager.enableCompactionAfter(partition, MIN_COMPACTION_INTERVAL_MS_ON_FAILURE); abortTransactionIgnoreException(partition.getDbId(), job.getTxnId(), errorMsg); continue; } ... int index = 0; int compactionLimit = compactionTaskLimit(); int numRunningTasks = runningCompactions.values().stream().mapToInt(CompactionJob::getNumTabletCompactionTasks).sum(); if (numRunningTasks >= compactionLimit) { return; } List<PartitionIdentifier> partitions = compactionManager.choosePartitionsToCompact(runningCompactions.keySet()); while (numRunningTasks < compactionLimit && index < partitions.size()) { PartitionIdentifier partition = partitions.get(index++); CompactionJob job = startCompaction(partition); if (job == null) { continue; } numRunningTasks += job.getNumTabletCompactionTasks(); runningCompactions.put(partition, job); if (LOG.isDebugEnabled()) { LOG.debug("Created new compaction job. partition={} txnId={}", partition, job.getTxnId()); } }
-
选取正在进行的
Compaction
的job,如果该任务完成了compaction(每个tablets都完成了compaction) ,但是事务没有提交,则完成compaction
事务的提交,
否则如果任务失败了,则abort该job。最终会把该任务从runnning队列中移除掉。如果是失败任务的话,还会记录到failHistory中,并会重新进行Compaction的任务的延迟提交(延迟间隔为LOOP_INTERVAL_MS*10,其中LOOP_INTERVAL_MS 为200ms) -
如果Compaction事务已经提交了,则会记录到
history
中,并会重新进行Compaction的任务的延迟提交(延迟间隔为LOOP_INTERVAL_MS*2,其中LOOP_INTERVAL_MS 为200ms) -
处理完正在运行的Compaction任务后,会构建当前的
Compaction
任务- 首先会通过
compactionTaskLimit
方法获取本次Compaction任务的个数限制,如果lake_compaction_max_tasks
大于等于0,则会根据lake_compaction_max_tasks
配置来,否则会根据系统的BE数和CN数乘以16来计算。 - 如果 运行的task(以Tablets为粒度计数的)大于了该
compactionTaskLimit
,则此次Compaction
结束,否则继续下一步 compactionManager.choosePartitionsToCompact
从已有的分区中。并且排除掉runningCompactions
里正在运行的Compaction任务中涉及的partition。
choosePartitionsToCompact 涉及到Sorter(默认ScoreSorter) 和selector(ScoreSelector),
ScoreSelector 会选择 lake_compaction_score_selector_min_score(默认为10)并且到了合并的时间的分区
ScoreSorter 会按照compactionScore 从高到低进行排序- 对于每一个被选出来的分区,会进行调用
startCompaction
方法进行compaction
任务的构建
这里会调用collectPartitionTablets方法,用来选择tablet以及对应的该tablet对应的backend - 调用createCompactionTasks创建CompactionTask,这里有多少个backend就有多少个task
调用thrift rpc服务往对应的backend发送Compact请求,并组装成CompactionJobList<CompactionTask> tasks = new ArrayList<>(); for (Map.Entry<Long, List<Long>> entry : beToTablets.entrySet()) { ComputeNode node = systemInfoService.getBackendOrComputeNode(entry.getKey()); if (node == null) { throw new UserException("Node " + entry.getKey() + " has been dropped"); } LakeService service = BrpcProxy.getLakeService(node.getHost(), node.getBrpcPort()); CompactRequest request = new CompactRequest(); request.tabletIds = entry.getValue(); request.txnId = txnId; request.version = currentVersion; request.timeoutMs = LakeService.TIMEOUT_COMPACT; CompactionTask task = new CompactionTask(node.getId(), service, request); tasks.add(task); } return tasks;
- 首先会通过
-
累计numRunningTasks计数,便于控制Compaction的并发执行,并且回放到 runningCompactions中
-
其他
前文提到的 一些 FE的配置 ,如lake_compaction_max_tasks 都是可以配置的,
可以通过 命令* admin set frontend config (“lake_compaction_max_tasks” = “0”);* ,具体的参考ADMIN_SET_CONFIG,
注意: 这个命令只是修改了当前内存中的变量的值,如果需要永久的修改,需要配置到fe.conf中
本站资源均来自互联网,仅供研究学习,禁止违法使用和商用,产生法律纠纷本站概不负责!如果侵犯了您的权益请与我们联系!
转载请注明出处: 免费源码网-免费的源码资源网站 » Starrocks Compaction的分析
发表评论 取消回复