本文用实践的方式初步探索Apache Hudi (GitHub项目地址 )表的底层数据结构。测试使用的版本为0.8.0,使用的集群为Docker自建,详见使用Docker搭建Hadoop + Hive + Spark集群(上) 。你也可以使用Apache Hudi官方的Docker Demo 。
综述
Apache Hudi 是开源的数据湖方案之一,最初由Uber开发。它为现有的数据湖架构做出了一定的改进,包括:
快速,可插拔索引的UPSERT支持
原子化更新数据与回滚
数据写入与查询之间的快照隔离
可用于数据恢复的保存点
利用统计信息管理文件大小与布局
行列数据的异步压缩
可跟踪血统的时间轴元数据
通过聚类优化数据湖布局
Apache Hudi支持 Spark 2.4.3+ 以及 3.x 版本。以下根据官网说明 做一些简单介绍。
Apache Hudi的核心是维护在不同即时时间(instant)在表格上执行的所有操作的时间轴(timeline),这有助于提供表格的即时视图,同时还有效地支持按到达顺序检索数据。Hudi instant构成的组件如下:
Instant action:对表格执行的操作类型
Instant time:通常是一个时间戳,按照动作开始时间的顺序单调增加
State:当前instant的状态
关键的操作(action)则包括:
COMMITS:将数据批量原子写入表
CLEANS:清除不再需要的旧文件的后台活动
DELTA_COMMIT:将数据批量原子写入 MergeOnRead 表中,部分或所有数据可以仅写入增量日志
COMPACTION:协调 Hudi 中差异数据结构的后台活动,例如将更新从基于行的日志文件移动到列格式
ROLLBACK:表示COMMITS或DELTA_COMMIT提交失败,进行回滚并清除中间文件
SAVEPOINT:将某些文件组标记为已保存,以免被清理程序删除
任何给定的instant可以处于下列状态之一:
REQUESTED:表示一个操作已被安排,但尚未启动
INFLIGHT:表示当前正在执行操作
COMPLETED:表示完成了时间轴上的一个操作
与Apache Iceberg和Delta Lake类似,Apache Hudi的存储也与Hive类似,以partition作为分隔(如有)。在每个partition内,文件会被标识为文件组,并被分配ID。每个文件组包含多个文件切片,其中每个切片包含在某个提交/压缩即时时间(instant)生成的基本列文件(*.parquet
),以及记录自生成基本文件以来对基本文件的插入/更新的日志文件集(*.log.*
)。此外,Hudi通过索引机制将给定的hoodie key(record key + partition path)映射到文件组,从而提供高效的UPSERT。这种映射一旦将首个版本的记录写入文件后就不会再改变。简而言之,映射的文件组包含一组记录的所有版本。
建表与插入数据
集群搭建好后,可以直接走官方教程 进行建表工作:
1 2 3 4 5 6 docker exec -it namenode hdfs dfs -mkdir /hudi docker exec -it spark-master /bin/bash /spark/bin/spark-shell \ --packages org.apache.hudi:hudi-spark3-bundle_2.12:0.8.0,org.apache.spark:spark-avro_2.12:3.1.1 \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
该命令会自动下载需要的jar包。你也可以去这里 下载对应的jar包上传到/spark/jars
目录下自动加载。
运行以下语句:
spark-master 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 :paste import org.apache.spark.sql.SaveMode ._import org.apache.hudi.DataSourceReadOptions ._import org.apache.hudi.DataSourceWriteOptions ._import org.apache.hudi.config.HoodieWriteConfig ._import spark.implicits._val df1 = Seq ( (1 , "a" ), (2 , "b" ), (3 , "c" ) ).toDF("id" , "name" ) df1.write.format("hudi" ) .option(TABLE_NAME , "table" ) .option(PRECOMBINE_FIELD_OPT_KEY , "id" ) .option(RECORDKEY_FIELD_OPT_KEY , "id" ) .mode(Overwrite ) .save("hdfs://namenode:9000/hudi/table" )
这其中,PRECOMBINE_FIELD_OPT_KEY
代表了数据遇到相同的key时的组合逻辑。RECORDKEY_FIELD_OPT_KEY
则代表记录键,即hoodie key中record key部分的值。
另开一个终端进入namenode
看一下对应的目录:
1 2 docker exec -it namenode /bin/bash hdfs dfs -ls -R /hudi/table
目前的结构如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 table |- .hoodie |- .aux |- .bootstrap |- .fileids |_ .partitions |- .temp |- 20210707004504.commit |- 20210707004504.commit.requested |- 20210707004504.inflight |- archived |- hoodie.properties |- default |- .hoodie_partition_metadata |- de86200e-81ea-4936-b84c-d9c493e9ddea-0_0-21-12006_20210707004504.parquet
其中除了数据文件外,有内容的文件为.hoodie_partition_metadata
,hoodie.properties
,.inflight
与.commit
文件。我们注意查看一下里面的内容:
.hoodie_partition_metadata 1 2 3 4 #partition metadata #Wed Jul 07 00:47:02 GMT 2021 commitTime=20210707004504 partitionDepth=1
由于我们没有partition,所以该文件的内容极其简单。再看一下properties:
hoodie.properties 1 2 3 4 5 6 7 8 #Properties saved on Wed Jul 07 00:45:05 GMT 2021 #Wed Jul 07 00:45:05 GMT 2021 hoodie.table.precombine.field=id hoodie.table.name=table hoodie.archivelog.folder=archived hoodie.table.type=COPY_ON_WRITE hoodie.table.version=1 hoodie.timeline.layout.version=1
这里面存了一些与表有关的参数,除了表名、当前版本、log存档位置之外,重要的一点是表明这张表是COPY_ON_WRITE
表。
.commit
与.inflight
文件结构非常类似,因为INFLIGHT本来就是表示未执行完毕的操作,相当于在COMMIT之前的中间状态。因此,下面带减号的红色部分表示.inflight
文件的内容,带加号的绿色部分表示.commit
不同的文件内容:
20210706205902.inflight|.commit 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 { "partitionToWriteStats" : { "default" : [ { - "fileId" : "", + "fileId" : "de86200e-81ea-4936-b84c-d9c493e9ddea-0", - "path" : null, + "path" : "default/de86200e-81ea-4936-b84c-d9c493e9ddea-0_0-21-12006_20210707004504.parquet", "prevCommit" : "null", - "numWrites" : 0, + "numWrites" : 3, "numDeletes" : 0, "numUpdateWrites" : 0, "numInserts" : 3, - "totalWriteBytes" : 0, + "totalWriteBytes" : 434285, "totalWriteErrors" : 0, "tempPath" : null, - "partitionPath" : null, + "partitionPath" : "default", "totalLogRecords" : 0, "totalLogFilesCompacted" : 0, "totalLogSizeCompacted" : 0, "totalUpdatedRecordsCompacted" : 0, "totalLogBlocks" : 0, "totalCorruptLogBlock" : 0, "totalRollbackBlocks" : 0, - "fileSizeInBytes" : 0, + "fileSizeInBytes" : 434285, "minEventTime" : null, "maxEventTime" : null } ] }, "compacted" : false, - "extraMetadata" : { }, + "extraMetadata" : { + "schema" : "{\"type\":\"record\",\"name\":\"table_record\",\"namespace\":\"hoodie.table\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":[\"string\",\"null\"]}]}" + }, "operationType" : "UPSERT", "fileIdAndRelativePaths" : { - "" : null + "de86200e-81ea-4936-b84c-d9c493e9ddea-0" : "default/de86200e-81ea-4936-b84c-d9c493e9ddea-0_0-21-12006_20210707004504.parquet" }, "writePartitionPaths" : [ "default" ], "totalRecordsDeleted" : 0, "totalLogRecordsCompacted" : 0, "totalLogFilesCompacted" : 0, "totalCompactedRecordsUpdated" : 0, "totalLogFilesSize" : 0, "totalScanTime" : 0, - "totalCreateTime" : 0, + "totalCreateTime" : 1293, "totalUpsertTime" : 0, "minAndMaxEventTime" : { "Optional.empty" : { "val" : null, "present" : false } } }
可以看到COMMIT后多了写入的数据信息、schema信息与文件位置信息等。
我们知道上面的表是COPY_ON_WRITE
(COW)类型,我们继续建一个MERGE_ON_READ
(MOR)类型的表:
spark-master 1 2 3 4 5 6 7 8 9 10 11 12 :paste df1.write.format("hudi" ) .option(TABLE_NAME , "mtable" ) .option(TABLE_TYPE_OPT_KEY , MOR_TABLE_TYPE_OPT_VAL ) .option(PRECOMBINE_FIELD_OPT_KEY , "id" ) .option(RECORDKEY_FIELD_OPT_KEY , "id" ) .mode(Overwrite ) .save("hdfs://namenode:9000/hudi/mtable" )
该表的目录结构如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 mtable |- .hoodie |- .aux |- .bootstrap |- .fileids |_ .partitions |- .temp |- 20210707005311.deltacommit |- 20210707005311.deltacommit.inflight |- 20210707005311.deltacommit.requested |- archived |- hoodie.properties |- default |- .hoodie_partition_metadata |- 226f829b-7de5-438a-aca2-db4076f777f4-0_0-52-24017_20210707005311.parquet
hoodie.properties
的内容有所不同:
hoodie.properties 1 2 3 4 5 6 7 8 9 #Properties saved on Wed Jul 07 00:53:11 GMT 2021 #Wed Jul 07 00:53:11 GMT 2021 hoodie.compaction.payload.class=org.apache.hudi.common.model.OverwriteWithLatestAvroPayload hoodie.table.precombine.field=id hoodie.table.name=mtable hoodie.archivelog.folder=archived hoodie.table.type=MERGE_ON_READ hoodie.table.version=1 hoodie.timeline.layout.version=1
这里显示表的类型为MERGE_ON_READ
。最重要的不同在于,COW表里的commit文件在MOR表里为deltacommit。但是由于目前只是刚刚建表并插入数据,所以MOR表的deltacommit文件与COW表的commit文件并无结构上的不同。
至于其余的文件,.aux
文件夹内主要存放追踪COMPACTION操作所需的辅助文件夹(包括统计数据和未来的任意元数据),.bootstrap
内则主要是存放与引导操作(将已有的表转化为Hudi表)有关的文件。.temp
则是完成写入Hudi表所需的临时文件夹。
Copy on Write 与 Merge on Read
Copy on Write(COW)与Merge on Read(MOR)表的区别列举如下:
写时复制(COW):数据以列式 (Parquet) 存储,每次更新都会在写入过程中创建一个新版本的文件。COW 是默认存储类型。
读时合并(MOR):使用列式 (Parquet) 和 行式 (Avro) 的组合存储数据。更新会记录到基于行的增量文件中,并根据需要进行压缩以创建新版本的列式文件。
对于 COW 数据集,每次记录更新时,包含该记录的文件都会被重写以反映更新后的值。对于 MOR 数据集,每次有更新时,Hudi 只写入有更改记录的行。 MOR 更适合读取较少,但写入或更新密集的情况。COW 则更适合于数据变化较少,但读取密集的情况。
两种表的权衡项目如下:
权衡
COW
MOR
数据延迟
更高
更低
更新消耗(I/O)
更高(重写整个Parquet)
更低(追加到增量日志)
Parquet文件大小
更小(更新代价高)
更大(更新代价低)
写放大
更高
更低(取决于压缩策略)
更新数据
我们尝试在两种表里更新数据:
spark-master 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 :paste val df2 = Seq ( (2 , "bb" ), (3 , "cc" ) ).toDF("id" , "name" ) df2.write.format("hudi" ) .option(TABLE_NAME , "table" ) .option(PRECOMBINE_FIELD_OPT_KEY , "id" ) .option(RECORDKEY_FIELD_OPT_KEY , "id" ) .mode(Append ) .save("hdfs://namenode:9000/hudi/table" ) df2.write.format("hudi" ) .option(TABLE_NAME , "mtable" ) .option(TABLE_TYPE_OPT_KEY , MOR_TABLE_TYPE_OPT_VAL ) .option(PRECOMBINE_FIELD_OPT_KEY , "id" ) .option(RECORDKEY_FIELD_OPT_KEY , "id" ) .mode(Append ) .save("hdfs://namenode:9000/hudi/mtable" )
COW表比较清晰,多了三个与commit相关的文件,数据文件则做了更改。新的commit则显示了numUpdateWrites
为2,并且写入了prevCommit
的时间戳。MOR表除了新的三个deltacommit文件以外,default
文件夹目前的结构如下:
1 2 3 4 mtable/default |- .hoodie_partition_metadata |- 226f829b-7de5-438a-aca2-db4076f777f4-0_0-52-24017_20210707005311.parquet + |- .226f829b-7de5-438a-aca2-db4076f777f4-0_20210707005311.log.1_0-114-51041
多了一个log文件,并且原数据文件并未被改动。该log文件是Hudi自己编码的Avro文件,所以无法通过Avro工具读取,但它包含了commit时间、hoodie key、文件名、新的数据改动等信息。
删除数据
继续尝试在两种表里删除数据:
spark-master 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 :paste val df3 = Seq ((3 , "cc" )).toDF("id" , "name" )df3.write.format("hudi" ) .option(OPERATION_OPT_KEY ,"delete" ) .option(TABLE_NAME , "table" ) .option(PRECOMBINE_FIELD_OPT_KEY , "id" ) .option(RECORDKEY_FIELD_OPT_KEY , "id" ) .mode(Append ) .save("hdfs://namenode:9000/hudi/table" ) df3.write.format("hudi" ) .option(OPERATION_OPT_KEY ,"delete" ) .option(TABLE_NAME , "mtable" ) .option(TABLE_TYPE_OPT_KEY , MOR_TABLE_TYPE_OPT_VAL ) .option(PRECOMBINE_FIELD_OPT_KEY , "id" ) .option(RECORDKEY_FIELD_OPT_KEY , "id" ) .mode(Append ) .save("hdfs://namenode:9000/hudi/mtable" )
COW表依旧是多了三个与commit相关的文件,数据文件做了更改。新的commit显示numDeletes
为1。MOR表除了新的三个deltacommit文件以外,依旧多了一个log文件,原数据文件没有改动。
查询数据
首先来做一个简单的查询:
spark-master 1 2 3 4 5 6 7 8 9 val s1 = spark.read.format("hudi" ).load("hdfs://namenode:9000/hudi/table/*/" )s1.show +-------------------+--------------------+------------------+----------------------+--------------------+---+----+ |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name| id|name| +-------------------+--------------------+------------------+----------------------+--------------------+---+----+ | 20210707004504 | 20210707004504 _0_1| 1 | default |de86200e-81 ea-493. ..| 1 | a| | 20210707005708 | 20210707005708 _0_7| 2 | default |de86200e-81 ea-493. ..| 2 | bb| +-------------------+--------------------+------------------+----------------------+--------------------+---+----+
可以看到除了基础的数据之外,这个DataFrame里还有commit time、hoodie key等信息。下面做一个增量查询:
spark-master 1 2 3 4 5 6 7 8 9 10 11 s1.createOrReplaceTempView("table_snap" ) val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from table_snap order by commitTime" ).map(k => k.getString(0 )).collectval beginTime = commits(commits.length - 1 )val s2 = spark.read.format("hudi" ).option(VIEW_TYPE_OPT_KEY , VIEW_TYPE_INCREMENTAL_OPT_VAL ).option(BEGIN_INSTANTTIME_OPT_KEY , beginTime).load("hdfs://namenode:9000/hudi/table" )s2.show +-------------------+--------------------+------------------+----------------------+--------------------+---+----+ |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name| id|name| +-------------------+--------------------+------------------+----------------------+--------------------+---+----+ | 20210707005708 | 20210707005708 _0_7| 2 | default |de86200e-81 ea-493. ..| 2 | bb| +-------------------+--------------------+------------------+----------------------+--------------------+---+----+
可以看到这样的查询通过指定开始时刻,返回了在该时刻之后的所有更改。也可以增加结束时刻以返回两者之间的更改。接下来试一下特定时间点查询:
spark-master 1 2 3 4 5 6 7 8 9 10 11 12 13 val beginTime = "000" val endTime = commits(commits.length - 2 )val s3 = spark.read.format("hudi" ).option(VIEW_TYPE_OPT_KEY , VIEW_TYPE_INCREMENTAL_OPT_VAL ).option(BEGIN_INSTANTT IME_OPT_KEY , beginTime).option(END_INSTANTTIME_OPT_KEY , endTime).load("hdfs://namenode:9000/hudi/table" )s3.show +-------------------+--------------------+------------------+----------------------+--------------------+---+----+ |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name| id|name| +-------------------+--------------------+------------------+----------------------+--------------------+---+----+ | 20210707004504 | 20210707004504 _0_1| 1 | default |de86200e-81 ea-493. ..| 1 | a| | 20210707004504 | 20210707004504 _0_2| 2 | default |de86200e-81 ea-493. ..| 2 | b| | 20210707004504 | 20210707004504 _0_3| 3 | default |de86200e-81 ea-493. ..| 3 | c| +-------------------+--------------------+------------------+----------------------+--------------------+---+----+
该查询会返回开始时刻与结束时刻之间的所有更改,但因为将开始时刻设置为000
,实际返回了在结束时刻时表格的状态。
Apache Hudi的功能非常强大,除了上述功能之外,还有Hudi CLI这个可以管理Hudi数据集,以查看有关提交、文件系统、统计信息等内容的工具。很多重要的功能,包括CLEANS、COMPACTION、ROLLBACK、SAVEPOINT等都是可以手动在Hudi CLI里触发的。我自建的Docker集群中Spark container不包含hadoop工具,也缺乏基础的编译工具,因此无法使用Hudi CLI,感兴趣的同学可以使用Apache Hudi官方的Docker Demo 建立集群然后运行Hudi CLI。