1. 引言

在本期的技术深度解析中,我们将学习并且了解Apache Paimon 的基本概念,了解一些特性,为了解决传统数据湖架构在实时数据处理和更新方面的局限性。其是一种数据湖格式,它能够与 FlinkSpark 结合,构建实时湖架构,支持流处理和批处理操作。

通过本文,我们将带领读者:

  1. 深入理解Paimon的各个组成部分及其具体功能作用。
  2. 探讨并发读写相关的处理策略与方法。

欢迎在评论区分享您的观点与见解,期待与您交流讨论!

2. 基本概念

2.1 基本构成

一张表的所有文件都存储在一个基本目录下,Paimon文件以分层方式组织。下图说明了文件布局:

warehouse
└── default.db
    └── my_table
        ├── bucket-0
        │   └── data-59f60cb9-44af-48cc-b5ad-59e85c663c8f-0.orc
        ├── index
        │   └── index-5625e6d9-dd44-403b-a738-2b6ea92e20f1-0
        ├── manifest
        │   ├── index-manifest-5d670043-da25-4265-9a26-e31affc98039-0
        │   ├── manifest-6758823b-2010-4d06-aef0-3b1b597723d6-0
        │   ├── manifest-list-9f856d52-5b33-4c10-8933-a0eddfaa25bf-0
        │   └── manifest-list-9f856d52-5b33-4c10-8933-a0eddfaa25bf-1
        ├── schema
        │   └── schema-0
        └── snapshot
            ├── EARLIEST
            ├── LATEST
            └── snapshot-1
  • Snapshot Files:包含表在某个时间点的状态信息。

  • Manifest Files:所有清单列表(manifestlist)和清单文件(manifestfile)都存储在 Manifest Files 目录中。清单列表是清单文件名的列表,而清单文件则包含有关 LSM 数据文件和变更日志文件的信息。

  • Data Files:数据文件按分区和存储桶分组,每个存储桶目录都包含一个 LSM 树及其变更日志文件。目前,Paimon 支持使用 orc(默认)、parquetavro 作为数据文件格式。

  • PartitionPaimon采用与 Apache Hive 相同的分区概念来分离数据,这是一种可选方法。

  • LSM Trees :Paimon采用 LSM Tree(日志结构合并树)作为其底层的数据结构来组织和管理数据的存储和更新操作.

2.2 Schema

Schema FileJSON 格式的文件,包含以下内容:


{
  "version" : 3,
  "id" : 0,
  "fields" : [ {
    "id" : 0,
    "name" : "order_id",
    "type" : "BIGINT NOT NULL"
  }, {
    "id" : 1,
    "name" : "order_name",
    "type" : "STRING"
  }, {
    "id" : 2,
    "name" : "order_user_id",
    "type" : "BIGINT"
  }, {
    "id" : 3,
    "name" : "order_shop_id",
    "type" : "BIGINT"
  } ],
  "highestFieldId" : 3,
  "partitionKeys" : [ ],
  "primaryKeys" : [ "order_id" ],
  "options" : {
    "bucket" : "5"
  },
  "comment" : "",
  "timeMillis" : 1720496663041
}

  • fields:数据字段列表,每个数据字段包含 idnametypefield id 用于支持模式演化。
  • partitionKeys:字段名列表,表的分区定义,不可修改。
  • primaryKeys:字段名列表,表的主键定义,不可修改。
  • optionsmap<string, string>,无序的表选项集合,包括许多功能和优化设置。

对于旧版本:

  • version 1:如果没有 bucket key,应在 options 中设置 bucket -> 1
  • version 1 & 2:如果没有 file.format key,应在 options 中设置 file.format -> orc

更新Schema 应该会生成一个新的Schema 文件。


warehouse
└── default.db
    └── my_table
        ├── schema
            ├── schema-0
            ├── schema-1
            └── schema-2

同时要注意的是在 snapshot 中有一个对 schema 的引用。通常,数值最高的 schema file 是最新的 schema file

旧的 schema files 不能直接删除,因为可能存在引用这些旧 schema files 的旧数据文件。在读取表时,需要依赖它们进行模式演化读取。

2.3 Snapshot

每个提交都会生成一个 snapshot filesnapshot file 的版本从 1 开始并且必须是连续的。EARLIESTLATEST 是位于 snapshot list 开头和结尾的提示文件,它们可能不准确。当提示文件不准确时,读取操作将扫描所有 snapshot files 以确定开头和结尾。

写入提交会抢占下一个 snapshot id,一旦成功写入 snapshot file,该提交将对用户可见。


warehouse
└── default.db
    └── my_table
        ├── snapshot
            ├── EARLIEST
            ├── LATEST
            ├── snapshot-1
            ├── snapshot-2
            └── snapshot-3

Snapshot File 也是一个 JSON 文件,它包含了以下信息:

  • version:快照文件的版本,当前版本是 3
  • id:快照的 ID,与文件名相同。
  • schemaId:与此次提交相对应的 schema 版本。
  • baseManifestList:记录从之前快照中所有变更的清单列表。
  • deltaManifestList:记录此次快照中所有新发生的变更的清单列表。
  • changelogManifestList:记录此次快照中产生的所有变更日志的清单列表,如果没有产生变更日志则为 null
  • indexManifest:记录此表所有索引文件的清单,如果没有表索引文件则为 null
  • commitUser:通常由 UUID 生成,用于流式写入的恢复,每个流式写入作业对应一个用户。
  • commitIdentifier:与流式写入对应的事务 ID,每个事务可能导致不同提交类型的多个提交。
  • commitKind:此次快照中的变更类型,包括追加(append)、压缩(compact)、覆盖(overwrite)和分析(analyze)。
  • timeMillis:提交的时间,以毫秒为单位。
  • totalRecordCount:此次快照中所有变更的记录总数。
  • deltaRecordCount:此次快照中所有新变更的记录数。
  • changelogRecordCount:此次快照中产生的所有变更日志的记录数。
  • watermark:输入记录的水印,来自 Flink 水印机制,如果没有水印则为 Long.MIN_VALUE
  • statistics:此表统计信息的统计文件名。

2.4 Manifest

├── manifest
    └── manifest-list-51c16f7b-421c-4bc0-80a0-17677f343358-1

Manifest List 包含了多个 manifest 文件的元数据。其名称包含 UUID,是一个 Avro 文件,其 schema 如下:

  • fileName:清单文件名。
  • fileSize:清单文件大小。
  • numAddedFiles:清单中新增文件的数量。
  • numDeletedFiles:清单中删除文件的数量。
  • partitionStats:分区统计信息,这个清单中分区字段的最小值和最大值有助于在查询时跳过某些清单文件,它是一个 SimpleStats 对象。
  • schemaId:写入此清单文件时使用的 schema ID。

除了 Manifest List文件中包含的数据,这包括三种类型的Manifest :data、index 和 changelog。

├── manifest
    └── manifest-6758823b-2010-4d06-aef0-3b1b597723d6-0

Data Manifest 包含了关于数据文件的详细信息,具体包括:

  • fileName:数据文件的名称。
  • fileSize:数据文件的大小。
  • numRecords:数据文件中的记录数。
  • minRecord:数据文件中记录的最小值(根据业务逻辑定义)。
  • maxRecord:数据文件中记录的最大值。
  • partitionPath:数据文件所在的分区路径。
  • crcCheckSum:数据文件的CRC校验和,用于数据完整性验证。
  • schemaId:写入数据文件时使用的 schema 的 ID。
├── manifest
    └── index-manifest-5d670043-da25-4265-9a26-e31affc98039-0

Index Manifest 包含了关于索引文件的详细信息,具体包括:

  • fileName:索引文件的名称。
  • fileSize:索引文件的大小。
  • numIndexEntries:索引文件中的索引条目数。
  • minIndexKey:索引文件中索引键的最小值。
  • maxIndexKey:索引文件中索引键的最大值。
  • partitionPath:索引文件所在的分区路径。
  • crcCheckSum:索引文件的CRC校验和。

Changelog Manifest 包含了关于变更日志文件的详细信息,具体包括:

  • fileName:变更日志文件的名称。
  • fileSize:变更日志文件的大小。
  • numRecords:变更日志文件中的记录数。
  • minRecord:变更日志文件中记录的最小值。
  • maxRecord:变更日志文件中记录的最大值。
  • partitionPath:变更日志文件所在的分区路径。
  • crcCheckSum:变更日志文件的CRC校验和。

2.5 Data File

通过FlinkSQL创建一个分区表:

CREATE TABLE part_t (
    f0 INT,
    f1 STRING,
    dt STRING
) PARTITIONED BY (dt);

INSERT INTO part_t VALUES (1, '11', '20240514');

Paimon采用与Apache Hive相同的分区概念来分离数据。分区的文件将放置在单独的分区目录中。其实际存储路径如下所示:

part_t
├── dt=20240514
│   └── bucket-0
│       └── data-ca1c3c38-dc8d-4533-949b-82e195b41bd4-0.orc
├── manifest
│   ├── manifest-08995fe5-c2ac-4f54-9a5f-d3af1fcde41d-0
│   ├── manifest-list-51c16f7b-421c-4bc0-80a0-17677f343358-0
│   └── manifest-list-51c16f7b-421c-4bc0-80a0-17677f343358-1
├── schema
│   └── schema-0
└── snapshot
    ├── EARLIEST
    ├── LATEST
    └── snapshot-1

Apache Paimon 中,所有表的存储都依赖于 buckets,数据文件存储在桶目录中。不同类型的表与 Paimon 中的桶之间的关系如下:

主键表(Primary Key Table)

  • bucket = -1:默认模式,动态桶模式通过索引文件记录键对应的桶。索引记录了主键的哈希值与桶之间的对应关系。在这种模式下,系统会自动管理数据分布,根据主键的哈希值将数据分配到不同的桶中,以实现数据的均匀分布和高效查询。
  • bucket = 10:需要根据桶键(默认为主键)的哈希值将数据分配到对应的桶中。在这种模式下,用户需要指定桶的数量,数据会根据桶键的哈希值被均匀分配到指定数量的桶中,这有助于提高数据的并行处理能力

追加表(Append Table)

  • bucket = -1:默认模式,忽略桶概念,尽管所有数据都写入桶-0,但读写的并行性不受限制。 在这种模式下,追加表的数据默认存储在一个桶中,但系统的读写操作仍然可以并行执行,不受单个桶的限制。
  • bucket = 10:需要定义桶键,根据桶键的哈希值将数据分配到对应的桶中。在这种模式下,用户需要指定桶键和桶的数量,数据会根据桶键的哈希值被分配到不同的桶中,这有助于提高数据的并行处理和查询效率。

数据的存储命名是 data-${uuid}-${id}.${format},但是对于追加表仅存储表中原有数据,不增加新列。主键表则包含额外的系统列,如_VALUE_KIND(标记行是删除还是添加)、_SEQUENCE_NUMBER(用于更新时的数据比较)以及_KEY_前缀(避免与表列冲突)。

举个例子

文件名格式为data-uuid-id.format,例如data-1a2b3c-001.csv

追加表:

  • 如购物记录表,有用户ID、商品ID、购买日期列。
  • 新购物记录直接追加到文件末尾,不改变表结构。

主键表:

  • 如用户信息表,有用户ID(主键)、用户名、电子邮件列。
  • 更新用户信息时,不修改原行,而是添加新行,并使用系统列:
    • _VALUE_KIND:标记为“添加”或“删除”。
    • _SEQUENCE_NUMBER:标记数据版本,确定最新信息。
    • _KEY_用户ID:避免与用户ID列冲突。 这样,主键表能确保数据唯一性和一致性,追加表则高效存储新增数据。

2.6 Table

表索引文件位于index 索引目录中。

HASH_VALUE | HASH_VALUE | HASH_VALUE | HASH_VALUE |

动态桶索引Dynamic Bucket Index)是一种用于存储主键哈希值与桶(bucket)对应关系的索引结构。动态桶索引用于存储主键的哈希值与数据存储桶之间的对应关系。其结构非常简单,主要存储哈希值。每个哈希值占用4个字节,使用高位优先(BIG_ENDIAN)。

所谓的⾼位优先,就是最低的地址放在⾼位字节上,⽽低位优先就是最低的地址放在低位字节上。

"Deletion file"是用于记录已删除记录位置的一种机制,特别是在主键表中,每个桶(bucket)都有一个对应的删除文件。

结构

  1. 版本记录: 删除文件的开始部分是一个字节,用于记录版本号。当前版本是1。

  2. 序列化二进制块(serialized bin): 接着记录一个序列化的二进制块,包括序列化二进制块的大小、序列化二进制块本身以及该二进制块的校验和。大小和校验和都是以BIT_ENDIAN整数格式存储。

作用

  • 记录删除位置:删除文件用于记录每个数据文件中被删除的记录的位置,这对于数据恢复和版本控制非常重要。
  • 优化存储和查询:通过记录删除的记录,系统可以避免实际删除数据,从而节省存储空间,并提高查询效率。
  • 支持数据版本控制:在支持时间旅行查询的数据库中,删除文件可以帮助系统追踪不同时间点的数据状态。

2.7 File index

文件索引文件格式。将所有列和偏移量放在HEAD中。

  _____________________________________    _____________________
|     magic    |version|head length |
|-------------------------------------|
|            column number            |
|-------------------------------------|
|   column 1        | index number   |
|-------------------------------------|
|  index name 1 |start pos |length  |
|-------------------------------------|
|  index name 2 |start pos |length  |
|-------------------------------------|
|  index name 3 |start pos |length  |
|-------------------------------------HEAD
|   column 2        | index number   |
|-------------------------------------|
|  index name 1 |start pos |length  |
|-------------------------------------|
|  index name 2 |start pos |length  |
|-------------------------------------|
|  index name 3 |start pos |length  |
|-------------------------------------|
|                 ...                 |
|-------------------------------------|
|                 ...                 |
|-------------------------------------|
|  redundant length |redundant bytes |
|----------------------------------------------------------BODY                 |
|                BODY                 |
|                BODYBODYBODY                 |
|_____________________________________|    _____________________
*
magic:                            8 bytes long, value is 1493475289347502L, BIT_ENDIAN
version:                          4 bytes int, BIT_ENDIAN
head length:                      4 bytes int, BIT_ENDIAN
column number:                    4 bytes int, BIT_ENDIAN
column x name:                    2 bytes short BIT_ENDIAN and Java modified-utf-8
index number:                     4 bytes int (how many column items below), BIT_ENDIAN
index name x:                     2 bytes short BIT_ENDIAN and Java modified-utf-8
start pos:                        4 bytes int, BIT_ENDIAN
length:                           4 bytes int, BIT_ENDIAN
redundant length:                 4 bytes int (for compatibility with later versions, in this version, content is zero)
redundant bytes:                  var bytes (for compatibility with later version, in this version, is empty)
BODY:                             column index bytes + column index bytes + column index bytes + .......

3.并发控制

3.1 基本概念

Apache Paimon 支持对多个并发写入作业的乐观并发控制(Optimistic Concurrency Control)。以下是对 Paimon 中乐观并发控制机制的详细解释:

乐观并发控制机制

  1. 并发写入作业:

    • Paimon 允许多个写入作业并发执行。
    • 每个作业按照自己的节奏写入数据,并在提交时基于当前快照生成一个新的快照。
  2. 快照和增量文件:

    • 作业在提交时通过应用增量文件(删除或添加文件)来生成新快照。
    • 增量文件代表了自上次快照以来数据的变化。

乐观并发控制的优点

  1. 提高并发性:

    • 乐观并发控制允许多个作业并发执行,提高了系统的并发性和吞吐量。
  2. 减少锁争用:

    • 与悲观并发控制相比,乐观并发控制减少了锁的使用,从而减少了锁争用和死锁的风险。
  3. 提高写入性能:

    • 作业可以按照自己的节奏写入数据,而不需要等待其他作业完成,从而提高了写入性能。

3.2 快照冲突

快照冲突(Snapshot Conflict):

  • 如果作业尝试提交的快照 ID 已被其他作业抢占,表已从另一个作业生成了新的快照,此时会出现快照冲突。
  • 解决方案:作业可以重新尝试提交,因为快照冲突是暂时的,重新提交通常可以解决问题。

快照ID的唯一性

  • Paimon 的每个快照都有一个唯一的ID。
  • 只要作业将其快照文件写入文件系统,就认为该作业是成功的。

文件系统重命名机制

  • Paimon 利用文件系统的重命名(renaming)机制来提交快照。
  • 对于像 HDFS(Hadoop Distributed File System)这样的文件系统,这种重命名操作是事务性的和原子性的,这意味着它要么完全成功,要么完全不发生,确保了数据的一致性。

对象存储的非原子重命名

  • 然而,对于对象存储服务,如阿里云的对象存储服务(OSS)和亚马逊的简单存储服务(S3),它们的“RENAME”操作并不具备原子语义。

  • 这意味着在重命名操作中可能会出现部分成功的情况,从而导致数据不一致或快照丢失的风险。

  • 为了解决这个问题,Paimon 需要配置 Hive 或 JDBC metastore,并启用“lock.enabled”选项。

  • 启用“lock.enabled”选项后,Paimon 将使用锁机制来确保快照提交的原子性和一致性。

  • 通过这种方式,即使在对象存储服务中,Paimon 也能确保快照提交的安全性和可靠性。

3.3 文件冲突

文件冲突(Files Conflict):

  • 如果作业想要删除的文件已被其他作业删除,此时会出现文件冲突。
  • 解决方案:对于批处理作业,遇到文件冲突时只能失败。对于流处理作业,它会失败并重新启动,故意进行一次故障转移。
    • Paimon 在提交文件删除时,实际上是进行逻辑删除,即标记文件为删除状态,而不是物理删除。
    • 冲突检查:Paimon 在提交文件删除时会检查与最新快照的冲突。如果存在冲突,意味着该文件已经被逻辑删除,此时无法继续在当前提交节点上进行操作。
    • 触发故障转移:面对冲突,Paimon 会有意触发故障转移(failover),导致作业重启。重启后,作业会从文件系统中检索最新状态,希望解决这种冲突。

  • 冲突的本质
    • 在于文件的逻辑删除,而文件的逻辑删除通常源于压缩(compaction)操作。
    • 为了解决这个问题,可以关闭写入作业的压缩功能(将 write-only 设置为 true)。
    • 同时启动一个单独的作业来专门处理压缩工作。

4. 总结

由于篇幅问题,本篇就简答的介绍了Paimon的基础概念:包括快照、分区、分桶和一致性保证等。其组成结构涵盖文件布局(如快照文件、清单文件等)、存储结构(底层采用列式文件存储和LSM树结构)、读写方式(多种读取和写入模式)以及生态系统(支持多种计算引擎读取)。

如果你想参与讨论,请 点击这里https://github.com/hiszm/BigDataWeekly,每周都有新的主题,周末或周一发布。

大数据精读,探索知识的深度。

关注 大数据精读周刊

版权声明:自由转载-非商用-非衍生-保持署名(创意共享 3.0 许可证

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部