服务粉丝

我们一直在努力
当前位置:首页 > 财经 >

Apache Hudi 流转批 场景实践

日期: 来源:ApacheHudi收集编辑:forwardxu



背景

在某些业务场景下,我们需要一个标志来衡量hudi数据写入的进度,比如:Flink 实时向 Hudi 表写入数据,然后使用这个 Hudi 表来支持批量计算并通过一个 flag 来评估它的分区数据是否完整从而进一步写入分区数据进行分区级别的ETL,这也就是我们通常说的流转批

EventTime计算原理

图中Flink Sink包含了两个算子。第一个writer 算子,它负责把数据写入文件,writer在checkpoint触发时,会把自己写入的最大的一个时间传到commit算子中,然后commit算子从多个上游传过来的时间中选取一个最小值作为这一批提交数据的时间,并写入HUDI表的元数据中。

社区相关工作参考: https://issues.apache.org/jira/browse/HUDI-5095

案例使用

我们的方案是将这个进度值(EventTime)存储为 hudi 提交(版本)元数据的属性里,然后通过访问这个元数据属性获取这个进度值。在下游的批处理任务之前加一个监控任务去监控最新快照元数据。如果它的时间已经超过了当前的分区时间,就认为这个表的数据已经完备了,这个监控任务就会成功触发下游的批处理任务进行计算,这样可以防止在异常场景下数据管道或者批处理任务空跑的情况。

下图是一个flink 1分钟级别入库到HUDI ODS表, 然后通过流转批计算写入HUDI DWD表的一个执行过程。

US调度系统轮询逻辑

如何解决乱序到来问题,  我们可以通过设置spedGapTime来设置允许延迟到来的范围默认是0 不会延迟到来。

Maven pom 依赖

针对此功能特性的Hudi依赖版本如下


<dependencies>
  <dependency>
    <groupId>org.apache.hudi</groupId>
    <artifactId>hudi-flink1.13-bundle</artifactId>
    <version>0.12.1</version>
  </dependency>
</dependencies>

<dependencies>
  <dependency>
    <groupId>org.apache.hudi</groupId>
    <artifactId>hudi-flink1.15-bundle</artifactId>
    <version>0.12.1</version>
  </dependency>
</dependencies>

如何设置EventTime

能够解析的字段类型及格式如下:

类型示例
TIMESTAMP(3)2012-12-12T12:12:12
TIMESTAMP(3)2012-12-12 12:12:12
DATE2012-12-12
BIGINT100L
INT100

Flink API

用户只需要设置flink conf指定时间字段作为时间推进字段

Map<String, String> options = new HashMap<>();
// 这里省略其他表字段
options.put(FlinkOptions.EVENT_TIME_FIELD.key(), "ts");
HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable)
     .column("id int not null")
     .column("ts string")
     .column("dt string")
     .pk("id")
     .partition("dt")
     .options(options);

Flink SQL

通过设置hoodie.payload.event.time.field指定需要计算的eventtime的字段

create table hudi_cow_01(\n" +
"  uuid varchar(20),\n" +
"  name varchar(10),\n" +
"  age int,\n" +
"  ts timestamp(3),\n" +
"  PRIMARY KEY(uuid) NOT ENFORCED\n" +
")\n" +
" with (\n" +
 // 这里省略其他参数
"  'hoodie.payload.event.time.field' = 'ts'\n"
")

如何读取EventTime

Spark SQL

call show_commit_extra_metadata(table => 'hudi_tauth_test.hudi_cow_01', metadata_key => 'hoodie.payload.event.time.field');

Java API

代码获取片段如下

Option<HoodieCommitMetadata> commitMetadataOption = MetadataConversionUtils.getHoodieCommitMetadata(metaClient, currentInstant);
if (!commitMetadataOption.isPresent()) {
    throw new HoodieException(String.format("Commit %s not found commitMetadata in Commits %s.", currentInstant, timeline));
}
// 获取到当前版本的时间进度
String eventTime = commitMetadataOption.get().getExtraMetadata().get(FlinkOptions.EVENT_TIME_FIELD.key());
System.out.println("current eventTime: " + eventTime);

输出结果如下

current eventTime: 1667971364742

推荐阅读

Apache Hudi 背后商业公司Onehouse宣布2500万美元A轮融资

流利说基于Apache Hudi构建实时数仓的实践

年度合集!Apache Hudi 技术文章一次看个够

基于Apache Hudi 构建Serverless实时分析平台

阿里云ADB基于Hudi构建Lakehouse的实践


相关阅读

  • Apache Hudi 0.13.0版本重磅发布!

  • Apache Hudi 0.13.0 版本引入了许多新功能,包括 Metaserver[1]、变更数据捕获[2]、新的 Record Merge API[3]、Deltastreamer支持新数据源[4]等。虽然此版本不需要表版本升级
  • 数据集成Zero-ETL的未来

  • AWS Re:Invent的Zero-ETL方案消除了对 ETL 需求,Snowflake 通过他们的混合表以及与 Salesforce 的合作伙伴关系也宣布了这一点。对 "零ETL" 的命名有点异议,从表面上看所描述
  • chatgpt解答智慧城市。

  • chatgpt横空出世,风卷全球,这个大咖,如何解答智慧城市呢?chatgpt如何支持智慧城市?中国智慧城市论坛组织了和chatgpt的对话,供业界参考,如何发挥chatgpt的价值,提升智慧城市的效能。
  • chatgpt为公共数据运营献计献策。

  • 公共数据运营,2023年已经得到了普遍的关注,数据要素价值实现,数据交易所的发展,都需要公共数据运营的大力支持。日前,一向走在全国前面的深圳市明确了政数局牵头组织公共数据授权
  • 公共数据授权运营,杭州来啦!

  • 日前杭州市发布了《杭州市公共数据授权运营实施方案(试行)》(以下简称《方案》)(征求意见稿)。这标志着,杭州市加入了公共数据授权运营的城市。杭州市提出2023年的工作是打基
  • 中金|宏观数据建模应用手册

  • 摘要①如何规避宏观数据建模时可能存在的错误、②如何对宏观数据进行必要的清洗和处理、③如何优化宏观数据的建模质量、以及④宏观数据在投资决策中有哪些应用场景,是投资者
  • 巧妙的看空话术

  • 周末社融数据公布后,几乎所有卖方宏观、策略分析师的观点都是:看好成长方向获取超额收益。这话术比较巧妙,赌狗我来翻译一下:成长能不能涨不知道,但应该比大盘价值类跌得少。周末

热门文章

  • “复活”半年后 京东拍拍二手杀入公益事业

  • 京东拍拍二手“复活”半年后,杀入公益事业,试图让企业捐的赠品、家庭闲置品变成实实在在的“爱心”。 把“闲置品”变爱心 6月12日,“益心一益·守护梦想每一步”2018年四

最新文章

  • Apache Hudi 流转批 场景实践

  • 背景在某些业务场景下,我们需要一个标志来衡量hudi数据写入的进度,比如:Flink 实时向 Hudi 表写入数据,然后使用这个 Hudi 表来支持批量计算并通过一个 flag 来评估它的分区数据
  • Apache Hudi 负载类Payload使用案例剖析

  • 在 Hudi 中可以根据业务场景为 Hudi 表配置负载类Payload,它用于在更新期间合并同一记录的两个版本。本文将深入了解有效负载类的用途以及可以使用的所有不同方式。配置:hoodi
  • Flink SQL的行级权限解决方案及源码解读

  • flink-sql-securityFlinkSQL的行级权限解决方案及源码,支持面向用户级别的行级数据访问控制,即特定用户只能访问授权过的行,隐藏未授权的行数据。此方案是实时领域Flink的解决
  • Apache Hudi 0.13.0版本重磅发布!

  • Apache Hudi 0.13.0 版本引入了许多新功能,包括 Metaserver[1]、变更数据捕获[2]、新的 Record Merge API[3]、Deltastreamer支持新数据源[4]等。虽然此版本不需要表版本升级
  • 数据集成Zero-ETL的未来

  • AWS Re:Invent的Zero-ETL方案消除了对 ETL 需求,Snowflake 通过他们的混合表以及与 Salesforce 的合作伙伴关系也宣布了这一点。对 "零ETL" 的命名有点异议,从表面上看所描述