Delta技术原理及其在eBay的实践

本文PPT,在微信公众号「DataFunSummit」,回复「20220326」领取


导读:大家好,我今天主要分享Delta 技术的基本原理以及一些在eBay的改造和实践。首先做个自我介绍,我是朱锋,2021年4月份加入eEay,参与Delta Lake 和Spark的开发和维护工作,之前4年是在腾讯,先后参加了TDW Hive, Spark SQL 和联邦分析引擎SuperSQL的设计和开发。所以个人的技术方向和兴趣主要集中在大数据分析、计算引擎和查询优化。

今天的介绍主要涉及三个部分:

  • 简单回顾Delta Lake的产生背景和基本原理
  • Delta Lake 在 eBay 内部应用的项目背景
  • eBay内部对开源Delta Lake的改造和实践

01

Delta Lake原理

1. Delta Lake技术原理 - Lakehouse架构

在2020年和2021年的VLDB 和CIDR这两个数据库学术会议上,Databricks提出Lake + house 的概念,用来解决数据存储和数据湖中的一些架构问题。第一代平台是传统数仓,用来解决结构化数据分析,经常使用的场景是用户通过ETL工具,例如工具Kettle,把结构化数据进行ETL,然后加载到Data Warehouse,这种架构依赖DW本身的能力来提供BI分析和报表服务。这种架构的优点是方便,但是缺少灵活性,扩展代价昂贵。

随着大数据框架和存储系统演进,Data Lake(数据湖)的角色开始凸显,数据湖除了结构化数据,还有半结构化和非结构化数据,支持各种计算构架,存储系统以及机器学习场景。但是这种数据湖架构依然存在问题,例如:

  • 读写有不可靠一致性问题,不同场景下写同样一份数据,会有脏读脏写等情况。
  • 数据质量不高,一些非结构化和半结构化的数据信息没有被完全抽取出来。
  • 系统与系统直接数据共享问题,因为元数据不是共通的,所以还涉及到ETL的过程。
  • 现有数据湖架构对删除和更新操作不友好。

基于这样一些问题 Lakehouse 架构应运而生。

Lakehouse顾名思义就是Lake+ house,目标是既希望有Data Lake的数据存储能力,又需要传统数仓的管控能力。大家有兴趣可以参考2021年CIDR论文:

Lakehouse: A New Generation of Open Platforms that Unify Data Warehousing and Advanced Analytics.

Delta Lake的主要思想是通过一层元数据来实现事物的处理、版本控制和跨引擎适配功能,对批处理和流处理都适用。

Delta Lake关键实现通过一层delta log也就是transaction log,来完成ACID的保证,元数据可扩展性和time travel 的数据回溯能力。

实际中,delta 数据和通用数据例如parquet的区别就是多了一层delta log,可以看到delta_log 文件夹下有不同json版本,一个json文件对应一个commit信息,json文件里记载了这次commit的具体操作(添加了哪些文件或者删除了哪些文件或者metadata 的改变)。

有了delta log,我们在读取 delta 表时,会首先在 hdfs 读取 _last_checkpoint 文件,这个文件会记录最新 checkpoint id, 图上最新 checkpoint 文件是 00003.parquet 文件,00003.parquet 记录了前面三次json文件的整合,在 03 checkpoint 基础上找到新的json文件(00004.json和00005.json),然后根据 last Checkpoint 文件 00003.parquet 和 00004.json,00005.json 中 add 和 remove 标记来 reply 出当前 snapshot 中所包含的数据文件,也就是构造 spark 读取 table scan 时对应的 file index。写数据的时候则是反过来,数据写入 delta 表时,首先将数据写入,然后生成相应json文件,如果达到设定 checkpoint 阈值。例如超过10个json,会对10个json文件做一个整合来生成新的checkpoint。

--

02

项目背景

2018年之前eBay使用Teradata,和很多企业一样,eBay面临商用数仓非常昂贵的情形。所以需求就是平滑迁移,兼容原有Teradata功能,迁移需要对用户层是无感知,性能保持一致(MPP);在数据方面,敏感核心财务数据会全部迁移到全新平台上。

图上 eBay 内部 SQL-on-Hadoop 解决方案选型 Spark SQL, 平台命名为 Carmel,2019 年底实现了对 Teradata 的完全替换。和批处理场景不同,Carmel Spark 经过优化做到了 Interactive 效应,80%查询响应时间可以在27s内完成,一些计算任务比较大的在几十分钟。Carmel 对调度模块、资源管理模块等组件做了大量优化,以及大量query的优化。用户可以通过Web页面、BI工具和自己的脚本,使用SQL访问集群。Carmel部署在SSD上,通过共有Hive Metastore来实现元数据管理。

eBay需要使用 delta 的需求很简单:用户有大量 update 和 delete 操作。基于常规表,无论是hive 还是datasource 类型都无法实现。所以面临数据湖技术选型Hudi、Iceberg 或者Delta Lake。

最终为什么选择 delta lake 的原因很简单,Delta Lake和Spark很紧密,提供的功能也更加切合Spark内部的改造。

eBay在 Delta 这块的需求开始于2019年,当时面临的一些挑战包含:

  • Delta Lake 0.4不支持SQL
  • Spark 2.3不支持Delta Lake
  • 升级到Spark 3.x时,发现不支持行级别跨表更新删除
  • Delta Lake的性能不足
  • 运维管理方面,Delta表缺乏自动化优化和管理

总的来说,2019年时Delta Lake 开源版本相对于商业版本,基本无法用于生产环境,很难提供完整功能来满足我们的场景需求。

2019年11月开始需求调研,2020年2月份初步功能上线,包含用户建表简单的增删查改和一些适配功能。全量上线在2020年5月份,增加了语法支持和性能优化。2020年12月,Carmel将Spark升级为3.0,Delta内部也进行了相应的一些升级。2021年11月Delta升级到社区版本0.8,策略上,为了稳定性,eBay Carmel 的Delta 版本一般落后于开源社区1到2个版本左右。

--

03

改造与实践

eBay的应用选型场景相对来说比较简单,主要是增删查改需求,所以对delta lake改造也是主要围绕增删查改更加高效和功能更加完备。我们的工作主要围绕三个部分:

  • 功能增强
  • 性能优化
  • 易用性管理

功能增强主要是语法层面,语法上比如最开始cross table的update和delete操作,delta lake本身支持单表的update和delete操作,但是像MySQL、PostgreSQL中常见的跨表更新,Delta本身并不支持,而我们在做Teradata迁移时,首先面临的需求就是跨表更新和删除。为了满足需求,实现了跨表和多表join 更新和删除。

Delta内实现跨表update和spark内部实现有区别,因为Delta lake内有spark injection插件机制和Sqlbase.g4 文法文件,修改文法文件时是对Sqlbase.g4中额外增加update和delete语法。

经过解析之后,重点是把解析规则和优化规则放到Delta Lake的Analysis中,通过injection插入到spark本身的Analysis中调用。实现的区别是要把update或者delete 涉及到的删除数据和增加数据进行额外处理,并把元数据提取出来。

这个过程中需要过滤掉目标表没有涉及更新的文件还有改动的文件,相当于把这些文件列表给提取出来。基于文件列表信息构建出新的TahoeFileIndex,这不同于Spark 本身Parquet常规表的Index,常规表index是把目录下所有数据文件全部列出来,而此处只是部分列出。所以Delta中语法和常规的的区别是把数据写完之后需要把信息标记出来(标记 RemoveFiles和AddedFiles)然后commit 到transaction log,也就是delta log的json文件。多了这步可以保证如果commit失败,前面标记的 RemoveFiles和AddedFiles就不生效,这些额外文件由Delta Lake Vacuum机制来负责回收。

相对于Parquet,Delta Lake是Datasource V2,有些针对V1的功能, V2是不支持的,例如dynamic, static partition和对性能很有影响的bucket,我们发现delta lake元数据中没有记载数据表的bucket信息,所以针对bucket的优化,例如避免shuffle的bucket join,在delta中没有默认支持。针对这些缺陷,我们做了增强支持。

因为有Update/Delete操作的引入,我们做了基于SQL的权限管控。另外做了小文件合并功能,包含基于整个表的compact和基于单个partition 的compact。

因为Delta提供了版本控制,能够很方便地控制版本回滚,所以在此基础上可以实现一些常规的表无法实现的Constraints。Constraints在关系数据库和数据仓库是比较常用的,例如NOT NULL, UNIQUE, PRIMARY KEY, FOREIGN KEY,CHECK,DEDAULT来控制数据完整性和辅助数据查询优化。

Spark本身是有支持NOT NULL语法,但是你会发现即使定义Parquet或者ORC 表字段为NOT NULL,这个Constraint 并不生效。原因是存储在metadata store中信息不包含NOT NULL,所以这个功能实际没有enforce。

Delta从0.8开始支持CHECK和NOT NULL是enforce的。所以我们在这个基础上把Delta 和Spark本身功能在CHECK和NOT NULL上做了统一,在spark parquet表和delta表都做了constraints enforce。

CHECK和NOT NULL Constraints 实现很简单,就是在plan tree上加了额外的检验节点。

在实际中,我们发现有些用户需求是通过unique 和primary key控制表的数据的独立性,在hive和snowflake中都有这样的功能但状态都是not-enforced。在Delta中可以做到enforce功能,也就是在平台侧可以控制。Delta本身有commit信息可以保证Constraints功能。数据插入时,在最终数据进行commit之前会进行全部数据unique功能的检测,如果检测到unique冲突,此次commit失败。但是会有性能问题,每次都涉及全表扫描,所以实际中一般不开启。除非像用户表的owner自己想控制,比如用户定义了一张表,不希望其他用户打破unique特性,并且不在乎性能影响,这时用户可以开启enforce来控制unique constraints。

其它一些比较细的点,我们增强支持一些特定功能比如where子查询,空表支持convert为delta。主要原因是相对于V1,Delta V2一些原有功能需要相应改动。还有一些metrics信息,像Row level update/delte信息需要收集,暴漏更多的metrics给spark进行优化。

Delta性能优化方面和常规场景相差不大,Delta中Update/Delete时plan生成会涉及大量Join,这样会生成过滤条件和join类型,例如out join, anti-join。在Delta中Merge into默认实现是right outer join一刀切,有些场景下可以优化,例如match情况可以,有些过滤条件可以下推到目标表,生成常规join之后再加filter生成plan,再和原有join做union操作,这样语义是等价的,但执行计划能够大大提高过滤场景,过滤无用数据。效果取决于数据本身,大概能够提高5到10倍效率。但是这个PR还没有merge 到开源社区。

其它一些场景的优化也是围绕plan,skip掉无效的数据和column,下推更多条件。

Delta表每次操作都有一个snapshot生成。线上我们经常遇到一种情景——用户定义一张表为Delta 表,当用户收到提示表空间不足时,用户删除数据,却导致数据越删越多,就是删除本身也会报警空间超出限额。但是用户不了解也不关心背后原理,以及为什么越删除空间占用越多。用户需求就是删除数据,所以我们面临存储空间优化问题。因为Delta增删改都会生成snapshot和commit文件,所以需要定期清理配置时间范围内的snapshot文件。

我们优化时取决于Carmel 部署环境,Carmel存算分离,按queue划分资源,每个queue都启动一个server,每个server共用一套Hive Metastore。我们考虑把Hive Metastore中delta表筛选出来,实现自动vacuum服务,后来发现对Hive metadata 压力比较大。

所以我们额外开发了一套服务,每个Thriftserver利用spark listener捕获事件,将Delta表的元数据异步地写入第三方存储,Carmel内部使用了MySQL,在创建和删除delta表时会把表的元信息写入MySQL里,这样通过MySQL就可以查询哪些表是Delta table, 针对MySQL信息,在列表上再做优化。即便事件存在丢失的可能,保留队列的Thriftserver仍然会使用单独的线程二次检测一张表是否为delta表,这样避免Delta表信息遗漏。同时为了方便展示,ThriftServer Delta表的Vacuum的过程,会定期展示在Spark UI页面上。

在上层,我们提供了一套command语句,通过这些命令,用户可以把表设置多少小时外的文件可以清除。也可以通过命令把Delta表转化为普通表。线程包括table校验,double check 和rename。和Delta相关的一些事件都会被监听。

通过这里的Spark UI,可以看到内部有10K delta格式数据表,UI同时展示多少张表正在做Vacuum,多少张表已经完成,多少张表跳过Vacuum以及跳过的原因。

另一个易用性改动是compact,这个是非常必须的改动,用户不希望自己管表的底层文件,也不在乎小文件,不关心小文件问题,而这是平台侧需要考虑。如果有小文件通知用户手动compact的,这样易用性不好,所以我们提供了透明table compact服务。也就是平台后台侧启动服务,定期compact。同时compact需要对用户无扰动,所以compact优先级是最低的。如果不把compact优先级降低,用户做update/delete操作,受compact影响会失败,这违背我们的初衷了。所以transparent compact最后方案是两阶段检验:首先update、delete、merge into 涉及数据改动的命令开始和结束时都会在特定的HDFS路径上写入一个flag文件,命令结束后把flag文件删除。Flag表示table正在被用。Compact在开始时会检测特定目录下是否有flag文件,如果当前有flag文件就不开始。Compact文件写入之后,commit之前也会检测flag,如果有flag,则 commit失败。所以compact是做了悄无声息的工作。

其它易用性工作,例如describe信息提升,因为是V2表,默认delta table describe像owner信息并没有,只是显示简单信息例如property。另外Analyze命令也不是太友好。在用户实际使用中,并不能像常规表一样获得相同的体验。所以我们在这方面做了一些易用性和可用性的改造。

此外,time travel在spark程序里面在Delta table上很容易,用户在SQL层面想用Time Travel Rollback到某一个version无法做到,所以我们在SQL层面进行支持,用户通过SQL语句rollback到某一个时间点上。

今天分享的内容主要是这些,因为改动和实现都是偏细节,所以有些细节没有具体展开,大家如果有兴趣,欢迎多多交流。


今天的分享就到这里,谢谢大家。

阅读更多技术干货文章、下载讲师PPT,请关注微信公众号“DataFunSummit”


分享嘉宾:朱锋博士 eBay 软件工程师

编辑整理:Will(伟宜)

出品平台:DataFunTalk


01/分享嘉宾


02/报名看直播 免费领PPT



03/关于我们

DataFun:专注于大数据、人工智能技术应用的分享与交流。发起于2017年,在北京、上海、深圳、杭州等城市举办超过100+线下和100+线上沙龙、论坛及峰会,已邀请超过2000位专家和学者参与分享。其公众号 DataFunTalk 累计生产原创文章700+,百万+阅读,14万+精准粉丝。


欢迎转载分享,转载请私信留言。

发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章