Flink SQL高效Top

Stella981
• 阅读 879

Flink SQL高效Top

https://www.jianshu.com/p/dea467eb67e0

Top-N

Top-N是我们应用Flink进行业务开发时的常见场景,传统的DataStream API已经有了非常成熟的实现方案,如果换成Flink SQL,又该怎样操作?好在Flink SQL官方文档已经给出了标准答案,我们只需要照抄就行,其语法如下:

SELECT [column_list]FROM (   SELECT [column_list],     ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]       ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum   FROM table_name)WHERE rownum <= N [AND conditions]

看官可能已经能够在日常工作中熟练应用这种查询风格了。那么,Flink内部是如何将它转化成高效的执行方案的呢?接下来基于最新的Flink 1.12版本稍微探究一下。

Logical Plan

使用EXPLAIN语句观察示例查询的执行计划(部分)如下:

EXPLAIN PLAN FOR SELECT * FROM (  SELECT *,    row_number() OVER(PARTITION BY merchandiseId ORDER BY totalQuantity DESC) AS rownum  FROM (    SELECT merchandiseId, sum(quantity) AS totalQuantity    FROM rtdw_dwd.kafka_order_done_log    GROUP BY merchandiseId  )) WHERE rownum <= 10== Abstract Syntax Tree ==LogicalProject(merchandiseId=[$0], totalQuantity=[$1], rownum=[$2])+- LogicalFilter(condition=[<=($2, 10)])   +- LogicalProject(merchandiseId=[$0], totalQuantity=[$1], rownum=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $1 DESC NULLS LAST)])      +- ...== Optimized Logical Plan ==Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[merchandiseId], orderBy=[totalQuantity DESC], select=[merchandiseId, totalQuantity, w0$o0])+- Exchange(distribution=[hash[merchandiseId]])   +- ...== Physical Execution Plan ==Stage 1 : Data Source    ...    Stage 2 : Operator        ...        Stage 4 : Operator            ...            Stage 6 : Operator                content : Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[merchandiseId], orderBy=[totalQuantity DESC], select=[merchandiseId, totalQuantity, w0$o0])                ship_strategy : HASH

由执行计划可知,row_number() OVER(PARTITION BY ...)子句在逻辑计划阶段被优化成了名为Rank的RelNode(看官可参见Calcite的相关资料了解RelNode),可以用如下的简图说明。

Flink SQL高效Top负责这个优化的RelOptRule在Flink项目中名为FlinkLogicalRankRule。它将符合规则的开窗聚合操作(FlinkLogicalOverAggregate RelNode)和对排名的过滤操作(FlinkLogicalCalc RelNode)合并为FlinkLogicalRank。也就是说,只有严格符合上一节所述语法的查询才能得到优化。

FlinkLogicalRank节点会记录以下主要信息:

  • partitionKey:

    分组键。

  • orderKey:

    排序键与排序规则。

  • rankType:

    排名函数的类型,即ROW_NUMBER、RANK或者DENSE_RANK。

  • rankRange:

    排名区间(即Top-N一词中的N)。

  • strategy:

Top-N结果的更新策略,目前有3种:

  • AppendFast:

    结果只追加,不更新;

  • Retract:

    类似于回撤流,结果会更新,前提是输入数据没有主键,或者主键与partitionKey不同;

  • UpdateFast:

    快速更新,前提是输入数据有主键,且结果单调递增/递减,还要求orderKey的排序规则与结果的单调性相反(例:

    ORDER BY sum(quantity) DESC)。

    可见它的效率最高,但是也最苛刻。

  • outputRankNumber:

    是否输出排名的序号,即在外层查询中是否有SELECT rownum子句。

    显然,如果不输出序号,在排名发生变化时可以大大减少回撤输出的数据量,降低Flink端的压力,具体可参见官方文档"No Ranking Output Optimization"一节。

Physical Plan

在流处理环境下,StreamPhysicalRankRule规则负责将FlinkLogicalRank逻辑节点转换成StreamPhysicalRankRule物理节点,并翻译成物理执行节点StreamExecRank。注意如果是分组Top-N(即有PARTITION BY子句),就会按照partitionKey的hash值分发到各个sub-task,否则会将并行度强制设为1,计算全局Top-N。另外从代码可以读出,Top-N语法目前仅支持ROW_NUMBER,暂时还不支持RANK和DENSE_RANK排名。

根据上文所述更新策略的不同,实际执行时采用的ProcessFunction也不同,如下类图所示。其中CleanupState接口表示支持空闲状态保留时间(idle state retention time)特性。

Flink SQL高效Top

以最常用到的RetractableTopNFunction为例,当有一条累加数据到来时,处理流程可以用如下的简图来说明。

Flink SQL高效Top其中,dataState是MapState<RowData, List>类型的状态,保存partitionKey与该key下面的流数据,用于容错。而treeMap是ValueState<SortedMap<RowData, Long>>类型的状态,顾名思义,它其中维护了一个TreeMap,用于计数及输出Top-N结果。至于这里为什么用了红黑树(TreeMap)而不是传统的最大/最小堆(PriorityQueue),自然是因为红黑树是对数复杂度的,相较于堆来说更适合Flink这种对时间敏感而对空间较不敏感的执行环境。

另外,我们一定要记得启用空闲状态保留时间,这样dataState和treeMap中的数据才不会永远积攒下去。不过空闲状态的清理并非确定性的,所以如果要计算有时间维度的排行榜(如按天、按小时等),需要把时间维度也加入PARTITION BY子句,而不是将保留时间设为对应的长度。

最后,在StreamExecRank中还提供了一个可配置的参数table.exec.topn.cache-size(默认值10000),即Top-N缓存的大小。如果Top-N的规模比较大,适当增加此值可以避免频繁访问状态,提高执行效率。

猜你喜欢

数仓建模分层理论

数据湖是谁?那数据仓库又算什么?

数仓架构发展史

Hive整合Hbase

Hive中的锁的用法场景

数仓建模方法论

Flink SQL高效Top

本文分享自微信公众号 - 数据分析挖掘与算法(ikeguang2)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
4个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
Android So动态加载 优雅实现与原理分析
背景:漫品Android客户端集成适配转换功能(基于目标识别(So库35M)和人脸识别库(5M)),导致apk体积50M左右,为优化客户端体验,决定实现So文件动态加载.!(https://oscimg.oschina.net/oscnet/00d1ff90e4b34869664fef59e3ec3fdd20b.png)点击上方“蓝字”关注我
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
10个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这