Flink集成iceberg数据湖之合并小文件

Stella981
• 阅读 1200
  • 背景

  • 使用

  • 流式数据入湖

  • 开启压缩程序

  • 快照过期

  • 删除无用文件

  • 数据查询

  • 遇到的坑

  • 最大并发度问题

  • 文件被重复压缩

  • 扫描任务读取文件问题

  • 不读取大文件

  • 优化生成CombinedScanTask

  • 后续问题

  • 数据管理

  • 迁移问题

  • presto查询性能优化

  • 总结

背景

在传统的实时数仓中,由于列式存储相对行式存储有较高的查询性能,我们一般采用orc,parquet数据格式,但是这种列式格式无法追加,流式数据又不能等候太长时间,等到文件够了一个hdfs block块大小再写入,所以不可避免的产生了一个令人头大的问题,即小文件问题,由于使用小文件会增加namenode的压力,并且影响查询性能,所以我们在使用流式数据入库的时候一般会对小文件进行合并处理。

但是传统的流式数据入库的过程中对小文件进行合并会产生很多问题,比如流式数据不断的往hive表进行写入,如果同时有一个合并程序进行小文件的合并,那么这时候对同一份数据进行读写。会不会产生问题。如何保证事务,出错了怎么回滚呢,这些都是很棘手的问题。

我们的流任务以flink为主,查询引擎是presto,所以调研以后,我决定引入iceberg来解决小文件合并的问题。

使用

流式数据入湖

我们主要的数据来源是kafka,flink的任务主要就是消费kafka的数据,然后处理以后发送到iceberg,任务主要是以sql为主,也有部分jar包的任务,提交的方式主要是使用zeppelin来提交,使用zeppelin提交sql任务是使用的其自带的功能,提交jar包是我自己写了一个插件。

开启压缩程序

目前社区提供了一个spark版本的合并小文件的Action,我们的环境以flink为主,所以我参考spark版本把这个压缩程序改了一个flink版本,并经过测试,进行了多处bug修改和优化。目前社区新发布的1.10版本中没有带这个功能,我自己基于master分支打了一个jar,并且里面包含了flink 版本压缩小文件的程序,以及所有的优化,需要的朋友,可以 到这下载一下,https://github.com/zhangjun0x01/bigdata-examples/blob/master/iceberg/libs/iceberg-flink-runtime-0.9.1.jar 。社区版本我觉得应该会在下一个版本发布。

这个压缩程序是单独启动的一个shell任务,逻辑就是先把iceberg表进行一次压缩。然后sleep五分钟。然后再启动压缩,是一个死循环任务。

之所以没有采取定时任务,是因为如果五分钟一个定时任务来压缩,那么如果五分钟之内没有压缩完成,或者压缩程序出现异常,导致本次压缩没完成的时候,下一个定时任务又起来了,就会把上次没有压缩完的数据一起压缩,这样就导致任务量就增大了,以后的任务压缩的文件越积累越多

Table table = ..............RewriteDataFilesActionResult result =    Actions.forTable(env, table)           .rewriteDataFiles()           .maxParallelism(10)           .targetSizeInBytes(128*1024*1024)        //.filter(Expressions.equal("day", day))           .execute();           

快照过期

目前我们的应用场景只需要查询当前数据就可以了,不需要查询历史数据,所以我只保留了最新的快照。在每次压缩程序之后,做了处理,使当前快照时间以前的快照过期。程序会自动删除以前的过期数据文件.

    Snapshot snapshot = table.currentSnapshot();            if (snapshot != null){                long time = snapshot.timestampMillis();                table.expireSnapshots()                     .expireOlderThan(time)                     .commit();            }

删除无用文件

我发现使程序的快照过期的代码并没有删除metadata里面的metadata.json结尾的json文件,所以在这之外,我单独启动了一个spark 任务来删除这些文件,(后续有时间可以改造成flink版本的)。

这个程序默认会删除三天之前的数据,我觉得对我来说可能不需要,我设置了删除一个小时之前的旧数据,但是有一点要强调,就是这个不能像快照过期一样,删除当前快照以前的数据,因为目前有入湖的流式数据,和压缩程序在同时操作一个表,如果该程序在删除无用文件的同时,其他两个程序很有可能正在读取或者写入,这样会导致删除了一些元数据文件,其他两个程序报错。

    long olderThanTimestamp = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1);    TableIdentifier identifier = TableIdentifier.of("db_name","table_name");    org.apache.iceberg.Table table = catalog.loadTable(identifier);    List<String> list = Actions.forTable(spark, table)                               .removeOrphanFiles()                               .olderThan(olderThanTimestamp)                               .execute();    list.stream().forEach(System.out::println);            

数据查询

我们使用的查询工具是presto,presto查询hive改成查询iceberg很简单,只需要添加一个catalog就可以了。

我们使用的是prestosql 331版本,其他的版本我没有做过测试

遇到的坑

最大并发度问题

目前系统是将扫描任务合并成任务,默认的并发度是合成任务的个数,但是当某一个表的分区数据比较多的时候,那么这个压缩任务的并发度可能会非常大,比如好几百,所以这样就会占用非常大的资源,为了避免这种压缩任务占用过多的资源而影响线上正常的任务,我们给他提供一个最大的并行度的设置,如果没有超过这个并行度就用系统的默认策略,超过了之后,就使用我们提供的最大并行度的参数

文件被重复压缩

比如我们设置压缩的大小128M,这个分区下面如果我们有三个120M大小的文件,那么压缩的时候这三个120M文件会被读取。然后再重新生成三个和原来旧数据一样大小的新文件,我觉得这个是无用且浪费资源的,所以我们也进行了改造,这种文件就不需要压缩了。

扫描任务读取文件问题

这个我在测试的时候发现。采用任务的默认读取大小,也就是读取大小是128M,压缩出来的文件使用hdfs命令查看的时候,发现才十几兆,和实际的128M相差太远,通过debug源码发现,扫描任务在扫描文件的时候读取的128的大小是按照实际读取的数据大小读取的,也就是压缩之前的数据,而我们这个orc文件是经过压缩的。

这个只有orc文件格式会出现这个问题,这个是一个bug,已经修复。https://github.com/apache/iceberg/issues/1666

不读取大文件

比如我们设置的目标大小128M,但是如果有文件超过了128M,那么压缩的时候这种文件就不需要读取了,这块也做了优化。

优化生成CombinedScanTask

默认情况下,系统是依次遍历查询到的数据文件,然后累加,直到达到target file size,比如有如下大小的数据文件,20M, 20M, 20M, 70M, 100M,目标文件大小是128M,系统将会生成三个CombinedScanTask:(20M, 20M, 20M), (70M), (100M),很显然,如果生成两个将会更加合理,(20M, 20M, 70M), (20M, 100M),这个是一个优化点,我还没有来得及做。

https://github.com/apache/iceberg/issues/1667

后续问题

数据管理

压缩完一个分区的数据,我想看看当前快照下面有多少个文件,每个文件大小是多少,是否符合我的预期,但是目前系统没有一个合适的工具,如果直接看data目录的数据文件,不知道哪些文件属于当前快照,我们需要通过api来写代码做查看。

我觉得后续我们需要一个最好是可视化的工具来方便的管理和查询iceberg表。

迁移问题

我们在测试过可以使用iceberg以后,如何将以前的hive表迁移成iceberg表呢,新建一个iceberg table,然后写批任务导入?如果我们有非常多的hive表,或者有的表下面分区比较多,这个时候怎么弄呢?如果写批任务导入将是一个巨大的工程,我现在是自己写了一个工具类,不过我觉得应该把它做成一个管理工具,更方便用户使用。

presto查询性能优化

对于一些相对较大的hive表,迁移到iceberg表之后,使用presto查询的时候,我发现速度变慢了,理论上查询iceberg比hive少了一层list操作,应该会快一些,这个不知道是我配置的问题,还是presto和iceberg集成的问题,需要排查一下。

总结

此外由于一些开源软件的新版或者新功能,很多可能没有经过线上生产环境的复杂多变的多方面的测试,建议不要贸然上线到生产环境,要多做一些测试,也多给社区反馈一下我们每个人遇到的各种问题。新的功能或者版本遇到了问题,网上很难找得到资料,所以我们对于开源软件的使用,有时候需要debug源码来解决。

在开源软件为我们提供便利的同时,我们也最好把我们自己的优化改进等等推回社区,这样众人拾柴火焰高,开源社区才能越来越好,才会让更多的人受益。

在这个过程中,也感谢社区各位大佬的支持,以前自己写代码。一般都要求快速更新迭代,可能对代码质量要求没那么高,我需要第一时间来解决问题,哪怕是一个临时方案,但是对于社区来说,对代码要求很严格,所以以后在参与社区的过程中也需要多多注意。

由于本人水平有限,也难免有错误,希望大家多多指正,更多信息,欢迎关注我的公众号[大数据技术与应用实战].

Flink集成iceberg数据湖之合并小文件

本文分享自微信公众号 - 大数据技术与应用实战(bigdata_bigdata)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Java修道之路,问鼎巅峰,我辈代码修仙法力齐天
<center<fontcolor00FF7Fsize5face"黑体"代码尽头谁为峰,一见秃头道成空。</font<center<fontcolor00FF00size5face"黑体"编程修真路破折,一步一劫渡飞升。</font众所周知,编程修真有八大境界:1.Javase练气筑基2.数据库结丹3.web前端元婴4.Jav
Stella981 Stella981
3年前
Android So动态加载 优雅实现与原理分析
背景:漫品Android客户端集成适配转换功能(基于目标识别(So库35M)和人脸识别库(5M)),导致apk体积50M左右,为优化客户端体验,决定实现So文件动态加载.!(https://oscimg.oschina.net/oscnet/00d1ff90e4b34869664fef59e3ec3fdd20b.png)点击上方“蓝字”关注我
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
为什么mysql不推荐使用雪花ID作为主键
作者:毛辰飞背景在mysql中设计表的时候,mysql官方推荐不要使用uuid或者不连续不重复的雪花id(long形且唯一),而是推荐连续自增的主键id,官方的推荐是auto_increment,那么为什么不建议采用uuid,使用uuid究
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
美凌格栋栋酱 美凌格栋栋酱
1小时前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(