本文用实践的方式初步探索Apache Iceberg 表的底层数据结构。由于Apache Iceberg项目在积极开发中 ,版本之间更新变动较大,本文内容可能会过时。
测试使用的版本为0.11.1,使用的集群为Docker自建,详见使用Docker搭建Hadoop + Hive + Spark集群(上) 。请注意由于Spark 3.1.1与该版本有冲突,请使用3.0.2版本的 YAML 搭建集群。
综述
Apache Iceberg 是开源的数据湖方案之一,最初由Netflix开发。它对自己的定位是一种用于大型分析数据集的开放表格式。
Apache Iceberg具有以下特色功能:
模式进化(Schema evolution)支持添加、删除、更新或重命名,并且没有副作用
隐藏分区(Hidden partitioning)防止因用户失误导致的无提示错误结果或极慢的查询
分区布局演变(Partition layout evolution)可以随着数据量或查询模式的变化更新表的布局
时间旅行(Time travel)支持使用完全相同的表快照进行重复查询,或让用户轻松检查更改信息
版本回退(Version rollback)允许用户通过将表重置为之前的状态来快速纠正问题
关于可靠性与性能方面,Apache Iceberg适用于查询大型表格,其中单个表可以包含数十 PB的数据,甚至可以在没有分布式SQL引擎的情况下读取它们,并且旨在解决最终一致的云对象存储中的正确性问题:
快速的扫描计划:不需要分布式 SQL 引擎来读取表或查找文件
高级过滤:使用表元数据、分区和列级统计信息修剪数据文件
通过避免listing和renames,适用于任何云存储并在 HDFS 中减少 NN 拥塞
可序列化隔离:表更改是原子的,读取时不会看到部分或未提交的更改
利用乐观锁控制并发写入,并会重试以确保兼容更新成功
有关存储格式方面,Apache Iceberg中的一些概念列举如下:
分区规格(Partition spec)– 定义了如何从数据文件推断分区信息
快照(Snapshot) – 表在某个时刻的状态,由清单列表定义
清单列表(Manifest list)– Avro文件,列出清单文件的列表
清单文件(Manifest)– Avro文件,列出组成某个快照的数据文件列表
数据文件(Data file)– 真实存储数据的文件
本文仅作一些简单介绍,可能对具体细节不会做过多阐释。详细内容可以参考官网 。
建表与插入数据
集群搭建好后,可以直接走官方教程 进行建表工作。在此我做了一些改动,将warehouse的位置改到了HDFS:
1 2 3 4 5 6 7 8 9 10 11 docker exec -it namenode hdfs dfs -mkdir /warehouse docker exec -it spark-master /bin/bash /spark/bin/spark-sql \ --packages org.apache.iceberg:iceberg-spark3-runtime:0.11.1 \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \ --conf spark.sql.catalog.spark_catalog.type=hive \ --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.local.type=hadoop \ --conf spark.sql.catalog.local.warehouse=hdfs://namenode:9000/warehouse
该命令会自动下载需要的jar包。你也可以去这里 下载对应的jar包上传到/spark/jars
目录下自动加载。
运行建表语句:
spark-master 1 CREATE TABLE local.db.table (id bigint , data string) USING iceberg;
另开一个终端进入namenode
看一下对应的目录:
1 2 docker exec -it namenode /bin/bash hdfs dfs -ls -R /warehouse
目前的结构如下:
1 2 3 4 db/table |- metadata |- v1.metadata.json |- version-hint.text
v1.metadata.json
信息如下。由于表刚刚建立,没有太多有用信息,重点关注一下schema
。
v1.metadata.json 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 { "format-version" : 1 , "table-uuid" : "c3ff2b29-4b09-425c-b4a5-4015d18ab70d" , "location" : "hdfs://namenode:9000/warehouse/db/table" , "last-updated-ms" : 1624994861175 , "last-column-id" : 2 , "schema" : { "type" : "struct" , "fields" : [ { "id" : 1 , "name" : "id" , "required" : false , "type" : "long" } , { "id" : 2 , "name" : "data" , "required" : false , "type" : "string" } ] } , "partition-spec" : [ ] , "default-spec-id" : 0 , "partition-specs" : [ { "spec-id" : 0 , "fields" : [ ] } ] , "default-sort-order-id" : 0 , "sort-orders" : [ { "order-id" : 0 , "fields" : [ ] } ] , "properties" : { "owner" : "root" } , "current-snapshot-id" : -1 , "snapshots" : [ ] , "snapshot-log" : [ ] , "metadata-log" : [ ] }
version-hint.text
则只包含数字1
。现在我们尝试插入数据:
spark-master 1 INSERT INTO local.db.table VALUES (1 , 'a' );
插入完毕,目前的目录结构如下:
1 2 3 4 5 6 7 8 9 db/table + |- data + |- 00000-228-02fc8f05-b942-405c-9402-513eea3c8568-00001.parquet |- metadata + |- 021ee222-2e0e-4a1c-8e12-9d01985a9788-m0.avro + |- snap-7832020346881217565-1-021ee222-2e0e-4a1c-8e12-9d01985a9788.avro |- v1.metadata.json + |- v2.metadata.json |- version-hint.text
其中data
文件夹下的自然是数据文件,我们关注一下metadata
下的文件。首先看v2.metadata.json
(仅显示关键部分):
v2.metadata.json 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 { "current-snapshot-id" : 7832020346881217565 , "snapshots" : [ { "snapshot-id" : 7832020346881217565 , "timestamp-ms" : 1624994912014 , "summary" : { "operation" : "append" , "spark.app.id" : "app-20210629175050-0005" , "added-data-files" : "1" , "added-records" : "1" , "added-files-size" : "622" , "changed-partition-count" : "1" , "total-records" : "1" , "total-data-files" : "1" , "total-delete-files" : "0" , "total-position-deletes" : "0" , "total-equality-deletes" : "0" } , "manifest-list" : "hdfs://namenode:9000/warehouse/db/table/metadata/snap-7832020346881217565-1-021ee222-2e0e-4a1c-8e12-9d01985a9788.avro" } ] , "snapshot-log" : [ { "timestamp-ms" : 1624994912014 , "snapshot-id" : 7832020346881217565 } ] , "metadata-log" : [ { "timestamp-ms" : 1624994861175 , "metadata-file" : "hdfs://namenode:9000/warehouse/db/table/metadata/v1.metadata.json" } ] }
可见本次插入加入了大量信息,包括最新的snapshot,以及snapshot和metadata的log。可以清晰地看到本次的操作是插入,以及生成的manifest-list文件位置。我们看一下这个文件(使用了外部工具avro-tools-1.10.2.jar
):
namenode 1 2 curl -O https://repo1.maven.org/maven2/org/apache/avro/avro-tools/1.10.2/avro-tools-1.10.2.jar java -jar /avro-tools-1.10.2.jar tojson hdfs://namenode:9000/warehouse/db/table/metadata/snap-7832020346881217565-1-021ee222-2e0e-4a1c-8e12-9d01985a9788.avro
snap-7832020346881217565-1-021ee222-2e0e-4a1c-8e12-9d01985a9788.avro 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 { "manifest_path" : "hdfs://namenode:9000/warehouse/db/table/metadata/021ee222-2e0e-4a1c-8e12-9d01985a9788-m0.avro" , "manifest_length" : 5567 , "partition_spec_id" : 0 , "added_snapshot_id" : { "long" : 7832020346881218000 } , "added_data_files_count" : { "int" : 1 } , "existing_data_files_count" : { "int" : 0 } , "deleted_data_files_count" : { "int" : 0 } , "partitions" : { "array" : [ ] } , "added_rows_count" : { "long" : 1 } , "existing_rows_count" : { "long" : 0 } , "deleted_rows_count" : { "long" : 0 } }
可以清晰看到本次操作增加一行,以及对应的manifest文件。查看该文件:
namenode 1 java -jar /avro-tools-1.10.2.jar tojson hdfs://namenode:9000/warehouse/db/table/metadata/021ee222-2e0e-4a1c-8e12-9d01985a9788-m0.avro
021ee222-2e0e-4a1c-8e12-9d01985a9788-m0.avro 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 { "status" : 1 , "snapshot_id" : { "long" : 7832020346881218000 } , "data_file" : { "file_path" : "hdfs://namenode:9000/warehouse/db/table/data/00000-228-02fc8f05-b942-405c-9402-513eea3c8568-00001.parquet" , "file_format" : "PARQUET" , "partition" : { } , "record_count" : 1 , "file_size_in_bytes" : 622 , "block_size_in_bytes" : 67108864 , "column_sizes" : { "array" : [ { "key" : 1 , "value" : 46 } , { "key" : 2 , "value" : 48 } ] } , "value_counts" : { "array" : [ { "key" : 1 , "value" : 1 } , { "key" : 2 , "value" : 1 } ] } , "null_value_counts" : { "array" : [ { "key" : 1 , "value" : 0 } , { "key" : 2 , "value" : 0 } ] } , "nan_value_counts" : { "array" : [ ] } , "lower_bounds" : { "array" : [ { "key" : 1 , "value" : "\u0001\u0000\u0000\u0000\u0000\u0000\u0000\u0000" } , { "key" : 2 , "value" : "a" } ] } , "upper_bounds" : { "array" : [ { "key" : 1 , "value" : "\u0001\u0000\u0000\u0000\u0000\u0000\u0000\u0000" } , { "key" : 2 , "value" : "a" } ] } , "key_metadata" : null , "split_offsets" : { "array" : [ 4 ] } } }
很明显,其中包含了数据文件的具体位置,以及一些统计信息。我们再插入一条数据,看看会有什么变化:
spark-master 1 INSERT INTO local.db.table VALUES (2 , 'b' );
1 2 3 4 5 6 7 8 9 10 11 12 13 db/table |- data |- 00000-228-02fc8f05-b942-405c-9402-513eea3c8568-00001.parquet + |- 00000-229-1a758de0-25a8-46c0-aadd-5fd80726d325-00001.parquet |- metadata |- 021ee222-2e0e-4a1c-8e12-9d01985a9788-m0.avro + |- 9f74ed0b-48aa-4425-b79c-578838e9ead2-m0.avro |- snap-7832020346881217565-1-021ee222-2e0e-4a1c-8e12-9d01985a9788.avro + |- snap-7744186654445432565-1-9f74ed0b-48aa-4425-b79c-578838e9ead2.avro |- v1.metadata.json |- v2.metadata.json + |- v3.metadata.json |- version-hint.text
与第一条数据插入后的结果十分类似,也是多了这几个文件。其中v3.metadata.json
的snapshots数组下面多了一个最新的snapshot,并指向自己的manifest-list。log数组信息也相应增多一项。值得注意的是snapshot文件里面现在有两项,分别对应本次与上一次插入。manifest文件则与第一次插入类似。
v3.metadata.json 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 { "snapshot-id" : 7744186654445432565 , "parent-snapshot-id" : 7832020346881217565 , "timestamp-ms" : 1624994985432 , "summary" : { "operation" : "append" , "spark.app.id" : "app-20210629175050-0005" , "added-data-files" : "1" , "added-records" : "1" , "added-files-size" : "622" , "changed-partition-count" : "1" , "total-records" : "2" , "total-data-files" : "2" , "total-delete-files" : "0" , "total-position-deletes" : "0" , "total-equality-deletes" : "0" } , "manifest-list" : "hdfs://namenode:9000/warehouse/db/table/metadata/snap-7744186654445432565-1-9f74ed0b-48aa-4425-b79c-578838e9ead2.avro" }
v3.metadata.json 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 { "snapshot-log" : [ { "timestamp-ms" : 1624994912014 , "snapshot-id" : 7832020346881217565 } , { "timestamp-ms" : 1624994985432 , "snapshot-id" : 7744186654445432565 } ] , "metadata-log" : [ { "timestamp-ms" : 1624994861175 , "metadata-file" : "hdfs://namenode:9000/warehouse/db/table/metadata/v1.metadata.json" } , { "timestamp-ms" : 1624994912014 , "metadata-file" : "hdfs://namenode:9000/warehouse/db/table/metadata/v2.metadata.json" } ] }
snap-7744186654445432565-1-9f74ed0b-48aa-4425-b79c-578838e9ead2.avro 1 2 3 4 5 6 7 [ { "manifest_path" : "hdfs://namenode:9000/warehouse/db/table/metadata/9f74ed0b-48aa-4425-b79c-578838e9ead2-m0.avro" } , { "manifest_path" : "hdfs://namenode:9000/warehouse/db/table/metadata/021ee222-2e0e-4a1c-8e12-9d01985a9788-m0.avro" } ]
最后spark-sql
里也支持非常多的metadata操作,列举如下:
spark-master 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 -- content int Contents of the file: 0=data, 1=position deletes, 2=equality deletes -- file_path string Location URI with FS scheme -- file_format string File format name: avro, orc, or parquet -- record_count bigint Number of records in the file -- file_size_in_bytes bigint Total file size in bytes -- column_sizes map<int,bigint> Map of column id to total size on disk -- value_counts map<int,bigint> Map of column id to total count, including null and NaN -- null_value_counts map<int,bigint> Map of column id to null value count -- nan_value_counts map<int,bigint> Map of column id to number of NaN values in the column -- lower_bounds map<int,binary> Map of column id to lower bound -- upper_bounds map<int,binary> Map of column id to upper bound -- key_metadata binary Encryption key metadata blob -- split_offsets array<bigint> Splittable offsets -- equality_ids array<int> Equality comparison field IDs SELECT * FROM local.db.table.files; 0 hdfs://namenode:9000/warehouse/db/table/data/00000-229-1a758de0-25a8-46c0-aadd-5fd80726d325-00001.parquet PARQUET 1 622 {1:46,2:48} {1:1,2:1} {1:0,2:0} {} {1:,2:b} {1:,2:b} NULL [4] NULL 0 hdfs://namenode:9000/warehouse/db/table/data/00000-228-02fc8f05-b942-405c-9402-513eea3c8568-00001.parquet PARQUET 1 622 {1:46,2:48} {1:1,2:1} {1:0,2:0} {} {1:,2:a} {1:,2:a} NULL [4] NULL
spark-master 1 2 3 4 5 6 7 8 9 -- made_current_at timestamp -- snapshot_id bigint -- parent_id bigint -- is_current_ancestor boolean SELECT * FROM local.db.table.history; 2021-06-29 19:28:32.014 7832020346881217565 NULL true 2021-06-29 19:29:45.432 7744186654445432565 7832020346881217565 true
spark-master 1 2 3 4 5 6 7 8 9 10 11 -- committed_at timestamp -- snapshot_id bigint -- parent_id bigint -- operation string -- manifest_list string -- summary map<string,string> SELECT * FROM local.db.table.snapshots; 2021-06-29 19:28:32.014 7832020346881217565 NULL append hdfs://namenode:9000/warehouse/db/table/metadata/snap-7832020346881217565-1-021ee222-2e0e-4a1c-8e12-9d01985a9788.avro {"added-data-files":"1","added-files-size":"622","added-records":"1","changed-partition-count":"1","spark.app.id":"app-20210629175050-0005","total-data-files":"1","total-delete-files":"0","total-equality-deletes":"0","total-position-deletes":"0","total-records":"1"} 2021-06-29 19:29:45.432 7744186654445432565 7832020346881217565 append hdfs://namenode:9000/warehouse/db/table/metadata/snap-7744186654445432565-1-9f74ed0b-48aa-4425-b79c-578838e9ead2.avro {"added-data-files":"1","added-files-size":"622","added-records":"1","changed-partition-count":"1","spark.app.id":"app-20210629175050-0005","total-data-files":"2","total-delete-files":"0","total-equality-deletes":"0","total-position-deletes":"0","total-records":"2"}
spark-master 1 2 3 4 5 6 7 8 9 10 11 12 13 -- path string -- length bigint -- partition_spec_id int -- added_snapshot_id bigint -- added_data_files_count int -- existing_data_files_count int -- deleted_data_files_count int -- partition_summaries array<struct<contains_null:boolean,lower_bound:string,upper_bound:string>> SELECT * FROM local.db.table.manifests; hdfs://namenode:9000/warehouse/db/table/metadata/021ee222-2e0e-4a1c-8e12-9d01985a9788-m0.avro 5565 0 7832020346881217565 1 0 [] hdfs://namenode:9000/warehouse/db/table/metadata/9f74ed0b-48aa-4425-b79c-578838e9ead2-m0.avro 5565 0 7744186654445432565 1 0 []
删除与MERGE INTO
数据
此部分语法参看此处 。首先测试DELETE:
spark-master 1 DELETE FROM local.db.table WHERE id= 1 ;
此时数据文件并未减少,metadata下面多了三个文件:v4.metadata.json
,新的snapshot与manifest。v4.metadata.json
里多了一项overwrite的snapshot。manifest-list里面有两项,分别指向了本次更新和上次更新的两个manifest文件。其中,后一个文件指向了最开始创建的数据文件,并把status设置成了2(0: EXISTING 1: ADDED 2: DELETED)。
spark-master 1 2 3 4 5 6 7 SELECT * FROM local.db.table.files; 0 hdfs://namenode:9000/warehouse/db/table/data/00000-229-1a758de0-25a8-46c0-aadd-5fd80726d325-00001.parquet PARQUET 1 622 {1:46,2:48} {1:1,2:1} {1:0,2:0} {} {1:,2:b} {1:,2:b} NULL [4] NULL SELECT * FROM local.db.table.manifests; hdfs://namenode:9000/warehouse/db/table/metadata/9f74ed0b-48aa-4425-b79c-578838e9ead2-m0.avro 5565 0 7744186654445432565 1 0 [] hdfs://namenode:9000/warehouse/db/table/metadata/1a2d0a26-6fbc-48f2-aa26-ba663bc7d675-m0.avro 5565 0 8526017697441933265 0 0 []
local.db.table.history
和local.db.table.snapshots
则均有三条记录。
随后测试MERGE INTO:
spark-master 1 2 3 4 5 6 7 8 CREATE TABLE local.db.table2 (id bigint , data string) USING iceberg;INSERT INTO local.db.table2 VALUES (2 , 'bb' );INSERT INTO local.db.table2 VALUES (3 , 'c' );MERGE INTO local.db.table t1 USING (SELECT * FROM local.db.table2) t2 on t1.id= t2.id WHEN MATCHED THEN UPDATE SET t1.data= t2.data WHEN NOT MATCHED THEN INSERT * ;
执行完毕后,多了1个数据文件、1个metadata文件、2个manifest文件与1个snapshot。新的snapshot依旧是overwrite,并且manifest-list指向的文件均为本次操作后新建的manifest。有效的数据文件变成了最新的文件:
spark-master 1 2 3 4 5 6 7 8 9 10 11 12 13 SELECT * FROM local.db.table.snapshots; ... 2021-06-29 19:34:58.439 51770443548099208 8526017697441933265 overwrite hdfs://namenode:9000/warehouse/db/table/metadata/snap-51770443548099208-1-06ee1df2-35a2-46ac-85e1-8c17ccce2829.avro {"added-data-files":"1","added-files-size":"641","added-records":"2","changed-partition-count":"1","deleted-data-files":"1","deleted-records":"1","removed-files-size":"622","spark.app.id":"app-20210629175050-0005","total-data-files":"1","total-delete-files":"0","total-equality-deletes":"0","total-position-deletes":"0","total-records":"2"} SELECT * FROM local.db.table.manifests; hdfs://namenode:9000/warehouse/db/table/metadata/06ee1df2-35a2-46ac-85e1-8c17ccce2829-m1.avro 5574 0 51770443548099208 1 0 [] hdfs://namenode:9000/warehouse/db/table/metadata/06ee1df2-35a2-46ac-85e1-8c17ccce2829-m0.avro 5565 0 51770443548099208 0 0 [] SELECT * FROM local.db.table.files; 0 hdfs://namenode:9000/warehouse/db/table/data/00000-436-6fd99e71-464a-4dec-a7dd-6c819d46a3de-00001.parquet PARQUET 2 641 {1:55,2:57} {1:2,2:2} {1:0,2:0} {} {1:,2:bb} {1:,2:c} NULL [4] NULL
快照管理
快照回退
尝试回退快照到上一个版本:
spark-master 1 CALL local.system.rollback_to_snapshot('db.table' , 8526017697441933265 );
这一步只增加了v6.metadata.json
,其他文件没有增加。这一版metadata仅仅将current-snapshot-id改成了上一版(以及增加了log)。
快照设定
再将快照设置为最新版:
spark-master 1 CALL local.system.set_current_snapshot('db.table' , 51770443548099208 );
增加了文件v7.metadata.json
,也仅仅是修改了current-snapshot-id(以及增加了log)。
移除旧快照
移除所有旧快照,只保留当前版本:
spark-master 1 2 3 CALL local.system.expire_snapshots('db.table' );2 3 3
其中最下面三个数字的含义如下:
1 2 3 deleted_data_files_count [long] Number of data files deleted by this operation deleted_manifest_files_count [long] Number of manifest files deleted by this operation deleted_manifest_lists_count [long] Number of manifest List files deleted by this operation
当前文件列表:
1 2 3 4 5 6 7 8 9 db/table |- data |- 00000-436-6fd99e71-464a-4dec-a7dd-6c819d46a3de-00001.parquet |- metadata |- 06ee1df2-35a2-46ac-85e1-8c17ccce2829-m0.avro |- 06ee1df2-35a2-46ac-85e1-8c17ccce2829-m1.avro |- snap-51770443548099208-1-06ee1df2-35a2-46ac-85e1-8c17ccce2829.avro |- [v1-v8].metadata.json |- version-hint.text
清单重写
执行此命令重写清单:
spark-master 1 2 3 CALL local.system.rewrite_manifests('db.table' );2 1
数字含义如下:
1 2 rewritten_manifests_count [int] Number of manifests which were re-written by this command added_mainfests_count [int] Number of new manifest files which were written by this command
这会生成新的metadata、snapshot与manifest。当前snapshot所指向的manifest文件从两个变成了一个。新的snapshot操作为replace:
spark-master 1 2 3 SELECT * FROM local.db.table.snapshots; 2021-06-29 22:59:18.518 298244000757312851 51770443548099208 replace hdfs://namenode:9000/warehouse/db/table/metadata/snap-298244000757312851-1-09c08664-8c89-4d75-be54-93b8ba21b215.avro {"changed-partition-count":"0","entries-processed":"0","manifests-created":"1","manifests-kept":"0","manifests-replaced":"2","total-data-files":"1","total-delete-files":"0","total-equality-deletes":"0","total-position-deletes":"0","total-records":"2"}
至此我们简单探索了Apache Iceberg 0.11.1中表的底层数据结构。该项目更新较为频繁,很多功能可能需要自己编译源码才可以使用,在0.12版本里metadata也会做一些改动,因此请以官方最新文档为准。