实际生产中,由于各种原因,导致事件创建时间与处理时间不一致,收集的规定对实时推荐有较大的影响。所以一般情况时选取创建时间,然后事先创建flink的时间窗口。但是问题来了,如何保证这个窗口的时间内所有事件都到齐了?这个时候就可以设置水位线(waterMark)。
概念:支持基于时间窗口操作,由于事件的时间来源于源头系统,很多时候由于网络延迟、分布式处理,以及源头系统等各种原因导致源头数据的事件时间可能乱序。这时可以设定一个时间阈值,或者说水位线(waterMark),其作用定义一个最大乱序时间,比如某条日志时间为2019-01-01 08:00:10,如果乱序最大允许时间为10s,那么就认为2019-01-01 08:00:00之前产生的所有事件都到齐了,可以进行计算。
时间窗口:指定一个固定时间间隔的窗口
一、滑动窗口
1、SlidingEventTimeWindows.of(Time.second(4), Time.seconds(3)):表示滑动窗口大小为4秒,滑动步长是3 秒,同时,每3秒才滑动一次;
2、每条数据存活的时间为滑动窗口的大小;
3、如果滑动窗口超过之前的窗口,那么后面来的属于前面窗口的数据会丢失;
4、来了一条数据,边移动边计算滑动窗口的数据(一个窗口停留,计算一次,不移动,不计算 ),直至窗口到达指定位置。
计算某位置时间的公式:
//n:时间戳;size窗口大小;slide:滑动长度
//根据等差公式推导
an = a1 + (x-1)*s
a1 = size - slide -1
x = [n - (size-slide)]/slide //除数后再乘以slide
s = slide
//当来了一条时间戳为n的事件,就认为指定位置时间之前的所有事件都到齐了
指定位置 = (size-slide-1) + [(n-waterMark) - (size-slide)]/slide * slide
二、翻滚窗口
基于时间窗口,对连续数据进行迭代计算时,不会重叠。翻滚窗口是一个特殊的滑动窗口,当窗口的长度等于滑动的长度时,滑动窗口就是翻滚窗口。
计算某位置时间的公式:
指定位置 = -1 + (n-waterMark)/size * size //除数后再乘以size,size为窗口大小,n为时间戳
三、会话窗口
时间间隔达到一定时间长度时才进行统计计算。
测试代码(需要集群telnet一个producer):
package com.cjs
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{SlidingEventTimeWindows, TumblingEventTimeWindows}
object WaterMarkTest {
/**
*想使用WaterMark,需要3个步骤:
* 1、对数据进行timestamp提取,即调用assignTimestampsAndWatermarks函数,
* 实例化BoundedOutOfOrdernessTimestampExtractor,重写extractTimestamp方法
* 2、设置使用事件时间,因为WaterMark是基于事件时间
* 3、定义时间窗口:翻滚窗口(TumblingEventWindows)、滑动窗口(timeWindow)
* 任意一个没有实现,都会报异常:Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
*/
def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
val streamAdd = senv.socketTextStream("192.168.112.10",9999)
val stream = streamAdd.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(0)) { //WaterMark设置
//对数据流进行处理,获取timestamp,对数据流就够不影响
override def extractTimestamp(element: String): Long ={
//定义timestamp怎么从数据中抽取出来
val eventTime = element.split(" ")(0).toLong
print(s"$eventTime \n")
eventTime
}
}) //提取时间戳之后,该数据流是带有时间的,用于事件窗口
.map(x=>(x.split(" ")(1),1L)).keyBy(0)
//设置使用事件时间,因为WaterMark是基于事件时间
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//定义翻滚窗口
// stream.window(TumblingEventTimeWindows.of(Time.seconds(3))).sum(1).print()
// stream.sum(1).print() //直接输出,没有用到事件时间窗口,flink默认是累计统计,来一个,统计一个
//定义滑动窗口
stream.window(SlidingEventTimeWindows.of(Time.seconds(4),Time.seconds(2))).sum(1).print()
senv.execute("watermark")
}
}