服务粉丝

我们一直在努力
当前位置:首页 > 财经 >

使用 Bucket Index 加速Apache Hudi 写入

日期: 来源:ApacheHudi收集编辑:Sivabalan


Apache Hudi 在写入路径上使用索引[1]来检测更新与插入,并将更新确定性地路由到同一文件组。Hudi 支持开箱即用的不同索引选项,如 bloom、simple、hbase、bucket、global_bloom、global_simple 等。我们将讨论 Apache Hudi 中的 bucket 索引以及它与其他索引的不同之处。

写入流程

这是一批数据摄取到 Hudi 的关键写入步骤。

关键阶段之一是上述所有阶段中的索引阶段。大多数情况下,索引查找将决定写入延迟,因为每个其他阶段都是合理限制或确定的。但是索引延迟取决于很多因素,例如表中的总数据、正在摄取的数据、分区与非分区、常规索引与全局索引、更新传播、记录关键时间特征等。所以,我们经常看到工程师/开发人员花时间尝试减少索引查找时间。

桶索引(Bucket Index)

与 Hudi 支持的所有其他索引相比,桶索引非常特殊。每个其他索引都有某种索引方式,索引查找涉及查找索引元数据和推断记录位置。然而在桶索引的情况下,它的记录键的哈希值或基于 Hudi 确定记录所在位置的某些列。其实我们可以只命名这个 StaticHashIndex 而不是 BucketIndex。无论如何计算哈希是 O(1) 并没有任何 IO操作,因此节省了写入期间索引所需的时间。

桶索引的唯一缺点是每个分区的桶数必须预先为给定的表定义。例如当启动一个新表时可以为每个分区定义 16 个桶,Hudi 将为表中的每个分区分配 16 个文件组。因此传入的记录通过 16 散列到 mod,然后路由到相应的文件组。每个文件组的写句柄将推断出插入/更新,并将基于此合并记录。

性能对比

进行了一个非常小规模的实验测试 Bloom索引和桶索引的区别。数据集特征:

  • • 总大小为 7GB,约 1300 万条记录,平均分布 10 个分区。

  • • 更新插入:抽取总批次的 50% 并尝试更新插入。

  • • upsert(第二次提交)比较 bloom 索引和桶索引的总写入延迟。

如下是两者的 Spark UI。使用桶索引可以明显地看到索引查找不涉及任何阶段,而对于 Bloom 索引可以看到其中有几个阶段/作业用于索引标记。Bloom索引 Spark UI

桶索引 Spark UI

Bloom索引代码

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.common.model.HoodieRecord

val tableName = "hudi_bloom_index"
val basePath = $TARGET_LOCATION
val inputPath = $INPUT_LOCATION // with parquet dataset as input. 

val df = spark.read.format("parquet").load(inputPath)
df.cache

df.write.format("hudi").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(PARTITIONPATH_FIELD_OPT_KEY, "partition").
option(RECORDKEY_FIELD_OPT_KEY, "key").
option("hoodie.table.name", tableName).
option("hoodie.metadata.enable","false").
option("hoodie.datasource.write.operation","insert").
mode(Overwrite).
save(basePath)

// upsert 50% of same batch. 

df.sample(0.5).write.format("hudi").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(PARTITIONPATH_FIELD_OPT_KEY, "partition").
option(RECORDKEY_FIELD_OPT_KEY, "key").
option("hoodie.table.name", tableName).
option("hoodie.metadata.enable","false").
mode(Append).
save(basePath)

注意:在 EMR 中 默认使用 Bloom 索引。本地测试默认索引是SIMPLE索引。

桶索引代码

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.common.model.HoodieRecord

val tableName = "hudi_bucket_index"
val basePath = $TARGET_LOCATION
val inputPath = $INPUT_LOCATION // with parquet dataset as input. 

val df = spark.read.format("parquet").load(inputPath)
df.cache

df.write.format("hudi").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(PARTITIONPATH_FIELD_OPT_KEY, "partition").
option(RECORDKEY_FIELD_OPT_KEY, "key").
option("hoodie.table.name", tableName).
option("hoodie.metadata.enable","false").
option("hoodie.index.type","BUCKET").
option("hoodie.index.bucket.engine","SIMPLE").
option("hoodie.storage.layout.partitioner.class","org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner").
option("hoodie.bucket.index.num.buckets","12").
option("hoodie.datasource.write.operation","insert").
mode(Overwrite).
save(basePath)

Upsert 50% of records:

df.sample(0.5).write.format("hudi").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(PARTITIONPATH_FIELD_OPT_KEY, "partition").
option(RECORDKEY_FIELD_OPT_KEY, "key").
option("hoodie.table.name", tableName).
option("hoodie.metadata.enable","false").
option("hoodie.index.type","BUCKET").
option("hoodie.index.bucket.engine","SIMPLE").
option("hoodie.storage.layout.partitioner.class","org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner").
option("hoodie.bucket.index.num.buckets","12").
mode(Append).
save(basePath)

如果更喜欢使用桶索引,这些是要设置的配置

option("hoodie.index.type","BUCKET").
option("hoodie.index.bucket.engine","SIMPLE").
option("hoodie.storage.layout.partitioner.class","org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner").
option("hoodie.bucket.index.num.buckets","12")

注意:如果更喜欢对表使用桶索引,则必须重新开始。不能变更索引,如从 Bloom 切换到桶索引。

结论

如果用例非常适合存储桶索引,则可以大大加快写入延迟,可以选择任何列进行散列,如果没有,主键将用于散列。

推荐阅读

探索Apache Hudi核心概念 (4) - Clustering

探索Apache Hudi核心概念 (3) - Compaction

探索Apache Hudi核心概念 (2) - File Sizing

干货 I 字节跳动基于 Apache Hudi 的数据湖实战解析

探索Apache Hudi核心概念 (1) - File Layouts

引用链接

[1] 索引: [https://hudi.apache.org/docs/indexing](https://hudi.apache.org/docs/indexing)


相关阅读

  • 世界读书日 | 想推荐一本书,但无法用标题描述

  • 每年的世界读书日都是出版界最大的活动节日,大家都会拿出磨刀霍霍向猪羊的架势推荐自己的宝贝图书,小北也不例外。但今年我们出了一本不止于“读”的书,它还可以用来听、用来玩
  • 隐私计算九问!涉及断直连、ChatGPT、数据开放

  • 作者 | 楚济慈 来源 | 零壹财经脱离数据安全谈数字经济的发展是虚妄的,且不能高效流通数据的数字经济是僵死的。安全与流通,两者的平衡,仅仅通过制度规定还不足以实现。技术发

热门文章

  • “复活”半年后 京东拍拍二手杀入公益事业

  • 京东拍拍二手“复活”半年后,杀入公益事业,试图让企业捐的赠品、家庭闲置品变成实实在在的“爱心”。 把“闲置品”变爱心 6月12日,“益心一益·守护梦想每一步”2018年四
  • 美国对华2000亿关税清单,到底影响有多大?

  • 1 今天A股大跌,上证最大跌幅超过2%。直接导火索是美国证实计划对华2000亿美元产品加征25%关税。 听起来,2000亿美元数目巨大,我们来算笔账。 2000亿美元,按现在人民币汇率

最新文章

  • 美丽的日照等你来|精致“绣绘”城市

  • 我市创新工作思维,下足“绣花”功夫,打造有质感、有颜值、有温度的精致城市。日照西综合客运站是我市重要的交通集散站,也是各地旅客对日照的第一印象。城市管理员费翔每天都在
  • 使用 Bucket Index 加速Apache Hudi 写入

  • Apache Hudi 在写入路径上使用索引[1]来检测更新与插入,并将更新确定性地路由到同一文件组。Hudi 支持开箱即用的不同索引选项,如 bloom、simple、hbase、bucket、global_bloo
  • 看完车展,我终于知道为啥宁德时代能叫宁王了。

  • 还记得上海车展开始前,脖子哥就说过肯定会是一幅神仙打架的景象吗。几天看下来,我还是保守了。不光是新车新技术一个接一个,各种大瓜和热搜也是一环扣一环,真的比看电视剧都有意