Flink 中定时加载外部数据

Stella981
• 阅读 981

社区中有好几个同学问过这样的场景:  

flink 任务中,source 进来的数据,需要连接数据库里面的字段,再做后面的处理

这里假设一个 ETL 的场景,输入数据包含两个字段 “type, userid....” ,需要根据 type,连接一张 mysql 的配置表,关联 type 对应的具体内容。相对于输入数据的数量,type 的值是很少的(这里默认只有10种), 所以对应配置表就只有10条数据,配置是会定时修改的(比如跑批补充数据),配置的修改必须在一定时间内生效。

实时 ETL,需要用里面的一个字段去关联数据库,补充其他数据,进来的数据中关联字段是很单一的(就10个),对应数据库的数据也很少,如果用 异步 IO,感觉会比较傻(浪费资源、性能还不好)。同时数据库的数据是会不定时修改的,所以不能在启动的时候一次性加载。

Flink 现在对应这种场景可以使用  Boradcase state 做,如:基于Broadcast 状态的Flink Etl Demo

这里想说的是另一种更简单的方法: 使用定时器,定时加载数据库的数据 (就是简单的Java定时器)

先说一下代码流程:

1、自定义的 source,输入逗号分隔的两个字段

2、使用 RichMapFunction  转换数据,在 open 中定义定时器,定时触发查询 mysql 的任务,并将结果放到一个 map 中

3、输入数据关联 map 的数据,然后输出

先看下数据库中的数据:

mysql> select * from timer;
+------+------+
| id   | name |
+------+------+
| 0    | 0zOq |
| 1    | 1hKC |
| 2    | 2ibM |
| 3    | 3fCe |
| 4    | 4TaM |
| 5    | 5URU |
| 6    | 6WhP |
| 7    | 7zjn |
| 8    | 8Szl |
| 9    | 9blS |
+------+------+
10 rows in set (0.01 sec)

总共10条数据,id 就是对应的关联字段,需要填充的数据是 name

下面是主要的代码:// 自定义的source,输出 x,xxx 格式随机字符

val input = env.addSource(new TwoStringSource)
    val stream = input.map(new RichMapFunction[String, String] {

      val jdbcUrl = "jdbc:mysql://venn:3306?useSSL=false&allowPublicKeyRetrieval=true"
      val username = "root"
      val password = "123456"
      val driverName = "com.mysql.jdbc.Driver"
      var conn: Connection = null
      var ps: PreparedStatement = null
      val map = new util.HashMap[String, String]()

      override def open(parameters: Configuration): Unit = {
        logger.info("init....")
        query()
        // new Timer
        val timer = new Timer(true)
        // schedule is 10 second 定义了一个10秒的定时器,定时执行查询数据库的方法
        timer.schedule(new TimerTask {
          override def run(): Unit = {
            query()
          }
        }, 10000, 10000)

      }

      override def map(value: String): String = {
        // concat input and mysql data,简单关联输出
        value + "-" + map.get(value.split(",")(0))
      }

      /**
        * query mysql for get new config data
        */
      def query() = {
        logger.info("query mysql")
        try {
          Class.forName(driverName)
          conn = DriverManager.getConnection(jdbcUrl, username, password)
          ps = conn.prepareStatement("select id,name from venn.timer")
          val rs = ps.executeQuery

          while (!rs.isClosed && rs.next) {
            val id = rs.getString(1)
            val name = rs.getString(2)            // 将结果放到 map 中
            map.put(id, name)
          }
          logger.info("get config from db size : {}", map.size())

        } catch {
          case e@(_: ClassNotFoundException | _: SQLException) =>
            e.printStackTrace()
        } finally {
          ps.close()
          conn.close()
        }
      }
    })
//          .print()


    val sink = new FlinkKafkaProducer[String]("timer_out"
      , new MyKafkaSerializationSchema[String]()
      , Common.getProp
      , FlinkKafkaProducer.Semantic.EXACTLY_ONCE)
    stream.addSink(sink)

简单的Java定时器:

val timer = new Timer(true)
// schedule is 10 second, 5 second between successive task executions
timer.schedule(new TimerTask {
  override def run(): Unit = {
    query()
  }
}, 10000, 10000)

------------------20200327 改---------------------

之前 博客写的有问题,public void schedule(TimerTask task, long delay, long period) 的第三个参数才是重复执行的时间间隔,0 是不执行,我之前写的时候放上去的案例,调用的 Timer 的构造方法是: public void schedule(TimerTask task, long delay) 只会在 delay 时间后调用一次,并不会重复执行,不需要 调用 : public void schedule(TimerTask task, long delay, long period)   这样的构造方法,才能真正的定时执行。

使用之前的方法执行的,会看到query 方法执行了两次,是 open 中主动调用了一次和 之后调度了一次,定时器就结束了。

感谢社区大佬指出

同时社区还有大佬指出 : ScheduledExecutorService 会比 timer 更好;理由: Timer里边的逻辑失败的话不会抛出任何异常,直接结束,建议用ScheduledExecutorService替换Timer并且捕获下异常看看

------------------------------------

看下输出的数据:

7,N-7zjn
7,C-7zjn
7,U-7zjn
4,T-4TaM
7,J-7zjn
9,R-9blS
4,C-4TaM
9,T-9blS
4,A-4TaM
6,I-6WhP
9,U-9blS

注:“-” 之前是原始数据,后面是关联后的数据

部署到服务器上定时器的调度:

2019-09-28 18:28:13,476 INFO  com.venn.stream.api.timer.CustomerTimerDemo$                  - query mysql
2019-09-28 18:28:13,480 INFO  com.venn.stream.api.timer.CustomerTimerDemo$                  - get config from db size : 10
2019-09-28 18:28:18,553 INFO  org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction  - FlinkKafkaProducer 0/1 - checkpoint 17 complete, committing transaction TransactionHolder{handle=KafkaTransactionState [transactionalId=null, producerId=-1, epoch=-1], transactionStartTime=1569666488499} from checkpoint 17
2019-09-28 18:28:23,476 INFO  com.venn.stream.api.timer.CustomerTimerDemo$                  - query mysql
2019-09-28 18:28:23,481 INFO  com.venn.stream.api.timer.CustomerTimerDemo$                  - get config from db size : 10
2019-09-28 18:28:28,549 INFO  org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction  - FlinkKafkaProducer 0/1 - checkpoint 18 complete, committing transaction TransactionHolder{handle=KafkaTransactionState [transactionalId=null, producerId=-1, epoch=-1], transactionStartTime=1569666498505} from checkpoint 18
2019-09-28 18:28:33,477 INFO  com.venn.stream.api.timer.CustomerTimerDemo$                  - query mysql
2019-09-28 18:28:33,484 INFO  com.venn.stream.api.timer.CustomerTimerDemo$                  - get config from db size : 10

十秒调度一次

 欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文

 Flink 中定时加载外部数据

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
5个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
ThinkPHP 根据关联数据查询 hasWhere 的使用实例
很多时候,模型关联后需要根据关联的模型做查询。场景:广告表(ad),广告类型表(ad\_type),现在需要筛选出广告类型表中id字段为1且广告表中status为1的列表先看关联的设置部分 publicfunctionadType(){return$thisbelongsTo('A
为什么mysql不推荐使用雪花ID作为主键
作者:毛辰飞背景在mysql中设计表的时候,mysql官方推荐不要使用uuid或者不连续不重复的雪花id(long形且唯一),而是推荐连续自增的主键id,官方的推荐是auto_increment,那么为什么不建议采用uuid,使用uuid究
Python进阶者 Python进阶者
11个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这