消息队列之Kafka详解

kelly
• 阅读 1586

消息队列之Kafka详解


1. 什么是Kafka

在流式计算中,Kafka一般用来缓存数据,Storm通过消费Kafka的数据进行计算。

  1. Apache Kafka是一个开源消息系统,由Scala写成。是由Apache软件基金会开发的一个开源消息系统项目。
  2. Kafka最初是由LinkedIn公司开发,并于2011年初开源。2012年10月从Apache Incubator毕业。该项目的目标是为处理实时数据提供一个统一、高通量、低等待的平台。
  3. Kafka是一个分布式消息队列。Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接受者称为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)称为broker。
  4. 无论是kafka集群,还是consumer都依赖于zookeeper集群保存一些meta信息,来保证系统可用性。新版本的offset维护在本地

2. Kafka架构

  1. Producer :消息生产者,就是向kafka broker发消息的客户端;
  2. Consumer :消息消费者,向kafka broker取消息的客户端;
  3. Topic :可以理解为一个队列;
  4. Consumer Group (CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个partion只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic;
  5. Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic;
  6. Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序;
  7. Offset:kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka。
  8. broker和consumer依赖zookeeper,而producer不和zookeeper进行通信

zookeeper 可以连接fllower 更新操作回到leader上去做fllower进行交互.

Zookeeper的作用及背景【数据一致性、高可用】

管理代码中的变量的配置
设置命名服务
提升系统的可用性和安全性
管理Kafaka集群 


3. 基本概念

了解代理、生产者、消费者、消费者组等概念

  1. Broker(代理)
  2. Producer(生产者)
    在Kafka系统中,写入数据的应用一般被称为 “生产者” 。
    Kafka生产者可以理解成Kafka系统与外界进行数据交互的应用接口。
  3. Consumer(消费者)
  4. Comsumer Group(消费者组)

明白主题、分区、副本、记录在kafka中所代表的含义

  1. Topic(主题)
  2. Partition(分区)
  3. Replication(副本)
  4. Record(记录)

  1. Kafka的设计初衷?
    高吞吐量、高可用队列、低延时、分布式机制
  2. Kafka的特性是什么?
    高吞吐量、高可用队列、低延时、分布式机制
  3. Kafka使用于哪些场景?
    异步产生数据、偏移量迁移、安全机制、连接器、机架感知、数据流、时间戳、消息语义、日志收集、消息系统、用户轨迹、记录运营监控数据、实现流处理、事件源
  4. Kafka有哪些元数据信息存储在zookeeper?
    控制器选举次数、代理节点和主题、配置、管理员操作、控制器。
  5. 这些元数据信息是如何分布的?
    消息队列之Kafka详解
  6. 为什么需要消费者组?
    水平程序拓展,放置信息堆积

4. 分区存储

4.1. 分区存储数据

分区文件存储

  • 一个主题下包含多个分区,每个分区为单独目录
  • 分区命名规则为主题+有序序号 从零开始 到分区n-1

片段文件存储

  • 由索引文件和数据文件组成 *.index索引文件 .log 数据文件
  • Kafka并不是给每条消息记录建立索引,而是采用稀疏索引方式

4.2. Kafka清理过期数据有哪些方法

基于时间和大小的删除策略

#系统默认保存7天
log.retention.hours=168

#系统默认没有设置大小
log.retention.bytes=-1 

压缩策略清除

如果使用压缩策略清除过期日志,则需要设置属性
log.cleanup.policy=compact 

5. Kafka安全机制

5.1. 了解Kafka安全机制

0.9版本前无安全机制存在 泄露敏感数据,删除主题,修改分区等风险

身份认证

1.客户端和Kafka Broker之间连接认证
2.Broker和Broker之间连接认证
3.Broker和Zookeeper之间连接认证 

权限控制

1. 对读写删改主题权限控制
2. 可插拔权限认证,支持与外部授权服务集成
3. 自带简单的授权类kafka.secutity.auth.SimpleAclAuthorizer
4. 部署安全模块是可选的 

5.2. 配置ACL

集群操作
倾向于集群内部代理节点之间的管理,例如代理节点升级、主题分区元数据Leader切换、主题分区副本设置等

主题操作
针对具体的访问权限,例如对主题的读取、删除、查看等

#如果没有设置ACL、则除超级用户外其他用户不能访问。默认为false
allow.everyone.if.no.acl.found=true
#设置超级用户
super.users=User.admin
#启用ACL,配置授权
authorizer.class.name=kafka.secutity.auth.SimpleAclAuthorizer 

5.3. Kafka启用ACL模式

集群启动

# 文件/**/reader_jaas.conf权限认证信息内容

KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password="admin"
  user_admin="admin"
  user_reader="reader"
  user_writer="writer";
};

#在 zookeeper-server-start.sh kafka-server-start.sh cat kafka-acls.sh脚本中添加
export KAFKA_OPTS="-Djava.security.auth.login.config=/**/reader_jaas.conf"

# 启动zookeeper
./zookeeper-server-start.sh ../config/zookeeper.properties 1>/dev/null 2>&1 &

# 启动Kafka
nohup ./kafka-server-start.sh ../config/server.properties > kafka-server-start.log 2>&1 & 

查看权限

kafka-acls.sh 

6. Kafka连接器

连接器核心概念

  1. 连接器实例
  2. 任务数
  3. 事件线程
  4. 转换器

6.1. 了解连接器使用场景

连接器一般是用来构建数据管道
1.开始和结束的端点 [举例 Kafka数据移出到hbase 或者oracle数据移入到Kafka]
2.数据传输的中间介质[举例 海量数据存储到ES中,作为临时存储] 

6.2. 特性和优势

特性

  1. 通用的框架
  2. 单机模式和分布式模式
  3. REST接口
  4. 自动管理偏移量
  5. 分布式和可扩展
  6. 数据流和批量集成

优势

  1. Source连接器
  2. Sink连接器

6.3. 操作Kafka连接器

单机模式将数据导入Kafka

第一步:创建要导入的文件
第一步:修改配置文件../config/connect-file-source.properties 

./connect-standalone.sh  ../config/connect-standalone.properties ../config/connect-file-source.properties 

分布式模式将数据导入Kafka

./connect-distributed.sh ../config/connect-distributed.properties 

#查看版本号
curl http://dns:8083 

6.4. 开发一个简易Kafka连接器插件

编写Source连接器

1.SourceConnector类:用来初始化连接器配置和任务数
2.SourceTask类:用来实现标准输入或者文件读取 

编写Sink连接器

1.SinkTask类:用来实现标准输出或者文件写入
2.SinkConnector类:用来初始化连接器配置和任务数 

参考资料:
Kafka监控系统 — Kafka Eagle
Centos下kafka 单机配置部署详解
kafka安装部署
Kafka安装教程(详细过程)
apache kafka系列之server.properties配置文件参数说明
Kafka监控系统Kafka Eagle剖析
Kafka集群部署(Docker容器的方式)

本文转自 https://blog.csdn.net/baidu_41847368/article/details/114764613?utm_medium=distribute.pc_category.none-task-blog-hot-8.nonecase&dist_request_id=1328642.42289.16157317109702379&depth_1-utm_source=distribute.pc_category.none-task-blog-hot-8.nonecase,如有侵权,请联系删除。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
kelly
kelly
Lv1
是你吧,我能从很远很远的地方一眼认出你来
文章
4
粉丝
0
获赞
0