一.引言:
Apach Flink 是全新的流处理系统,在Spark Straming的基础上添加了很多特性,主要在于其提供了基于时间和窗口计算的算子,并且支持有状态的存储和 Checkpoint 的重启机制,下面假设有多个温度传感器持续传输当前温度,Flink流处理需要每一段时间提供该时间段内的传感器平均温度。
二.依赖支持
项目是基于maven的scala项目,主要导入flink的scala依赖,如果是java需要另一套依赖:
1.scala
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.7.1</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12.8</scala.version>
<hadoop.version>2.6.0</hadoop.version>
</properties>
<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Scala Library, provided by Flink as well. -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
2.java
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
三.辅助类
1.基础温度类 SensorReading
采用case class简化后续处理函数的代码
case class SensorReading(id: String, timestamp: Long, temperature: Double)
2.时间戳提取类
这里采用了Flink的特性: EventTime作为数据的时间戳,通过提取生成SensorReading中的时间戳作为温度传感器传递温度的EventTime
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.windowing.time.Time
/**
* Assigns timestamps to SensorReadings based on their internal timestamp and
* emits watermarks with five seconds slack.
* 根据传感器内部时间戳和传感器读取分配时间戳。
*/
class SensorTimeAssigner extends BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(5)) {
/** Extracts timestamp from SensorReading. */
override def extractTimestamp(r: SensorReading): Long = r.timestamp
}
如果是Java,写法稍有不同
public static class SensorTimeAssigner extends BoundedOutOfOrdernessTimestampExtractor<SensorReading> {
public SensorTimeAssigner() {
super(Time.seconds(5));
}
@Override
public long extractTimestamp(SensorReading r) {
return r.timestamp;
}
}
3.自定义source类
SparkStreaming采用的是复写Receiver函数实现自定义数据源,通过receiver的store生成数据;Storm通过覆盖Spout的nextTruple方法,emit生成数据;这里Flink通过集成SoureFunction实现run方法,通过collect方法生成数据,这几种流式处理器在自定义数据流这方面其实大致比较类似,换汤不换药。相关的注释都写在代码里了,这里逻辑比较简单,只是通过Random类,去随机模拟温度函数,如果自己有场景需求需要自定义数据源时,可以把Random看做是自己的Socket,在run方法初始化数据源生成数据即可,这里生成数据可以通过Flink Env设置并行度,并行的接收数据,前提是你的数据源支持并行接收。
import java.util.Calendar
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import scala.util.Random
/**
* Flink 源功能,用于生成具有随机温度值的传感器读取。
*
* 自定义源方法需要实现run方法与cancel方法即可,需要初始化的连接放到run方法之内即可
*
* 源的每个并行实例模拟 10 个传感器,这些传感器发出一个传感器
* 每 100 ms 阅读一次。
*
* 注意:这是一个简单的数据生成源函数,不检查其状态。
* 如果发生故障,源不会重播任何数据。
*/
// 继承时需要定义生成的DStream的类型
class SensorSource extends RichParallelSourceFunction[SensorReading] {
// flag indicating whether source is still running.
// 指示源是否仍在运行。
var running: Boolean = true
/** run() continuously emits SensorReadings by emitting them through the SourceContext. */
override def run(srcCtx: SourceContext[SensorReading]): Unit = {
// SourceContext 通过 collect 方法不断向flink发出数据
// initialize random number generator
val rand = new Random()
// 获取当前parallel subtask的下标
val taskIdx = this.getRuntimeContext.getIndexOfThisSubtask
// initialize sensor ids and temperatures
// 初始化温度转换器 IndexSeq[String,Int] 序列长度为10 华氏度65+
var curFTemp = (1 to 10).map {
i => ("sensor_" + (taskIdx * 10 + i), 65 + (rand.nextGaussian() * 20))
}
// emit data until being canceled
while (running) {
// update temperature
// 更新温度
curFTemp = curFTemp.map( t => (t._1, t._2 + (rand.nextGaussian() * 0.5)) )
// get current time
val curTime = Calendar.getInstance.getTimeInMillis
// emit new SensorReading
// id 区分传感器分区 curTime 标识eventTime temperature 标识温度
curFTemp.foreach( t => srcCtx.collect(SensorReading(t._1, curTime, t._2)))
// wait for 100 ms
Thread.sleep(100)
}
}
/** Cancels this SourceFunction. */
override def cancel(): Unit = {
running = false
}
}
四.主类
主类主要提供三个逻辑:
=> 定义Flink Env 配置相关环境变量,这里定义使用EventTime作为处理时间,其他还有ProcessTime,IngestionTime
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.setAutoWatermarkInterval(1000L)
=> 获取数据源 Source 并设置 EventTime 的获取方式,Source获取数据源,assign设置事件时间
val sensorData: DataStream[SensorReading] = env
.addSource(new SensorSource)
.assignTimestampsAndWatermarks(new SensorTimeAssigner)
=> 定义数据处理方式并提交任务,这里采用了时间窗口的处理方式,还有基于数据量的窗口以及基于处理函数的算子,这里先介绍最基础的
val avgTemp: DataStream[SensorReading] = sensorData
.map( r => SensorReading(r.id, r.timestamp, (r.temperature - 32) * (5.0 / 9.0)) )
.keyBy(_.id)
.timeWindow(Time.seconds(1))
.apply(new TemperatureAverage)
avgTemp.print()
env.execute("Compute average sensor temperature")
1.完整主类
通过KeyBy可以将原始DataStream转换为KeyedStream,这样同一个key的数据都会发往一个窗口进行处理,这里1s生成一个时间窗口用于监测平均温度
import com.weibo.ug.push.flink.SensorData.{SensorReading, SensorSource, SensorTimeAssigner}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
/** Object that defines the DataStream program in the main() method */
object AverageSensorReadings {
/** main() defines and executes the DataStream program */
def main(args: Array[String])
// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// use event time for the application
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// configure watermark interval
// 自动生成水位线
// 也可以通过 assignTimestampsAndWatermarks 函数内的 getCurrentWatermark 获取水位线生成方式
env.getConfig.setAutoWatermarkInterval(1000L)
// ingest sensor stream
// 摄取流数据并绑定 eventTime
val sensorData: DataStream[SensorReading] = env
// SensorSource generates random temperature readings
// 自定义数据源
.addSource(new SensorSource)
// assign timestamps and watermarks which are required for event time
// 分配事件时间所需的时间戳和水印 主要从DStream的数据中获取相关的时间戳 extractTimestamp
// 最好在事件生成时为数据类型绑定 eventTime
.assignTimestampsAndWatermarks(new SensorTimeAssigner)
val avgTemp: DataStream[SensorReading] = sensorData
// convert Fahrenheit to Celsius using an inlined map function
// 通过map函数将华氏度转换为摄氏度 这里也可以调用filter过滤认为不需要的传感器参数
.map( r =>
SensorReading(r.id, r.timestamp, (r.temperature - 32) * (5.0 / 9.0)) )
// organize stream by sensorId
// 将同一传感器的温度统一在一起 KeyedStream
.keyBy(_.id)
// group readings in 1 second windows
// 1s生成1个窗口 将一个窗口的数据传输并处理 类似 spark streaming 的 interval
.timeWindow(Time.seconds(1))
// compute average temperature using a user-defined function
.apply(new TemperatureAverage)
// print result stream to standard out
avgTemp.print()
// execute application
env.execute("Compute average sensor temperature")
}
}
2.窗口处理函数
窗口处理函数这里需要覆盖apply方法,有一个注意的点就是继承WindowFunction函数的参数和apply方法的参数是不完全一致的,相关参数的注解都在代码注释里,可以大致浏览;Collector作为一个数据发射器,将处理好的类型进行下一步传递,这里主类的处理逻辑比较简单,只调用了print,也可以通过addSink方法继续向下游发送数据,常见的落盘HDFS,或者写到Kafka,Flink都有相关的实现API。
/** User-defined WindowFunction to compute the average temperature of SensorReadings */
// WindowFunction 参数分别代表 In Out Key W ,前两个比较好理解 输入输出类型 第三个为key的类型 第四个用于获取当前window参数
class TemperatureAverage extends WindowFunction[SensorReading, SensorReading, String, TimeWindow] {
/** apply() is invoked once for each window */
// apply方法的参数不完全和window保持相同顺序 分别为key 当前window 本次窗口输入的迭代类型 与输出的Collector
override def apply(
sensorId: String,
window: TimeWindow,
values: Iterable[SensorReading],
out: Collector[SensorReading]): Unit = {
// compute the average temperature
// Int Double 代表返回类型 (c,r)中c代表泛型 r代表values中的元素类型
val (cnt, sum) = values.foldLeft((0, 0.0))((c, r) => (c._1 + 1, c._2 + r.temperature))
val avgTemp = sum / cnt
// emit a SensorReading with the average temperature
// collector通过collect方法输出每个窗口对应时间戳的平均温度
out.collect(SensorReading(sensorId, window.getEnd, avgTemp))
}
}