(8)FlinkSQL自定义UDF

(1)定义一个UDF

package com.udf;import org.apache.flink.table.functions.ScalarFunction;/** * Created by lj on 2022-07-25. */public class TestUDF extends ScalarFunction {    public String eval(String value) {        return value + "_udf";    }}

(2)使用UDF

public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setParallelism(1);        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);        DataStreamSource streamSource = env.socketTextStream("127.0.0.1", 9999,"
");        SingleOutputStreamOperator waterDS = streamSource.map(new MapFunction() {            @Override            public WaterSensor map(String s) throws Exception {                String[] split = s.split(",");                return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));            }        });        // 将流转化为表        Table table = tableEnv.fromDataStream(waterDS,                $("id"),                $("ts"),                $("vc"),                $("pt").proctime());        tableEnv.createTemporaryView("EventTable", table);/*        // 1. 直接调用自定义udf 函数        //        table.select(call(myFunction.class,$("id"))).execute().print();        // 2. 先注册在使用        tableEnv.createTemporarySystemFunction("MyLength",myFunction.class);        //2.1 在使用注册的自定义函数 名称为MyLength        //        table.select(call("MyLength",$("id"))).execute().print();        // 2.2 采用sql 的方式进行使用自定义函数            tableEnv.sqlQuery("select id, MyLength(id) from "+table).execute().print();* */        tableEnv.createTemporarySystemFunction("MyLength",TestUDF.class);        Table result = tableEnv.sqlQuery(                "SELECT " +                        "id as componentname, " +                //window_start, window_end,                        "COUNT(ts) as componentcount ,SUM(ts) as componentsum, " +                        "MyLength(cast(COUNT(ts) as string)) as testudf " +                        "FROM TABLE( " +                        "TUMBLE( TABLE EventTable , " +                        "DESCRIPTOR(pt), " +                        "INTERVAL '10' SECOND)) " +                        "GROUP BY id , window_start, window_end"        );        tableEnv.toRetractStream(result, Row.class).print("toRetractStream");       //缩进模式        env.execute();    }

(3)应用效果

发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章