Apache Hudi Rollback实现分析

Stella981
• 阅读 844

1. 介绍

在发现有些commit出错时,可使用Hudi提供的rollback回滚至指定的commit,这样可防止出现错误的结果,并且当一次commit失败时,也会进行rollback操作,保证一次commit的原子性。

2. 分析

rollback(回滚)的入口在 HoodieWriteClient#rollback,其依赖 HoodieWriteClient#rollbackInternal方法完成实际的回滚,其核心代码如下

  1. protected void rollbackInternal(String commitToRollback) {

  2. // 生成新的rollback时间

  3. final String startRollbackTime = HoodieActiveTimeline.createNewInstantTime();

  4. try {

  5. HoodieTable<T> table = HoodieTable.getHoodieTable(

  6. createMetaClient(true), config, jsc);

  7. // 找出第一个与rollback commit相等的instant

  8. Option<HoodieInstant> rollbackInstantOpt =

  9. Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants()

  10. .filter(instant -> HoodieActiveTimeline.EQUAL.test(instant.getTimestamp(), commitToRollback))

  11. .findFirst());

  12. // 存在

  13. if (rollbackInstantOpt.isPresent()) {

  14. // 进行回滚

  15. List<HoodieRollbackStat> stats = doRollbackAndGetStats(rollbackInstantOpt.get());

  16. // 结束回滚

  17. finishRollback(context, stats, Collections.singletonList(commitToRollback), startRollbackTime);

  18. }

  19. } catch (IOException e) {

  20. throw new HoodieRollbackException("Failed to rollback " + config.getBasePath() + " commits " + commitToRollback,

  21. e);

  22. }

  23. }

首先过滤出commit/delta_commit中是否存在待回滚instant的时间,如果存在,则进行回滚,回滚的核心方法为 doRollbackAndGetStats,该方法在前一篇讲解savepoint时已经分析过,该方法会调用 HoodieTable#rollback完成实际回滚动作,下面着重分析 HoodieTable#rollback方法,对于MOR和COW不同类型有不同实现,下面一一进行分析。

2.1 HoodieCopyOnWriteTable#rollback

对于COW类型而言, rollback核心代码如下

  1. public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, HoodieInstant instant, boolean deleteInstants)

  2. throws IOException {

  3. long startTime = System.currentTimeMillis();

  4. List<HoodieRollbackStat> stats = new ArrayList<>();

  5. HoodieActiveTimeline activeTimeline = this.getActiveTimeline();

  6. if (instant.isCompleted()) { // instant状态为completed

  7. // 转变至inflight状态

  8. instant = activeTimeline.revertToInflight(instant);

  9. }

  10. if (!instant.isRequested()) { // 不为requested状态

  11. String commit = instant.getTimestamp();

  12. // 生成回滚的请求

  13. List<RollbackRequest> rollbackRequests = generateRollbackRequests(instant);

  14. // 进行回滚

  15. stats = new RollbackExecutor(metaClient, config).performRollback(jsc, instant, rollbackRequests);

  16. }

  17. // 删除inflight和requested状态的instant

  18. deleteInflightAndRequestedInstant(deleteInstants, activeTimeline, instant);

  19. return stats;

  20. }

可以看到,进行回滚总体分为四步:1. 对于处理completed状态的instant,首先会将其转变至inflight状态,而对于不处于requested状态的instant(compaction会存在requested状态);2. 生成回滚请求;3. 进行回滚;4. 删除instant。

2.1.1 转变instant状态

对于处于completed状态的instant,将其转变至 inflight状态,其核心代码如下

  1. public HoodieInstant revertToInflight(HoodieInstant instant) {

  2. // 获取inflight状态的instant

  3. HoodieInstant inflight = HoodieTimeline.getInflightInstant(instant, metaClient.getTableType());

  4. // 转变至inflight,即文件名会变为.inflight

  5. revertCompleteToInflight(instant, inflight);

  6. return inflight;

  7. }

对于状态转变体现在文件名后缀的变化,即会变为 .inflght状态。

2.1.2 生成回滚请求

回滚请求由 generateRollbackRequests方法生成,其核心代码如下

  1. private List<RollbackRequest> generateRollbackRequests(HoodieInstant instantToRollback)

  2. throws IOException {

  3. // 获取所有的分区路径,对每个分区路径生成DELETE_DATA_AND_LOG_FILES类型的RollbackRequest

  4. return FSUtils.getAllPartitionPaths(this.metaClient.getFs(), this.getMetaClient().getBasePath(),

  5. config.shouldAssumeDatePartitioning()).stream().map(partitionPath -> RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback))

  6. .collect(Collectors.toList());

  7. }

会根据不同的分区路径生成不同的RollbackRequest,该方法会生成会生成DELETEDATAANDLOGFILES类型,指定分区路径的RollbackRequest。

2.1.3 进行回滚

通过 RollbackExecutor#performRollback进行回滚,其核心代码如下

  1. public List<HoodieRollbackStat> performRollback(JavaSparkContext jsc, HoodieInstant instantToRollback,

  2. List<RollbackRequest> rollbackRequests) {

  3. SerializablePathFilter filter = (path) -> {

  4. if (path.toString().contains(".parquet")) {

  5. // 获取parquet文件提交时间

  6. String fileCommitTime = FSUtils.getCommitTime(path.getName());

  7. // 是否等于指定回滚的时间

  8. return instantToRollback.getTimestamp().equals(fileCommitTime);

  9. } else if (path.toString().contains(".log")) {

  10. // 获取log文件提交时间

  11. String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);

  12. // 是否等于指定回滚的时间

  13. return instantToRollback.getTimestamp().equals(fileCommitTime);

  14. }

  15. return false;

  16. };

  17. int sparkPartitions = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1);

  18. return jsc.parallelize(rollbackRequests, sparkPartitions).mapToPair(rollbackRequest -> {

  19. final Map<FileStatus, Boolean> filesToDeletedStatus = new HashMap<>();

  20. switch (rollbackRequest.getRollbackAction()) { // rollback类型

  21. case DELETE_DATA_FILES_ONLY: { // 仅仅删除数据文件

  22. // 根据分区路径来删除该路径下文件

  23. deleteCleanedFiles(metaClient, config, filesToDeletedStatus, instantToRollback.getTimestamp(),

  24. rollbackRequest.getPartitionPath());

  25. return new Tuple2<>(rollbackRequest.getPartitionPath(),

  26. HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())

  27. .withDeletedFileResults(filesToDeletedStatus).build());

  28. }

  29. case DELETE_DATA_AND_LOG_FILES: { // 删除数据文件和日志文件

  30. // 根据分区路径来删除该路径下文件

  31. deleteCleanedFiles(metaClient, config, filesToDeletedStatus, rollbackRequest.getPartitionPath(), filter);

  32. return new Tuple2<>(rollbackRequest.getPartitionPath(),

  33. HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())

  34. .withDeletedFileResults(filesToDeletedStatus).build());

  35. }

  36. case APPEND_ROLLBACK_BLOCK: { // 添加ROLLBACK块

  37. Writer writer = null;

  38. try {

  39. writer = HoodieLogFormat.newWriterBuilder()

  40. .onParentPath(FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath()))

  41. .withFileId(rollbackRequest.getFileId().get())

  42. .overBaseCommit(rollbackRequest.getLatestBaseInstant().get()).withFs(metaClient.getFs())

  43. .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();

  44. // 生成元数据,如生成控制块(CommandBlock)

  45. Map<HeaderMetadataType, String> header = generateHeader(instantToRollback.getTimestamp());

  46. writer = writer.appendBlock(new HoodieCommandBlock(header));

  47. } catch (IOException | InterruptedException io) {

  48. throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io);

  49. } finally {

  50. try {

  51. if (writer != null) {

  52. writer.close();

  53. }

  54. } catch (IOException io) {

  55. throw new UncheckedIOException(io);

  56. }

  57. }

  58. Map<FileStatus, Long> filesToNumBlocksRollback = new HashMap<>();

  59. filesToNumBlocksRollback.put(metaClient.getFs().getFileStatus(Preconditions.checkNotNull(writer).getLogFile().getPath()), 1L);

  60. return new Tuple2<>(rollbackRequest.getPartitionPath(),

  61. HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())

  62. .withRollbackBlockAppendResults(filesToNumBlocksRollback).build());

  63. }

  64. default:

  65. throw new IllegalStateException("Unknown Rollback action " + rollbackRequest);

  66. }

  67. }).reduceByKey(this::mergeRollbackStat).map(Tuple2::_2).collect();

  68. }

对于DELETEDATAFILES_ONLY类型的rollback,会调用 deleteCleanedFiles来删除数据文件,其核心代码如下

  1. private Map<FileStatus, Boolean> deleteCleanedFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,

  2. Map<FileStatus, Boolean> results, String commit, String partitionPath) throws IOException {

  3. FileSystem fs = metaClient.getFs();

  4. PathFilter filter = (path) -> {

  5. if (path.toString().contains(".parquet")) { // 数据文件

  6. String fileCommitTime = FSUtils.getCommitTime(path.getName());

  7. // 与rollback时间相等

  8. return commit.equals(fileCommitTime);

  9. }

  10. return false;

  11. };

  12. // 过滤出与rollback时间相等的所有parquet文件

  13. FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);

  14. for (FileStatus file : toBeDeleted) { // 逐一删除

  15. boolean success = fs.delete(file.getPath(), false);

  16. results.put(file, success);

  17. }

  18. return results;

  19. }

首先会过滤指定分区下所有与rollback时间相等的parquet文件,然后逐一删除。

对于DELETEDATAANDLOGFILES类型的rollback,会调用同名的 deleteCleanedFiles来删除文件,其核心代码如下

  1. private Map<FileStatus, Boolean> deleteCleanedFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,

  2. Map<FileStatus, Boolean> results, String partitionPath, PathFilter filter) throws IOException {

  3. FileSystem fs = metaClient.getFs();

  4. // 过滤出与rollback时间相等的所有parquet和log文件

  5. FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);

  6. for (FileStatus file : toBeDeleted) { // 逐一删除

  7. boolean success = fs.delete(file.getPath(), false);

  8. results.put(file, success);

  9. }

  10. return results;

  11. }

首先会过滤指定分区下所有与rollback时间相等的parquet/log文件,然后逐一删除。

对于APPENDROLLBACKBLOCK类型的rollback,会生成日志文件控制块并写入指定的文件中,在读取时,将不会读取该控制块的前一个块。

2.1.4 删除instant

在完成回滚后,还需要调用 deleteInflightAndRequestedInstant来删除instant,其核心代码如下

  1. protected void deleteInflightAndRequestedInstant(boolean deleteInstant, HoodieActiveTimeline activeTimeline,

  2. HoodieInstant instantToBeDeleted) {

  3. // 删除marker下的目录

  4. deleteMarkerDir(instantToBeDeleted.getTimestamp());

  5. if (deleteInstant) { // 删除instant

  6. // 删除处于pending状态的instant

  7. activeTimeline.deletePending(instantToBeDeleted);

  8. if (instantToBeDeleted.isInflight() && !metaClient.getTimelineLayoutVersion().isNullVersion()) {

  9. // 删除处于requested状态的instant

  10. instantToBeDeleted = new HoodieInstant(State.REQUESTED, instantToBeDeleted.getAction(),

  11. instantToBeDeleted.getTimestamp());

  12. activeTimeline.deletePending(instantToBeDeleted);

  13. }

  14. }

  15. }

删除instant主要是删除处于inflight和requested状态的在元数据目录下的文件。

2.2 HoodieMergeOnReadTable#rollback

对于MOR而言, rollback核心代码如下

  1. public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, HoodieInstant instant,

  2. boolean deleteInstants) throws IOException {

  3. long startTime = System.currentTimeMillis();

  4. // rollback时间

  5. String commit = instant.getTimestamp();

  6. if (instant.isCompleted()) { // instant状态为completed

  7. // // 转变至inflight状态

  8. instant = this.getActiveTimeline().revertToInflight(instant);

  9. }

  10. List<HoodieRollbackStat> allRollbackStats = new ArrayList<>();

  11. // 不为requested状态

  12. if (!instant.isRequested()) {

  13. // 生成rollback请求

  14. List<RollbackRequest> rollbackRequests = generateRollbackRequests(jsc, instant);

  15. // 进行回滚

  16. allRollbackStats = new RollbackExecutor(metaClient, config).performRollback(jsc, instant, rollbackRequests);

  17. }

  18. // 删除inflight和requested状态的instant

  19. deleteInflightAndRequestedInstant(deleteInstants, this.getActiveTimeline(), instant);

  20. return allRollbackStats;

  21. }

可以看到其流程与COW相同,不再赘述。

3. 总结

对于rollback而言,其主要分为四步:转变instant状态;2. 生成回滚请求;3. 进行回滚;4. 删除instant。而回滚时会分为三种情况,对于 DELETE_DATA_FILES_ONLYDELETE_DATA_AND_LOG_FILES类型的rollback,会直接删除对应commit的数据文件和日志文件,而对于 APPEND_ROLLBACK_BLOCK类型,则会写入控制块至文件中,在读取时不读取其前一个块。

Apache Hudi Rollback实现分析

Apache Hudi Rollback实现分析

Apache Hudi Rollback实现分析

本文分享自微信公众号 - ApacheHudi(ApacheHudi)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
4个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
MySQL相关问题题
1.truncate、delete、drop的区别(1)truncate、drop是不可以rollback的,但是delete是可以rollback的。DELETE语句执行删除的过程是每次从表中删除一行,并且同时将该行的删除操作作为事务记录在日志中保存以便进行进行回滚操作。TRUNCATETABLE则一次性地从表中删除所有的数据并不把单独的删
Wesley13 Wesley13
3年前
Mysql技术总结总结
一:mysql数据库引擎MyISAM和InnoDB的区别:1、MyISAM具有检查和修复表格的大多数工具。表格可以被压缩,而且支持全文收索。不支持事物,而且不支持外键。2、innodb这种表是事务安全的。提供了commit(提交)rollback(实务回滚)支持外键,
Python进阶者 Python进阶者
10个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这