Flink和StreamPark自定义UDF函数的使用

天翼云开发者社区
• 阅读 12

本文分享自天翼云开发者社区《Flink和StreamPark自定义UDF函数的使用》,作者:王****帅

1、什么是函数

在 SQL 中,我们可以把一些数据的转换操作包装起来,嵌入到 SQL 查询中统一调用,这就是“函数”(functions)。Flink 的 Table API 和 SQL 同样提供了函数的功能。两者在调用时略有不同:Table API 中的函数是通过数据对象的方法调用来实现的;而 SQL 则是直接引用函数名称,传入数据作为参数。例如,要把一个字符串 str 转换成全大写的形式,Table API 的写法是调用 str 这个 String对象的 upperCase()方法:

str.upperCase();

而 SQL 中的写法就是直接引用 UPPER()函数,将 str 作为参数传入:

UPPER(str)

由于 Table API 是内嵌在 Java 语言中的,很多方法需要在类中额外添加,因此扩展功能比较麻烦,目前支持的函数比较少;而且 Table API 也不如 SQL 的通用性强,所以一般情况下较少使用。下面我们主要介绍 Flink SQL 中函数的使用。Flink SQL 中的函数可以分为两类:一类是 SQL 中内置的系统函数,直接通过函数名调用就可以,能够实现一些常用的转换操作,比如之前我们用到的 COUNT()、CHAR_LENGTH()、UPPER()等等;而另一类函数则是用户自定义的函数(UDF),需要在表环境中注册才能使用。

2、什么是自定义UDF函数

系统函数尽管庞大,也不可能涵盖所有的功能;如果有系统函数不支持的需求,我们就需要用自定义函数(User Defined Functions,UDF)来实现了。事实上,系统内置函数仍然在不断扩充,如果我们认为自己实现的自定义函数足够通用、应用非常广泛,也可以在项目跟踪工具 JIRA 上向 Flink 开发团队提出“议题”(issue),请求将新的函数添加到系统函数中。

2.1 编写自定义UDF函数

自定义一个ScalarFunction,传入一个String类型的参数,输出这个参数的hashCode
public class HashScalarFunction extends ScalarFunction {
    public String eval(String str){
        return String.valueOf(str.hashCode());
    }
}

2.2 在代码中以SQL方式使用UDF函数

2.2.1 读取mysql数据使用UDF函数转换并输出到控制台

package cn.ctyun.demo.flinksql;

import cn.ctyun.demo.flinksql.udf.HashScalarFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @Date 2023/4/14 14:38
 * @Description 读取mysql数据使用UDF函数转换并输出到控制台
 */
public class FlinkSqlUdfMysql2Print {

    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        // 1. 创建读取表,使用mysql进行
        String source_ddl = "CREATE TABLE UserSource (" +
                " id INT, " +
                " name VARCHAR, " +
                " phone VARCHAR, " +
                " sex INT " +
                ") WITH (" +
                " 'connector.type' = 'jdbc', " +
                " 'connector.url' = 'jdbc:mysql://*******:3306/flink_test_source?useSSL=false', " +
                " 'connector.table' = 'test_user_table', " +
                " 'connector.username' = 'root', " +
                " 'connector.password' = '******'" +
                ")";
        tableEnv.executeSql(source_ddl);
        // 3. 注册自定义标量函数
        tableEnv.createTemporarySystemFunction("MyHash", HashScalarFunction.class);
        // 4. 调用UDF查询转换
        Table resultTable = tableEnv.sqlQuery("select id, name, phone, sex, MyHash(name) as name_hash from UserSource");

        // 5. 输出到控制台
        tableEnv.executeSql("create table output (" +
                "id INT, " +
                "name STRING, " +
                "phone STRING, " +
                "sex INT, " +
                "name_hash STRING ) " +
                "WITH (" +
                "'connector' = 'print')");
        resultTable.executeInsert("output");
    }
}

2.2.2 读取mysql数据使用UDF函数转换并输出到mysql

package cn.ctyun.demo.flinksql;

import cn.ctyun.demo.flinksql.udf.HashScalarFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @Date 2023/4/14 14:50
 * @Description 读取mysql数据使用UDF函数转换并输出到mysql
 */
public class FlinkSqlUdfMysql2Mysql {

    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        // 1. 创建读取表,使用mysql进行
        String source_ddl = "CREATE TABLE UserSource (" +
                " id INT, " +
                " name VARCHAR, " +
                " phone VARCHAR, " +
                " sex INT " +
                ") WITH (" +
                " 'connector.type' = 'jdbc', " +
                " 'connector.url' = 'jdbc:mysql://*******:3306/flink_test_source?useSSL=false', " +
                " 'connector.table' = 'test_user_table', " +
                " 'connector.username' = 'root', " +
                " 'connector.password' = '*******'" +
                ")";
        tableEnv.executeSql(source_ddl);

        //  2. 创建写出表,使用mysql进行
        String sink_ddl = "CREATE TABLE UserSink (" +
                "id INT, " +
                "name STRING, " +
                "phone STRING, " +
                "sex INT, " +
                "name_hash STRING " +
                ") WITH (" +
                " 'connector.type' = 'jdbc', " +
                " 'connector.url' = 'jdbc:mysql://*******:3306/flink_test_sink?useSSL=false', " +
                " 'connector.table' = 'test_user_table_udf', " +
                " 'connector.username' = 'root', " +
                " 'connector.password' = '********'" +
                ")";
        tableEnv.executeSql(sink_ddl);

        // 3. 注册自定义标量函数
        tableEnv.createTemporarySystemFunction("MyHash", HashScalarFunction.class);

        // 4. 使用insert语句进行数据输出,在这里进行UDF查询转换
        String insertSql = "INSERT INTO UserSink select id, name, phone, sex, MyHash(name) as name_hash from UserSource";

        tableEnv.executeSql(insertSql);
    }
}

2.3 在StreamPark中以SQL方式使用UDF函数

在StreamPark创建作业,导入作业依赖:

flink-connector-jdbc_2.12-1.14.3.jar

flink-demo-jar-job-1.0-SNAPSHOT.jar

mysql-connector-java-8.0.21.jar

FlinkSQL为:

CREATE FUNCTION MyHash AS 'cn.ctyun.demo.flinksql.udf.HashScalarFunction';
CREATE TABLE UserSource ( 
id INT, 
name VARCHAR,
phone VARCHAR, 
sex INT 
) WITH (
'connector.type' = 'jdbc', 
'connector.url' = 'jdbc:mysql://********:3306/flink_test_source?useSSL=false', 
'connector.table' = 'test_user_table', 
'connector.username' = 'root', 
'connector.password' = '*********'
);
CREATE TABLE UserSink (
id INT, 
name STRING, 
phone STRING, 
sex INT, 
name_hash STRING 
) WITH (
'connector.type' = 'jdbc', 
'connector.url' = 'jdbc:mysql://*******:3306/flink_test_sink?useSSL=false', 
'connector.table' = 'test_user_table_udf', 
'connector.username' = 'root', 
'connector.password' = '**********'
);
INSERT INTO UserSink select id, name, phone, sex, MyHash(name) as name_hash from UserSource;

运行作业后mysql可正常插入数据

点赞
收藏
评论区
推荐文章
常用内核架构
本文分享自天翼云开发者社区《》,作者:JackW宏内核应用程序调用内存分配的API(应用程序接口)函数。处理器切换到特权模式,开始运行内核代码。内核里的内存管理代码按照特定的算法,分配一块内存。把分配的内存块的首地址,返回给内存分配的API函数。内存分配的
艾木酱 艾木酱
3年前
PostgreSQL的函数和存储过程--MemFireDB
简介PostgreSQL是最流行的对象关系型数据库系统。它是一个强大的、高性能的数据库系统。在这篇文章中,我们将讨论如何使用函数和存储过程来执行操作,如插入、删除、更新和查询。感兴趣的同学可以通过memfiredb.com提供的免费云数据库一边操作一边阅读。函数一般来说,函数是一组进行任何操作的SQL语句,如选择、插入、删除和更新。在PostgreSQ
Stella981 Stella981
3年前
HIVE之UDF函数开发
1为什么要写UDF函数    有时候hive自带的函数不能满足当前需要,需要自定义函数来解决问题2UDF,UDAF,UDTF的比较UDF操作作用于单个数据行,并且产生一个数据行作为输出。大多数函数都属于这一类(比如数学函数和字符串函数)。
Wesley13 Wesley13
3年前
CAST()函数
6.4 转换函数数据类型转换可以通过CAST()和CONVERT()函数来实现。大多数情况下,这两个函数是重叠的,它们反映了SQL语言的演化历史。这两个函数的功能相似,不过它们的语法不同。虽然并非所有类型的值都能转变为其他数据类型,但总的来说,任何可以转换的值都可以用简单的函数实现转换。6.4.1 CAST()函数CAST()函数
Stella981 Stella981
3年前
Python学习笔记(五)函数和代码复用
  函数能提高应用的模块性,和代码的重复利用率。在很多高级语言中,都可以使用函数实现多种功能。在之前的学习中,相信你已经知道Python提供了许多内建函数,比如print()。同样,你也可以自己创建函数,这被叫做用户自定义函数,来实现定制的功能。一、函数的基本使用1.函数的定义  函数是一段具有特定功能的、可重用的
spark-sql优化简述
本文分享自天翼云开发者社区《》,作者:徐东1、自适应中reduce参数控制spark.sql.adaptive.shuffle.targetPostShuffleInputSize用于控制任务Shuffle后的目标输入大小(以字节为单位)。spark.sq
Flink Parallelism、Flink Slot的关系
本文分享自天翼云开发者社区《》,作者:王帅1、Parallelism(并行度)的概念parallelism在Flink中表示每个算子的并行度。举两个例子(1)比如kafka某个topic数据量太大,设置了10个分区,但source端的算子并行度却为1,只有
Flink 与Flink可视化平台StreamPark教程(开篇)
本文分享自天翼云开发者社区《》,作者:ln介绍Flink是一个大数据流处理引擎,可以为不同行业提供实时大数据处理解决方案。随着Flink的快速发展和改进,世界各地的许多公司现在都能看到它的存在。目前,北美、欧洲和金砖国家都是全球Flink应用的热门地区。当
flinkcdc中checkpoint不成功问题排查
本文分享自天翼云开发者社区《》,作者:徐东使用flink1.16和flinkcdc3.0进行数据接入,采用standalone模式。运行一段时间后checkpoint开始失败,但日志中没有报错信息。因savepoint和checkpoint机制一致,使用手
聊聊Docker镜像
本文分享自天翼云开发者社区@《​​​​​​​​​》,作者:AE86上山了。前言回顾前面:为什么需要Docker?Docker入门为什么可以这么简单?在上篇也同样留下一个问题:我们知道Tomcat运行起来需要Java的支持,那么我们在DockerHub拉取下
天翼云开发者社区
天翼云开发者社区
Lv1
天翼云是中国电信倾力打造的云服务品牌,致力于成为领先的云计算服务提供商。提供云主机、CDN、云电脑、大数据及AI等全线产品和场景化解决方案。
文章
917
粉丝
16
获赞
40