Flink JDBC Connector:Flink 与数据库集成最佳实践

Stella981
• 阅读 1345

整理:陈政羽(Flink 社区志愿者)

摘要:Flink 1.11 引入了 CDC,在此基础上, JDBC Connector 也发生比较大的变化,本文由 Apache Flink Contributor,阿里巴巴高级开发工程师徐榜江(雪尽)分享,主要介绍 Flink 1.11 JDBC Connector 的最佳实践。大纲如下:

  1. JDBC connector

  2. JDBC Catalog

  3. JDBC Dialect

  4. Demo

Tips:点击下方链接可查看作者原版 PPT 及分享视频:
https://flink-learning.org.cn/developers/flink-training-course3/

JDBC-Connector 的重构


JDBC Connector 在 Flink 1.11 版本发生了比较大的变化,我们先从以下几个 Feature 来具体了解一下 Flink 社区在这个版本上对 JDBC 所做的改进。

  • FLINK-15782 :Rework JDBC Sinks[1] (重写 JDBC Sink)

这个 issue 主要为 DataStream API 新增了 JdbcSink,对于使用 DataStream 编程的用户会更加方便地把数据写入到 JDBC;并且规范了一些命名规则,以前命名使用的是 JDBC 加上连接器名称,目前命名规范为 Jdbc+ 连接器名称

  • FLINK-17537:Refactor flink-jdbc connector structure[2] (重构 flink-jdbc 连接器的结构)

这个 issue 将 flink-jdbc 包名重命名为 flink-connector-jdbc,与 Flink 的其他 connector 统一,将所有接口和类从 org.apache.flink.java.io.jdbc(旧包)规范为新包路径 org.apache.flink.connector.jdbc(新包),通过这种重命名用户在对底层源代码的阅读上面会更加容易的理解和统一。

  • FLIP-95: New TableSource and TableSink interfaces[3] (新的 TableSource 和 TableSink 接口)

由于早期数据类型系统并不是很完善,导致了比较多的 Connector 在使用上会经常报数据类型相关的异常,例如 DECIMAL 精度类型,在以往的 Flink 1.10 版本中有可能出现下图问题:

Flink JDBC Connector:Flink 与数据库集成最佳实践

基于 FLIP-95 新的 TableSource 和 TableSink 在精度支持方面做了重构,目前数据精度方面的支持已经很完善了。

  • FLIP-122:New Connector Property Keys for New Factory[4](新的连接器参数)

在 Flink 1.11 版本中,我们对 DDL 的 WITH 参数相对于 1.10 版本做了简化,从用户视角看上就是简化和规范了参数,如表格所示:

Old Key (Flink 1.10)

New Key (Flink 1.11)

connector.type

connector.type

connector.url

url

connector.table

table-name

connector.driver

driver

connector.username

username

connector.password

password

connector.read.partition.column

scan.partition.column

connector.read.partition.num

scan.partition.num

connector.read.partition.lower-bound

scan.partition.lower-bound

connector.read.partition.upper-bound

scan.partition.upper-bound

connector.read.fetch-size

scan.fetch-size

connector.lookup.cache.max-rows

lookup.cache.max-rows

connector.lookup.cache.ttl

lookup.cache.ttl

connector.lookup.max-retries

lookup.max-retries

connector.write.flush.max-rows

sink.buffer-flush.max-rows

connector.write.flush.interval

sink.buffer-flush.interval

connector.write.max-retries

sink.max-retries

大家可以看到表格中有 3 个标红的地方,这个是相对于 1.10 有发生变化比较多的地方。这次 FLIP 希望进一步简化连接器属性,以便使属性更加简洁和可读,并更好地与 FLIP-107 协作。如果需要了解更多的 Connector 参数可以进一步参考官方文档和 FLIP-122 中提到的改变,这样有助于从旧版本迁移到新版本并了解参数的变化。

  • FLIP-87:Primary key Constraints in Table API[5] (Table API 接口中的主键约束问题)

Flink 1.10 存在某些 Query 无法推断出主键导致无法进行 Upsert 更新操作(如下图所示错误)。所以在 FLIP-87 中为 Flink SQL 引入的 Primary Key 约束。Flink 的主键约束遵循 SQL 标准,主键约束分为 PRIMARY KEY NOT ENFORCED 和 PRIMARY KEY ENFORCED, ENFORCED 表示是否对数据进行校验。我们常见数据库的主键约束属于 PRIMARY KEY ENFORCED,会对数据进行校验。因为 Flink 并不持有数据,因此 Flink 支持的主键模式是 PRIMARY KEY NOT ENFORCED,  这意味着 Flink 不会校验数据,而是由用户确保主键的完整性。例如 HBase 里面对应的主键应该是 RowKey,在 MySQL 中对应的主键是在用户数据库的表中对应的主键。

Flink JDBC Connector:Flink 与数据库集成最佳实践

JDBC Catalog


目前 Flink 支持 Catalog 主要有 JDBC Catalog 和 Hive Catalog 。在关系数据库中的表,如果要在 Flink 中使用,用户需要手动写表的 DDL,一旦表的 Schema 发生改变,用户需要手动修改, 这是比较繁琐的事情。JDBC Catalog 提供了接口用于连接到各种关系型数据库,使得 Flink 能够自动检索表,不用用户手动输入和修改。目前 JDBC Catalog 内置目前实现了 Postgres Catalog。Postgres catalog 是一个 read-only (只读)的 Catalog,只支持读取 Postgres 表,支持的功能比较有限。下面代码展示了目前 Postgres catalog 支持的 6 个功能:数据库是否存在、数据库列表、获取数据库、根据数据库名获取表列表、获得表、表是否存在。

// The supported methods by Postgres Catalog.

如果需要支持其他 DB (如 MySQL),需要用户根据 FLIP-93 的 JdbcCatalog 接口实现对应不同的 JDBC Catalog。

JDBC Dialect


什么是 Dialect?

Dialect (方言)对各个数据库来说,Dialect 体现各个数据库的特性,比如语法、数据类型等。如果需要查看详细的差异,可以点击这里[6]查看详细差异。下面通过对比 MySQL 和 Postgres 的一些常见场景举例:

Dialect

MySQL

Postgres

场景描述

Grammar(语法)

LIMIT 0,30

WITH LIMIT 30 OFFSET 0

分页

Data Type (数据类型)

BINARY

BYTEA,ARRAY

字段类型

Command (命令)

show tables

\dt

查看所有表

在数据类型上面,Flink SQL 的数据类型目前映射规则如下:

MySQL type

PostgreSQL type

Flink SQL type

TINYINT

TINYINT

SMALLINT

TINYINT UNSIGNED

SMALLINT

INT2

SMALLSERIAL

SERIAL2

SMALLINT

INT

MEDIUMINT

SMALLINT

UNSIGNED

INTEGER

SERIAL

INT

BIGINT

INT

UNSIGNED

BIGINT

BIGSERIAL

BIGINT

BIGINT

UNSIGNED

DECIMAL(20, 0)

Flink 目前支持三种 Dialect: Derby、MySQL、PostgreSQL,Derby 主要用于测试,更多的类型映射可以点击下方链接前往官方文档查看。

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/jdbc.html#data-type-mapping

如何保证 Dialect Upsert 的幂等性?

如果定义了主键,JDBC 写入时是能够保证 Upsert 语义的, 如果 DB 不支持 Upsert 语法,则会退化成 DELETE + INSERT 语义。Upsert query 是原子执行的,可以保证幂等性。这个在官方文档中也详细描述了更新失败或者存在故障时候如何做出的处理,下面的表格是不同的 DB 对应不同的 Upsert 语法:

Database

Upsert Grammar

MySQL

INSERT .. ON DUPLICATE KEY UPDATE ..

PostgreSQL

INSERT .. ON CONFLICT .. DO UPDATE SET ..

如何自定义 Dialect?

目前如果要实现自定义 Dialect (比如 SQL Server、Oracle 等), 需要用户自己实现对应 Dialect 修改源码并重新打包 flink-connector-jdbc。社区正在讨论提供一种插件化 dialect 的机制, 让用户可以不用修改源码打包就能实现自定义 Dialect,这个机制需要把 Dialect 接口暴露给用户。目前的 Dialect 接口不够清晰,没有考虑 DataStream API 的使用场景,也没有考虑到 一些复杂的 SQL 场景,所以这个接口目前不太稳定(后续版本会修改) 。

社区目前之所以没有把这个 API 开放给用户,是从用户使用的体验角度考虑,希望把这种顶级 API 设计得尽量稳定、简洁后再开放出来给用户使用,避免用户在后续 Flink 版本的迭代中多次修改代码。目前社区已经有相应的计划去做了,大家可以留意 FLINK-16833[7] 提出的 JDBCDialect 插件化设计。

实践 Demo


大家看完上述 Flink 1.11 在 JDBC 所做的改动后,大家可以尝试下面这个关于商品表 CDC 同步和 ETL 的小案例,有助于理解 JDBC Catalog 和 CDC 的同步机制。

环境与版本:Flink 1.11.1、Docker、Kafka 1.11.1、MySQL Driver 5.1.48、PostgreSQL Driver 42.2.14

流程如下:

  1. Flink standalone 环境准备并在提供的地址下载好对应的安装包和 connector jar。

  2. 测试数据准备,通过拉起容器运行已经打包好的镜像。其中 Kafka 中的 changelog 数据是通过 debezium connector 抓取的 MySQL orders表 的 binlog。

  3. 通过 SQL Client 编写 SQL 作业,分别创建 Flink 订单表,维表,用户表,产品表,并创建 Function UDF。从 PG Catalog 获取结果表信息之后,把作业提交至集群执行运行。

  4. 测试 CDC 数据同步和维表 join,通过新增订单、修改订单、删除订单、维表数据更新等一系列操作验证 CDC 在 Flink 上如何运行以及写入结果表。

Flink JDBC Connector:Flink 与数据库集成最佳实践

上图为业务流程整体图,项目 Demo 地址:

https://github.com/leonardBang/flink-sql-etl

问答环节


1.Flink SQL Client 上面执行的 use default,是使用哪个 catlog 呢?

答:Flink 内部有一个内置 Catlog,它把 meta 数据存于内存中。在 SQL Client 上没有显式指定 Hive catlog 或者 jdbc catlog 时会使用内置的 Catalog,刚刚的案例给大家演示的是 Postgres Catalog,里面有结果表。在内置 Catlog 可以看到我们刚刚创建 Kafka 的表,MySQL 的维度表。

2.Flink MySQL DDL 连接 8 小时后就会自动断开的问题是否已经解决?

这个问题会在 1.12 版本解决此问题,目前 master 分支已经合并,具体可以参考以下地址 ,描述了相关问题的讨论和解决办法。

https://issues.apache.org/jira/browse/FLINK-16681

3.什么是 CDC?能大概讲下目前 Flink 支持的 CDC 吗?

通过 Change Data Capture 机制(CDC)来将外部系统的动态数据(如 Mysql BinLog、Kafka Compacted Topic)导入 Flink,以及将 Flink 的 Update/Retract 流写出到外部系统中是用户一直希望的功能。

Flink 1.11 实现了对 CDC 数据读取和写出的支持。目前 Flink 可以支持 Debezium(Demo 中所用的工具) 和 Canal(阿里巴巴开源同步工具) 两种 CDC 格式。Debezium 在国外用得比较多,Canal 在国内用得比较多,两者格式会有所区别,详细可以参考官方使用文档。

总结


本文从 JDBC Connector 的重构、数据精度、主键约束、命名规范等方面详细介绍,分享了社区目前实现的 Postgres Catalog 功能点;介绍了 Flink 如何实现 JDBC Dialect 的统一以及目前社区对 Dialect 做的一些计划;最后的实践 Demo 环节演示了通过 SQL Client 进行维表 JOIN 和 ETL 操作以及解答了大家在实际生产中所遇到的问题,希望对大家进一步了解 Flink CDC 新功能有所帮助。

参考链接:

[1]https://issues.apache.org/jira/browse/FLINK-15782

[2]https://issues.apache.org/jira/browse/FLINK-17537

[3]https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces

[4]https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory

[5]https://cwiki.apache.org/confluence/display/FLINK/FLIP+87%3A+Primary+key+constraints+in+Table+API

[6]https://www.postgresqltutorial.com/postgresql-vs-mysql/

[7]https://issues.apache.org/jira/browse/FLINK-16833


Flink JDBC Connector:Flink 与数据库集成最佳实践   Flink Forward Asia 2020 议题征集中  Flink JDBC Connector:Flink 与数据库集成最佳实践

洞察先机,智见未来, Flink Forward Asia 2020 盛大开启!诚邀开源社区的各方力量与我们一起,探讨新型数字化技术下的未来趋势,共同打造 2020 年大数据领域的这场顶级盛会!

在 Flink Forward Asia 2020,您可与全球开发者分享您的真知灼见,同各技术领域顶级专家面对面交流,探索数字化技术下的未来趋势。如果您对以上任意技术方向有积累与洞察,欢迎投递议题!每位嘉宾最多可以投递三个Topic, 投递日期截止至 10 月 12 日。

Flink JDBC Connector:Flink 与数据库集成最佳实践

(点击可了解更多议题投递详情)

戳我投议题! Flink JDBC Connector:Flink 与数据库集成最佳实践

本文分享自微信公众号 - Flink 中文社区(gh_5efd76d10a8d)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
4个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
Python3:sqlalchemy对mysql数据库操作,非sql语句
Python3:sqlalchemy对mysql数据库操作,非sql语句python3authorlizmdatetime2018020110:00:00coding:utf8'''
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
10个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这