Flink集成数据湖之实时数据写入iceberg

Stella981
• 阅读 877
  • 背景

  • iceberg简介

  • flink实时写入

  • 准备sql client环境

  • 创建catalog

  • 创建db

  • 创建table

  • 插入数据

  • 查询

  • 代码版本

  • 总结

背景

随着大数据处理结果的实时性要求越来越高,越来越多的大数据处理从离线转到了实时,其中以flink为主的实时计算在大数据处理中占有重要地位。

Flink消费kafka等实时数据流。然后实时写入hive,在大数据处理方面有着广泛的应用。此外由于列式存储格式如parquet或者orc在查询性能方面有着显著的提高,所以大家都会优先选择列式存储作为我们的存储格式。

传统的这种架构看似不错,但是还是有很多没有解决的问题:

  • 实时写入造成大量小文件,需要单独的程序来进行合并

  • 实时的写入,读取,还有合并小文件在同时进行,那么如何保证事务,读取数据的时候不会出现脏读。

  • Hdfs的数据一般是一次写入。多次读写,但是如果因为程序出错导致数据错了,确实要修改某一条数据改怎么办

  • 消费kafka的数据落地到hive,有一天kafka的数据多了几个字段,如何同步到hive?必须删了重建吗?

  • 订单等业务数据一般存储在传统数据库,如mysql等。如何实时同步这些cdc数据到hive仓库呢,包括ddl和dml

如果你有上面的需求,那么你可以考虑一下数据湖了,目前开源的数据湖技术主要有以下几个:delta、hudi、iceberg,但是侧重点有所不同,我上面说的问题也不完全都能实现,但是这些都是数据湖要做的东西,随着社区的不断发展,这些功能都会有的。

一些介绍可以参考下这个ppt 【基于Flink+Iceberg构建企业级实时数据湖.pdf】

但是目前世面上这些数据湖技术都与spark紧密绑定。而我们目前实时计算主要以flink为主,而且我个人觉得未来实时计算也将以flink为主,所以我选择了iceberg为我们的数据湖,虽然他有一些功能不是很完善,但是有着良好的抽象,并且不强制绑定spark,所以对于iceberg没有的功能,我们可以自己给补全,再回馈给社区,一起成长。

iceberg简介

其实对于iceberg,官方的定义是一种表格式。

Apache Iceberg is an open table format for huge analytic datasets. Iceberg adds tables to Presto and Spark that use a high-performance format that works just like a SQL table.

我们可以简单理解为他是基于计算层(flink , spark)和存储层(orc,parqurt)的一个中间层,我们在hive建立一个iceberg格式的表。用flink或者spark写入iceberg,然后再通过其他方式来读取这个表,比如spark,flink,presto等。

当然数据湖的概念远不止这点,我们今天就先简单的这么理解,后续写一篇文章专门介绍一下iceberg。

flink实时写入

目前官网的flink集成iceberg写入的时候有一个小bug,我改了改,自己重新编译打了一个包。接下来我们使用flink sql client来测试一下如何使用flink将实时的流数据写入iceberg,然后使用presto查询结果。

准备sql client环境

目前官方的测试版本是基于scala 2.12版本的flink。所以我们也用和官方同步的版本来测试下,下载下面的两个jar放到flink的lib下面,然后启动一下flink集群,standalone模式。

  • 下载flink :flink-1.11.2-bin-scala_2.12.tgz

  • 下载 iceberg-flink-runtime.jar 这个包目前版本(0.9.1)没有提供,需要的话需要自己编译一下,我编译好了一个,并且该了创建catalog的bug,可以来这里获取。

https://github.com/zhangjun0x01/bigdata-examples/tree/master/iceberg/libs/iceberg-flink-runtime-0.9.1.jar

  • 下载flink 集成hive的connector,flink-sql-connector-hive-2.3.6_2.12-1.11.2.jar

  • 目前官方的hive测试版本是 2.3.7,其他的版本可能有不兼容

注意要配置flink的checkpoint,因为目前flink提交iceberg的信息是在每次checkpoint的时候提交的。在sql client配置checkpoint的方法如下:

在flink-conf.yaml添加如下配置

execution.checkpointing.interval: 10s   # checkpoint间隔时间execution.checkpointing.tolerable-failed-checkpoints: 10  # checkpoint 失败容忍次数

创建catalog

iceberg在创建catalog的时候有一个小bug,他需要一个warehouse,但是系统没有提供,根据issue的讨论,借鉴flink集成hive,大家更倾向于提供一个hive-site.xml配置,但是如果是配置一个本地路径的话,对于flink application mode提交任务是有问题的,因为这种模式用户程序的加载是在flink的jobmanager端的,可能那个机器是没hive-site.xml配置文件的。所以我自己写了一个方案,提供一个hive-site.xml的配置路径,可以是本地或者hdfs路径,如果是hdfs,则先下载到本地,然后再加载。

相关的issue[1]和pr[2]

官方提供的创建catalog的版本ddl如下:

CREATE CATALOG iceberg WITH (  'type'='iceberg',  'catalog-type'='hive',  'uri'='thrift://localhost:9083');

我修改后的DDL如下:

CREATE CATALOG iceberg WITH (  'type'='iceberg',  'catalog-type'='hive',  'hive-site-path'='/Users/user/work/hive/conf/hive-site.xml1')

执行完之后,显示如下:

Flink SQL> show catalogs;default_catalogiceberg

我自己测试了一下,在flink的多种提供模式下都是没有问题的(sql client、standalnoe、yarn sesson、yarn per job、yarn application)。

创建db

use catalog iceberg;CREATE DATABASE iceberg_db;USE iceberg_db;

创建table

CREATE TABLE iceberg.iceberg_db.iceberg_001 (    id BIGINT COMMENT 'unique id',    data STRING) WITH ('connector'='iceberg','write.format.default'='ORC');

插入数据

我们依然创建一个datagen的connector。

CREATE TABLE sourceTable ( userid int, f_random_str STRING) WITH ( 'connector' = 'datagen', 'rows-per-second'='100', 'fields.userid.kind'='random', 'fields.userid.min'='1', 'fields.userid.max'='100','fields.f_random_str.length'='10')

这时候我们看到有两个表了

Flink SQL> show tables;iceberg_001sourcetable

然后执行insert into插入数据:

insert into iceberg.iceberg_db.iceberg_001 select * from iceberg.iceberg_db.sourceTable

查询

我们这里使用presto来查询

presto的配置iceberg.properties 如下:

connector.name=iceberghive.metastore.uri=thrift://localhost:9083

Flink集成数据湖之实时数据写入iceberg

代码版本

public class Flink2Iceberg{    public static void main(String[] args) throws Exception{        StreamExecutionEnvironment env =                StreamExecutionEnvironment.getExecutionEnvironment();        env.setParallelism(1);        env.enableCheckpointing(10000);        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);        tenv.executeSql("CREATE CATALOG iceberg WITH (\n" +                        "  'type'='iceberg',\n" +                        "  'catalog-type'='hive'," +                        //"  'hive-site-path'='hdfs://localhost/data/flink/conf/hive-site.xml'" +                        "  'hive-site-path'='/Users/user/work/hive/conf/hive-site.xml'" +                        ")");        tenv.useCatalog("iceberg");        tenv.executeSql("CREATE DATABASE iceberg_db");        tenv.useDatabase("iceberg_db");        tenv.executeSql("CREATE TABLE sourceTable (\n" +                        " userid int,\n" +                        " f_random_str STRING\n" +                        ") WITH (\n" +                        " 'connector' = 'datagen',\n" +                        " 'rows-per-second'='100',\n" +                        " 'fields.userid.kind'='random',\n" +                        " 'fields.userid.min'='1',\n" +                        " 'fields.userid.max'='100',\n" +                        "'fields.f_random_str.length'='10'\n" +                        ")");        tenv.executeSql(                "insert into iceberg.iceberg_db.iceberg_001 select * from iceberg.iceberg_db.sourceTable");    }}

具体见:

https://github.com/zhangjun0x01/bigdata-examples/blob/master/iceberg/src/main/java/com/Flink2Iceberg.java

总结

总结一下,iceberg的资料比较少,很多设计或者讨论等需要关注issues,然后再去撸源码,可能对于刚入门的小伙伴来说有点困难。后续我也会多分享一些关于iceberg的文章,欢迎大家关注我公众号【大数据技术与应用实战】。

Flink集成数据湖之实时数据写入iceberg

参考:
[1].https://github.com/apache/iceberg/issues/1437
[2].https://github.com/apache/iceberg/pull/1527

本文分享自微信公众号 - 大数据技术与应用实战(bigdata_bigdata)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
美凌格栋栋酱 美凌格栋栋酱
2小时前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(