Kafka数据管道

Stella981
• 阅读 570

Kafka数据管道

点击上方「蓝字」关注我们

Kafka数据管道

Kafka数据管道

当我们使用Kafka来构建数据管道的时候,通常有两种主要的场景:1)Kafka是数据的起点或终点,比如从Kafka传输数据到S3或者从MongoDB传输数据到Kafka;2)Kafka作为数据的中间缓冲区,比如构建Twitter到Elasticsearch的数据管道时,Twitter先把数据传输到Kafka,然后Kafka再将数据传输到Elasticsearch。

使用Kafka构建数据管道可以将数据的生产者和消费者进行解耦,并且能够保证高可靠以及高性能。另外在0.9版本,Kafka加入了Kafka Connect这个新的API,使得将Kafka集成到数据管道更加方便。

下面来看下数据管道的一些具体细节。

构建数据管道的考虑因素

时间线


在实际中,有一些系统的数据可能每天进行一次数据处理,有一些系统可能希望数据从产生到消费只有毫秒级延迟,而另外的系统则介于这两个极端之间。一个优秀的数据集成系统应当能满足不同场景的时间线要求,并且能够支持时间线的迁移(因为实际应用中需求是不断变化的)。Kafka具备这样的性质,既支持准实时的数据传输,也支持定时的批量数据传输,并且保证数据可靠存储以及水平扩展。在Kafka中,生产者可以根据需要来决定写入Kafka的时机,而一旦数据到达Kafka,消费者可以立即读取(其实消费者也可以定时批量读取,取决于使用场景)。

在这个场景中,Kafka充当数据的大缓冲区角色,并且解耦了生产者与消费者的时间敏感度要求:生产者可以实时产生数据而消费者定期消费数据,反之亦然。

可靠性


我们需要避免单点故障,并且在发生故障时能够快速的自动恢复。对于核心系统来说,即便是秒级的不可用也有可能造成巨大的损失,因此系统可用性极为重要。另外,数据传输可靠性也非常重要,一些系统能够容忍数据丢失,但更多情况下业务需要的是至少一次(at-least-once)的数据传输保证。至少一次意味着数据一旦生产成功,那么必定会到达终点,但有可能会出现数据重复出现的情况。在某些情况下,我们甚至需要有且仅有一次(exactly-once)的数据传输,这意味着数据一旦生产必须到达终点,而且不允许数据丢失或者重复。

我们讨论过了Kafka的可用性和可靠性。Kafka本身能够提供至少一次的数据传输,而通过与外部系统(具备事务性质或者支持唯一键)结合使用能够保证数据有且仅有一次的语义。值得一提的是,Kafka Connect这个API让外部系统与Kafka结合更为方便,使得实现端到端的有且仅有一次的语义更简单。

高吞吐


数据管道一般需要支持高吞吐,而且更为重要的是在流量激增的情况下仍然能正常运行。通过使用Kafka,我们可以将生产者与消费者的处理能力进行解耦。如果某个时刻生产者的生产速度远超于消费者的消费速度,那么数据会存放在Kafka中直至消费,也就是说Kafka具备流量削峰的特性。另外,我们可以通过增加消费者或者生产者来分别提高两端的处理能力。

总的来说,Kafka是一个高吞吐的分布式系统,在集群情况下每秒处理百兆级别的数据并不是什么难事,我们也不需要担心在数据量增长的情况下系统不能横向扩展。另外,Kafka Connect使得数据处理不仅可以横向扩展,并且可以并行化,后面我们会深入讨论这一点。

数据格式


构建数据管道的一个重要考虑因素是不同数据格式的支持程度。在实际应用中,我们的数据库或者其他存储系统的存储格式通常是多种多样的,比如说可能源数据格式是XML或者关系型的,存储到Kafka中是Avro类型的,最后可能需要转换成JSON格式以便写入Elasticsearch。

Kafka能够满足不同的数据类型要求,在前面系列文章中,我们讨论过生产者和消费者如何使用不同的序列化/反序列化来支持多种数据格式。另外,Kafka Connect的内存数据具有自己的数据类型,但后面我们会进一步看到,我们可以通过增加可插拔的转换器来支持不同的数据格式。

有一点需要注意的是,数据源与数据终点的数据格式通常具有自己的数据结构(Schema),当数据源的数据结构改变时,我们可能需要同时更新数据终点的数据结构。一个经典的例子为,当我们构建MySQL到Hive的数据管道时,如果MySQL增加了一列,那么当我们写入新数据到Hive时需要保证新的列也以某种形式添加到Hive中。

在支持不同数据格式之外,一个通用的数据集成框架应当能支持数据源与数据终点的不同特性。比如,Syslog是一个主动推送数据的数据源,而关系型数据库则要求我们主动拉取它的数据;HDFS只支持数据追加,而其他系统则允许追加和更新。

数据转换


构建数据管道时我们有如下两种数据转换方案:

  • ELT(Extract-Transform-Load):这种方案意味着数据管道负责做数据转换,这样做的好处是可以节省目标系统的转换时间和存储空间。但这种方案也有一个缺点,那就是数据管道的转换与下游的依赖需要时刻保持同步。比如,如果我们构建MongoDB到MySQL的数据管道,并且在数据管道中进行数据过滤并且移除某些域,那么MySQL中只能看到部分数据;如果后续我们需要访问这些缺失的数据域,那么数据管道需要重建并且重新处理历史数据。

  • ELT(Extract-Load-Transform):这种方案意味着数据管道做最少的转换(大部分情况下只是转换数据格式),终点的数据与源数据基本一样,这样做的好处是目标系统拥有极大的处理灵活性(因为能看到几乎原始的数据),并且由于数据处理与过滤只在目标系统上进行,减轻追溯问题的复杂程度。这种方案的缺点是目标系统会消耗较多的存储空间,并且的转换也会消耗CPU资源。

安全性


对于数据管道来说,安全性包含如下几个方面:

  • 经过数据管道的数据是加密的吗?这个问题在跨数据中心时尤其突出。

  • 谁允许对数据管道进行修改?

  • 如果数据管道需要从访问受限的地方读取或写入数据,它是否能正确的进行身份验证?

Kafka支持对数据传输进行加密,以及支持身份验证(通过SASL)和授权。授权能够保证包含隐私数据的主题在未经授权的情况下不能被读取。另外,Kafka还提供授权与非授权的访问记录,并且能够跟踪主题中的事件来源以及谁进行了何种修改。

错误处理


认为数据始终是正确的是一件很危险的事情,我们需要提前考虑错误处理。例如,是否能阻止错误的记录进入管道?是否能从分析失败的记录恢复数据?错误记录是否能被修复以及重新处理?如果不良事件被当做正常事件处理了,但过了几天才发现,这会这么样?

由于Kafka能够在一段时间内保存所有事件,因此在需要的情况下我们可以回溯并且进行错误恢复。

耦合与敏捷


数据管道的一个重要作用就是将数据源与目标系统进行解耦,但在某些情况下如果不加以注意便会发生耦合:

  • 专门定制管道:有一些公司会针对不同的应用专门定制管道,比如使用Logstash转储日志到Elasticsearch,使用Flume转储日志到HDFS,使用GoldenGate从Oracle获取数据并写入HDFS,等等…这样做会将数据管道与特定的终端耦合在一起,并且意味着每一个新系统都需要搭建新的数据管道。

  • 结构元数据缺失:如果数据管道不包含结构元数据而且不允许结构变化,那么其实我们已经将产生数据的源系统与消费数据的目标系统耦合在一起。假如数据从Oracle数据库流向HDFS,DBA在数据库中添加了一列,在数据管道不包含结构元数据而且不允许结构变化的情况下,目标系统要么处理数据失败,要么需要升级应用代码。因此,数据管道应该能支持结构变化,每个独立的团队都可以根据需要来在合适的时刻修改应用逻辑。

  • 过度处理:前面已经提到,一些数据处理会在数据管道中进行,毕竟数据管道负责把数据转移到不同的系统。但如果数据管道进行了过度的处理(比如数据清洗、数据聚合),那么会导致下游使用数据的系统与数据管道耦合在一起。最好的处理方式应该为,数据管道尽可能保留元数据的属性,只是做简单的格式转换,允许下游系统来决定他们需要什么样的数据。

什么时候使用Kafka Connect?


当写入Kafka或者从Kafka读取时,我们可以使用传统的生产者/消费者客户端,或者使用Kafka Connect和connector。那应该怎么选择呢?

生产者/消费者客户端是嵌入到应用中的,换句话说,如果我们能够修改连接应用的代码,那么可以使用生产者/消费者客户端来写入和读取数据。而如果我们需要将Kafka连接到数据存储系统(或者将数据存储系统连接到Kafka),那么我们可以直接使用Connect以及相应的connector即可。如果对于某个数据存储系统,不存在与其匹配的connector,那么我们既可以使用生产者/消费者客户端,也可以使用Connect。但仍然推荐使用Connect,因为它开箱即用,提供了许多有用的功能,比如配置管理、位移存储、并行化、错误处理、不同数据类型支持等等。

Kafka Connect


Kafka Connect是Kafka的一部分,它提供了可扩展的方式将Kafka的数据转移到数据存储系统,或者从数据存储系统转移到Kafka。它提供了相应的API以及运行环境,以便我们开发connector插件。connector插件会被Kafka Connect执行并且用来转移数据。Kafka Connect以集群方式运行,每个节点均安装有connector插件,并且提供REST的API接口来配置和管理connector。数据源的connector只需要负责从源系统读取数据,并且转化为Connect数据对象;而目标系统的connector则负责接收Connect数据对象,以及写入到目标系统。

此外,Kafka Connect包含转换器来支持在Kafka中使用不同的数据格式,比如JSON或者Avro。这里提醒下,Kafka中的数据格式是可以独立于源系统(及其connector)与目标系统(及其connector)的。

Kafka数据管道

5个常用的大数据可视化分析工具

如何做架构设计说明书

架构设计说明书该怎么写?

漫谈从零访问量到每秒千万访问量的架构设计

腾讯计费金融级技术架构演进

Kafka数据管道

点赞鼓励一下

Kafka数据管道

本文分享自微信公众号 - 侠梦的开发笔记(xmdevnote)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
5个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
Android So动态加载 优雅实现与原理分析
背景:漫品Android客户端集成适配转换功能(基于目标识别(So库35M)和人脸识别库(5M)),导致apk体积50M左右,为优化客户端体验,决定实现So文件动态加载.!(https://oscimg.oschina.net/oscnet/00d1ff90e4b34869664fef59e3ec3fdd20b.png)点击上方“蓝字”关注我
Stella981 Stella981
3年前
Spring Boot如何利用AOP巧妙记录操作日志?
!(https://oscimg.oschina.net/oscnet/7f1d6247ad65413fbca3b77b0bb9433c.png)点击上方蓝字关注我们!(https://oscimg.oschina.net/oscnet/3f5a1c2360f9430c93a00b4715527ed9.jpg)本篇
Stella981 Stella981
3年前
Docker 部署SpringBoot项目不香吗?
  公众号改版后文章乱序推荐,希望你可以点击上方“Java进阶架构师”,点击右上角,将我们设为★“星标”!这样才不会错过每日进阶架构文章呀。  !(http://dingyue.ws.126.net/2020/0920/b00fbfc7j00qgy5xy002kd200qo00hsg00it00cj.jpg)  2
Stella981 Stella981
3年前
200的大额人民币即将面世?央行:Yes!
点击上方蓝字关注我们!(https://oscimg.oschina.net/oscnet/2a1c2ac00bf54458a78c48a6c2e547d5.png)点击上方“印象python”,选择“星标”公众号重磅干货,第一时间送达!!(
可莉 可莉
3年前
200的大额人民币即将面世?央行:Yes!
点击上方蓝字关注我们!(https://oscimg.oschina.net/oscnet/2a1c2ac00bf54458a78c48a6c2e547d5.png)点击上方“印象python”,选择“星标”公众号重磅干货,第一时间送达!!(
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
11个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这