Structured Streaming 之状态存储解析

Easter79
• 阅读 497

引言

我们知道,持续查询的驱动引擎StreamExecution 会持续不断地驱动每个批次的执行。

对于不需要跨批次的持续查询,如 map(), filter() 等,每个批次之间的执行相互独立,不需要状态支持。而比如类似 count() 的聚合式持续查询,则需要跨批次的状态支持,这样本批次的执行只需依赖上一个批次的结果,而不需要依赖之前所有批次的结果。这也即增量式持续查询,能够将每个批次的执行时间稳定下来,避免越后面的批次执行时间越长的情形。

增量式持续查询的思路和实现:

Structured Streaming 之状态存储解析

而在这里面的 StateStore,即是 Structured Streaming 用于保存跨批次状态结果的模块组件。本文解析 StateStore 模块。

StateStore 模块的总体思路

Structured Streaming 之状态存储解析

StateStore 模块的总体思路:

  • 分布式实现

  • 跑在现有 Spark 的 driver-executors 架构上

  • driver 端是轻量级的 coordinator,只做协调工作

  • executor 端负责状态的实际分片的读写

  • 状态分片

  • 因为一个应用里可能会包含多个需要状态的 operator,而且 operator 本身也是分 partition 执行的,所以状态存储的分片以 operatorId+partitionId 为切分依据

  • 以分片为基本单位进行状态的读入和写出

  • 每个分片里是一个 key-value 的 store,key 和 value 的类型都是 UnsafeRow(可以理解为 SparkSQL 里的 Object 通用类型),可以按 key 查询、或更新

  • 状态分版本

  • 因为 StreamExection 会持续不断地执行批次,因而同一个 operator 同一个 partition 的状态也是随着时间不断更新、产生新版本的数据

  • 状态的版本是与 StreamExecution 的进展一致,比如 StreamExection 的批次 id = 7 完成时,那么所有 version = 7 的状态即已经持久化

  • 批量读入和写出分片

  • 累计当前版本(即 StreamExecution 的当前批次)的多行的状态修改,一次性写出到 HDFS 一个修改的流水 log,流水 log 写完即标志本批次的状态修改完成

  • 同时应用修改到内存中的状态缓存

  • 根据 operator + partition + version, 从 HDFS 读入数据,并缓存在内存里

  • 对于每个分片,读入时

  • 对于每个分片,写出时

关于 StateStore 的 operator, partiton, version 有一个图片可帮助理解:

Structured Streaming 之状态存储解析

StateStore:(a)迁移、(b)更新和查询、(c)维护、(d)故障恢复

Structured Streaming 之状态存储解析

(a) StateStore 在不同的节点之间如何迁移

在 StreamExecution 执行过程中,随时在 operator 实际执行的 executor 节点上唤起一个状态存储分片、并读入前一个版本的数据即可(如果 executor 上已经存在一个分片,那么就直接重用,不用唤起分片、也不用读入数据了)。

我们上节讲过,持久化的状态是在 HDFS 上的。那么如上图所示:

  • executor a, 唤起了 operator = 1, partition = 1 的状态存储分片,从 HDFS 里位于本机的数据副本 load 进来 version = 5 的数据;

  • 一个 executor 节点可以执行多个 operator,那么也就可以在一个 executor 上唤起多个状态存储分片(分别对应不同的 operator + partition),如图示 executor b

  • 在一些情况下,需要从其他节点的 HDFS 数据副本上 load 状态数据,如图中 executor c 需要从 executor b 的硬盘上 load 数据;

  • 另外还有的情况是,同一份数据被同时 load 到不同的 executor 上,如 executor dexecutor a 即是读入了同一份数据 —— 推测执行时就容易产生这种情况 —— 这时也不会产生问题,因为 load 进来的是同一份数据,然后在两个节点上各自修改,最终只会有一个节点能够成功提交对状态的修改。

(b) StateStore 的更新和查询

我们前面也讲过,在一个状态存储分片里,是 key-value 的 store。这个 key-value 的 store 支持如下操作:

/* == CRUD 增删改查 =============================== */  // 查询一条 key-value  def get(key: UnsafeRow): Option[UnsafeRow]      // 新增、或修改一条 key-value  def put(key: UnsafeRow, value: UnsafeRow): Unit      // 删除一条符合条件的 key-value  def remove(condition: UnsafeRow => Boolean): Unit  // 根据 key 删除 key-value  def remove(key: UnsafeRow): Unit    /* == 批量操作相关 =============================== */      // 提交当前执行批次的所有修改,将刷出到 HDFS,成功后版本将自增  def commit(): Long  // 放弃当前执行批次的所有修改  def abort(): Unit      // 当前状态分片、当前版本的所有 key-value 状态  def iterator(): Iterator[(UnsafeRow, UnsafeRow)]      // 当前状态分片、当前版本比上一个版本的所有增量更新  def updates(): Iterator[StoreUpdate]

使用 StateStore 的代码可以这样写(现在都是 Structured Streaming 内部实现在使用 StateStore,上层用户无需面对这些细节):

// 在最开始,获取正确的状态分片(按需重用已有分片或读入新的分片)  val store = StateStore.get(StateStoreId(checkpointLocation, operatorId, partitionId), ..., version, ...)  // 开始进行一些更改  store.put(...)  store.remove(...)      // 更改完成,批量提交缓存在内存里的更改到 HDFS  store.commit()      // 查看当前状态分片的所有 key-value / 刚刚更新了的 key-value  store.iterator()  store.updates()

(c) StateStore 的维护

我们看到,前面 StateStore 在写出状态的更新时,是写出的修改流水 log。

StateStore 本身也带了 maintainess 即维护模块,会周期性的在后台将过去的状态和最近若干版本的流水 log 进行合并,并把合并后的结果重新写回到 HDFS:old_snapshot + delta_a + delta_b + … => lastest_snapshot

这个过程跟 HBase 的 major/minor compact 差不多,但还没有区别到 major/minor 的粒度。

(d) StateStore 的故障恢复

StateStore 的所有状态以 HDFS 为准。如果某个状态分片在更新过程中失败了,那么还没有写出的更新会不可见。

恢复时也是从 HDFS 读入最近可见的状态,并配合 StreamExecution 的执行批次重做。从另一个角度说,就是大家 —— 输入数据、及状态存储 —— 先统一往后会退到本执行批次刚开始时的状态,然后重新计算。当然这里重新计算的粒度是 Spark 的单个 task,即一个 partition 的输入数据 + 一个 partition 的状态存储。

从 HDFS 读入最近可见的状态时,如果有最新的 snapshot,也就用最新的 snapshot,如果没有,就读入稍旧一点的 snapshot 和新的 deltas,先做一下最新状态的合并。

总结

在 Structured Streaming 里,StateStore 模块提供了 分片的分版本的可迁移的高可用  key-value store。

基于这个 StateStore 模块,StreamExecution 实现了 增量的 持续查询、和很好的故障恢复以维护 _end-to-end exactly-once guarantees_。

Structured Streaming 之状态存储解析

Structured Streaming 之状态存储解析

本文分享自微信公众号 - 大数据技术与架构(import_bigdata)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
3个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Wesley13 Wesley13
3年前
Mysql的学习6____事物,索引,备份,视图,触发器
1.Mysql事务:就是将一组的SQL语句放在一个批次去执行,要是一条语句出错,该批次的SQL语句都会取消执行。Mysql事物处理只支持InnoDB和BDB数据表类型。1.1事物的ACID原则:原子性(Atomic):事物中的SQL语句要么全部执行,要么全不执行,不可能停滞在中间的某个状态,若在执行中发生了错误,会进行事物的回滚(Rol
Wesley13 Wesley13
3年前
Java多线程导致的的一个事物性问题
业务场景我们现在有一个类似于文件上传的功能,各个子站点接受业务,业务上传文件,各个子站点的文件需要提交到总站点保存,文件是按批次提交到总站点的,也就是说,一个批次下面约有几百个文件。      考虑到白天提交这么多文件会影响到子站点其他系统带宽,我们将分站点的文件提交到总站点这个操作过程独立出来,放到晚上来做,具体时间是晚上7:00到早上7:00。
Python进阶者 Python进阶者
9个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
Easter79
Easter79
Lv1
今生可爱与温柔,每一样都不能少。
文章
2.8k
粉丝
5
获赞
1.2k