Flink 专题

Stella981
• 阅读 797

CheckPoint

1. checkpoint 保留策略

默认情况下,checkpoint 不会被保留,取消程序时即会删除他们,但是可以通过配置保留定期检查点,根据配置 当作业失败或者取消的时候 ,不会自动清除这些保留的检查点 。
java :

CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

ExternalizedCheckpointCleanup 可选项如下:

  • ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION: 取消作业时保留检查点。请注意,在这种情况下,您必须在取消后手动清理检查点状态。
  • ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 取消作业时删除检查点。只有在作业失败时,检查点状态才可用。

2. Checkpoint 配置

与SavePoint 类似 ,checkpoint 保留的是元数据文件和一些数据文件
默认情况下checkpoint 只保留 一份最新数据,如果需要进行checkpoint数据恢复 ,可以通过全局设置的方式设置该集群默认的checkpoint 保留数,以保证后期可以从checkpoint 点进行恢复 。 同时为了 及时保存checkpoint状态 还需要在服务级别设置 checkpoint 检查点的 备份速度 。 全局配置:
flink-conf.yaml

// 设置 checkpoint全局设置保存点  
state.checkpoints.dir: hdfs:///checkpoints/
// 设置checkpoint 默认保留 数量  
state.checkpoints.num-retained: 20

注意 如果将 checkpoint保存在hdfs 系统中 , 需要设置 hdfs 元数据信息 : fs.default-scheme:
服务级别设置:
java:

// 设置 checkpoint 保存目录  
env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data/");
// 设置checkpoint 检查点间隔时间  
env.enableCheckpointing(5000);

提交任务之后 job 界面 和 hdfs 界面

  • 通过页面可以看出 checkpoint 备份方式是每5秒执行一次 ,保存当前所有task 状态元信息 和状态信息 。
  • hdfs 信息 保存 jobId 为 0171897fa809692093b4a9b223cb35e4 最新的 20次 checkpoint 信息 Flink 专题 Flink 专题 Flink 专题

3. Checkpoint 状态点恢复

因为 flink checkpoint 目录 分别对应的是 jobId , 每通过 flink run 方式 / 页面提交方式 都会重新生成 jobId ,那么如何通过checkpoint 恢复 失败任务或者重新执行保留时间点的 任务?

flink 提供了 在启动 之时 通过设置 -s 参数指定checkpoint 目录 , 让新的jobId 读取该checkpoint 元文件信息和状态信息 ,从而达到指定时间节点启动 job 。启动方式如下 :

./bin/flink -s /flink/checkpoints/0171897fa809692093b4a9b223cb35e4/chk-50/_metadata  -p  @Parallelism -c @Mainclass @jar  

Savepoint

Savepoint 介绍

Savepoint是通过Flink的检查点机制创建的流作业执行状态的一致图像。您可以使用Savepoints来停止和恢复,分叉或更新Flink作业。保存点由两部分组成:稳定存储(例如HDFS,S3,...)上的(通常是大的)二进制文件和(相对较小的)元数据文件的目录。稳定存储上的文件表示作业执行状态图像的净数据。Savepoint的元数据文件以(绝对路径)的形式包含(主要)指向作为Savepoint一部分的稳定存储上的所有文件的指针。

savepoint 和 checkpoint 区别

从概念上讲,Flink的Savepoints与Checkpoints的不同之处在于备份与传统数据库系统中的恢复日志不同。检查点的主要目的是在意外的作业失败时提供恢复机制。Checkpoint的生命周期由Flink管理,即Flink创建,拥有和发布Checkpoint - 无需用户交互。作为一种恢复和定期触发的方法,Checkpoint实现的两个主要设计目标是:i)being as lightweight to create (轻量级),ii)fast restore (快速恢复) 。针对这些目标的优化可以利用某些属性,例如,JobCode在执行尝试之间不会改变。

与此相反,Savepoints由用户创建,拥有和删除。他们的用例是planned (计划) 的,manual backup( 手动备份 ) 和 resume(恢复) 。例如,这可能是您的Flink版本的更新,更改您的Job graph ,更改 parallelism ,分配第二个作业,如红色/蓝色部署,等等。当然,Savepoints必须在终止工作后继续存在。从概念上讲,保存点的生成和恢复成本可能更高,并且更多地关注可移植性和对前面提到的作业更改的支持。

Assigning Operator IDs ( 分配 operator ids)

为了能够在将来升级你的程序在本节中描述。主要的必要更改是通过该uid(String)方法手动指定操作员ID 。这些ID用于确定每个运算符的状态。
java:

DataStream<String> stream = env.
  // Stateful source (e.g. Kafka) with ID
  .addSource(new StatefulSource())
  .uid("source-id") // ID for the source operator
  .shuffle()
  // Stateful mapper with ID
  .map(new StatefulMapper())
  .uid("mapper-id") // ID for the mapper
  // Stateless printing sink
  .print(); // Auto-generated ID

如果您未手动指定ID,则会自动生成这些ID。只要这些ID不变,您就可以从保存点自动恢复。生成的ID取决于程序的结构,并且对程序更改很敏感。因此,强烈建议手动分配这些ID。

Savepoint State

触发保存点时,会创建一个新的保存点目录,其中将存储数据和元数据。可以通过配置默认目标目录或使用触发器命令指定自定义目标目录来控制此目录的位置

保存Savepoint

$ bin/flink savepoint :jobId [:targetDirectory]
这将触发具有ID的作业的保存点:jobId,并返回创建的保存点的路径。您需要此路径来还原和部署保存点。

在yarn 集群中保存Savepoint

$ bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId
这将触发具有ID :jobId和YARN应用程序ID 的作业的保存点:yarnAppId,并返回创建的保存点的路径。

使用 Savepoint 取消job

$ bin/flink cancel -s [:targetDirectory] :jobId
这将以原子方式触发具有ID的作业的保存点:jobid并取消作业。此外,您可以指定目标文件系统目录以存储保存点。该目录需要可由JobManager和TaskManager访问。

Resuming Savepoint

$ bin/flink run -s :savepointPath [:runArgs]
这将提交作业并指定要从中恢复的保存点。您可以指定保存点目录或_metadata文件的路径。

允许未恢复状态启动

$ bin/flink run -s :savepointPath -n [:runArgs]
默认情况下,resume操作将尝试将保存点的所有状态映射回要恢复的程序。如果删除了运算符,则可以通过--allowNonRestoredState(short -n:)选项跳过无法映射到新程序的状态:

全局配置

您可以通过state.savepoints.dir 配置文件设置默认savepoint 位置 。触发保存点时,此目录将用于存储保存点。您可以通过使用触发器命令指定自定义目标目录来覆盖默认值(请参阅:targetDirectory参数)。
flink-conf.yaml

# Default savepoint target directory
state.savepoints.dir: hdfs:///flink/savepoints

参考地址:

https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/state_backends.html https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.html https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/savepoints.html http://ju.outofmemory.cn/entry/370841

Flink 专题

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
Java日期时间API系列30
  实际使用中,经常需要使用不同精确度的Date,比如保留到天2020042300:00:00,保留到小时,保留到分钟,保留到秒等,常见的方法是通过格式化到指定精确度(比如:yyyyMMdd),然后再解析为Date。Java8中可以用更多的方法来实现这个需求,下面使用三种方法:使用Format方法、 使用Of方法和使用With方法,性能对比,使用
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这