1. 介绍
在发现有些commit出错时,可使用Hudi提供的rollback回滚至指定的commit,这样可防止出现错误的结果,并且当一次commit失败时,也会进行rollback操作,保证一次commit的原子性。
2. 分析
rollback(回滚)的入口在 HoodieWriteClient#rollback,其依赖 HoodieWriteClient#rollbackInternal方法完成实际的回滚,其核心代码如下
protected void rollbackInternal(String commitToRollback) {// 生成新的rollback时间final String startRollbackTime = HoodieActiveTimeline.createNewInstantTime();try {HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);// 找出第一个与rollback commit相等的instantOption<HoodieInstant> rollbackInstantOpt =Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants().filter(instant -> HoodieActiveTimeline.EQUAL.test(instant.getTimestamp(), commitToRollback)).findFirst());// 存在if (rollbackInstantOpt.isPresent()) {// 进行回滚List<HoodieRollbackStat> stats = doRollbackAndGetStats(rollbackInstantOpt.get());// 结束回滚finishRollback(context, stats, Collections.singletonList(commitToRollback), startRollbackTime);}} catch (IOException e) {throw new HoodieRollbackException("Failed to rollback " + config.getBasePath() + " commits " + commitToRollback,e);}}
首先过滤出commit/delta_commit中是否存在待回滚instant的时间,如果存在,则进行回滚,回滚的核心方法为 doRollbackAndGetStats,该方法在前一篇讲解savepoint时已经分析过,该方法会调用 HoodieTable#rollback完成实际回滚动作,下面着重分析 HoodieTable#rollback方法,对于MOR和COW不同类型有不同实现,下面一一进行分析。
2.1 HoodieCopyOnWriteTable#rollback
对于COW类型而言, rollback核心代码如下
public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, HoodieInstant instant, boolean deleteInstants)throws IOException {long startTime = System.currentTimeMillis();List<HoodieRollbackStat> stats = new ArrayList<>();HoodieActiveTimeline activeTimeline = this.getActiveTimeline();if (instant.isCompleted()) { // instant状态为completed// 转变至inflight状态instant = activeTimeline.revertToInflight(instant);}if (!instant.isRequested()) { // 不为requested状态String commit = instant.getTimestamp();// 生成回滚的请求List<RollbackRequest> rollbackRequests = generateRollbackRequests(instant);// 进行回滚stats = new RollbackExecutor(metaClient, config).performRollback(jsc, instant, rollbackRequests);}// 删除inflight和requested状态的instantdeleteInflightAndRequestedInstant(deleteInstants, activeTimeline, instant);return stats;}
可以看到,进行回滚总体分为四步:1. 对于处理completed状态的instant,首先会将其转变至inflight状态,而对于不处于requested状态的instant(compaction会存在requested状态);2. 生成回滚请求;3. 进行回滚;4. 删除instant。
2.1.1 转变instant状态
对于处于completed状态的instant,将其转变至 inflight状态,其核心代码如下
public HoodieInstant revertToInflight(HoodieInstant instant) {// 获取inflight状态的instantHoodieInstant inflight = HoodieTimeline.getInflightInstant(instant, metaClient.getTableType());// 转变至inflight,即文件名会变为.inflightrevertCompleteToInflight(instant, inflight);return inflight;}
对于状态转变体现在文件名后缀的变化,即会变为 .inflght状态。
2.1.2 生成回滚请求
回滚请求由 generateRollbackRequests方法生成,其核心代码如下
private List<RollbackRequest> generateRollbackRequests(HoodieInstant instantToRollback)throws IOException {// 获取所有的分区路径,对每个分区路径生成DELETE_DATA_AND_LOG_FILES类型的RollbackRequestreturn FSUtils.getAllPartitionPaths(this.metaClient.getFs(), this.getMetaClient().getBasePath(),config.shouldAssumeDatePartitioning()).stream().map(partitionPath -> RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback)).collect(Collectors.toList());}
会根据不同的分区路径生成不同的RollbackRequest,该方法会生成会生成DELETEDATAANDLOGFILES类型,指定分区路径的RollbackRequest。
2.1.3 进行回滚
通过 RollbackExecutor#performRollback进行回滚,其核心代码如下
public List<HoodieRollbackStat> performRollback(JavaSparkContext jsc, HoodieInstant instantToRollback,List<RollbackRequest> rollbackRequests) {SerializablePathFilter filter = (path) -> {if (path.toString().contains(".parquet")) {// 获取parquet文件提交时间String fileCommitTime = FSUtils.getCommitTime(path.getName());// 是否等于指定回滚的时间return instantToRollback.getTimestamp().equals(fileCommitTime);} else if (path.toString().contains(".log")) {// 获取log文件提交时间String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);// 是否等于指定回滚的时间return instantToRollback.getTimestamp().equals(fileCommitTime);}return false;};int sparkPartitions = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1);return jsc.parallelize(rollbackRequests, sparkPartitions).mapToPair(rollbackRequest -> {final Map<FileStatus, Boolean> filesToDeletedStatus = new HashMap<>();switch (rollbackRequest.getRollbackAction()) { // rollback类型case DELETE_DATA_FILES_ONLY: { // 仅仅删除数据文件// 根据分区路径来删除该路径下文件deleteCleanedFiles(metaClient, config, filesToDeletedStatus, instantToRollback.getTimestamp(),rollbackRequest.getPartitionPath());return new Tuple2<>(rollbackRequest.getPartitionPath(),HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()).withDeletedFileResults(filesToDeletedStatus).build());}case DELETE_DATA_AND_LOG_FILES: { // 删除数据文件和日志文件// 根据分区路径来删除该路径下文件deleteCleanedFiles(metaClient, config, filesToDeletedStatus, rollbackRequest.getPartitionPath(), filter);return new Tuple2<>(rollbackRequest.getPartitionPath(),HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()).withDeletedFileResults(filesToDeletedStatus).build());}case APPEND_ROLLBACK_BLOCK: { // 添加ROLLBACK块Writer writer = null;try {writer = HoodieLogFormat.newWriterBuilder().onParentPath(FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath())).withFileId(rollbackRequest.getFileId().get()).overBaseCommit(rollbackRequest.getLatestBaseInstant().get()).withFs(metaClient.getFs()).withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();// 生成元数据,如生成控制块(CommandBlock)Map<HeaderMetadataType, String> header = generateHeader(instantToRollback.getTimestamp());writer = writer.appendBlock(new HoodieCommandBlock(header));} catch (IOException | InterruptedException io) {throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io);} finally {try {if (writer != null) {writer.close();}} catch (IOException io) {throw new UncheckedIOException(io);}}Map<FileStatus, Long> filesToNumBlocksRollback = new HashMap<>();filesToNumBlocksRollback.put(metaClient.getFs().getFileStatus(Preconditions.checkNotNull(writer).getLogFile().getPath()), 1L);return new Tuple2<>(rollbackRequest.getPartitionPath(),HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()).withRollbackBlockAppendResults(filesToNumBlocksRollback).build());}default:throw new IllegalStateException("Unknown Rollback action " + rollbackRequest);}}).reduceByKey(this::mergeRollbackStat).map(Tuple2::_2).collect();}
对于DELETEDATAFILES_ONLY类型的rollback,会调用 deleteCleanedFiles来删除数据文件,其核心代码如下
private Map<FileStatus, Boolean> deleteCleanedFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,Map<FileStatus, Boolean> results, String commit, String partitionPath) throws IOException {FileSystem fs = metaClient.getFs();PathFilter filter = (path) -> {if (path.toString().contains(".parquet")) { // 数据文件String fileCommitTime = FSUtils.getCommitTime(path.getName());// 与rollback时间相等return commit.equals(fileCommitTime);}return false;};// 过滤出与rollback时间相等的所有parquet文件FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);for (FileStatus file : toBeDeleted) { // 逐一删除boolean success = fs.delete(file.getPath(), false);results.put(file, success);}return results;}
首先会过滤指定分区下所有与rollback时间相等的parquet文件,然后逐一删除。
对于DELETEDATAANDLOGFILES类型的rollback,会调用同名的 deleteCleanedFiles来删除文件,其核心代码如下
private Map<FileStatus, Boolean> deleteCleanedFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,Map<FileStatus, Boolean> results, String partitionPath, PathFilter filter) throws IOException {FileSystem fs = metaClient.getFs();// 过滤出与rollback时间相等的所有parquet和log文件FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);for (FileStatus file : toBeDeleted) { // 逐一删除boolean success = fs.delete(file.getPath(), false);results.put(file, success);}return results;}
首先会过滤指定分区下所有与rollback时间相等的parquet/log文件,然后逐一删除。
对于APPENDROLLBACKBLOCK类型的rollback,会生成日志文件控制块并写入指定的文件中,在读取时,将不会读取该控制块的前一个块。
2.1.4 删除instant
在完成回滚后,还需要调用 deleteInflightAndRequestedInstant来删除instant,其核心代码如下
protected void deleteInflightAndRequestedInstant(boolean deleteInstant, HoodieActiveTimeline activeTimeline,HoodieInstant instantToBeDeleted) {// 删除marker下的目录deleteMarkerDir(instantToBeDeleted.getTimestamp());if (deleteInstant) { // 删除instant// 删除处于pending状态的instantactiveTimeline.deletePending(instantToBeDeleted);if (instantToBeDeleted.isInflight() && !metaClient.getTimelineLayoutVersion().isNullVersion()) {// 删除处于requested状态的instantinstantToBeDeleted = new HoodieInstant(State.REQUESTED, instantToBeDeleted.getAction(),instantToBeDeleted.getTimestamp());activeTimeline.deletePending(instantToBeDeleted);}}}
删除instant主要是删除处于inflight和requested状态的在元数据目录下的文件。
2.2 HoodieMergeOnReadTable#rollback
对于MOR而言, rollback核心代码如下
public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, HoodieInstant instant,boolean deleteInstants) throws IOException {long startTime = System.currentTimeMillis();// rollback时间String commit = instant.getTimestamp();if (instant.isCompleted()) { // instant状态为completed// // 转变至inflight状态instant = this.getActiveTimeline().revertToInflight(instant);}List<HoodieRollbackStat> allRollbackStats = new ArrayList<>();// 不为requested状态if (!instant.isRequested()) {// 生成rollback请求List<RollbackRequest> rollbackRequests = generateRollbackRequests(jsc, instant);// 进行回滚allRollbackStats = new RollbackExecutor(metaClient, config).performRollback(jsc, instant, rollbackRequests);}// 删除inflight和requested状态的instantdeleteInflightAndRequestedInstant(deleteInstants, this.getActiveTimeline(), instant);return allRollbackStats;}
可以看到其流程与COW相同,不再赘述。
3. 总结
对于rollback而言,其主要分为四步:转变instant状态;2. 生成回滚请求;3. 进行回滚;4. 删除instant。而回滚时会分为三种情况,对于 DELETE_DATA_FILES_ONLY和 DELETE_DATA_AND_LOG_FILES类型的rollback,会直接删除对应commit的数据文件和日志文件,而对于 APPEND_ROLLBACK_BLOCK类型,则会写入控制块至文件中,在读取时不读取其前一个块。



本文分享自微信公众号 - ApacheHudi(ApacheHudi)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

