Apache Iceberg表格底层数据结构初探

本文用实践的方式初步探索Apache Iceberg表的底层数据结构。由于Apache Iceberg项目在积极开发中,版本之间更新变动较大,本文内容可能会过时。

测试使用的版本为0.11.1,使用的集群为Docker自建,详见使用Docker搭建Hadoop + Hive + Spark集群(上)。请注意由于Spark 3.1.1与该版本有冲突,请使用3.0.2版本的 YAML 搭建集群。

综述

Apache Iceberg是开源的数据湖方案之一,最初由Netflix开发。它对自己的定位是一种用于大型分析数据集的开放表格式。

Apache Iceberg具有以下特色功能:

  • 模式进化(Schema evolution)支持添加、删除、更新或重命名,并且没有副作用
  • 隐藏分区(Hidden partitioning)防止因用户失误导致的无提示错误结果或极慢的查询
  • 分区布局演变(Partition layout evolution)可以随着数据量或查询模式的变化更新表的布局
  • 时间旅行(Time travel)支持使用完全相同的表快照进行重复查询,或让用户轻松检查更改信息
  • 版本回退(Version rollback)允许用户通过将表重置为之前的状态来快速纠正问题

关于可靠性与性能方面,Apache Iceberg适用于查询大型表格,其中单个表可以包含数十 PB的数据,甚至可以在没有分布式SQL引擎的情况下读取它们,并且旨在解决最终一致的云对象存储中的正确性问题:

  • 快速的扫描计划:不需要分布式 SQL 引擎来读取表或查找文件
  • 高级过滤:使用表元数据、分区和列级统计信息修剪数据文件
  • 通过避免listing和renames,适用于任何云存储并在 HDFS 中减少 NN 拥塞
  • 可序列化隔离:表更改是原子的,读取时不会看到部分或未提交的更改
  • 利用乐观锁控制并发写入,并会重试以确保兼容更新成功

有关存储格式方面,Apache Iceberg中的一些概念列举如下:

  • 分区规格(Partition spec)– 定义了如何从数据文件推断分区信息
  • 快照(Snapshot) – 表在某个时刻的状态,由清单列表定义
  • 清单列表(Manifest list)– Avro文件,列出清单文件的列表
  • 清单文件(Manifest)– Avro文件,列出组成某个快照的数据文件列表
  • 数据文件(Data file)– 真实存储数据的文件

本文仅作一些简单介绍,可能对具体细节不会做过多阐释。详细内容可以参考官网

建表与插入数据

集群搭建好后,可以直接走官方教程进行建表工作。在此我做了一些改动,将warehouse的位置改到了HDFS:

1
2
3
4
5
6
7
8
9
10
11
docker exec -it namenode hdfs dfs -mkdir /warehouse
docker exec -it spark-master /bin/bash

/spark/bin/spark-sql \
--packages org.apache.iceberg:iceberg-spark3-runtime:0.11.1 \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
--conf spark.sql.catalog.spark_catalog.type=hive \
--conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.local.type=hadoop \
--conf spark.sql.catalog.local.warehouse=hdfs://namenode:9000/warehouse

该命令会自动下载需要的jar包。你也可以去这里下载对应的jar包上传到/spark/jars目录下自动加载。

运行建表语句:

spark-master
1
CREATE TABLE local.db.table (id bigint, data string) USING iceberg;

另开一个终端进入namenode看一下对应的目录:

1
2
docker exec -it namenode /bin/bash
hdfs dfs -ls -R /warehouse

目前的结构如下:

1
2
3
4
db/table
|- metadata
|- v1.metadata.json
|- version-hint.text

v1.metadata.json信息如下。由于表刚刚建立,没有太多有用信息,重点关注一下schema

v1.metadata.json
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
{
"format-version" : 1,
"table-uuid" : "c3ff2b29-4b09-425c-b4a5-4015d18ab70d",
"location" : "hdfs://namenode:9000/warehouse/db/table",
"last-updated-ms" : 1624994861175,
"last-column-id" : 2,
"schema" : {
"type" : "struct",
"fields" : [ {
"id" : 1,
"name" : "id",
"required" : false,
"type" : "long"
}, {
"id" : 2,
"name" : "data",
"required" : false,
"type" : "string"
} ]
},
"partition-spec" : [ ],
"default-spec-id" : 0,
"partition-specs" : [ {
"spec-id" : 0,
"fields" : [ ]
} ],
"default-sort-order-id" : 0,
"sort-orders" : [ {
"order-id" : 0,
"fields" : [ ]
} ],
"properties" : {
"owner" : "root"
},
"current-snapshot-id" : -1,
"snapshots" : [ ],
"snapshot-log" : [ ],
"metadata-log" : [ ]
}

version-hint.text则只包含数字1。现在我们尝试插入数据:

spark-master
1
INSERT INTO local.db.table VALUES (1, 'a');

插入完毕,目前的目录结构如下:

1
2
3
4
5
6
7
8
9
db/table
+ |- data
+ |- 00000-228-02fc8f05-b942-405c-9402-513eea3c8568-00001.parquet
|- metadata
+ |- 021ee222-2e0e-4a1c-8e12-9d01985a9788-m0.avro
+ |- snap-7832020346881217565-1-021ee222-2e0e-4a1c-8e12-9d01985a9788.avro
|- v1.metadata.json
+ |- v2.metadata.json
|- version-hint.text

其中data文件夹下的自然是数据文件,我们关注一下metadata下的文件。首先看v2.metadata.json(仅显示关键部分):

v2.metadata.json
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
{
"current-snapshot-id" : 7832020346881217565,
"snapshots" : [ {
"snapshot-id" : 7832020346881217565,
"timestamp-ms" : 1624994912014,
"summary" : {
"operation" : "append",
"spark.app.id" : "app-20210629175050-0005",
"added-data-files" : "1",
"added-records" : "1",
"added-files-size" : "622",
"changed-partition-count" : "1",
"total-records" : "1",
"total-data-files" : "1",
"total-delete-files" : "0",
"total-position-deletes" : "0",
"total-equality-deletes" : "0"
},
"manifest-list" : "hdfs://namenode:9000/warehouse/db/table/metadata/snap-7832020346881217565-1-021ee222-2e0e-4a1c-8e12-9d01985a9788.avro"
} ],
"snapshot-log" : [ {
"timestamp-ms" : 1624994912014,
"snapshot-id" : 7832020346881217565
} ],
"metadata-log" : [ {
"timestamp-ms" : 1624994861175,
"metadata-file" : "hdfs://namenode:9000/warehouse/db/table/metadata/v1.metadata.json"
} ]
}

可见本次插入加入了大量信息,包括最新的snapshot,以及snapshot和metadata的log。可以清晰地看到本次的操作是插入,以及生成的manifest-list文件位置。我们看一下这个文件(使用了外部工具avro-tools-1.10.2.jar):

namenode
1
2
curl -O https://repo1.maven.org/maven2/org/apache/avro/avro-tools/1.10.2/avro-tools-1.10.2.jar
java -jar /avro-tools-1.10.2.jar tojson hdfs://namenode:9000/warehouse/db/table/metadata/snap-7832020346881217565-1-021ee222-2e0e-4a1c-8e12-9d01985a9788.avro
snap-7832020346881217565-1-021ee222-2e0e-4a1c-8e12-9d01985a9788.avro
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
{
"manifest_path": "hdfs://namenode:9000/warehouse/db/table/metadata/021ee222-2e0e-4a1c-8e12-9d01985a9788-m0.avro",
"manifest_length": 5567,
"partition_spec_id": 0,
"added_snapshot_id": {
"long": 7832020346881218000
},
"added_data_files_count": {
"int": 1
},
"existing_data_files_count": {
"int": 0
},
"deleted_data_files_count": {
"int": 0
},
"partitions": {
"array": []
},
"added_rows_count": {
"long": 1
},
"existing_rows_count": {
"long": 0
},
"deleted_rows_count": {
"long": 0
}
}

可以清晰看到本次操作增加一行,以及对应的manifest文件。查看该文件:

namenode
1
java -jar /avro-tools-1.10.2.jar tojson hdfs://namenode:9000/warehouse/db/table/metadata/021ee222-2e0e-4a1c-8e12-9d01985a9788-m0.avro
021ee222-2e0e-4a1c-8e12-9d01985a9788-m0.avro
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
{
"status": 1,
"snapshot_id": {
"long": 7832020346881218000
},
"data_file": {
"file_path": "hdfs://namenode:9000/warehouse/db/table/data/00000-228-02fc8f05-b942-405c-9402-513eea3c8568-00001.parquet",
"file_format": "PARQUET",
"partition": {},
"record_count": 1,
"file_size_in_bytes": 622,
"block_size_in_bytes": 67108864,
"column_sizes": {
"array": [
{
"key": 1,
"value": 46
},
{
"key": 2,
"value": 48
}
]
},
"value_counts": {
"array": [
{
"key": 1,
"value": 1
},
{
"key": 2,
"value": 1
}
]
},
"null_value_counts": {
"array": [
{
"key": 1,
"value": 0
},
{
"key": 2,
"value": 0
}
]
},
"nan_value_counts": {
"array": []
},
"lower_bounds": {
"array": [
{
"key": 1,
"value": "\u0001\u0000\u0000\u0000\u0000\u0000\u0000\u0000"
},
{
"key": 2,
"value": "a"
}
]
},
"upper_bounds": {
"array": [
{
"key": 1,
"value": "\u0001\u0000\u0000\u0000\u0000\u0000\u0000\u0000"
},
{
"key": 2,
"value": "a"
}
]
},
"key_metadata": null,
"split_offsets": {
"array": [
4
]
}
}
}

很明显,其中包含了数据文件的具体位置,以及一些统计信息。我们再插入一条数据,看看会有什么变化:

spark-master
1
INSERT INTO local.db.table VALUES (2, 'b');
1
2
3
4
5
6
7
8
9
10
11
12
13
db/table
|- data
|- 00000-228-02fc8f05-b942-405c-9402-513eea3c8568-00001.parquet
+ |- 00000-229-1a758de0-25a8-46c0-aadd-5fd80726d325-00001.parquet
|- metadata
|- 021ee222-2e0e-4a1c-8e12-9d01985a9788-m0.avro
+ |- 9f74ed0b-48aa-4425-b79c-578838e9ead2-m0.avro
|- snap-7832020346881217565-1-021ee222-2e0e-4a1c-8e12-9d01985a9788.avro
+ |- snap-7744186654445432565-1-9f74ed0b-48aa-4425-b79c-578838e9ead2.avro
|- v1.metadata.json
|- v2.metadata.json
+ |- v3.metadata.json
|- version-hint.text

与第一条数据插入后的结果十分类似,也是多了这几个文件。其中v3.metadata.json的snapshots数组下面多了一个最新的snapshot,并指向自己的manifest-list。log数组信息也相应增多一项。值得注意的是snapshot文件里面现在有两项,分别对应本次与上一次插入。manifest文件则与第一次插入类似。

v3.metadata.json
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
{
"snapshot-id" : 7744186654445432565,
"parent-snapshot-id" : 7832020346881217565,
"timestamp-ms" : 1624994985432,
"summary" : {
"operation" : "append",
"spark.app.id" : "app-20210629175050-0005",
"added-data-files" : "1",
"added-records" : "1",
"added-files-size" : "622",
"changed-partition-count" : "1",
"total-records" : "2",
"total-data-files" : "2",
"total-delete-files" : "0",
"total-position-deletes" : "0",
"total-equality-deletes" : "0"
},
"manifest-list" : "hdfs://namenode:9000/warehouse/db/table/metadata/snap-7744186654445432565-1-9f74ed0b-48aa-4425-b79c-578838e9ead2.avro"
}
v3.metadata.json
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{
"snapshot-log" : [ {
"timestamp-ms" : 1624994912014,
"snapshot-id" : 7832020346881217565
}, {
"timestamp-ms" : 1624994985432,
"snapshot-id" : 7744186654445432565
} ],
"metadata-log" : [ {
"timestamp-ms" : 1624994861175,
"metadata-file" : "hdfs://namenode:9000/warehouse/db/table/metadata/v1.metadata.json"
}, {
"timestamp-ms" : 1624994912014,
"metadata-file" : "hdfs://namenode:9000/warehouse/db/table/metadata/v2.metadata.json"
} ]
}
snap-7744186654445432565-1-9f74ed0b-48aa-4425-b79c-578838e9ead2.avro
1
2
3
4
5
6
7
[
{
"manifest_path":"hdfs://namenode:9000/warehouse/db/table/metadata/9f74ed0b-48aa-4425-b79c-578838e9ead2-m0.avro"
}, {
"manifest_path":"hdfs://namenode:9000/warehouse/db/table/metadata/021ee222-2e0e-4a1c-8e12-9d01985a9788-m0.avro"
}
]

最后spark-sql里也支持非常多的metadata操作,列举如下:

spark-master
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
-- content int     Contents of the file: 0=data, 1=position deletes, 2=equality deletes
-- file_path string Location URI with FS scheme
-- file_format string File format name: avro, orc, or parquet
-- record_count bigint Number of records in the file
-- file_size_in_bytes bigint Total file size in bytes
-- column_sizes map<int,bigint> Map of column id to total size on disk
-- value_counts map<int,bigint> Map of column id to total count, including null and NaN
-- null_value_counts map<int,bigint> Map of column id to null value count
-- nan_value_counts map<int,bigint> Map of column id to number of NaN values in the column
-- lower_bounds map<int,binary> Map of column id to lower bound
-- upper_bounds map<int,binary> Map of column id to upper bound
-- key_metadata binary Encryption key metadata blob
-- split_offsets array<bigint> Splittable offsets
-- equality_ids array<int> Equality comparison field IDs

SELECT * FROM local.db.table.files;

0 hdfs://namenode:9000/warehouse/db/table/data/00000-229-1a758de0-25a8-46c0-aadd-5fd80726d325-00001.parquet PARQUET 1 622 {1:46,2:48} {1:1,2:1} {1:0,2:0} {} {1:,2:b} {1:,2:b} NULL [4] NULL
0 hdfs://namenode:9000/warehouse/db/table/data/00000-228-02fc8f05-b942-405c-9402-513eea3c8568-00001.parquet PARQUET 1 622 {1:46,2:48} {1:1,2:1} {1:0,2:0} {} {1:,2:a} {1:,2:a} NULL [4] NULL
spark-master
1
2
3
4
5
6
7
8
9
-- made_current_at timestamp
-- snapshot_id bigint
-- parent_id bigint
-- is_current_ancestor boolean

SELECT * FROM local.db.table.history;

2021-06-29 19:28:32.014 7832020346881217565 NULL true
2021-06-29 19:29:45.432 7744186654445432565 7832020346881217565 true
spark-master
1
2
3
4
5
6
7
8
9
10
11
-- committed_at    timestamp
-- snapshot_id bigint
-- parent_id bigint
-- operation string
-- manifest_list string
-- summary map<string,string>

SELECT * FROM local.db.table.snapshots;

2021-06-29 19:28:32.014 7832020346881217565 NULL append hdfs://namenode:9000/warehouse/db/table/metadata/snap-7832020346881217565-1-021ee222-2e0e-4a1c-8e12-9d01985a9788.avro {"added-data-files":"1","added-files-size":"622","added-records":"1","changed-partition-count":"1","spark.app.id":"app-20210629175050-0005","total-data-files":"1","total-delete-files":"0","total-equality-deletes":"0","total-position-deletes":"0","total-records":"1"}
2021-06-29 19:29:45.432 7744186654445432565 7832020346881217565 append hdfs://namenode:9000/warehouse/db/table/metadata/snap-7744186654445432565-1-9f74ed0b-48aa-4425-b79c-578838e9ead2.avro {"added-data-files":"1","added-files-size":"622","added-records":"1","changed-partition-count":"1","spark.app.id":"app-20210629175050-0005","total-data-files":"2","total-delete-files":"0","total-equality-deletes":"0","total-position-deletes":"0","total-records":"2"}
spark-master
1
2
3
4
5
6
7
8
9
10
11
12
13
-- path    string
-- length bigint
-- partition_spec_id int
-- added_snapshot_id bigint
-- added_data_files_count int
-- existing_data_files_count int
-- deleted_data_files_count int
-- partition_summaries array<struct<contains_null:boolean,lower_bound:string,upper_bound:string>>

SELECT * FROM local.db.table.manifests;

hdfs://namenode:9000/warehouse/db/table/metadata/021ee222-2e0e-4a1c-8e12-9d01985a9788-m0.avro 5565 0 7832020346881217565 1 0 []
hdfs://namenode:9000/warehouse/db/table/metadata/9f74ed0b-48aa-4425-b79c-578838e9ead2-m0.avro 5565 0 7744186654445432565 1 0 []

删除与MERGE INTO数据

此部分语法参看此处。首先测试DELETE:

spark-master
1
DELETE FROM local.db.table WHERE id=1;

此时数据文件并未减少,metadata下面多了三个文件:v4.metadata.json,新的snapshot与manifest。v4.metadata.json里多了一项overwrite的snapshot。manifest-list里面有两项,分别指向了本次更新和上次更新的两个manifest文件。其中,后一个文件指向了最开始创建的数据文件,并把status设置成了2(0: EXISTING 1: ADDED 2: DELETED)。

spark-master
1
2
3
4
5
6
7
SELECT * FROM local.db.table.files;

0 hdfs://namenode:9000/warehouse/db/table/data/00000-229-1a758de0-25a8-46c0-aadd-5fd80726d325-00001.parquet PARQUET 1 622 {1:46,2:48} {1:1,2:1} {1:0,2:0} {} {1:,2:b} {1:,2:b} NULL [4] NULL

SELECT * FROM local.db.table.manifests;
hdfs://namenode:9000/warehouse/db/table/metadata/9f74ed0b-48aa-4425-b79c-578838e9ead2-m0.avro 5565 0 7744186654445432565 1 0 []
hdfs://namenode:9000/warehouse/db/table/metadata/1a2d0a26-6fbc-48f2-aa26-ba663bc7d675-m0.avro 5565 0 8526017697441933265 0 0 []

local.db.table.historylocal.db.table.snapshots则均有三条记录。

随后测试MERGE INTO:

spark-master
1
2
3
4
5
6
7
8
CREATE TABLE local.db.table2 (id bigint, data string) USING iceberg;
INSERT INTO local.db.table2 VALUES (2, 'bb');
INSERT INTO local.db.table2 VALUES (3, 'c');

MERGE INTO local.db.table t1 USING (SELECT * FROM local.db.table2) t2 on t1.id=t2.id
WHEN MATCHED THEN UPDATE SET t1.data=t2.data
WHEN NOT MATCHED THEN INSERT *
;

执行完毕后,多了1个数据文件、1个metadata文件、2个manifest文件与1个snapshot。新的snapshot依旧是overwrite,并且manifest-list指向的文件均为本次操作后新建的manifest。有效的数据文件变成了最新的文件:

spark-master
1
2
3
4
5
6
7
8
9
10
11
12
13
SELECT * FROM local.db.table.snapshots;

...
2021-06-29 19:34:58.439 51770443548099208 8526017697441933265 overwrite hdfs://namenode:9000/warehouse/db/table/metadata/snap-51770443548099208-1-06ee1df2-35a2-46ac-85e1-8c17ccce2829.avro {"added-data-files":"1","added-files-size":"641","added-records":"2","changed-partition-count":"1","deleted-data-files":"1","deleted-records":"1","removed-files-size":"622","spark.app.id":"app-20210629175050-0005","total-data-files":"1","total-delete-files":"0","total-equality-deletes":"0","total-position-deletes":"0","total-records":"2"}

SELECT * FROM local.db.table.manifests;

hdfs://namenode:9000/warehouse/db/table/metadata/06ee1df2-35a2-46ac-85e1-8c17ccce2829-m1.avro 5574 0 51770443548099208 1 0 []
hdfs://namenode:9000/warehouse/db/table/metadata/06ee1df2-35a2-46ac-85e1-8c17ccce2829-m0.avro 5565 0 51770443548099208 0 0 []

SELECT * FROM local.db.table.files;

0 hdfs://namenode:9000/warehouse/db/table/data/00000-436-6fd99e71-464a-4dec-a7dd-6c819d46a3de-00001.parquet PARQUET 2 641 {1:55,2:57} {1:2,2:2} {1:0,2:0} {} {1:,2:bb} {1:,2:c} NULL [4] NULL

快照管理

快照回退

尝试回退快照到上一个版本:

spark-master
1
CALL local.system.rollback_to_snapshot('db.table', 8526017697441933265);

这一步只增加了v6.metadata.json,其他文件没有增加。这一版metadata仅仅将current-snapshot-id改成了上一版(以及增加了log)。

快照设定

再将快照设置为最新版:

spark-master
1
CALL local.system.set_current_snapshot('db.table', 51770443548099208);

增加了文件v7.metadata.json,也仅仅是修改了current-snapshot-id(以及增加了log)。

移除旧快照

移除所有旧快照,只保留当前版本:

spark-master
1
2
3
CALL local.system.expire_snapshots('db.table');

2 3 3

其中最下面三个数字的含义如下:

1
2
3
deleted_data_files_count [long] Number of data files deleted by this operation
deleted_manifest_files_count [long] Number of manifest files deleted by this operation
deleted_manifest_lists_count [long] Number of manifest List files deleted by this operation

当前文件列表:

1
2
3
4
5
6
7
8
9
db/table
|- data
|- 00000-436-6fd99e71-464a-4dec-a7dd-6c819d46a3de-00001.parquet
|- metadata
|- 06ee1df2-35a2-46ac-85e1-8c17ccce2829-m0.avro
|- 06ee1df2-35a2-46ac-85e1-8c17ccce2829-m1.avro
|- snap-51770443548099208-1-06ee1df2-35a2-46ac-85e1-8c17ccce2829.avro
|- [v1-v8].metadata.json
|- version-hint.text

清单重写

执行此命令重写清单:

spark-master
1
2
3
CALL local.system.rewrite_manifests('db.table');

2 1

数字含义如下:

1
2
rewritten_manifests_count [int] Number of manifests which were re-written by this command
added_mainfests_count [int] Number of new manifest files which were written by this command

这会生成新的metadata、snapshot与manifest。当前snapshot所指向的manifest文件从两个变成了一个。新的snapshot操作为replace:

spark-master
1
2
3
SELECT * FROM local.db.table.snapshots;

2021-06-29 22:59:18.518 298244000757312851 51770443548099208 replace hdfs://namenode:9000/warehouse/db/table/metadata/snap-298244000757312851-1-09c08664-8c89-4d75-be54-93b8ba21b215.avro {"changed-partition-count":"0","entries-processed":"0","manifests-created":"1","manifests-kept":"0","manifests-replaced":"2","total-data-files":"1","total-delete-files":"0","total-equality-deletes":"0","total-position-deletes":"0","total-records":"2"}

至此我们简单探索了Apache Iceberg 0.11.1中表的底层数据结构。该项目更新较为频繁,很多功能可能需要自己编译源码才可以使用,在0.12版本里metadata也会做一些改动,因此请以官方最新文档为准。