本文用实践的方式初步探索Delta Lake (GitHub项目地址 )表的底层数据结构。测试使用的版本为1.0.0,使用的集群为Docker自建,详见使用Docker搭建Hadoop + Hive + Spark集群(上) 。请注意使用Spark 3.1.0及以上版本的YAML搭建集群。
综述
Delta Lake 是开源的数据湖方案之一(并且在向Lakehouse架构进行探索),由Databricks开发并维护(同时有商用的闭源版本)。关于它的介绍,以及Lakehouse架构思想的详细介绍不是本文重点,感兴趣的读者可以参考Matei Zaharia的Delta Lake以及Lakehouse架构的论文。
简单来讲,现有的Data Lake + Data Warehouse架构面临着如下问题:
可靠性差:因为下游数据仓库是由数据湖中的一小部分数据进行ETL构建的,因此难以在数据湖与数据仓库之间保持数据一致性。
数据陈旧问题:下游数据仓库的数据依靠数据湖与ETL流水线,因此数据时效性差。
存储成本高:下游数据仓库相当于复制了一部分数据湖的数据,需要付出两倍的存储成本。
对高级分析的支持有限:机器学习无法直接使用数据仓库里的数据,需要将仓库的数据ETL回湖,进一步增加架构的复杂性与可维护性。
而Lakehouse可以解决这些问题,其基于低成本对象存储与开放的数据格式(例如Apache Parquet),并使用Metadata Layer实现对底层数据的SQL操作,而且增添了对ACID事务的支持。其中,Delta Lake带来了以下特性:
时间旅行(Time travel)允许用户查看任意时刻的快照。
UPSERT,DELETE与MERGE操作。
高效的流式I/O,可以支持小文件聚合或快速追加数据。
模式进化(Schema evolution)可以在schema改变之后无需重写旧数据。
审计日志(Audit logging),基于事务日志。
建表与插入数据
集群搭建好后,可以直接走官方教程 进行建表工作:
1 2 3 4 5 6 7 docker exec -it namenode hdfs dfs -mkdir /delta docker exec -it spark-master /bin/bash /spark/bin/spark-shell \ --packages io.delta:delta-core_2.12:1.0.0 \ --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
该命令会自动下载需要的jar包。你也可以去这里 下载对应的jar包上传到/spark/jars
目录下自动加载。
运行以下语句:
spark-master 1 2 val data = spark.range(0 , 5 )data.write.format("delta" ).save("hdfs://namenode:9000/delta/table" )
另开一个终端进入namenode
看一下对应的目录:
1 2 docker exec -it namenode /bin/bash hdfs dfs -ls -R /delta
目前的结构如下:
1 2 3 4 5 table |- part-00000-82953bf9-a44e-4a0f-b660-e510e518f78b-c000.snappy.parquet |- part-00001-70d079a0-60eb-4aae-b146-bb5d8b8a8b28-c000.snappy.parquet |- _delta_log |- 00000000000000000000.json
Delta Lake的表格底层文件分为数据文件(即上面的parquet文件),日志文件(即从0开始递增的json文件)与检查点文件(即checkpoint文件,与检查点建立时的日志文件共用ID,为parquet文件)三种,此外还会维护一个_last_checkpoint
方便查找最新的检查点。关于文件体系的详细架构与读写模式,详见Matei Zaharia的Delta Lake论文。这篇论文实际上详细给出了Delta Lake的详细思想、架构、设计与性能测试。
看一下目前有的log文件:
00000000000000000000.json 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 { "commitInfo" : { "timestamp" : 1625122735997 , "operation" : "WRITE" , "operationParameters" : { "mode" : "ErrorIfExists" , "partitionBy" : "[]" } , "isBlindAppend" : true , "operationMetrics" : { "numFiles" : "2" , "numOutputBytes" : "947" , "numOutputRows" : "5" } } } { "protocol" : { "minReaderVersion" : 1 , "minWriterVersion" : 2 } } { "metaData" : { "id" : "01790ccb-fc22-4df8-b201-4da997bdd3f3" , "format" : { "provider" : "parquet" , "options" : { } } , "schemaString" : "{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}" , "partitionColumns" : [ ] , "configuration" : { } , "createdTime" : 1625122732395 } } { "add" : { "path" : "part-00000-82953bf9-a44e-4a0f-b660-e510e518f78b-c000.snappy.parquet" , "partitionValues" : { } , "size" : 471 , "modificationTime" : 1625122735877 , "dataChange" : true } } { "add" : { "path" : "part-00001-70d079a0-60eb-4aae-b146-bb5d8b8a8b28-c000.snappy.parquet" , "partitionValues" : { } , "size" : 476 , "modificationTime" : 1625122735876 , "dataChange" : true } }
文件内容很详细,第一部分commitInfo
指出了本次操作是WRITE操作(ErrorIfExists),第三部分metaData
给出了表的结构,最后两部分则是指出本次操作新增了两个数据文件。我们再尝试插入一些数据:
spark-master 1 spark.range(5 , 10 ).write.format("delta" ).mode("append" ).save("hdfs://namenode:9000/delta/table" )
插入完毕,目前的目录结构如下:
1 2 3 4 5 6 7 8 table |- part-00000-82953bf9-a44e-4a0f-b660-e510e518f78b-c000.snappy.parquet |- part-00001-70d079a0-60eb-4aae-b146-bb5d8b8a8b28-c000.snappy.parquet + |- part-00000-2690c4d8-2577-4731-8284-9e80a2d1cdab-c000.snappy.parquet + |- part-00001-ef3b4586-0bfe-4830-b000-c51b3d36594f-c000.snappy.parquet |- _delta_log |- 00000000000000000000.json + |- 00000000000000000001.json
查看一下最新的日志,发现只有三项,第一项commitInfo
指出本次依旧是WRITE操作(Append),剩余两项则指向新增的两个数据文件。我们尝试一下overwrite:
spark-master 1 data.write.format("delta" ).mode("overwrite" ).save("hdfs://namenode:9000/delta/table" )
1 2 3 4 5 6 7 8 9 10 11 table |- part-00000-82953bf9-a44e-4a0f-b660-e510e518f78b-c000.snappy.parquet |- part-00001-70d079a0-60eb-4aae-b146-bb5d8b8a8b28-c000.snappy.parquet |- part-00000-2690c4d8-2577-4731-8284-9e80a2d1cdab-c000.snappy.parquet |- part-00001-ef3b4586-0bfe-4830-b000-c51b3d36594f-c000.snappy.parquet + |- part-00000-25dc4582-9526-4501-8c03-bd2c7bde1a9a-c000.snappy.parquet + |- part-00001-4620b02b-6bf8-4736-962a-6e9199e27ec6-c000.snappy.parquet |- _delta_log |- 00000000000000000000.json |- 00000000000000000001.json + |- 00000000000000000002.json
新增的日志文件也是非常清晰,一共有七项,第一项commitInfo
指出本次是WRITE操作(Overwrite),剩余六项是两项add,四项remove,对应着移除的四个数据文件与新增的两个数据文件。
建表的两种方式
Delta Lake建表有两种形式,一种是我们之前使用的指定文件路径的形式:
spark-master 1 data.write.format("delta" ).save("hdfs://namenode:9000/delta/table" )
还有一种是直接使用metastore:
spark-master 1 data.write.format("delta" ).saveAsTable("table" )
其中后一种方法可以直接使用show tables
看到,数据文件存放在hdfs://namenode:9000/user/hive/warehouse
下面。而前一种表可以通过delta.`[location]`
的方式引用。例如,通过SQL建表的方式如下:
spark-master 1 2 3 4 5 6 7 CREATE IF NOT EXISTS TABLE test ( id BIGINT ) USING DELTACREATE OR REPLACE TABLE delta.`hdfs:/ / namenode:9000 / delta/ test` ( id BIGINT ) USING DELTA
你可以用以下语句查看表的Metadata信息:
spark-master 1 2 3 spark.sql("describe delta.`hdfs://namenode:9000/delta/table`" ).show spark.sql("describe detail delta.`hdfs://namenode:9000/delta/table`" ).show spark.sql("describe history delta.`hdfs://namenode:9000/delta/table`" ).show
更新与删除数据
尝试更新表内数据:
spark-master 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import io.delta.tables._import org.apache.spark.sql.functions._import spark.implicits._val deltaTable = DeltaTable .forPath(spark, "hdfs://namenode:9000/delta/table" )deltaTable.update(Map ("id" -> expr("id + 1" ))) deltaTable.toDF.show +---+ | id| +---+ | 3 | | 4 | | 5 | | 1 | | 2 | +---+
可以看到数据已经更新,目录里一样是多了两个数据文件与00000000000000000003.json
。该日志文件有五项,表明本次操作为UPDATE,删除了两个数据文件并增加了两个数据文件。我们尝试删除一条数据:
spark-master 1 deltaTable.delete("id > 3" )
目录里多了00000000000000000004.json
日志与一个数据文件,日志文件表明做了DELETE操作,删除并增加了一个数据文件。接下来试一下upsert:
spark-master 1 2 3 4 5 6 7 8 9 :paste deltaTable.as("table" ).merge(spark.range(5 , 10 ).toDF.as("update" ), "table.id = update.id" ) .whenMatched.updateExpr(Map ("id" -> "update.id" )) .whenNotMatched.insertExpr(Map ("id" -> "update.id" )) .execute
本次操作多了6个数据文件和1个log(00000000000000000005.json
),该文件的第一部分commitInfo
内容如下:
00000000000000000005.json 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 { "commitInfo" : { "timestamp" : 1625141999920 , "operation" : "MERGE" , "operationParameters" : { "predicate" : "(table.`id` = update.`id`)" , "matchedPredicates" : "[{\"actionType\":\"update\"}]" , "notMatchedPredicates" : "[{\"actionType\":\"insert\"}]" } , "readVersion" : 4 , "isBlindAppend" : false , "operationMetrics" : { "numTargetRowsCopied" : "0" , "numTargetRowsDeleted" : "0" , "numTargetFilesAdded" : "6" , "executionTimeMs" : "7713" , "numTargetRowsInserted" : "5" , "scanTimeMs" : "4258" , "numTargetRowsUpdated" : "0" , "numOutputRows" : "5" , "numSourceRows" : "5" , "numTargetFilesRemoved" : "0" , "rewriteTimeMs" : "3424" } } }
这里很详细地列出了本次MERGE操作的参数。剩余的部分则表明增加了6个数据文件(其中1个文件并不含有实际的数据)。
更改Schema
首先尝试直接增加一列:
spark-master 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 spark.sql("alter table delta.`hdfs://namenode:9000/delta/table` add columns (id2 bigint after id)" ) import io.delta.implicits._spark.read.delta("hdfs://namenode:9000/delta/table" ).show +---+----+ | id| id2| +---+----+ | 1 |null | | 2 |null | | 6 |null | | 7 |null | | 3 |null | | 9 |null | | 5 |null | | 8 |null | +---+----+
新增的日志文件有两部分,第一部分指出本次为ADD COLUMNS操作,第二部分则给出了更新的metaData
。我们尝试进行reorder:
spark-master 1 spark.sql("alter table delta.`hdfs://namenode:9000/delta/table` change column id2 id2 bigint first" )
新增的日志文件一样有两部分,第一部分指出操作为CHANGE COLUMN,第二部分给出新的metaData
。
需要注意的是,Delta Lake不支持删除列。
时间旅行
时间旅行功能让你可以很轻易地读取表格历史数据。例如,读取最早版本的数据:
spark-master 1 2 3 4 5 6 7 8 9 10 spark.read.format("delta" ).option("versionAsOf" , 0 ).load("hdfs://namenode:9000/delta/table" ).show +---+ | id| +---+ | 2 | | 3 | | 4 | | 0 | | 1 | +---+
默认情况下,表格历史可以保存30天。你可以用以下方法获取历史信息:
spark-master 1 2 3 import io.delta.tables._val deltaTable = DeltaTable .forPath(spark, "hdfs://namenode:9000/delta/table" )val historyDF = deltaTable.history()
最后,你也可以用vacuum
函数删除不被Delta表格使用的文件。默认的保留期限是7天,你也可以传入参数(单位为小时)。该操作只会删除数据文件,而非日志文件。日志文件会在下一次建立检查点的时候进行自动清理(大于30天)。
在默认情况下,你不可以给vacuum
函数传入小于168小时(7天)的时间,除非设置spark.databricks.delta.retentionDurationCheck.enabled = false
。我们用Ctrl + D
退出并用新设置重启一下spark-shell
:
spark-master 1 2 3 4 5 6 /spark/bin/spark-shell \ --packages io.delta:delta-core_2.12:1.0.0 \ --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \ --conf "spark.databricks.delta.retentionDurationCheck. enabled=false"
spark-master 1 2 3 4 5 import io.delta.tables._val deltaTable = DeltaTable .forPath(spark, "hdfs://namenode:9000/delta/table" )deltaTable.vacuum(0 ) Deleted 7 files and directories in a total of 1 directories.
运行完毕,发现当前的数据文件仅剩下了最新的保留有当前数据的8个文件,其余文件全部删除了。日志文件没有减少,但会在下一个检查点进行清理(大于30天)。
检查点
我们最后做一些改动,看一下检查点文件:
spark-master 1 2 3 4 deltaTable.delete("id > 8" ) deltaTable.delete("id > 7" ) deltaTable.delete("id > 1" ) deltaTable.vacuum(0 )
此时多了00000000000000000010.checkpoint.parquet
与last_checkpoint
,后者包含的内容很简单:{"version":10,"size":20}
。
用spark-shell
看一下前者的内容:
spark-master 1 2 3 val sc = spark.sqlContextval cp = sc.read.parquet("hdfs://namenode:9000/delta/table/_delta_log/00000000000000000010.checkpoint.parquet" )cp.printSchema
结果如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 root |-- txn: struct (nullable = true) | |-- appId: string (nullable = true) | |-- version: long (nullable = true) | |-- lastUpdated: long (nullable = true) |-- add: struct (nullable = true) | |-- path: string (nullable = true) | |-- partitionValues: map (nullable = true) | | |-- key: string | | |-- value: string (valueContainsNull = true) | |-- size: long (nullable = true) | |-- modificationTime: long (nullable = true) | |-- dataChange: boolean (nullable = true) | |-- tags: map (nullable = true) | | |-- key: string | | |-- value: string (valueContainsNull = true) | |-- stats: string (nullable = true) |-- remove: struct (nullable = true) | |-- path: string (nullable = true) | |-- deletionTimestamp: long (nullable = true) | |-- dataChange: boolean (nullable = true) | |-- extendedFileMetadata: boolean (nullable = true) | |-- partitionValues: map (nullable = true) | | |-- key: string | | |-- value: string (valueContainsNull = true) | |-- size: long (nullable = true) | |-- tags: map (nullable = true) | | |-- key: string | | |-- value: string (valueContainsNull = true) |-- metaData: struct (nullable = true) | |-- id: string (nullable = true) | |-- name: string (nullable = true) | |-- description: string (nullable = true) | |-- format: struct (nullable = true) | | |-- provider: string (nullable = true) | | |-- options: map (nullable = true) | | | |-- key: string | | | |-- value: string (valueContainsNull = true) | |-- schemaString: string (nullable = true) | |-- partitionColumns: array (nullable = true) | | |-- element: string (containsNull = true) | |-- configuration: map (nullable = true) | | |-- key: string | | |-- value: string (valueContainsNull = true) | |-- createdTime: long (nullable = true) |-- protocol: struct (nullable = true) | |-- minReaderVersion: integer (nullable = true) | |-- minWriterVersion: integer (nullable = true) +----+--------------------+--------------------+--------------------+--------+ | txn| add| remove| metaData|protocol| +----+--------------------+--------------------+--------------------+--------+ |null| null|{part-00001-2af43...| null| null| |null| null|{part-00045-7cd87...| null| null| |null|{part-00000-0a9d5...| null| null| null| |null| null|{part-00011-20467...| null| null| |null| null|{part-00000-2690c...| null| null| |null| null|{part-00000-82953...| null| null| |null|{part-00000-29909...| null| null| null| |null| null|{part-00000-87315...| null| null| |null| null|{part-00000-25dc4...| null| null| |null| null|{part-00049-76d6a...| null| null| |null| null|{part-00068-6a737...| null| null| |null| null|{part-00001-70d07...| null| null| |null| null|{part-00001-ef3b4...| null| null| |null| null|{part-00000-fd0cc...| null| null| |null|{part-00000-e098b...| null| null| null| |null|{part-00000-e086a...| null| null| null| |null| null| null| null| {1, 2}| |null| null| null|{01790ccb-fc22-4d...| null| |null| null|{part-00001-4620b...| null| null| |null| null|{part-00116-314ea...| null| null| +----+--------------------+--------------------+--------------------+--------+
可以看到检查点文件就是日志文件的汇总,同时也做了一部分精简。
至此我们简单探索了Delta Lake 1.0.0中表的底层数据结构。更详细的架构请参考文末的两篇论文,并请以官方文档为准。