点击上方蓝色字体,选择“设为星标”
回复”资源“获取更多资源
点击右侧关注,大数据开发领域最强公众号!
点击右侧关注,暴走大数据!
1.简介
Apache Hudi(简称:Hudi)使得您能在hadoop兼容的存储之上存储大量数据,同时它还提供两种原语,使得除了经典的批处理之外,还可以在数据湖上进行流处理。这两种原语分别是:
Update/Delete记录:Hudi使用细粒度的文件/记录级别索引来支持Update/Delete记录,同时还提供写操作的事务保证。查询会处理最后一个提交的快照,并基于此输出结果。
变更流:Hudi对获取数据变更提供了一流的支持:可以从给定的时间点获取给定表中已updated/inserted/deleted的所有记录的增量流,并解锁新的查询姿势(类别)。
如果你希望将数据快速提取到HDFS或云存储中,Hudi可以提供帮助。另外,如果你的ETL /hive/spark作业很慢或占用大量资源,那么Hudi可以通过提供一种增量式读取和写入数据的方法来提供帮助。
2. 基本概念
存储类型
我们看一下 Hudi 的两种存储类型:
写时复制(copy on write):仅使用列式文件(parquet)存储数据。在写入/更新数据时,直接同步合并原文件,生成新版本的基文件(需要重写整个列数据文件,即使只有一个字节的新数据被提交)。此存储类型下,写入数据非常昂贵,而读取的成本没有增加,所以适合频繁读的工作负载,因为数据集的最新版本在列式文件中始终可用,以进行高效的查询。
读时合并(merge on read):使用列式(parquet)与行式(avro)文件组合,进行数据存储。在更新记录时,更新到增量文件中(avro),然后进行异步(或同步)的compaction,创建列式文件(parquet)的新版本。此存储类型适合频繁写的工作负载,因为新记录是以appending 的模式写入增量文件中。但是在读取数据集时,需要将增量文件与旧文件进行合并,生成列式文件。
视图
在了解这两种存储类型后,我们再看一下Hudi支持的存储数据的视图(也就是查询模式):
读优化视图(Read Optimized view):直接query 基文件(数据集的最新快照),也就是列式文件(如parquet)。相较于非Hudi列式数据集,有相同的列式查询性能
增量视图(Incremental View):仅query新写入数据集的文件,也就是指定一个commit/compaction,query此之后的新数据。
实时视图(Real-time View):query最新基文件与增量文件。此视图通过将最新的基文件(parquet)与增量文件(avro)进行动态合并,然后进行query。可以提供近实时的数据(会有几分钟的延迟)
在以上3种视图中,“读优化视图”与“增量视图”均可在“写时复制”与“读时合并”的存储类型下使用。而“实时视图“仅能在”读时合并“模式下使用。
时间轴
最后介绍一下 Hudi 的核心 —— 时间轴。Hudi 会维护一个时间轴,在每次执行操作时(如写入、删除、合并等),均会带有一个时间戳。通过时间轴,可以实现在仅查询某个时间点之后成功提交的数据,或是仅查询某个时间点之前的数据。这样可以避免扫描更大的时间范围,并非常高效地只消费更改过的文件(例如在某个时间点提交了更改操作后,仅query某个时间点之前的数据,则仍可以query修改前的数据)。
3.典型应用场景
1.近实时摄取
将数据从外部源如事件日志、数据库提取到Hadoop数据湖中是一个很常见的问题。在大多数Hadoop部署中,一般使用混合提取工具并以零散的方式解决该问题,尽管这些数据对组织是非常有价值的。
对于RDBMS摄取,Hudi通过Upserts提供了更快的负载,而非昂贵且低效的批量负载。例如你可以读取MySQL binlog日志或Sqoop增量导入,并将它们应用在DFS上的Hudi表,这比批量合并作业或复杂的手工合并工作流更快/更高效。
对于像Cassandra / Voldemort / HBase这样的NoSQL数据库,即使规模集群不大也可以存储数十亿行数据,此时进行批量加载则完全不可行,需要采用更有效的方法使得摄取速度与较频繁的更新数据量相匹配。
即使对于像Kafka这样的不可变数据源,Hudi也会强制在DFS上保持最小文件大小,从而解决Hadoop领域中的古老问题以便改善NameNode的运行状况。这对于事件流尤为重要,因为事件流(例如单击流)通常较大,如果管理不善,可能会严重损害Hadoop集群性能。
对于所有数据源,Hudi都提供了通过提交将新数据原子化地发布给消费者,从而避免部分提取失败。
2. 近实时分析
通常实时数据集市由专门的分析存储,如Druid、Memsql甚至OpenTSDB提供支持。这对于需要亚秒级查询响应(例如系统监视或交互式实时分析)的较小规模(相对于安装Hadoop)数据而言是非常完美的选择。但由于Hadoop上的数据令人难以忍受,因此这些系统通常最终会被较少的交互查询所滥用,从而导致利用率不足和硬件/许可证成本的浪费。
另一方面,Hadoop上的交互式SQL解决方案(如Presto和SparkSQL),能在几秒钟内完成的查询。通过将数据的更新时间缩短至几分钟,Hudi提供了一种高效的替代方案,并且还可以对存储在DFS上多个更大的表进行实时分析。此外,Hudi没有外部依赖项(例如专用于实时分析的专用HBase群集),因此可以在不增加运营成本的情况下,对更实时的数据进行更快的分析。
3. 增量处理管道
Hadoop提供的一项基本功能是构建基于表的派生链,并通过DAG表示整个工作流。工作流通常取决于多个上游工作流输出的新数据,传统上新生成的DFS文件夹/Hive分区表示新数据可用。例如上游工作流 U可以每小时创建一个Hive分区,并在每小时的末尾( processing_time)包含该小时( event_time)的数据,从而提供1小时的数据新鲜度。然后下游工作流 D在 U完成后立即开始,并在接下来的一个小时进行处理,从而将延迟增加到2个小时。
上述示例忽略了延迟到达的数据,即 processing_time和 event_time分开的情况。不幸的是在后移动和物联网前的时代,数据延迟到达是非常常见的情况。在这种情况下,保证正确性的唯一方法是每小时重复处理最后几个小时的数据,这会严重损害整个生态系统的效率。想象下在数百个工作流中每小时重新处理TB级别的数据。
Hudi可以很好的解决上述问题,其通过记录粒度(而非文件夹或分区)来消费上游Hudi表 HU中的新数据,下游的Hudi表 HD应用处理逻辑并更新/协调延迟数据,这里 HU和 HD可以以更频繁的时间(例如15分钟)连续进行调度,并在 HD上提供30分钟的端到端延迟。
为了实现这一目标,Hudi从流处理框架如Spark Streaming、发布/订阅系统如Kafka或数据库复制技术如Oracle XStream中引入了类似概念。若感兴趣可以在此处找到有关增量处理(与流处理和批处理相比)更多优势的更详细说明。
4. DFS上数据分发
Hadoop的经典应用是处理数据,然后将其分发到在线存储以供应用程序使用。例如使用Spark Pipeline将Hadoop的数据导入到ElasticSearch供Uber应用程序使用。一种典型的架构是在Hadoop和服务存储之间使用队列进行解耦,以防止压垮目标服务存储,一般会选择Kafka作为队列,该架构会导致相同数据冗余存储在DFS(用于对计算结果进行离线分析)和Kafka(用于分发)上。
Hudi可以通过以下方式再次有效地解决此问题:将Spark Pipeline 插入更新输出到Hudi表,然后对表进行增量读取(就像Kafka主题一样)以获取新数据并写入服务存储中,即使用Hudi统一存储。
4.入门案例
1、编译
github地址: https://github.com/apache/incubator-hudi
`cd incubator-hudi-hoodie-0.4.7`
`mvn clean install -DskipITs -DskipTests -Dhadoop.version=2.6.0-cdh5.13.0 -Dhive.version=1.1.0-cdh5.13.0`
2、快速开始
1、新建项目
新建maven项目,并加入scala框架,然后依次加入spark、hudi依赖
<properties>
2、插入数据
准备数据,在本地文件目录下新建如下文件:
`{"id":1,"name": "aaa","age": 10}`
`{"id":2,"name": "bbb","age": 11}`
`{"id":3,"name": "ccc","age": 12}`
`{"id":4,"name": "ddd","age": 13}`
`{"id":5,"name": "eee","age": 14}`
`{"id":6,"name": "fff","age": 15}`
构建sparksession:
`val spark = SparkSession.builder`
` .master("local")`
` .appName("Demo2")`
` .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")`
` .enableHiveSupport()`
` .getOrCreate`
hudi强制要求使用Kryo的序列化方式,所以初始化的时候需要添加该配置。
读取刚才保存的json文件:
`val jsonData = spark.read.json("file:///Users/apple/Documents/project/study/hudi-study/source_data/insert.json")`
然后通过write方法写入数据:
`import com.uber.hoodie.config.HoodieWriteConfig._`
`val tableName = "test_data"`
`val basePath = "file:///Users/apple/Documents/project/study/hudi-study/hudi_data/" + tableName`
`jsonData.write.format("com.uber.hoodie")`
` .option("hoodie.upsert.shuffle.parallelism", "1")`
` .option(PRECOMBINE_FIELD_OPT_KEY, "id")`
` .option(RECORDKEY_FIELD_OPT_KEY, "id")`
` .option(KEYGENERATOR_CLASS_OPT_KEY, "com.mbp.study.DayKeyGenerator")`
` .option(TABLE_NAME, tableName)`
` .mode(SaveMode.Overwrite)`
` .save(basePath)`
3、查询数据
`val jsonDataDf = spark.read.format("com.uber.hoodie").load(basePath + "/*/*")`
`jsonDataDf.show(false)`
4、更新数据
先创建需要更新的json数据集,数据如下:
`{"id":1,"name": "aaa","age": 20,"address": "a1"}`
`{"id":2,"name": "bbb","age": 21,"address": "a1"}`
`{"id":3,"name": "ccc","age": 22,"address": "a1"}`
然后读取要更新的数据,并执行写入操作:
val updateJsonf = spark.read.json("/Users/apple/Documents/project/study/hudi-study/source_data/update.json")
保存模式需要改为追加,每个写操作都会生成一个新的由时间戳表示的commit。
5、增量查询
Hudi还提供了获取给定提交时间戳以来已更改的记录流的功能。这可以通过使用Hudi的增量视图并提供所需更改的开始时间来实现。如果我们需要给定提交之后的所有更改(这是常见的情况),则无需指定结束时间。
val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_ro_table order by commitTime").map(k => k.getString(0)).take(50)
这将提供在开始时间提交之后发生的所有更改,其中包含票价大于20.0的过滤器。关于此功能的独特之处在于,它现在使您可以在批量数据上创作流式管道。
6、特定时间点查询
可以通过将结束时间指向特定的提交时间,将开始时间指向”000”(表示最早的提交时间)来表示特定时间。
val beginTime = "000" // Represents all commits > this time.
7、同步到Hive
要想hive可以查询到该表,需要以下两部分操作:
1、写数据的时候设置同步hive
jsonData.write.format("com.uber.hoodie")
2、在hive集群中上传hudi所需的jar包
`hoodie-hadoop-mr-0.4.7.jar`
`hoodie-common-0.4.7.jar`
然后再hive中直接执行查询即可。
版权声明:
本文为大数据技术与架构整理,原作者独家授权。未经原作者允许转载追究侵权责任。
编辑|冷眼丶
微信公众号|import_bigdata
欢迎点赞+收藏+转发朋友圈素质三连
文章不错?点个【在看】吧!** 👇**
本文分享自微信公众号 - 大数据技术与架构(import_bigdata)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。