Spark + iceberg的API

由于spark版本问题,所以使用 iceberg-api操作创建表。

IcebergApi .java

package org.example;

import org.apache.hadoop.conf.Configuration;

import org.apache.iceberg.Schema;

import org.apache.iceberg.Table;

import org.apache.iceberg.catalog.Catalog;

import org.apache.iceberg.catalog.TableIdentifier;

import org.apache.iceberg.hive.HiveCatalog;

import org.apache.iceberg.types.Types;

import org.apache.spark.SparkConf;

import org.apache.spark.sql.SparkSession;

public class IcebergApi {

public static Configuration getProperties() {

System.out.println("start:-----");

SparkSession spark = SparkSession.builder().config(

new SparkConf().setAppName("IcebergApi")).enableHiveSupport().getOrCreate();

System.out.println("spark: " + spark);

Configuration conf = spark.sparkContext().hadoopConfiguration();

// conf1.set("spark.sql.warehouse.dir", "/user/bigdata/hive/warehouse/");

conf.set("hive.metastore.warehouse.dir", "/user/bigdata/hive/warehouse/");

return conf;

}

public static Table createTable() {

Configuration conf = getProperties();

Catalog catalog = new HiveCatalog(conf);

System.out.println("catalog: " + catalog);

TableIdentifier name = TableIdentifier.of("testdb", "ice_table2");

System.out.println("name: " + name);

Schema schema = new Schema(

Types.NestedField.required(1, "level", Types.StringType.get()),

Types.NestedField.required(2, "event_time", Types.StringType.get())

);

System.out.println("schema: " + schema);

Table table = catalog.createTable(name, schema);

System.out.println("end:-----" + table);

return table;

}

public static void main( String[] args ) {

createTable();

}

}

直接打包。放到spark服务器上。

然后执行命令:

spark-submit --class org.example.IcebergApi \

--master yarn \

--deploy-mode cluster \

/home/bigdata/mhb/iceberg-api-1.0-SNAPSHOT-jar-with-dependencies.jar

注意:

maven打包要把所有的依赖都打到jar包中才行。

所以要加如下插件:



maven-assembly-plugin

2.4.1





jar-with-dependencies





建表成功。

Spark   iceberg   API
发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章