Flink的WaterMark,及demo实例

Stella981
• 阅读 1018

实际生产中,由于各种原因,导致事件创建时间与处理时间不一致,收集的规定对实时推荐有较大的影响。所以一般情况时选取创建时间,然后事先创建flink的时间窗口。但是问题来了,如何保证这个窗口的时间内所有事件都到齐了?这个时候就可以设置水位线(waterMark)。

概念:支持基于时间窗口操作,由于事件的时间来源于源头系统,很多时候由于网络延迟、分布式处理,以及源头系统等各种原因导致源头数据的事件时间可能乱序。这时可以设定一个时间阈值,或者说水位线(waterMark),其作用定义一个最大乱序时间,比如某条日志时间为2019-01-01 08:00:10,如果乱序最大允许时间为10s,那么就认为2019-01-01 08:00:00之前产生的所有事件都到齐了,可以进行计算。

时间窗口:指定一个固定时间间隔的窗口

一、滑动窗口

1、SlidingEventTimeWindows.of(Time.second(4), Time.seconds(3)):表示滑动窗口大小为4秒,滑动步长是3 秒,同时,每3秒才滑动一次;

2、每条数据存活的时间为滑动窗口的大小;

3、如果滑动窗口超过之前的窗口,那么后面来的属于前面窗口的数据会丢失;

4、来了一条数据,边移动边计算滑动窗口的数据(一个窗口停留,计算一次,不移动,不计算 ),直至窗口到达指定位置。

计算某位置时间的公式:

//n:时间戳;size窗口大小;slide:滑动长度
//根据等差公式推导
an = a1 + (x-1)*s
a1 = size - slide -1
x = [n - (size-slide)]/slide     //除数后再乘以slide
s = slide
 
//当来了一条时间戳为n的事件,就认为指定位置时间之前的所有事件都到齐了
指定位置 = (size-slide-1) + [(n-waterMark) - (size-slide)]/slide * slide   

二、翻滚窗口

基于时间窗口,对连续数据进行迭代计算时,不会重叠。翻滚窗口是一个特殊的滑动窗口,当窗口的长度等于滑动的长度时,滑动窗口就是翻滚窗口。

计算某位置时间的公式:

指定位置 = -1 + (n-waterMark)/size * size     //除数后再乘以size,size为窗口大小,n为时间戳

三、会话窗口

时间间隔达到一定时间长度时才进行统计计算。

测试代码(需要集群telnet一个producer):

package com.cjs
 
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{SlidingEventTimeWindows, TumblingEventTimeWindows}
 
object WaterMarkTest {
 
    /**
      *想使用WaterMark,需要3个步骤:
      * 1、对数据进行timestamp提取,即调用assignTimestampsAndWatermarks函数,
      *     实例化BoundedOutOfOrdernessTimestampExtractor,重写extractTimestamp方法
      * 2、设置使用事件时间,因为WaterMark是基于事件时间
      * 3、定义时间窗口:翻滚窗口(TumblingEventWindows)、滑动窗口(timeWindow)
      * 任意一个没有实现,都会报异常:Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
      */
    def main(args: Array[String]): Unit = {
        val senv = StreamExecutionEnvironment.getExecutionEnvironment
 
        val streamAdd  = senv.socketTextStream("192.168.112.10",9999)
        val stream = streamAdd.assignTimestampsAndWatermarks(
                new BoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(0)) {  //WaterMark设置
                    //对数据流进行处理,获取timestamp,对数据流就够不影响
                    override def extractTimestamp(element: String): Long ={
                        //定义timestamp怎么从数据中抽取出来
                        val eventTime = element.split(" ")(0).toLong
                        print(s"$eventTime \n")
                        eventTime
                    }
                })  //提取时间戳之后,该数据流是带有时间的,用于事件窗口
            .map(x=>(x.split(" ")(1),1L)).keyBy(0)
 
        //设置使用事件时间,因为WaterMark是基于事件时间
        senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        //定义翻滚窗口
//        stream.window(TumblingEventTimeWindows.of(Time.seconds(3))).sum(1).print()
//        stream.sum(1).print()   //直接输出,没有用到事件时间窗口,flink默认是累计统计,来一个,统计一个
        //定义滑动窗口
        stream.window(SlidingEventTimeWindows.of(Time.seconds(4),Time.seconds(2))).sum(1).print()
        senv.execute("watermark")
    }
 
}
点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
双十一预售活动分析
2022年双十一促销活动已经开始,大家应该都提前开始关注今年双十一活动的时间表了吧?2022年10月24日晚8:00天猫双11预售时间,第一波销售时间10月31日晚8:0,第二波销售时间11月10日晚8:00;天猫双11的优惠力度是跨店每满30050
Karen110 Karen110
3年前
​一篇文章总结一下Python库中关于时间的常见操作
前言本次来总结一下关于Python时间的相关操作,有一个有趣的问题。如果你的业务用不到时间相关的操作,你的业务基本上会一直用不到。但是如果你的业务一旦用到了时间操作,你就会发现,淦,到处都是时间操作。。。所以思来想去,还是总结一下吧,本次会采用类型注解方式。time包importtime时间戳从1970年1月1日00:00:00标准时区诞生到现在
Wesley13 Wesley13
3年前
Java日期时间API系列31
  时间戳是指格林威治时间1970年01月01日00时00分00秒起至现在的总毫秒数,是所有时间的基础,其他时间可以通过时间戳转换得到。Java中本来已经有相关获取时间戳的方法,Java8后增加新的类Instant等专用于处理时间戳问题。 1获取时间戳的方法和性能对比1.1获取时间戳方法Java8以前
Stella981 Stella981
3年前
Python之time模块的时间戳、时间字符串格式化与转换
Python处理时间和时间戳的内置模块就有time,和datetime两个,本文先说time模块。关于时间戳的几个概念时间戳,根据1970年1月1日00:00:00开始按秒计算的偏移量。时间元组(struct_time),包含9个元素。 time.struct_time(tm_y
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Stella981 Stella981
3年前
HIVE 时间操作函数
日期函数UNIX时间戳转日期函数: from\_unixtime语法:   from\_unixtime(bigint unixtime\, string format\)返回值: string说明: 转化UNIX时间戳(从19700101 00:00:00 UTC到指定时间的秒数)到当前时区的时间格式举例:hive   selec
Stella981 Stella981
3年前
Flink从入门到真香(13、时间语义的定义)
在watermark之前先说下时间的概念,在https://blog.51cto.com/mapengfei/2554577里面有各种时间窗口,实际生产中那是以哪个时间为准产生的窗口呢?事件发生的时间?进入flink程序的时间?还是flink开始处理的时间Flink提供了一套设计解决方案设置可以在代码中env直接设置
Wesley13 Wesley13
3年前
Java日期时间API系列23
  有时候,往往需要统计某个时间区间的销量等问题,这就需要准确的起始时间,获取准确开始时间00:00:00,获取准确结束时间23:59:59。下面增加了一一些方法,获取当天起始时间,昨天起始时间,当前月第一天开始时间,当前月最后一天结束时间,上个月第一天开始时间,上个月最后一天结束时间,某个指定月的起始结束时间等等。其中月份最后一天往往因为月份不同和