Flink 与Flink可视化平台StreamPark教程(时间相关 1)

天翼云开发者社区
• 阅读 9

本文分享自天翼云开发者社区《Flink 与Flink可视化平台StreamPark教程(时间相关 1)》,作者:l****n

水位线与窗口

对于流式数据,时间是一个重要的标识。在flink的事件时间语义下,我们不依赖系统时间,而是基于数据自带的时间戳去定义了一个时钟,用来表示当前时间的进展。于是每个并行子任务都会有一个自己的逻辑时钟,它的前进是靠数据的时间戳来驱动的。

但在分布式系统中,这种驱动方式又会有一些问题。因为数据本身在处理转换的过程中会变化,如果遇到窗口聚合这样的操作,其实是要攒一批数据才会输出一个结果,那么下游的数据就会变少,时间进度的控制就不够精细了。

所以我们应该把时钟也以数据的形式传递出去,告诉下游任务当前时间的进展;而且这个时钟的传递不会因为窗口聚合之类的运算而停滞。一种简单的想法是,在数据流中加入一个时钟标记,记录当前的事件时间;这个标记可以直接广播到下游,当下游任务收到这个标记,就可以更新自己的时钟了。由于类似于水流中用来做标志的记号,在 Flink 中,这种用来衡量事件时间(Event Time)进展的标记,就被称作“水位线”(Watermark)。

水位线设置

这里我们将通过mysql-cdc来生成一个水位线,我们在读取数据源的一侧进行设置。

package cn.ctyun.demo.api.watermark;

import cn.ctyun.demo.api.utils.TransformUtil;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.Duration;

/**
 * @classname: ViewContentStreamWithWaterMark
 * @description: 拥有水位线
 * @author: Liu Xinyuan
 * @create: 2023-04-14 09:50
 **/
public class ViewContentStreamWithWaterMark {

    public static DataStream<JSONObject> getViewContentDataStream(StreamExecutionEnvironment env){
        // 1.创建Flink-MySQL-CDC的Source
        MySqlSource<String> viewContentSouce = MySqlSource.<String>builder()
                .hostname("***")
                .port(3306)
                .username("***")
                .password("***")
                .databaseList("test_cdc_source")
                .tableList("test_cdc_source.user_view")
                .startupOptions(StartupOptions.initial())
                .deserializer(new JsonDebeziumDeserializationSchema())
                .serverTimeZone("Asia/Shanghai")
                .build();

        // 2.使用CDC Source从MySQL读取数据
        DataStreamSource<String> mysqlDataStreamSource = env.fromSource(
                viewContentSouce,
                WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(1L)).withTimestampAssigner(
                        new SerializableTimestampAssigner<String>() {
                            @Override
                            public long extractTimestamp(String extractData, long l) {
                                return JSONObject.parseObject(extractData).getLong("ts_ms");
                            }
                        }
                ),
                "ViewContentStreamWithWatermark Source"
        );
        // 3.转换为指定格式
        return mysqlDataStreamSource.map(TransformUtil::formatResult);

    }
}

我们在cdc传来的数据中获取他的日志自带更新时间戳字段ts_ms时间戳作为我们的事件时间,并生成水位线,此后此数据流将包含水位线进行后续地传递。

窗口设置

在窗口中,有着不同的设置,可以面对不同的场景。我们按照数据不同的分配规则,将窗口的具体实现分为了以下四种,如下所示:

  • 滚动窗口(Tumbling Windows):滚动窗口有固定的大小,是一种对数据进行“均匀切片”的划分方式。窗口之间没有重叠,也不会有间隔,是“首尾相接”的状态。如果我们把多个窗口的创建,看作一个窗口的运动,就好像它在不停地向前“翻滚”一样。这是最简单的窗口形式,我们之前所举的例子都是滚动窗口。也正是因为滚动窗口是“无缝衔接”,所以每个数据都会被分配到一个窗口,而且只会属于一个窗口。滚动窗口也是在BI分析中最常用的窗口类型之一。 Flink 与Flink可视化平台StreamPark教程(时间相关 1)
  • 滑动窗口(Sliding Windows ):与滚动窗口类似,滑动窗口的大小也是固定的。区别在于,窗口之间并不是首尾相接的,而是可以“错开”一定的位置。如果看作一个窗口的运动,那么就像是向前小步“滑动”一样。所以定义滑动窗口的参数有两个:窗口大小(window size)定义了窗口的大小,还有一个“滑动步长”(window slide),代表了窗口计算的频率。 Flink 与Flink可视化平台StreamPark教程(时间相关 1)
  • 会话窗口(Session Windows):会话窗口顾名思义,是基于“会话”(session)来来对数据进行分组的。这里的会话类似Web 应用中 session 的概念,不过并不表示两端的通讯过程,而是借用会话超时失效的机制来 描述窗口。简单来说,就是数据来了之后就开启一个会话窗口,如果接下来还有数据陆续到来,那么就一直保持会话;如果一段时间一直没收到数据,那就认为会话超时失效,窗口自动关闭。一般而言将会给数据设置一个超时时间,如果两个数据间间隔过长并大于超时时间。在这里所有能够控制的就是超时时间(gap),其作为判定新窗口开启的一个重要指标。 Flink 与Flink可视化平台StreamPark教程(时间相关 1)
  • 全局窗口(Session Windows):这种窗口全局有效,会把相同 key 的所有数据都分配到同一个窗口中;无界流的数据永无止尽,所以这种窗口也没有结束的时候,默认是不会做触发计算的。如果希望它能对数据进行计算处理,还需要自定义“触发器”(Trigger)。

窗口API

窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)

窗口函数MapReduce 在这里,我们首先定义一个MapReduce过程,用来统计目前十秒内的访问统计数量,这里的水位线设定请参考代码ViewContentStreamWithWaterMark(上文中提供的代码),具体的MapReduce如下所示

package cn.ctyun.demo.api;

import cn.ctyun.demo.api.watermark.ViewContentStreamWithWaterMark;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * @classname: ApiTimeWindow
 * @description: 时间窗的使用
 * @author: Liu Xinyuan
 * @create: 2023-04-17 20:39
 **/
public class ApiTimeWindow {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStream<JSONObject> viewContentDataStream = ViewContentStreamWithWaterMark.getViewContentDataStream(env);

        viewContentDataStream.filter(new FilterFunction<JSONObject>() {
            @Override
            public boolean filter(JSONObject value) throws Exception {
                // 不将删除的数据考虑在内
                return !value.getString("op").equals("d");
            }
        }).map(new MapFunction<JSONObject, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> map(JSONObject value) throws Exception {
                return Tuple2.of(value.getString("user_name"), 1L);
            }
        }).keyBy(r -> r.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                        // 设定一个累加规则
                        return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                    }
                }).print();
        env.execute();
    }
}

这里设定了一个时间窗口为10秒,最终的结果为每十秒钟将统计一个登录统计,并输出到控制台。使用时间窗口后和不加的唯一区别是计算的范围变为了时间窗内计算。

点赞
收藏
评论区
推荐文章
inode节点扩容
本文分享自天翼云开发者社区《》,作者:2m1.inode概述在Linux系统中,每个文件和目录都有一个对应的inode节点,用于存储文件或目录的元数据信息,如:文件大小、创建时间、修改时间、权限等。当文件或目录被创建时,系统会为其分配一个inode节点。然
Stella981 Stella981
3年前
Flink的WaterMark,及demo实例
实际生产中,由于各种原因,导致事件创建时间与处理时间不一致,收集的规定对实时推荐有较大的影响。所以一般情况时选取创建时间,然后事先创建flink的时间窗口。但是问题来了,如何保证这个窗口的时间内所有事件都到齐了?这个时候就可以设置水位线(waterMark)。概念:支持基于时间窗口操作,由于事件的时间来源于源头系统,很多时候由于网络延迟、分布式处理,以
Stella981 Stella981
3年前
Flink从入门到真香(13、时间语义的定义)
在watermark之前先说下时间的概念,在https://blog.51cto.com/mapengfei/2554577里面有各种时间窗口,实际生产中那是以哪个时间为准产生的窗口呢?事件发生的时间?进入flink程序的时间?还是flink开始处理的时间Flink提供了一套设计解决方案设置可以在代码中env直接设置
HBase Sync功能导致HBase入库性能下降
本文分享自天翼云开发者社区《》,作者:5m问题背景与现象HBase入库慢,regionserver日志中大量打印slowsync。原因分析1.对比正常写入时间段监控,检查HBase服务整体CPU、内存以及NameNodeRPC在异常时间段是否增加;2.检查
flinkcdc中checkpoint不成功问题排查
本文分享自天翼云开发者社区《》,作者:徐东使用flink1.16和flinkcdc3.0进行数据接入,采用standalone模式。运行一段时间后checkpoint开始失败,但日志中没有报错信息。因savepoint和checkpoint机制一致,使用手
Flink Parallelism、Flink Slot的关系
本文分享自天翼云开发者社区《》,作者:王帅1、Parallelism(并行度)的概念parallelism在Flink中表示每个算子的并行度。举两个例子(1)比如kafka某个topic数据量太大,设置了10个分区,但source端的算子并行度却为1,只有
Flink 与Flink可视化平台StreamPark教程(开篇)
本文分享自天翼云开发者社区《》,作者:ln介绍Flink是一个大数据流处理引擎,可以为不同行业提供实时大数据处理解决方案。随着Flink的快速发展和改进,世界各地的许多公司现在都能看到它的存在。目前,北美、欧洲和金砖国家都是全球Flink应用的热门地区。当
Flink 与Flink可视化平台StreamPark教程(DataStreamApi基本使用)
本文分享自天翼云开发者社区《》,作者:lnDataStreamApidataStreamApi是一切的基础,处于调度flink程序处理任务的起点。Flink有非常灵活的分层API设计,其中的核心层就是DataStream/DataSetAPI。由于新版本已
HPC调度基础:slurm集群的部署
本文分享自天翼云开发者社区@《》,作者:才开始学技术的小白0.引言HPC(HighPerformanceComputing,以下简称HPC)是一个领域,试图在任何时间点和技术上对于相关技术、方法和应用等多种方面实现最大的计算能力;换而言之其目的就是求解一类
chrony时间同步软件介绍
本文分享自天翼云开发者社区《》,作者:刘苏chrony是网络时间协议NTP的通用实现,它可以将系统时钟和NTP服务器同步。它支持在各种条件下包括间歇性的网络连接、严重阻塞的网络、不断变化的温度以及支持不连续的运行并且可以运行于虚机上。本文介绍chrony工
天翼云开发者社区
天翼云开发者社区
Lv1
天翼云是中国电信倾力打造的云服务品牌,致力于成为领先的云计算服务提供商。提供云主机、CDN、云电脑、大数据及AI等全线产品和场景化解决方案。
文章
929
粉丝
16
获赞
40