什么是 Flink SQL 解决不了的问题?

京东云开发者
• 阅读 373

简介

在实时数据开发过程中,大家经常会用 Flink SQL 或者 Flink DataStream API 来做数据加工。通常情况下选用2者都能加工出想要的数据,但是总会有 Flink SQL 覆盖不了的问题,但 SQL 的易用性又难以让人释怀。所以有些场景在使用 FLink SQL 开始就与需要额外注意,下面就介绍一种多表关联时存在部分列更新(partial Update)场景,在 DataStream API 和 Flink SQL 开发时都容易忽视的情况而导致的问题。为了简化问题描述,采用了Flink SQL 来阐述此类问题。

场景介绍

多表关联时表 A 关联表 B, 表 A 具有pk1, field1, field2, field3字段,表 B 具有 pk2, field4, field5, field6 字段,表 A 通过 pk1 关联表B pk2。使用 Flink SQL 会如下实现:

CREATE TABLE jdq_source(
pk1 INT,
field1 STIRNG,
field2 STIRNG,
field3 STIRNG,
PRIMARY KEY(pk1) NOT ENFORCED
) WITH(...);

CREATE TABLE sr_sink(
pk1 INT,
field1 STRING,
field2 STRING,
field3 STRING,
field4 STRING,
field5 STRING,
field6 STRING,
PRIMARY KEY(pk2) NOT ENFORCED
) WITH (...);

INSERT INTO C
SELECT A.pk1,A.field1,A.field2,A.field3,B.pk2,B.field4,B.field5,B.field6 FROM jdq_source A
INNER JOIN sr_sink B
ON A.pk1 = B.pk2;

上述实例中有明显特征:使用了Join 关联, 且需要注意的是写入的数据库 sink 是 StarRocks。StarRocks 存在如下特性:当表是主键表时是不支持部分列更新( Partial Update)的,实际上大部分时候大家都用的是主键表。

然后在一个SQL查询数据的接口就遇到了如下问题:每次从接口查询返回的结果都不稳定,同样的查询条件不同时机返回的结果不一样。SQL查询语句如下:

select C.field1,C.field2,C.field3
FROM C group by field1,field2,field3;
为什么SQL查询的结果会不一致呢?起初排查原因发现 group by 返回结果有多条,而在SQL 中也没有使用 order by 对数据进行排序,所以导致了结果不稳定。后又排查为什么会出现多条结果呢?于是怀疑 field1, field2, field3 有不符合预期的数据。如: 

20240530, 2, 3
20240530, 2, null
20240531, 2, 4

其中第2条是多余的,不应该出现。结果发现可能是如下原因导致的:这3个字段 filed1, field2, filed3 在StarRocks数据库中会一直在变化,不停的写入新值。导致 SQL 查询时可以查到 field3 为 null 的数据。
为什么field3为不断变化呢?究其原因是:StarRocks 主键表不支持部分列更新(Partial Update)。当field3 为null时,同样会被写入 StarRocks。我们在通过JDQ读取表A field1, field2, field3 数据给表C写入数据时,当JDQ 消息队列中表A的记录存在乱序场景且field3 字段可能为null时,最终写入StarRocks的field3 字段会出现时而为null,时而不为null。 所以SQL查询接口中 group by的结果会出现不稳定。

总结

  1. 为什么在开发的时候当时没有发现 StarRocks 主键表这个问题呢?原因:1. 大家所关注的部分列更新,多数是关注insert into table_C(field1, field2, field3) 中不包含的字段field4,field5...等被更新为null,而当前场景是会把 field3 为null的值也写入SR数据库中,这不是我们期望的结果。2.表A作为主表,通常不会出现开始field3有值后来又没有值(null)的场景。出现这个现象大概率是因为上游JDQ消息队列中的数据乱序了,导致field3 为null的后出现了。而这种问题又比较难发现。
  2. 什么情况下会出现此类问题呢?写入的数据库不支持部分列更新场景时会出现。如StarRocks, Doris。因为MySQL, ES,ClickHouse的部分表引擎支持部分列更新,所以在MySQL, ES,ClickHouse中不会出现。
  3. 同理在 DataStream API 中如果表 A,表 B 关联后的数据直接写入StarRocks 的话,也会出现此类问题。
    以上这个问题在 Flink SQL 中无法解决,在 Flink DataStream API 中可以模拟部分列更新来避免此类问题。具体方法:在DatStream 任务中增加一个MapState, 用来在新数据到来时从MapState拿出缓存的数据,并和新到来的数据进行合并,来实现部分列更新功能,最后再写入 StarRocks。
    虽然问题不是Flink SQL导致的,但是上面的问题可以通过Flink DataStream API来规避。
点赞
收藏
评论区
推荐文章
Stella981 Stella981
3年前
Demo:基于 Flink SQL 构建流式应用
Flink1.10.0于近期刚发布,释放了许多令人激动的新特性。尤其是FlinkSQL模块,发展速度非常快,因此本文特意从实践的角度出发,带领大家一起探索使用FlinkSQL如何快速构建流式应用。本文将基于Kafka,MySQL,Elasticsearch,Kibana,使用FlinkSQL构建一个电商用户行为的实时分析应用
Stella981 Stella981
3年前
Flink SQL 实战:双流 join 场景应用
本文主要介绍在流式场景中join的实战。大家都知道在使用SQL进行数据分析的过程中,join是经常要使用的操作。在离线场景中,join的数据集是有边界的,可以缓存数据有边界的数据集进行查询,有NestedLoop/HashJoin/SortMergeJoin等多表join;而在实时场景中,join两侧的数据都是无边界的数据流,所以缓
Stella981 Stella981
3年前
Flink SQL 1.11 on Zeppelin 平台化实践
简介: 鉴于有很多企业都无法配备专门的团队来解决FlinkSQL平台化的问题,那么到底有没有一个开源的、开箱即用的、功能相对完善的组件呢?答案就是本文的主角——ApacheZeppelin。作者:LittleMagic大数据领域SQL化开发的风潮方兴未艾(所谓"EverybodyknowsSQL"),Flink自然也不能“免
Stella981 Stella981
3年前
Flink 1.9 实战:使用 SQL 读取 Kafka 并写入 MySQL
上周六在深圳分享了《FlinkSQL1.9.0技术内幕和最佳实践》,会后许多小伙伴对最后演示环节的Demo代码非常感兴趣,迫不及待地想尝试下,所以写了这篇文章分享下这份代码。希望对于FlinkSQL的初学者能有所帮助。完整分享可以观看Meetup视频回顾:https://developer.aliyun.com/live/1416(
Stella981 Stella981
3年前
Flink SQL 核心解密 —— 提升吞吐的利器 MicroBatch
之前我们在FlinkSQL中支持了MiniBatch,在支持高吞吐场景发挥了重要作用。今年我们在FlinkSQL性能优化中一项重要的改进就是升级了微批模型,我们称之为MicroBatch,也叫MiniBatch2.0。在设计和实现Flink的流计算算子时,我们一般会把“面向状态编程”作为第一准则。因为在流计算中,为了保证状态(St
Wesley13 Wesley13
3年前
2014 全国最新省市地区 SQL(国家统计局发布20130831版)
根据国家统计局20130831最新的统计结果生成的最新国家省市,直辖市包含了县和直辖市,但是我用的没有县,所以经过的我的加工。china.txt为处理过的单纯的国家省市代码及结构。s\_region\_1.sql为没有pid的数据结构。s\_region.sql为我加工过的数据结构。百度云盘:http://pan.baidu.com
Wesley13 Wesley13
3年前
mysql学习 索引
  在平时开发过程中写sql时,我们通常都不太关心sql的性能,只有能给查出来数据,sql的执行速度不是太慢就不会去管它了。但是开发时期的数据量往往都不是太大,很多性能问题只有在生产环境中才会发现,如:数据过多、sql关联了太多的表,使用了太多的join、或者建立了索引,但是索引失效的问题。所以要解决这些性能上的难题,就要去研究mysql最为重要的特性索
Stella981 Stella981
3年前
Hibernate纯sql查询结果和该sql在数据库直接查询结果不一致
问题:今天在做一个查询的时候发现一个问题,我先在数据库实现了我需要的sql,然后我在代码中代码:selectdistinctd.id,d.name,COALESCE(c.count_num,0),COALESCE(c.count_fix,0),COALESCE(c
Wesley13 Wesley13
3年前
MySQL 快速创建千万级测试数据
备注:此文章的数据量在100W,如果想要千万级,调大数量即可,但是不要大量使用rand()或者uuid()会导致性能下降背景在进行查询操作的性能测试或者sql优化时,我们经常需要在线下环境构建大量的基础数据供我们测试,模拟线上的真实环境。废话,总不能让我去线上去测试吧,会被DBA砍死的创建测试数据的方式
Flink测试利器之DataGen初探 | 京东云技术团队
什么是FlinksqlFlinkSQL是基于ApacheCalcite的SQL解析器和优化器构建的,支持ANSISQL标准,允许使用标准的SQL语句来处理流式和批处理数据。通过FlinkSQL,可以以声明式的方式描述数据处理逻辑,而无需编写显式的代码。使用