Flink 入门Demo详解

Stella981
• 阅读 990

一.引言:

Apach Flink 是全新的流处理系统,在Spark Straming的基础上添加了很多特性,主要在于其提供了基于时间和窗口计算的算子,并且支持有状态的存储和 Checkpoint 的重启机制,下面假设有多个温度传感器持续传输当前温度,Flink流处理需要每一段时间提供该时间段内的传感器平均温度。

二.依赖支持

项目是基于maven的scala项目,主要导入flink的scala依赖,如果是java需要另一套依赖:

1.scala

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <flink.version>1.7.1</flink.version>
    <scala.binary.version>2.12</scala.binary.version>
    <scala.version>2.12.8</scala.version>
    <hadoop.version>2.6.0</hadoop.version>
</properties>

<dependencies>
    <!-- Apache Flink dependencies -->
    <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>

    <!-- Scala Library, provided by Flink as well. -->
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
        <scope>provided</scope>
    </dependency>
</dependencies>

2.java

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>

三.辅助类

1.基础温度类 SensorReading

采用case class简化后续处理函数的代码

case class SensorReading(id: String, timestamp: Long, temperature: Double)

2.时间戳提取类

这里采用了Flink的特性: EventTime作为数据的时间戳,通过提取生成SensorReading中的时间戳作为温度传感器传递温度的EventTime

import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.windowing.time.Time

/**
  * Assigns timestamps to SensorReadings based on their internal timestamp and
  * emits watermarks with five seconds slack.
  * 根据传感器内部时间戳和传感器读取分配时间戳。
  */
class SensorTimeAssigner extends BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(5)) {

  /** Extracts timestamp from SensorReading. */
  override def extractTimestamp(r: SensorReading): Long = r.timestamp

}

如果是Java,写法稍有不同

public static class SensorTimeAssigner extends BoundedOutOfOrdernessTimestampExtractor<SensorReading> {

    public SensorTimeAssigner() {
        super(Time.seconds(5));
    }

    @Override
    public long extractTimestamp(SensorReading r) {
        return r.timestamp;
    }
}

3.自定义source类

SparkStreaming采用的是复写Receiver函数实现自定义数据源,通过receiver的store生成数据;Storm通过覆盖Spout的nextTruple方法,emit生成数据;这里Flink通过集成SoureFunction实现run方法,通过collect方法生成数据,这几种流式处理器在自定义数据流这方面其实大致比较类似,换汤不换药。相关的注释都写在代码里了,这里逻辑比较简单,只是通过Random类,去随机模拟温度函数,如果自己有场景需求需要自定义数据源时,可以把Random看做是自己的Socket,在run方法初始化数据源生成数据即可,这里生成数据可以通过Flink Env设置并行度,并行的接收数据,前提是你的数据源支持并行接收。

import java.util.Calendar

import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext

import scala.util.Random

/**
  * Flink 源功能,用于生成具有随机温度值的传感器读取。
  *
  * 自定义源方法需要实现run方法与cancel方法即可,需要初始化的连接放到run方法之内即可
  *
  * 源的每个并行实例模拟 10 个传感器,这些传感器发出一个传感器
  * 每 100 ms 阅读一次。
  *
  * 注意:这是一个简单的数据生成源函数,不检查其状态。
  * 如果发生故障,源不会重播任何数据。
  */
// 继承时需要定义生成的DStream的类型
class SensorSource extends RichParallelSourceFunction[SensorReading] {

  // flag indicating whether source is still running.
  // 指示源是否仍在运行。
  var running: Boolean = true

  /** run() continuously emits SensorReadings by emitting them through the SourceContext. */
  override def run(srcCtx: SourceContext[SensorReading]): Unit = {
    // SourceContext 通过 collect 方法不断向flink发出数据
    // initialize random number generator
    val rand = new Random()

    // 获取当前parallel subtask的下标
    val taskIdx = this.getRuntimeContext.getIndexOfThisSubtask

    // initialize sensor ids and temperatures
    // 初始化温度转换器 IndexSeq[String,Int] 序列长度为10 华氏度65+
    var curFTemp = (1 to 10).map {
      i => ("sensor_" + (taskIdx * 10 + i), 65 + (rand.nextGaussian() * 20))
    }

    // emit data until being canceled
    while (running) {

      // update temperature
      // 更新温度
      curFTemp = curFTemp.map( t => (t._1, t._2 + (rand.nextGaussian() * 0.5)) )
      // get current time
      val curTime = Calendar.getInstance.getTimeInMillis

      // emit new SensorReading
      // id 区分传感器分区 curTime 标识eventTime temperature 标识温度
      curFTemp.foreach( t => srcCtx.collect(SensorReading(t._1, curTime, t._2)))

      // wait for 100 ms
      Thread.sleep(100)
    }

  }

  /** Cancels this SourceFunction. */
  override def cancel(): Unit = {
    running = false
  }

}

四.主类

主类主要提供三个逻辑:

=> 定义Flink Env 配置相关环境变量,这里定义使用EventTime作为处理时间,其他还有ProcessTime,IngestionTime

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.getConfig.setAutoWatermarkInterval(1000L)

=> 获取数据源 Source 并设置 EventTime 的获取方式,Source获取数据源,assign设置事件时间

    val sensorData: DataStream[SensorReading] = env
      .addSource(new SensorSource)
      .assignTimestampsAndWatermarks(new SensorTimeAssigner)

=> 定义数据处理方式并提交任务,这里采用了时间窗口的处理方式,还有基于数据量的窗口以及基于处理函数的算子,这里先介绍最基础的

    val avgTemp: DataStream[SensorReading] = sensorData
      .map( r => SensorReading(r.id, r.timestamp, (r.temperature - 32) * (5.0 / 9.0)) )
      .keyBy(_.id)
      .timeWindow(Time.seconds(1))
      .apply(new TemperatureAverage)

    avgTemp.print()

    env.execute("Compute average sensor temperature")

1.完整主类

通过KeyBy可以将原始DataStream转换为KeyedStream,这样同一个key的数据都会发往一个窗口进行处理,这里1s生成一个时间窗口用于监测平均温度

import com.weibo.ug.push.flink.SensorData.{SensorReading, SensorSource, SensorTimeAssigner}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

/** Object that defines the DataStream program in the main() method */
object AverageSensorReadings {

  /** main() defines and executes the DataStream program */
  def main(args: Array[String]) 

    // set up the streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // use event time for the application
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    // configure watermark interval
    // 自动生成水位线
    // 也可以通过 assignTimestampsAndWatermarks 函数内的 getCurrentWatermark 获取水位线生成方式
    env.getConfig.setAutoWatermarkInterval(1000L)

    // ingest sensor stream
    // 摄取流数据并绑定 eventTime
    val sensorData: DataStream[SensorReading] = env
      // SensorSource generates random temperature readings
      // 自定义数据源
      .addSource(new SensorSource)
      // assign timestamps and watermarks which are required for event time
      // 分配事件时间所需的时间戳和水印 主要从DStream的数据中获取相关的时间戳 extractTimestamp
      // 最好在事件生成时为数据类型绑定 eventTime
      .assignTimestampsAndWatermarks(new SensorTimeAssigner)

    val avgTemp: DataStream[SensorReading] = sensorData
      // convert Fahrenheit to Celsius using an inlined map function
      // 通过map函数将华氏度转换为摄氏度 这里也可以调用filter过滤认为不需要的传感器参数
      .map( r =>
      SensorReading(r.id, r.timestamp, (r.temperature - 32) * (5.0 / 9.0)) )
      // organize stream by sensorId
      // 将同一传感器的温度统一在一起 KeyedStream
      .keyBy(_.id)
      // group readings in 1 second windows
      // 1s生成1个窗口 将一个窗口的数据传输并处理 类似 spark streaming 的 interval
      .timeWindow(Time.seconds(1))
      // compute average temperature using a user-defined function
      .apply(new TemperatureAverage)

    // print result stream to standard out
    avgTemp.print()

    // execute application
    env.execute("Compute average sensor temperature")
  }
}

2.窗口处理函数

窗口处理函数这里需要覆盖apply方法,有一个注意的点就是继承WindowFunction函数的参数和apply方法的参数是不完全一致的,相关参数的注解都在代码注释里,可以大致浏览;Collector作为一个数据发射器,将处理好的类型进行下一步传递,这里主类的处理逻辑比较简单,只调用了print,也可以通过addSink方法继续向下游发送数据,常见的落盘HDFS,或者写到Kafka,Flink都有相关的实现API。

/** User-defined WindowFunction to compute the average temperature of SensorReadings */
// WindowFunction 参数分别代表 In Out Key W ,前两个比较好理解 输入输出类型 第三个为key的类型 第四个用于获取当前window参数
class TemperatureAverage extends WindowFunction[SensorReading, SensorReading, String, TimeWindow] {

  /** apply() is invoked once for each window */
  // apply方法的参数不完全和window保持相同顺序 分别为key 当前window 本次窗口输入的迭代类型 与输出的Collector
  override def apply(
                      sensorId: String,
                      window: TimeWindow,
                      values: Iterable[SensorReading],
                      out: Collector[SensorReading]): Unit = {

    // compute the average temperature
    // Int Double 代表返回类型 (c,r)中c代表泛型 r代表values中的元素类型
    val (cnt, sum) = values.foldLeft((0, 0.0))((c, r) => (c._1 + 1, c._2 + r.temperature))
    val avgTemp = sum / cnt

    // emit a SensorReading with the average temperature
    // collector通过collect方法输出每个窗口对应时间戳的平均温度
    out.collect(SensorReading(sensorId, window.getEnd, avgTemp))
  }
}
点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
4个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
Python之time模块的时间戳、时间字符串格式化与转换
Python处理时间和时间戳的内置模块就有time,和datetime两个,本文先说time模块。关于时间戳的几个概念时间戳,根据1970年1月1日00:00:00开始按秒计算的偏移量。时间元组(struct_time),包含9个元素。 time.struct_time(tm_y
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
10个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这