Flink编程练习

Stella981
• 阅读 828

[TOC]

1.wordcount

利用socket作为数据源,对输入的每行数据进行单词计数。计算频率为process time的每10秒一次,结果输出到terminal。

object SocketWindowWordCount {
    def main(args: Array[String]) : Unit = {

        val port: Int = try {
            ParameterTool.fromArgs(args).getInt("port")
        } catch {
            case e: Exception => {
                System.err.println("No port specified. Please run 'xx.jar --port <port>'")
                return
            }
        }

        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

        val text = env.socketTextStream("localhost", port, '\n')

        val windowCounts = text
            .flatMap(_.split("\\s"))
            .map(WordWithCount(_,1)) 
            .keyBy(_.word)
            .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
            .sum("count")

        windowCounts.print()

        env.execute("Socket Window WordCount")
    }

    case class WordWithCount(word: String, count: Long)
}

数据格式

case class SensorReading(id: String, timestamp: Long, temperature: Double)

object SmokeLevel extends Enumeration {
  type SmokeLevel = SmokeLevel.Value
  val High, Low = Value
}

case class Alert(message: String, timestamp: Long)

2.双流警报EventTime

时间特征为event time,每1s更新一次watermark,watermark由SensorReading内部的timestamp推进,允许5s的延迟(过滤掉迟到数据)。数据源SensorReading并发处理,数据源SmokeLevel并发度为1,但能够被每个并发的SensorReading流访问。假设两个流的数据是源源不断的。当SensorReading的temperature大于100且SmokeLevel为High时触发警报。警报包含当时SensorReading的timestamp。

下面例子,迟到数据,即数据晚于WM依然会被处理

注意:如果某个流在connect前assignTimestampsAndWatermarks,connect后的流是不会更新WM的。

def main(args: Array[String]): Unit = {

  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  env.getConfig.setAutoWatermarkInterval(1000L)

  val sensorInput = env
    .addSource(new SensorSource)
    .assignTimestampsAndWatermarks(
      new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(5)) {
        override def extractTimestamp(element: SensorReading): Long = {
          element.timestamp
        }
      })
  val smokeLevelInput = env
    .addSource(new SmokeLevelSource)
    .setParallelism(1)

  val res = sensorInput
    .process(new MyKeyedProcessFunction) // 这里的实现省略,其实就是if和collect
    .connect(smokeLevelInput)
    .flatMap(new MyFlatMapFunc)
  
  res.print()

  env.execute("multiple streamss")

}

class MyFlatMapFunc extends CoFlatMapFunction[SensorReading, SmokeLevel, Alert] {
  private private var curSmokeLevel = SmokeLevel.Low

  override def flatMap1(value: SensorReading, out: Collector[Alert]): Unit = {
    if (curSmokeLevel.equals(SmokeLevel.High) && value.temperature > 100) {
      out.collect(Alert("Alert! ", value.timestamp))
    }
  }

  override def flatMap2(value: SmokeLevel, out: Collector[Alert]): Unit = {
    curSmokeLevel = value
  }
}

3.持续计数stateful + timer + SideOutputs

对每个key的数据量进行累加计数,如果1分钟没有新数据,就输出key-count对。对每一个数据进行处理时,sideoutput当前所处理的key的state数据(更新后的)

val realTimeInfo: OutputTag[String] =
  new OutputTag[String]("real-time_info")

def main(args: Array[String]): Unit = {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.getConfig.setAutoWatermarkInterval(1000L)
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  val countStream = env.addSource(new SensorSource)
    .keyBy(_.id)
    .process(new MyProcessFunc)
  
  countStream.getSideOutput(realTimeInfo)
    .print()

  env.execute()
}

case class CountWithTimeStamp(key: String, count: Int, ts: Long)

class MyProcessFunc extends KeyedProcessFunction[String, SensorReading, (String, Int)] {

  lazy val state = getRuntimeContext
    .getState(new ValueStateDescriptor[CountWithTimeStamp]("muState", classOf[CountWithTimeStamp]))

  override def processElement(value: SensorReading,
                              ctx: KeyedProcessFunction[String, SensorReading, (String, Int)]#Context,
                              out: Collector[(String, Int)]): Unit = {
    val current = state.value() match {
      case null =>
        CountWithTimeStamp(value.id, 1, ctx.timestamp())
      case CountWithTimeStamp(key, count, lastModified) =>
        // 删除上一次的timer
        ctx.timerService().deleteEventTimeTimer(lastModified + 6000)
        CountWithTimeStamp(key, count + 1, ctx.timestamp())
    }

    state.update(current)

    ctx.timerService().registerEventTimeTimer(current.ts + 6000)
    
    ctx.output(realTimeInfo, current)
  }

  override def onTimer(timestamp: Long,
                       ctx: KeyedProcessFunction[String, SensorReading, (String, Int)]#OnTimerContext,
                       out: Collector[(String, Int)]): Unit = {
    state.value() match {
      case CountWithTimeStamp(key, count, lastModified) =>
        if (timestamp == lastModified) out.collect((key, count)) else None
      case _ =>
    }
  }
}

4.一定时间范围内的极值windowfunction + checkpoint

利用tumbling window计算各个sensor在15s内的最大最小值,返回结果包含窗口的结束时间。另外,window只存储极值,不保留原数据。

checkpoint间隔为10s,watermark刷新间隔1s

def main(args: Array[String]): Unit = {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.getConfig.setAutoWatermarkInterval(1000)
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  env.enableCheckpointing(10 * 1000L)

  env.addSource(new SensorSource)
    .assignTimestampsAndWatermarks(new MyPeriodicAssigner)
    .map(r => (r.id, r.temperature, r.temperature))
    .keyBy(_._1)
    .window(TumblingEventTimeWindows.of(Time.seconds(15)))
    .reduce(
      (item1: (String, Double, Double), item2: (String, Double, Double)) => {
        (item1._1, item1._2.min(item2._2), item1._3.max(item2._3))
      },
      new MyWindowEndProcessFunction()
    )
}

case class MaxMinTemperature(key: String, min: Double, max: Double, ts: Long)

class MyWindowEndProcessFunction
  extends ProcessWindowFunction[(String, Double, Double), MaxMinTemperature, String, TimeWindow] {
  override def process(key: String, context: Context, elements: Iterable[(String, Double, Double)],
                       out: Collector[MaxMinTemperature]): Unit = {
    out.collect(MaxMinTemperature(key, elements.head._2, elements.head._3, context.window.getEnd))
  }
}
点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
3个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
Python之time模块的时间戳、时间字符串格式化与转换
Python处理时间和时间戳的内置模块就有time,和datetime两个,本文先说time模块。关于时间戳的几个概念时间戳,根据1970年1月1日00:00:00开始按秒计算的偏移量。时间元组(struct_time),包含9个元素。 time.struct_time(tm_y
Stella981 Stella981
3年前
HIVE 时间操作函数
日期函数UNIX时间戳转日期函数: from\_unixtime语法:   from\_unixtime(bigint unixtime\, string format\)返回值: string说明: 转化UNIX时间戳(从19700101 00:00:00 UTC到指定时间的秒数)到当前时区的时间格式举例:hive   selec
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
9个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这