详解Apache Hudi Schema Evolution(模式演进)

详解Apache Hudi Schema Evolution(模式演进)

Schema Evolution(模式演进)允许用户轻松更改 Hudi 表的当前模式,以适应随时间变化的数据。从 0.11.0 版本开始,支持 Spark SQL(spark3.1.x 和 spark3.2.1)对 Schema 演进的 DDL 支持并且标志为实验性的。

场景

  • • 可以添加、删除、修改和移动列(包括嵌套列)

  • • 分区列不能演进

  • • 不能对 Array 类型的嵌套列进行添加、删除或操作

SparkSQL模式演进以及语法描述

使用模式演进之前,请先设置spark.sql.extensions,对于spark 3.2.x,需要设置spark.sql.catalog.spark_catalog

# Spark SQL for spark 3.1.x
spark-sql --packages org.apache.hudi:hudi-spark3.1.2-bundle_2.12:0.11.1,org.apache.spark:spark-avro_2.12:3.1.2 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

# Spark SQL for spark 3.2.1
spark-sql --packages org.apache.hudi:hudi-spark3-bundle_2.12:0.11.1,org.apache.spark:spark-avro_2.12:3.2.1 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'

启动spark app后,请执行set schema.on.read.enable=true开启模式演进

当前模式演进开启后不能关闭

添加列

语法

-- add columns
ALTER TABLE Table name ADD COLUMNS(col_spec[, col_spec ...])

参数描述

列定义,由五个字段组成,col_name, col_type, able, comment, col_position

参数描述
tableName表名
col_spec

col_name: 新列名,强制必须存在,如果在嵌套类型中添加子列,请指定子列的全路径

示例

  • • 在嵌套类型users struct中添加子列col1,设置字段为users.col1

  • • 在嵌套map类型member map>中添加子列col1, 设置字段为member.value.col1

col_type: 新列的类型

able: 新列是否可为,可为空,当前Hudi中并未使用

comment: 新列的注释,可为空

col_position: 列添加的位置,值可为FIRST或者AFTER 某字段

  • • 如果设置为FIRST,那么新加的列在表的第一列

  • • 如果设置为AFTER 某字段,将在某字段后添加新列

  • • 如果设置为空,只有当新的子列被添加到嵌套列时,才能使用 FIRST。不要在顶级列中使用 FIRST。AFTER 的使用没有限制。

示例

alter table h0 add columns(ext0 string);
alter table h0 add columns(new_col int not comment 'add new column' after col1);
alter table complex_table add columns(col_struct.col_name string comment 'add new column to a struct col' after col_from_col_struct);

修改列

语法

-- alter table ... alter column
ALTER TABLE Table name ALTER [COLUMN] col_old_name TYPE column_type [COMMENT] col_comment[FIRST|AFTER] column_name

参数描述

列名,放置目标列的新位置。例如,AFTER column_name 表示目标列放在 column_name 之后

参数描述
tableName表名
col_old_name待修改的列名
column_type新的列类型
col_comment列comment
column_name

示例

--- Changing the column type
ALTER TABLE table1 ALTER COLUMN a.b.c TYPE bigint

--- Altering other attributes
ALTER TABLE table1 ALTER COLUMN a.b.c COMMENT 'new comment'
ALTER TABLE table1 ALTER COLUMN a.b.c FIRST
ALTER TABLE table1 ALTER COLUMN a.b.c AFTER x
ALTER TABLE table1 ALTER COLUMN a.b.c DROP NOT

列类型变更兼容表

源列类型\目标列类型longfloatdoublestringdecimaldateint
intYYYYYNY
longYNYYYNN
floatNYYYYNN
doubleNNYYYNN
decimalNNNYYNN
stringNNNYYYN
dateNNNYNYN

删除列

语法

-- alter table ... drop columns
ALTER TABLE tableName DROP COLUMN|COLUMNS cols

示例

ALTER TABLE table1 DROP COLUMN a.b.c
ALTER TABLE table1 DROP COLUMNS a.b.c, x, y

修改列名

语法

-- alter table ... rename column
ALTER TABLE tableName RENAME COLUMN old_columnName TO new_columnName

示例

ALTER TABLE table1 RENAME COLUMN a.b.c TO x

修改表属性

语法

-- alter table ... set|unset
ALTER TABLE Table name SET|UNSET tblproperties

示例

ALTER TABLE table SET TBLPROPERTIES ('table_property' = 'property_value')
ALTER TABLE table UNSET TBLPROPERTIES [IF EXISTS] ('comment', 'key')

修改表名

语法

-- alter table ... rename
ALTER TABLE tableName RENAME TO newTableName

示例

ALTER TABLE table1 RENAME TO table2

0.11.0之前的模式演进

模式演进是数据管理的一个非常重要的方面。Hudi 支持开箱即用的常见模式演进场景,例如添加可为空的字段或提升字段的数据类型。此外,演进后的模式可以跨引擎查询,例如 Presto、Hive 和 Spark SQL。下表总结了与不同 Hudi 表类型兼容的Schema变更类型。

Yes意味着具有演进模式的写入成功并且写入之后的读取成功读取整个数据集



如果使用演进模式的写入仅更新了一些基本文件而不是全部,则写入成功但读取失败。目前Hudi 不维护模式注册表,其中包含跨基础文件的更改历史记录。然而如果 upsert 触及所有基本文件,则读取将成功

添加自定义可为空的 Hudi 元列,例如 _hoodie_meta_col


对于其他类型,Hudi 支持与Avro相同 Avro schema resolution[1]





对于复杂类型(map或array的值),将数据类型从 int 提升为 long


对于Spark数据源的MOR表,写入成功但读取失败。作为一种解决方法,您可以使该字段为空




Schema变更COWMOR说明
在最后的根级别添加一个新的可为空列YesYes
向内部结构添加一个新的可为空列(最后)YesYes
添加具有默认值的新复杂类型字段(map和array)YesYes
添加新的可为空列并更改字段的顺序NoNo
YesYes
将根级别字段的数据类型从 int 提升为 longYesYes
.
将嵌套字段的数据类型从 int 提升为 longYesYes
YesYes
在最后的根级别添加一个新的不可为空的列NoNo
向内部结构添加一个新的不可为空的列(最后)NoNo
将嵌套字段的数据类型从 long 更改为 intNoNo
将复杂类型的数据类型从 long 更改为 int(映射或数组的值)NoNo

让我们通过一个示例来演示 Hudi 中的模式演进支持。在下面的示例中,我们将添加一个新的字符串字段并将字段的数据类型从 int 更改为 long。

Welcometo
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.1.2
/_/

UsingScalaversion 2.12.10 (OpenJDK64-BitServerVM,Java1.8.0_292)
Typein expressions to have them evaluated.
Type:helpformore information.

scala>importorg.apache.hudi.QuickstartUtils._
importorg.apache.hudi.QuickstartUtils._

scala>importscala.collection.JavaConversions._
importscala.collection.JavaConversions._

scala>importorg.apache.spark.sql.SaveMode._
importorg.apache.spark.sql.SaveMode._

scala>importorg.apache.hudi.DataSourceReadOptions._
importorg.apache.hudi.DataSourceReadOptions._

scala>importorg.apache.hudi.DataSourceWriteOptions._
importorg.apache.hudi.DataSourceWriteOptions._

scala>importorg.apache.hudi.config.HoodieWriteConfig._
importorg.apache.hudi.config.HoodieWriteConfig._

scala>importorg.apache.spark.sql.types._
importorg.apache.spark.sql.types._

scala>importorg.apache.spark.sql.Row
importorg.apache.spark.sql.Row

scala>valtableName = "hudi_trips_cow"
tableName:String= hudi_trips_cow
scala>valbasePath = "file:///tmp/hudi_trips_cow"
basePath:String= file:///tmp/hudi_trips_cow
scala>valschema =StructType(Array(
|StructField("rowId",StringType,true),
|StructField("partitionId",StringType,true),
|StructField("preComb",LongType,true),
|StructField("name",StringType,true),
|StructField("versionId",StringType,true),
|StructField("intToLong",IntegerType,true)
| ))
schema: org.apache.spark.sql.types.StructType=StructType(StructField(rowId,StringType,true),StructField(partitionId,StringType,true),StructField(preComb,LongType,true),StructField(name,StringType,true),StructField(versionId,StringType,true),StructField(intToLong,IntegerType,true))

scala>valdata1 =Seq(Row("row_1", "part_0", 0L, "bob", "v_0", 0),
|Row("row_2", "part_0", 0L, "john", "v_0", 0),
|Row("row_3", "part_0", 0L, "tom", "v_0", 0))
data1:Seq[org.apache.spark.sql.Row] =List([row_1,part_0,0,bob,v_0,0], [row_2,part_0,0,john,v_0,0], [row_3,part_0,0,tom,v_0,0])

scala>vardfFromData1 = spark.createDataFrame(data1, schema)
scala> dfFromData1.write.format("hudi").
| options(getQuickstartWriteConfigs).
| option(PRECOMBINE_FIELD_OPT_KEY.key, "preComb").
| option(RECORDKEY_FIELD_OPT_KEY.key, "rowId").
| option(PARTITIONPATH_FIELD_OPT_KEY.key, "partitionId").
| option("hoodie.index.type","SIMPLE").
| option(TABLE_NAME.key, tableName).
| mode(Overwrite).
| save(basePath)

scala>vartripsSnapshotDF1 = spark.read.format("hudi").load(basePath + "/*/*")
tripsSnapshotDF1: org.apache.spark.sql.DataFrame= [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 9 more fields]

scala> tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot")

scala> spark.sql("desc hudi_trips_snapshot").show
+--------------------+---------+-------+
| col_name|data_type|comment|
+--------------------+---------+-------+
| _hoodie_commit_time| string| |
|_hoodie_commit_seqno| string| |
| _hoodie_record_key| string| |
|_hoodie_partition...| string| |
| _hoodie_file_name| string| |
| rowId| string| |
| partitionId| string| |
| preComb| bigint| |
| name| string| |
| versionId| string| |
| intToLong| int| |
+--------------------+---------+-------+

scala> spark.sql("select rowId, partitionId, preComb, name, versionId, intToLong from hudi_trips_snapshot").show
+-----+-----------+-------+----+---------+---------+
|rowId|partitionId|preComb|name|versionId|intToLong|
+-----+-----------+-------+----+---------+---------+
|row_3| part_0| 0| tom| v_0| 0|
|row_2| part_0| 0|john| v_0| 0|
|row_1| part_0| 0| bob| v_0| 0|
+-----+-----------+-------+----+---------+---------+

// In the new schema, we are going to add a String field and
// change the datatype `intToLong` field from int to long.
scala>valnewSchema =StructType(Array(
|StructField("rowId",StringType,true),
|StructField("partitionId",StringType,true),
|StructField("preComb",LongType,true),
|StructField("name",StringType,true),
|StructField("versionId",StringType,true),
|StructField("intToLong",LongType,true),
|StructField("newField",StringType,true)
| ))
newSchema: org.apache.spark.sql.types.StructType=StructType(StructField(rowId,StringType,true),StructField(partitionId,StringType,true),StructField(preComb,LongType,true),StructField(name,StringType,true),StructField(versionId,StringType,true),StructField(intToLong,LongType,true),StructField(newField,StringType,true))

scala>valdata2 =Seq(Row("row_2", "part_0", 5L, "john", "v_3", 3L, "newField_1"),
|Row("row_5", "part_0", 5L, "maroon", "v_2", 2L, "newField_1"),
|Row("row_9", "part_0", 5L, "michael", "v_2", 2L, "newField_1"))
data2:Seq[org.apache.spark.sql.Row] =List([row_2,part_0,5,john,v_3,3,newField_1], [row_5,part_0,5,maroon,v_2,2,newField_1], [row_9,part_0,5,michael,v_2,2,newField_1])

scala>vardfFromData2 = spark.createDataFrame(data2, newSchema)
scala> dfFromData2.write.format("hudi").
| options(getQuickstartWriteConfigs).
| option(PRECOMBINE_FIELD_OPT_KEY.key, "preComb").
| option(RECORDKEY_FIELD_OPT_KEY.key, "rowId").
| option(PARTITIONPATH_FIELD_OPT_KEY.key, "partitionId").
| option("hoodie.index.type","SIMPLE").
| option(TABLE_NAME.key, tableName).
| mode(Append).
| save(basePath)

scala>vartripsSnapshotDF2 = spark.read.format("hudi").load(basePath + "/*/*")
tripsSnapshotDF2: org.apache.spark.sql.DataFrame= [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 10 more fields]

scala> tripsSnapshotDF2.createOrReplaceTempView("hudi_trips_snapshot")

scala> spark.sql("desc hudi_trips_snapshot").show
+--------------------+---------+-------+
| col_name|data_type|comment|
+--------------------+---------+-------+
| _hoodie_commit_time| string| |
|_hoodie_commit_seqno| string| |
| _hoodie_record_key| string| |
|_hoodie_partition...| string| |
| _hoodie_file_name| string| |
| rowId| string| |
| partitionId| string| |
| preComb| bigint| |
| name| string| |
| versionId| string| |
| intToLong| bigint| |
| newField| string| |
+--------------------+---------+-------+


scala> spark.sql("select rowId, partitionId, preComb, name, versionId, intToLong, newField from hudi_trips_snapshot").show
+-----+-----------+-------+-------+---------+---------+----------+
|rowId|partitionId|preComb| name|versionId|intToLong| newField|
+-----+-----------+-------+-------+---------+---------+----------+
|row_3| part_0| 0| tom| v_0| 0| |
|row_2| part_0| 5| john| v_3| 3|newField_1|
|row_1| part_0| 0| bob| v_0| 0| |
|row_5| part_0| 5| maroon| v_2| 2|newField_1|
|row_9| part_0| 5|michael| v_2| 2|newField_1|
+-----+-----------+-------+-------+---------+---------+----------+

引用链接

[1]Avro schema resolution:http://avro.apache.org/docs/current/spec#Schema+Resolution

发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章