Delta Lake表格底层数据结构初探

本文用实践的方式初步探索Delta LakeGitHub项目地址)表的底层数据结构。测试使用的版本为1.0.0,使用的集群为Docker自建,详见使用Docker搭建Hadoop + Hive + Spark集群(上)。请注意使用Spark 3.1.0及以上版本的YAML搭建集群。

综述

Delta Lake是开源的数据湖方案之一(并且在向Lakehouse架构进行探索),由Databricks开发并维护(同时有商用的闭源版本)。关于它的介绍,以及Lakehouse架构思想的详细介绍不是本文重点,感兴趣的读者可以参考Matei Zaharia的Delta Lake[1]以及Lakehouse架构[2]的论文。

简单来讲,现有的Data Lake + Data Warehouse架构面临着如下问题:

  • 可靠性差:因为下游数据仓库是由数据湖中的一小部分数据进行ETL构建的,因此难以在数据湖与数据仓库之间保持数据一致性。
  • 数据陈旧问题:下游数据仓库的数据依靠数据湖与ETL流水线,因此数据时效性差。
  • 存储成本高:下游数据仓库相当于复制了一部分数据湖的数据,需要付出两倍的存储成本。
  • 对高级分析的支持有限:机器学习无法直接使用数据仓库里的数据,需要将仓库的数据ETL回湖,进一步增加架构的复杂性与可维护性。

而Lakehouse可以解决这些问题,其基于低成本对象存储与开放的数据格式(例如Apache Parquet),并使用Metadata Layer实现对底层数据的SQL操作,而且增添了对ACID事务的支持。其中,Delta Lake带来了以下特性:

  • 时间旅行(Time travel)允许用户查看任意时刻的快照。
  • UPSERT,DELETE与MERGE操作。
  • 高效的流式I/O,可以支持小文件聚合或快速追加数据。
  • 模式进化(Schema evolution)可以在schema改变之后无需重写旧数据。
  • 审计日志(Audit logging),基于事务日志。

建表与插入数据

集群搭建好后,可以直接走官方教程进行建表工作:

1
2
3
4
5
6
7
docker exec -it namenode hdfs dfs -mkdir /delta
docker exec -it spark-master /bin/bash

/spark/bin/spark-shell \
--packages io.delta:delta-core_2.12:1.0.0 \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

该命令会自动下载需要的jar包。你也可以去这里下载对应的jar包上传到/spark/jars目录下自动加载。

运行以下语句:

spark-master
1
2
val data = spark.range(0, 5)
data.write.format("delta").save("hdfs://namenode:9000/delta/table")

另开一个终端进入namenode看一下对应的目录:

1
2
docker exec -it namenode /bin/bash
hdfs dfs -ls -R /delta

目前的结构如下:

1
2
3
4
5
table
|- part-00000-82953bf9-a44e-4a0f-b660-e510e518f78b-c000.snappy.parquet
|- part-00001-70d079a0-60eb-4aae-b146-bb5d8b8a8b28-c000.snappy.parquet
|- _delta_log
|- 00000000000000000000.json

Delta Lake的表格底层文件分为数据文件(即上面的parquet文件),日志文件(即从0开始递增的json文件)与检查点文件(即checkpoint文件,与检查点建立时的日志文件共用ID,为parquet文件)三种,此外还会维护一个_last_checkpoint方便查找最新的检查点。关于文件体系的详细架构与读写模式,详见Matei Zaharia的Delta Lake论文。这篇论文实际上详细给出了Delta Lake的详细思想、架构、设计与性能测试。

看一下目前有的log文件:

00000000000000000000.json
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
{
"commitInfo": {
"timestamp": 1625122735997,
"operation": "WRITE",
"operationParameters": {
"mode": "ErrorIfExists",
"partitionBy": "[]"
},
"isBlindAppend": true,
"operationMetrics": {
"numFiles": "2",
"numOutputBytes": "947",
"numOutputRows": "5"
}
}
}
{
"protocol": {
"minReaderVersion": 1,
"minWriterVersion": 2
}
}
{
"metaData": {
"id": "01790ccb-fc22-4df8-b201-4da997bdd3f3",
"format": {
"provider": "parquet",
"options": {}
},
"schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}",
"partitionColumns": [],
"configuration": {},
"createdTime": 1625122732395
}
}
{
"add": {
"path": "part-00000-82953bf9-a44e-4a0f-b660-e510e518f78b-c000.snappy.parquet",
"partitionValues": {},
"size": 471,
"modificationTime": 1625122735877,
"dataChange": true
}
}
{
"add": {
"path": "part-00001-70d079a0-60eb-4aae-b146-bb5d8b8a8b28-c000.snappy.parquet",
"partitionValues": {},
"size": 476,
"modificationTime": 1625122735876,
"dataChange": true
}
}

文件内容很详细,第一部分commitInfo指出了本次操作是WRITE操作(ErrorIfExists),第三部分metaData给出了表的结构,最后两部分则是指出本次操作新增了两个数据文件。我们再尝试插入一些数据:

spark-master
1
spark.range(5, 10).write.format("delta").mode("append").save("hdfs://namenode:9000/delta/table")

插入完毕,目前的目录结构如下:

1
2
3
4
5
6
7
8
table
|- part-00000-82953bf9-a44e-4a0f-b660-e510e518f78b-c000.snappy.parquet
|- part-00001-70d079a0-60eb-4aae-b146-bb5d8b8a8b28-c000.snappy.parquet
+ |- part-00000-2690c4d8-2577-4731-8284-9e80a2d1cdab-c000.snappy.parquet
+ |- part-00001-ef3b4586-0bfe-4830-b000-c51b3d36594f-c000.snappy.parquet
|- _delta_log
|- 00000000000000000000.json
+ |- 00000000000000000001.json

查看一下最新的日志,发现只有三项,第一项commitInfo指出本次依旧是WRITE操作(Append),剩余两项则指向新增的两个数据文件。我们尝试一下overwrite:

spark-master
1
data.write.format("delta").mode("overwrite").save("hdfs://namenode:9000/delta/table")
1
2
3
4
5
6
7
8
9
10
11
table
|- part-00000-82953bf9-a44e-4a0f-b660-e510e518f78b-c000.snappy.parquet
|- part-00001-70d079a0-60eb-4aae-b146-bb5d8b8a8b28-c000.snappy.parquet
|- part-00000-2690c4d8-2577-4731-8284-9e80a2d1cdab-c000.snappy.parquet
|- part-00001-ef3b4586-0bfe-4830-b000-c51b3d36594f-c000.snappy.parquet
+ |- part-00000-25dc4582-9526-4501-8c03-bd2c7bde1a9a-c000.snappy.parquet
+ |- part-00001-4620b02b-6bf8-4736-962a-6e9199e27ec6-c000.snappy.parquet
|- _delta_log
|- 00000000000000000000.json
|- 00000000000000000001.json
+ |- 00000000000000000002.json

新增的日志文件也是非常清晰,一共有七项,第一项commitInfo指出本次是WRITE操作(Overwrite),剩余六项是两项add,四项remove,对应着移除的四个数据文件与新增的两个数据文件。

建表的两种方式

Delta Lake建表有两种形式,一种是我们之前使用的指定文件路径的形式:

spark-master
1
data.write.format("delta").save("hdfs://namenode:9000/delta/table")

还有一种是直接使用metastore:

spark-master
1
data.write.format("delta").saveAsTable("table")

其中后一种方法可以直接使用show tables看到,数据文件存放在hdfs://namenode:9000/user/hive/warehouse下面。而前一种表可以通过delta.`[location]`的方式引用。例如,通过SQL建表的方式如下:

spark-master
1
2
3
4
5
6
7
CREATE IF NOT EXISTS TABLE test (
id BIGINT)
USING DELTA

CREATE OR REPLACE TABLE delta.`hdfs://namenode:9000/delta/test` (
id BIGINT)
USING DELTA

你可以用以下语句查看表的Metadata信息:

spark-master
1
2
3
spark.sql("describe delta.`hdfs://namenode:9000/delta/table`").show
spark.sql("describe detail delta.`hdfs://namenode:9000/delta/table`").show
spark.sql("describe history delta.`hdfs://namenode:9000/delta/table`").show

更新与删除数据

尝试更新表内数据:

spark-master
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import io.delta.tables._
import org.apache.spark.sql.functions._
import spark.implicits._
val deltaTable = DeltaTable.forPath(spark, "hdfs://namenode:9000/delta/table")
// forName: val deltaTable = DeltaTable.forName("table")

deltaTable.update(Map("id" -> expr("id + 1")))
deltaTable.toDF.show

+---+
| id|
+---+
| 3|
| 4|
| 5|
| 1|
| 2|
+---+

可以看到数据已经更新,目录里一样是多了两个数据文件与00000000000000000003.json。该日志文件有五项,表明本次操作为UPDATE,删除了两个数据文件并增加了两个数据文件。我们尝试删除一条数据:

spark-master
1
deltaTable.delete("id > 3")

目录里多了00000000000000000004.json日志与一个数据文件,日志文件表明做了DELETE操作,删除并增加了一个数据文件。接下来试一下upsert:

spark-master
1
2
3
4
5
6
7
8
9
:paste
// Entering paste mode (ctrl-D to finish)

deltaTable.as("table").merge(spark.range(5, 10).toDF.as("update"), "table.id = update.id")
.whenMatched.updateExpr(Map("id" -> "update.id"))
.whenNotMatched.insertExpr(Map("id" -> "update.id"))
.execute

// Exiting paste mode, now interpreting.

本次操作多了6个数据文件和1个log(00000000000000000005.json),该文件的第一部分commitInfo内容如下:

00000000000000000005.json
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
{
"commitInfo": {
"timestamp": 1625141999920,
"operation": "MERGE",
"operationParameters": {
"predicate": "(table.`id` = update.`id`)",
"matchedPredicates": "[{\"actionType\":\"update\"}]",
"notMatchedPredicates": "[{\"actionType\":\"insert\"}]"
},
"readVersion": 4,
"isBlindAppend": false,
"operationMetrics": {
"numTargetRowsCopied": "0",
"numTargetRowsDeleted": "0",
"numTargetFilesAdded": "6",
"executionTimeMs": "7713",
"numTargetRowsInserted": "5",
"scanTimeMs": "4258",
"numTargetRowsUpdated": "0",
"numOutputRows": "5",
"numSourceRows": "5",
"numTargetFilesRemoved": "0",
"rewriteTimeMs": "3424"
}
}
}

这里很详细地列出了本次MERGE操作的参数。剩余的部分则表明增加了6个数据文件(其中1个文件并不含有实际的数据)。

更改Schema

首先尝试直接增加一列:

spark-master
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
spark.sql("alter table delta.`hdfs://namenode:9000/delta/table` add columns (id2 bigint after id)")

import io.delta.implicits._
spark.read.delta("hdfs://namenode:9000/delta/table").show

+---+----+
| id| id2|
+---+----+
| 1|null|
| 2|null|
| 6|null|
| 7|null|
| 3|null|
| 9|null|
| 5|null|
| 8|null|
+---+----+

新增的日志文件有两部分,第一部分指出本次为ADD COLUMNS操作,第二部分则给出了更新的metaData。我们尝试进行reorder:

spark-master
1
spark.sql("alter table delta.`hdfs://namenode:9000/delta/table` change column id2 id2 bigint first")

新增的日志文件一样有两部分,第一部分指出操作为CHANGE COLUMN,第二部分给出新的metaData

需要注意的是,Delta Lake不支持删除列。

时间旅行

时间旅行功能让你可以很轻易地读取表格历史数据。例如,读取最早版本的数据:

spark-master
1
2
3
4
5
6
7
8
9
10
spark.read.format("delta").option("versionAsOf", 0).load("hdfs://namenode:9000/delta/table").show
+---+
| id|
+---+
| 2|
| 3|
| 4|
| 0|
| 1|
+---+

默认情况下,表格历史可以保存30天。你可以用以下方法获取历史信息:

spark-master
1
2
3
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "hdfs://namenode:9000/delta/table")
val historyDF = deltaTable.history()

最后,你也可以用vacuum函数删除不被Delta表格使用的文件。默认的保留期限是7天,你也可以传入参数(单位为小时)。该操作只会删除数据文件,而非日志文件。日志文件会在下一次建立检查点的时候进行自动清理(大于30天)。

在默认情况下,你不可以给vacuum函数传入小于168小时(7天)的时间,除非设置spark.databricks.delta.retentionDurationCheck.enabled = false。我们用Ctrl + D退出并用新设置重启一下spark-shell

spark-master
1
2
3
4
5
6
/spark/bin/spark-shell \
--packages io.delta:delta-core_2.12:1.0.0 \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
--conf "spark.databricks.delta.retentionDurationCheck.
enabled=false"
spark-master
1
2
3
4
5
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "hdfs://namenode:9000/delta/table")
deltaTable.vacuum(0)

Deleted 7 files and directories in a total of 1 directories.

运行完毕,发现当前的数据文件仅剩下了最新的保留有当前数据的8个文件,其余文件全部删除了。日志文件没有减少,但会在下一个检查点进行清理(大于30天)。

检查点

我们最后做一些改动,看一下检查点文件:

spark-master
1
2
3
4
deltaTable.delete("id > 8")
deltaTable.delete("id > 7")
deltaTable.delete("id > 1")
deltaTable.vacuum(0)

此时多了00000000000000000010.checkpoint.parquetlast_checkpoint,后者包含的内容很简单:{"version":10,"size":20}

spark-shell看一下前者的内容:

spark-master
1
2
3
val sc = spark.sqlContext
val cp = sc.read.parquet("hdfs://namenode:9000/delta/table/_delta_log/00000000000000000010.checkpoint.parquet")
cp.printSchema

结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
root
|-- txn: struct (nullable = true)
| |-- appId: string (nullable = true)
| |-- version: long (nullable = true)
| |-- lastUpdated: long (nullable = true)
|-- add: struct (nullable = true)
| |-- path: string (nullable = true)
| |-- partitionValues: map (nullable = true)
| | |-- key: string
| | |-- value: string (valueContainsNull = true)
| |-- size: long (nullable = true)
| |-- modificationTime: long (nullable = true)
| |-- dataChange: boolean (nullable = true)
| |-- tags: map (nullable = true)
| | |-- key: string
| | |-- value: string (valueContainsNull = true)
| |-- stats: string (nullable = true)
|-- remove: struct (nullable = true)
| |-- path: string (nullable = true)
| |-- deletionTimestamp: long (nullable = true)
| |-- dataChange: boolean (nullable = true)
| |-- extendedFileMetadata: boolean (nullable = true)
| |-- partitionValues: map (nullable = true)
| | |-- key: string
| | |-- value: string (valueContainsNull = true)
| |-- size: long (nullable = true)
| |-- tags: map (nullable = true)
| | |-- key: string
| | |-- value: string (valueContainsNull = true)
|-- metaData: struct (nullable = true)
| |-- id: string (nullable = true)
| |-- name: string (nullable = true)
| |-- description: string (nullable = true)
| |-- format: struct (nullable = true)
| | |-- provider: string (nullable = true)
| | |-- options: map (nullable = true)
| | | |-- key: string
| | | |-- value: string (valueContainsNull = true)
| |-- schemaString: string (nullable = true)
| |-- partitionColumns: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- configuration: map (nullable = true)
| | |-- key: string
| | |-- value: string (valueContainsNull = true)
| |-- createdTime: long (nullable = true)
|-- protocol: struct (nullable = true)
| |-- minReaderVersion: integer (nullable = true)
| |-- minWriterVersion: integer (nullable = true)

+----+--------------------+--------------------+--------------------+--------+
| txn| add| remove| metaData|protocol|
+----+--------------------+--------------------+--------------------+--------+
|null| null|{part-00001-2af43...| null| null|
|null| null|{part-00045-7cd87...| null| null|
|null|{part-00000-0a9d5...| null| null| null|
|null| null|{part-00011-20467...| null| null|
|null| null|{part-00000-2690c...| null| null|
|null| null|{part-00000-82953...| null| null|
|null|{part-00000-29909...| null| null| null|
|null| null|{part-00000-87315...| null| null|
|null| null|{part-00000-25dc4...| null| null|
|null| null|{part-00049-76d6a...| null| null|
|null| null|{part-00068-6a737...| null| null|
|null| null|{part-00001-70d07...| null| null|
|null| null|{part-00001-ef3b4...| null| null|
|null| null|{part-00000-fd0cc...| null| null|
|null|{part-00000-e098b...| null| null| null|
|null|{part-00000-e086a...| null| null| null|
|null| null| null| null| {1, 2}|
|null| null| null|{01790ccb-fc22-4d...| null|
|null| null|{part-00001-4620b...| null| null|
|null| null|{part-00116-314ea...| null| null|
+----+--------------------+--------------------+--------------------+--------+

可以看到检查点文件就是日志文件的汇总,同时也做了一部分精简。

至此我们简单探索了Delta Lake 1.0.0中表的底层数据结构。更详细的架构请参考文末的两篇论文,并请以官方文档为准。


  1. https://cs.stanford.edu/people/matei/papers/2020/vldb_delta_lake.pdf ↩︎

  2. https://cs.stanford.edu/people/matei/papers/2021/cidr_lakehouse.pdf ↩︎