服务粉丝

我们一直在努力
当前位置:首页 > 财经 >

Apache Hudi 负载类Payload使用案例剖析

日期: 来源:ApacheHudi收集编辑:Siva



在 Hudi 中可以根据业务场景为 Hudi 表配置负载类Payload,它用于在更新期间合并同一记录的两个版本。本文将深入了解有效负载类的用途以及可以使用的所有不同方式。配置:hoodie.datasource.write.payload.class[1]

注意:对于新的记录合并API[2] ,这些可能会发生变化。因此此有效负载类详细信息适用于 Hudi 0.13.0 之前的所有版本。未来的版本可能会弃用这一点。

Payload类

Hudi 有一个有效负载类接口,它将确定如何将同一记录的两个版本合并在一起。核心方法如下:

/**
* This methods lets you write custom merging/combining logic to produce new values as a function of current value on storage and whats contained
* in this object. Implementations can leverage properties if required.
* <p>
* eg:
* 1) You are updating counters, you may want to add counts to currentValue and write back updated counts
* 2) You may be reading DB redo logs, and merge them with current image for a database row on storage
* </p>
*
* @param currentValue Current value in storage, to merge/combine this payload with
* @param schema Schema used for record
* @param properties Payload related properties. For example pass the ordering field(s) name to extract from value in storage.
* @return new combined/merged value to be written back to storage. EMPTY to skip writing this record.
*/
Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties properties) throws IOException;

Hudi 在内部将一条记录表示为 HoodieRecord,它由一对 HoodieKey 和 HoodieRecordPayload 组成。正如我们在之前的博客中看到的,HoodieKey 代表一条记录的主键(通常是分区路径和记录键)。HoodieRecordPayload是用户实际传入的数据。

让我们来看一个典型的例子。在 commit1 中摄取了 2 条记录,即 {HK1, payload1_1} 和 {HK2, payload2_1}。在 commit2 中,假设摄取 {HK1, payload1_2} 和 {HK3, payload3_1}。

由于更新了 HK1,Hudi 将合并两个有效载荷(payload1_1 和 payload1_2 以产生 HK1 的最终输出。这就是上面显示的 combineAndGetUpdateValue() 发挥作用的地方。

本质上,HK1.payload1_2.combineAndGetUpdateValue(HK1.payload1_1) 在 commit2 结束时推导出 HK1 的最终值。

在这种情况下,让我们深入研究 Hudi 提供的一些有效负载实现。默认负载类称为 OverwriteWithLatestAvroPayload。

OverwriteWithLatestAvroPayload

正如名称[3]所暗示的那样,当使用此有效负载类时,我们只需使用新的传入值覆盖任何现有值。因此,在上述示例中,一旦 commit2 完成,payload1_2 将成为 HK1 的最终值。这是 Hudi 提供的最简单的有效负载,并且对社区中的大多数用户来说效果很好。

DefaultHoodieRecordPayload

我们还有一个名为 DefaultHoodieRecordPayload[4] 的负载类。与 Hudi 一开始就提供的 OverwriteWithLatestAvroPayload 相比,这个 DefaultHoodieRecordPayload 是在 1.5 年前引入的。让我们深入了解一下这个负载类的特殊之处。

一般来说,Hudi表可以配置preCombine[5]字段。简而言之 preCombine 字段用于解决同一批次中同一记录的两个版本之间的优胜者。例如,如果在写入 Hudi 时在同一批次中摄取 {HK1, payload1_1} 和 {HK1, payload1_2},Hudi 将在内部路由之前对传入记录进行去重。因此在这种情况下,preCombine 字段值将决定多个版本中的获胜者。

例如可以在表schema中选择“updated_at”字段作为 preCombine 字段。因此,如果传入批次中有超过 1 条具有相同 HoodieKey 的记录,则具有较高 preCombine 值的记录将优先。

尽管 OverwriteWithLatestAvroPayload 和 DefaultHoodieRecordPayload 可能看起来很相似,但有一个关键区别。这是 combineAndGetUpdateValue() 的实现方式。DefaultHoodieRecordPayload 在将传入记录与存储中的记录合并时也遵循 preCombine 值,而 OverwriteWithLatestAvroPayload 将盲目地选择传入而不是存储中的任何内容。

让我们添加带有插入记录(HK3,以及 HK1 的更新值)的 commit2。

OverwriteWithLatestAvroPayload 和 DefaultHoodieRecordPayload 都用 payload1_2 更新了 HK1。OverwriteWithLatestAvroPayload 始终选择较新的传入,因此选择了 payload1_2。DefaultHoodieRecordPayload 根据 preCombine 字段值推导。由于 payload1_2 的预组合字段值(20)高于 payload1_1 的预组合字段值(10),DefaultHoodieRecordPayload 也选择 payload1_2 作为 HK1 的最终快照。

现在让我们使用 commit3,它使用较低的 preCombine 值更新 HK1 以模拟迟到的数据。

OverwriteWithLatestAvroPayload 选择新的传入有效负载而不考虑 preCombine 值,因此它选择 payload1_3 作为 HK1 的最终值。但 DefaultHoodieRecordPayload 根据 preCombine 值选择最终获胜者,因此它选择 payload1_2 作为 HK1 的最终快照值。

社区有其他有效负载类供使用,如 OverwriteNonDefaultsWithLatestAvroPayload[6]、AWSDmsAvroPayload[7]、MySqlDebeziumAvroPayload[8]、PostgresDebeziumAvroPayload[9] 等。

还可以自定义合并两个版本的记录的负载类,为 lakehouse 用户提供了极大的灵活性。如果不是 SparkSQL 写入(MERGE INTO),没有多少系统能给你这种灵活性,但 Hudi 用户从一开始就享受它

结论

因为不同用例的场景不同,Hudi 支持Payload方式提供灵活性,有效负载类就是这样一种设计,可以根据自己的需求定义自己的 Payload 类,而不是局限于 Hudi 提供的 Payload。希望这篇博客有助于理解有效负载类的用途、常用的有效负载实现。

推荐阅读

Apache Hudi 流转批 场景实践

流利说基于Apache Hudi构建实时数仓的实践

年度合集!Apache Hudi 技术文章一次看个够

基于Apache Hudi 构建Serverless实时分析平台

阿里云ADB基于Hudi构建Lakehouse的实践

引用链接

[1] hoodie.datasource.write.payload.class: [https://hudi.apache.org/docs/configurations/#hoodiedatasourcewritepayloadclass](https://hudi.apache.org/docs/configurations/#hoodiedatasourcewritepayloadclass)
[2] 新的记录合并API: [https://github.com/apache/hudi/blob/master/rfc/rfc-46/rfc-46.md](https://github.com/apache/hudi/blob/master/rfc/rfc-46/rfc-46.md)
[3] 名称: [https://github.com/apache/hudi/blob/a70355f44571036d7f99b3ca3cb240674bd1cf91/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java](https://github.com/apache/hudi/blob/a70355f44571036d7f99b3ca3cb240674bd1cf91/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java)
[4] DefaultHoodieRecordPayload: [https://github.com/apache/hudi/blob/a70355f44571036d7f99b3ca3cb240674bd1cf91/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java](https://github.com/apache/hudi/blob/a70355f44571036d7f99b3ca3cb240674bd1cf91/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java)
[5] preCombine[https://medium.com/@simpsons/managing-duplicates-with-apache-hudi-569d44b76ab7](https://medium.com/@simpsons/managing-duplicates-with-apache-hudi-569d44b76ab7)
[6] OverwriteNonDefaultsWithLatestAvroPayload: [https://github.com/apache/hudi/blob/a70355f44571036d7f99b3ca3cb240674bd1cf91/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java](https://github.com/apache/hudi/blob/a70355f44571036d7f99b3ca3cb240674bd1cf91/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java)
[7] AWSDmsAvroPayload: [https://github.com/apache/hudi/blob/a70355f44571036d7f99b3ca3cb240674bd1cf91/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java](https://github.com/apache/hudi/blob/a70355f44571036d7f99b3ca3cb240674bd1cf91/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java)
[8] MySqlDebeziumAvroPayload: [https://github.com/apache/hudi/blob/a70355f44571036d7f99b3ca3cb240674bd1cf91/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/MySqlDebeziumAvroPayload.java](https://github.com/apache/hudi/blob/a70355f44571036d7f99b3ca3cb240674bd1cf91/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/MySqlDebeziumAvroPayload.java)
[9] PostgresDebeziumAvroPayload: [https://github.com/apache/hudi/blob/a70355f44571036d7f99b3ca3cb240674bd1cf91/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/PostgresDebeziumAvroPayload.java](https://github.com/apache/hudi/blob/a70355f44571036d7f99b3ca3cb240674bd1cf91/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/PostgresDebeziumAvroPayload.java)


相关阅读

  • 写了个工具,CRUD 开发效率直接提升100倍!

  • 最近在做一个项目,需要新建20多张表相信大多数同学应该和我一样,都是比较讨厌创建新表这个工作的,因为每创建一张表,都要去创建实体类、创建增删改查的接口、编写增删改查的SQL
  • 智慧型农业技术对农业种植业的影响

  • 本文收录于《农业信息化》2022年第10期,目次13摘 要:在中国现代农业发展过程中,通过运用智慧型农业技术,能够充分提升农业种植的效果,在提升农产品的产量和质量的同时,还能够进一
  • 聊聊选择字段在 B 端系统的意义

  • 选择对于 B 端系统来说究竟意味着什么?如果将下图两个组件摆在设计师面前,它们唯一的差别便是 一个有着右侧的下拉箭头、一个右侧没有下拉箭头。当听到了这种解释时,我也就只能
  • 黑龙江哈尔滨开展百天巡山清套专项行动

  •   为切实保护野生动物栖息地安全,有效震慑和遏制乱捕滥猎野生动物违法犯罪行为,近期,哈尔滨市林业和草原局精心组织,严密部署,采取有效措施,开展为期100天的巡山清套专项行动。
  • 李沧区召开城市项目建设专题调度会

  • 2月9日下午,李沧区召开城市项目建设专题调度会议,听取重点项目推进情况汇报。区委书记张友玉主持会议并讲话,区委副书记、区长魏瑞雪作工作部署。张友玉强调,项目建设是推动经
  • 关于开展注册消防工程师期满延续注册的通知

  • 关于开展注册消防工程师期满延续注册的通知各单位注册消防工程师:
    根据《注册消防工程师管理规定》(公安部令第143号)和《关于印发注册消防工程师制度暂行规定》(人社部发〔2012

热门文章

  • “复活”半年后 京东拍拍二手杀入公益事业

  • 京东拍拍二手“复活”半年后,杀入公益事业,试图让企业捐的赠品、家庭闲置品变成实实在在的“爱心”。 把“闲置品”变爱心 6月12日,“益心一益·守护梦想每一步”2018年四

最新文章

  • Apache Hudi 负载类Payload使用案例剖析

  • 在 Hudi 中可以根据业务场景为 Hudi 表配置负载类Payload,它用于在更新期间合并同一记录的两个版本。本文将深入了解有效负载类的用途以及可以使用的所有不同方式。配置:hoodi
  • Flink SQL的行级权限解决方案及源码解读

  • flink-sql-securityFlinkSQL的行级权限解决方案及源码,支持面向用户级别的行级数据访问控制,即特定用户只能访问授权过的行,隐藏未授权的行数据。此方案是实时领域Flink的解决
  • Apache Hudi 0.13.0版本重磅发布!

  • Apache Hudi 0.13.0 版本引入了许多新功能,包括 Metaserver[1]、变更数据捕获[2]、新的 Record Merge API[3]、Deltastreamer支持新数据源[4]等。虽然此版本不需要表版本升级
  • 数据集成Zero-ETL的未来

  • AWS Re:Invent的Zero-ETL方案消除了对 ETL 需求,Snowflake 通过他们的混合表以及与 Salesforce 的合作伙伴关系也宣布了这一点。对 "零ETL" 的命名有点异议,从表面上看所描述