Apache Hudi表格底层数据结构初探

本文用实践的方式初步探索Apache HudiGitHub项目地址)表的底层数据结构。测试使用的版本为0.8.0,使用的集群为Docker自建,详见使用Docker搭建Hadoop + Hive + Spark集群(上)。你也可以使用Apache Hudi官方的Docker Demo

综述

Apache Hudi是开源的数据湖方案之一,最初由Uber开发。它为现有的数据湖架构做出了一定的改进,包括:

  • 快速,可插拔索引的UPSERT支持
  • 原子化更新数据与回滚
  • 数据写入与查询之间的快照隔离
  • 可用于数据恢复的保存点
  • 利用统计信息管理文件大小与布局
  • 行列数据的异步压缩
  • 可跟踪血统的时间轴元数据
  • 通过聚类优化数据湖布局

Apache Hudi支持 Spark 2.4.3+ 以及 3.x 版本。以下根据官网说明做一些简单介绍。

Apache Hudi的核心是维护在不同即时时间(instant)在表格上执行的所有操作的时间轴(timeline),这有助于提供表格的即时视图,同时还有效地支持按到达顺序检索数据。Hudi instant构成的组件如下:

  • Instant action:对表格执行的操作类型
  • Instant time:通常是一个时间戳,按照动作开始时间的顺序单调增加
  • State:当前instant的状态

关键的操作(action)则包括:

  • COMMITS:将数据批量原子写入表
  • CLEANS:清除不再需要的旧文件的后台活动
  • DELTA_COMMIT:将数据批量原子写入 MergeOnRead 表中,部分或所有数据可以仅写入增量日志
  • COMPACTION:协调 Hudi 中差异数据结构的后台活动,例如将更新从基于行的日志文件移动到列格式
  • ROLLBACK:表示COMMITS或DELTA_COMMIT提交失败,进行回滚并清除中间文件
  • SAVEPOINT:将某些文件组标记为已保存,以免被清理程序删除

任何给定的instant可以处于下列状态之一:

  • REQUESTED:表示一个操作已被安排,但尚未启动
  • INFLIGHT:表示当前正在执行操作
  • COMPLETED:表示完成了时间轴上的一个操作

与Apache Iceberg和Delta Lake类似,Apache Hudi的存储也与Hive类似,以partition作为分隔(如有)。在每个partition内,文件会被标识为文件组,并被分配ID。每个文件组包含多个文件切片,其中每个切片包含在某个提交/压缩即时时间(instant)生成的基本列文件(*.parquet),以及记录自生成基本文件以来对基本文件的插入/更新的日志文件集(*.log.*)。此外,Hudi通过索引机制将给定的hoodie key(record key + partition path)映射到文件组,从而提供高效的UPSERT。这种映射一旦将首个版本的记录写入文件后就不会再改变。简而言之,映射的文件组包含一组记录的所有版本。

建表与插入数据

集群搭建好后,可以直接走官方教程进行建表工作:

1
2
3
4
5
6
docker exec -it namenode hdfs dfs -mkdir /hudi
docker exec -it spark-master /bin/bash

/spark/bin/spark-shell \
--packages org.apache.hudi:hudi-spark3-bundle_2.12:0.8.0,org.apache.spark:spark-avro_2.12:3.1.1 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

该命令会自动下载需要的jar包。你也可以去这里下载对应的jar包上传到/spark/jars目录下自动加载。

运行以下语句:

spark-master
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
:paste
// Entering paste mode (ctrl-D to finish)

import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import spark.implicits._

val df1 = Seq(
(1, "a"),
(2, "b"),
(3, "c")
).toDF("id", "name")

df1.write.format("hudi")
.option(TABLE_NAME, "table")
.option(PRECOMBINE_FIELD_OPT_KEY, "id")
.option(RECORDKEY_FIELD_OPT_KEY, "id")
.mode(Overwrite)
.save("hdfs://namenode:9000/hudi/table")

// Exiting paste mode, now interpreting.

这其中,PRECOMBINE_FIELD_OPT_KEY代表了数据遇到相同的key时的组合逻辑。RECORDKEY_FIELD_OPT_KEY则代表记录键,即hoodie key中record key部分的值。

另开一个终端进入namenode看一下对应的目录:

1
2
docker exec -it namenode /bin/bash
hdfs dfs -ls -R /hudi/table

目前的结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
table
|- .hoodie
|- .aux
|- .bootstrap
|- .fileids
|_ .partitions
|- .temp
|- 20210707004504.commit
|- 20210707004504.commit.requested
|- 20210707004504.inflight
|- archived
|- hoodie.properties
|- default
|- .hoodie_partition_metadata
|- de86200e-81ea-4936-b84c-d9c493e9ddea-0_0-21-12006_20210707004504.parquet

其中除了数据文件外,有内容的文件为.hoodie_partition_metadatahoodie.properties.inflight.commit文件。我们注意查看一下里面的内容:

.hoodie_partition_metadata
1
2
3
4
#partition metadata
#Wed Jul 07 00:47:02 GMT 2021
commitTime=20210707004504
partitionDepth=1

由于我们没有partition,所以该文件的内容极其简单。再看一下properties:

hoodie.properties
1
2
3
4
5
6
7
8
#Properties saved on Wed Jul 07 00:45:05 GMT 2021
#Wed Jul 07 00:45:05 GMT 2021
hoodie.table.precombine.field=id
hoodie.table.name=table
hoodie.archivelog.folder=archived
hoodie.table.type=COPY_ON_WRITE
hoodie.table.version=1
hoodie.timeline.layout.version=1

这里面存了一些与表有关的参数,除了表名、当前版本、log存档位置之外,重要的一点是表明这张表是COPY_ON_WRITE表。

.commit.inflight文件结构非常类似,因为INFLIGHT本来就是表示未执行完毕的操作,相当于在COMMIT之前的中间状态。因此,下面带减号的红色部分表示.inflight文件的内容,带加号的绿色部分表示.commit不同的文件内容:

20210706205902.inflight|.commit
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
{
"partitionToWriteStats" : {
"default" : [ {
- "fileId" : "",
+ "fileId" : "de86200e-81ea-4936-b84c-d9c493e9ddea-0",
- "path" : null,
+ "path" : "default/de86200e-81ea-4936-b84c-d9c493e9ddea-0_0-21-12006_20210707004504.parquet",
"prevCommit" : "null",
- "numWrites" : 0,
+ "numWrites" : 3,
"numDeletes" : 0,
"numUpdateWrites" : 0,
"numInserts" : 3,
- "totalWriteBytes" : 0,
+ "totalWriteBytes" : 434285,
"totalWriteErrors" : 0,
"tempPath" : null,
- "partitionPath" : null,
+ "partitionPath" : "default",
"totalLogRecords" : 0,
"totalLogFilesCompacted" : 0,
"totalLogSizeCompacted" : 0,
"totalUpdatedRecordsCompacted" : 0,
"totalLogBlocks" : 0,
"totalCorruptLogBlock" : 0,
"totalRollbackBlocks" : 0,
- "fileSizeInBytes" : 0,
+ "fileSizeInBytes" : 434285,
"minEventTime" : null,
"maxEventTime" : null
} ]
},
"compacted" : false,
- "extraMetadata" : { },
+ "extraMetadata" : {
+ "schema" : "{\"type\":\"record\",\"name\":\"table_record\",\"namespace\":\"hoodie.table\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":[\"string\",\"null\"]}]}"
+ },
"operationType" : "UPSERT",
"fileIdAndRelativePaths" : {
- "" : null
+ "de86200e-81ea-4936-b84c-d9c493e9ddea-0" : "default/de86200e-81ea-4936-b84c-d9c493e9ddea-0_0-21-12006_20210707004504.parquet"
},
"writePartitionPaths" : [ "default" ],
"totalRecordsDeleted" : 0,
"totalLogRecordsCompacted" : 0,
"totalLogFilesCompacted" : 0,
"totalCompactedRecordsUpdated" : 0,
"totalLogFilesSize" : 0,
"totalScanTime" : 0,
- "totalCreateTime" : 0,
+ "totalCreateTime" : 1293,
"totalUpsertTime" : 0,
"minAndMaxEventTime" : {
"Optional.empty" : {
"val" : null,
"present" : false
}
}
}

可以看到COMMIT后多了写入的数据信息、schema信息与文件位置信息等。

我们知道上面的表是COPY_ON_WRITE(COW)类型,我们继续建一个MERGE_ON_READ(MOR)类型的表:

spark-master
1
2
3
4
5
6
7
8
9
10
11
12
:paste
// Entering paste mode (ctrl-D to finish)

df1.write.format("hudi")
.option(TABLE_NAME, "mtable")
.option(TABLE_TYPE_OPT_KEY, MOR_TABLE_TYPE_OPT_VAL)
.option(PRECOMBINE_FIELD_OPT_KEY, "id")
.option(RECORDKEY_FIELD_OPT_KEY, "id")
.mode(Overwrite)
.save("hdfs://namenode:9000/hudi/mtable")

// Exiting paste mode, now interpreting.

该表的目录结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
mtable
|- .hoodie
|- .aux
|- .bootstrap
|- .fileids
|_ .partitions
|- .temp
|- 20210707005311.deltacommit
|- 20210707005311.deltacommit.inflight
|- 20210707005311.deltacommit.requested
|- archived
|- hoodie.properties
|- default
|- .hoodie_partition_metadata
|- 226f829b-7de5-438a-aca2-db4076f777f4-0_0-52-24017_20210707005311.parquet

hoodie.properties的内容有所不同:

hoodie.properties
1
2
3
4
5
6
7
8
9
#Properties saved on Wed Jul 07 00:53:11 GMT 2021
#Wed Jul 07 00:53:11 GMT 2021
hoodie.compaction.payload.class=org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
hoodie.table.precombine.field=id
hoodie.table.name=mtable
hoodie.archivelog.folder=archived
hoodie.table.type=MERGE_ON_READ
hoodie.table.version=1
hoodie.timeline.layout.version=1

这里显示表的类型为MERGE_ON_READ。最重要的不同在于,COW表里的commit文件在MOR表里为deltacommit。但是由于目前只是刚刚建表并插入数据,所以MOR表的deltacommit文件与COW表的commit文件并无结构上的不同。

至于其余的文件,.aux文件夹内主要存放追踪COMPACTION操作所需的辅助文件夹(包括统计数据和未来的任意元数据),.bootstrap内则主要是存放与引导操作(将已有的表转化为Hudi表)有关的文件。.temp则是完成写入Hudi表所需的临时文件夹。

Copy on Write 与 Merge on Read

Copy on Write(COW)与Merge on Read(MOR)表的区别列举如下[1]

  • 写时复制(COW):数据以列式 (Parquet) 存储,每次更新都会在写入过程中创建一个新版本的文件。COW 是默认存储类型。
  • 读时合并(MOR):使用列式 (Parquet) 和 行式 (Avro) 的组合存储数据。更新会记录到基于行的增量文件中,并根据需要进行压缩以创建新版本的列式文件。

对于 COW 数据集,每次记录更新时,包含该记录的文件都会被重写以反映更新后的值。对于 MOR 数据集,每次有更新时,Hudi 只写入有更改记录的行。 MOR 更适合读取较少,但写入或更新密集的情况。COW 则更适合于数据变化较少,但读取密集的情况。

两种表的权衡项目如下[2]

权衡 COW MOR
数据延迟 更高 更低
更新消耗(I/O) 更高(重写整个Parquet) 更低(追加到增量日志)
Parquet文件大小 更小(更新代价高) 更大(更新代价低)
写放大 更高 更低(取决于压缩策略)

更新数据

我们尝试在两种表里更新数据:

spark-master
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
:paste
// Entering paste mode (ctrl-D to finish)

val df2 = Seq(
(2, "bb"),
(3, "cc")
).toDF("id", "name")

df2.write.format("hudi")
.option(TABLE_NAME, "table")
.option(PRECOMBINE_FIELD_OPT_KEY, "id")
.option(RECORDKEY_FIELD_OPT_KEY, "id")
.mode(Append)
.save("hdfs://namenode:9000/hudi/table")

df2.write.format("hudi")
.option(TABLE_NAME, "mtable")
.option(TABLE_TYPE_OPT_KEY, MOR_TABLE_TYPE_OPT_VAL)
.option(PRECOMBINE_FIELD_OPT_KEY, "id")
.option(RECORDKEY_FIELD_OPT_KEY, "id")
.mode(Append)
.save("hdfs://namenode:9000/hudi/mtable")

// Exiting paste mode, now interpreting.

COW表比较清晰,多了三个与commit相关的文件,数据文件则做了更改。新的commit则显示了numUpdateWrites为2,并且写入了prevCommit的时间戳。MOR表除了新的三个deltacommit文件以外,default文件夹目前的结构如下:

1
2
3
4
mtable/default
|- .hoodie_partition_metadata
|- 226f829b-7de5-438a-aca2-db4076f777f4-0_0-52-24017_20210707005311.parquet
+ |- .226f829b-7de5-438a-aca2-db4076f777f4-0_20210707005311.log.1_0-114-51041

多了一个log文件,并且原数据文件并未被改动。该log文件是Hudi自己编码的Avro文件[3],所以无法通过Avro工具读取,但它包含了commit时间、hoodie key、文件名、新的数据改动等信息。

删除数据

继续尝试在两种表里删除数据:

spark-master
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
:paste
// Entering paste mode (ctrl-D to finish)

val df3 = Seq((3, "cc")).toDF("id", "name")

df3.write.format("hudi")
.option(OPERATION_OPT_KEY,"delete")
.option(TABLE_NAME, "table")
.option(PRECOMBINE_FIELD_OPT_KEY, "id")
.option(RECORDKEY_FIELD_OPT_KEY, "id")
.mode(Append)
.save("hdfs://namenode:9000/hudi/table")

df3.write.format("hudi")
.option(OPERATION_OPT_KEY,"delete")
.option(TABLE_NAME, "mtable")
.option(TABLE_TYPE_OPT_KEY, MOR_TABLE_TYPE_OPT_VAL)
.option(PRECOMBINE_FIELD_OPT_KEY, "id")
.option(RECORDKEY_FIELD_OPT_KEY, "id")
.mode(Append)
.save("hdfs://namenode:9000/hudi/mtable")

// Exiting paste mode, now interpreting.

COW表依旧是多了三个与commit相关的文件,数据文件做了更改。新的commit显示numDeletes为1。MOR表除了新的三个deltacommit文件以外,依旧多了一个log文件,原数据文件没有改动。

查询数据

首先来做一个简单的查询:

spark-master
1
2
3
4
5
6
7
8
9
val s1 = spark.read.format("hudi").load("hdfs://namenode:9000/hudi/table/*/")
s1.show

+-------------------+--------------------+------------------+----------------------+--------------------+---+----+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name| id|name|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+
| 20210707004504| 20210707004504_0_1| 1| default|de86200e-81ea-493...| 1| a|
| 20210707005708| 20210707005708_0_7| 2| default|de86200e-81ea-493...| 2| bb|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+

可以看到除了基础的数据之外,这个DataFrame里还有commit time、hoodie key等信息。下面做一个增量查询:

spark-master
1
2
3
4
5
6
7
8
9
10
11
s1.createOrReplaceTempView("table_snap")
val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from table_snap order by commitTime").map(k => k.getString(0)).collect
val beginTime = commits(commits.length - 1)
val s2 = spark.read.format("hudi").option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL).option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).load("hdfs://namenode:9000/hudi/table")
s2.show

+-------------------+--------------------+------------------+----------------------+--------------------+---+----+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name| id|name|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+
| 20210707005708| 20210707005708_0_7| 2| default|de86200e-81ea-493...| 2| bb|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+

可以看到这样的查询通过指定开始时刻,返回了在该时刻之后的所有更改。也可以增加结束时刻以返回两者之间的更改。接下来试一下特定时间点查询:

spark-master
1
2
3
4
5
6
7
8
9
10
11
12
13
val beginTime = "000"
val endTime = commits(commits.length - 2)
val s3 = spark.read.format("hudi").option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL).option(BEGIN_INSTANTT
IME_OPT_KEY, beginTime).option(END_INSTANTTIME_OPT_KEY, endTime).load("hdfs://namenode:9000/hudi/table")
s3.show

+-------------------+--------------------+------------------+----------------------+--------------------+---+----+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name| id|name|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+
| 20210707004504| 20210707004504_0_1| 1| default|de86200e-81ea-493...| 1| a|
| 20210707004504| 20210707004504_0_2| 2| default|de86200e-81ea-493...| 2| b|
| 20210707004504| 20210707004504_0_3| 3| default|de86200e-81ea-493...| 3| c|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+

该查询会返回开始时刻与结束时刻之间的所有更改,但因为将开始时刻设置为000,实际返回了在结束时刻时表格的状态。

Apache Hudi的功能非常强大,除了上述功能之外,还有Hudi CLI这个可以管理Hudi数据集,以查看有关提交、文件系统、统计信息等内容的工具。很多重要的功能,包括CLEANS、COMPACTION、ROLLBACK、SAVEPOINT等都是可以手动在Hudi CLI里触发的。我自建的Docker集群中Spark container不包含hadoop工具,也缺乏基础的编译工具,因此无法使用Hudi CLI,感兴趣的同学可以使用Apache Hudi官方的Docker Demo建立集群然后运行Hudi CLI。


  1. https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hudi-how-it-works.html ↩︎

  2. https://hudi.apache.org/docs/concepts.html#table-types--queries ↩︎

  3. https://github.com/apache/hudi/pull/162 ↩︎