消息队列之Kafka详解
1. 什么是Kafka
在流式计算中,Kafka一般用来缓存数据,Storm通过消费Kafka的数据进行计算。
- Apache Kafka是一个开源消息系统,由Scala写成。是由Apache软件基金会开发的一个开源消息系统项目。
- Kafka最初是由LinkedIn公司开发,并于2011年初开源。2012年10月从Apache Incubator毕业。该项目的目标是为处理实时数据提供一个统一、高通量、低等待的平台。
- Kafka是一个分布式消息队列。Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接受者称为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)称为broker。
- 无论是kafka集群,还是consumer都依赖于zookeeper集群保存一些meta信息,来保证系统可用性。新版本的offset维护在本地
2. Kafka架构
- Producer :消息生产者,就是向kafka broker发消息的客户端;
- Consumer :消息消费者,向kafka broker取消息的客户端;
- Topic :可以理解为一个队列;
- Consumer Group (CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个partion只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic;
- Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic;
- Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序;
- Offset:kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka。
- broker和consumer依赖zookeeper,而producer不和zookeeper进行通信
zookeeper 可以连接fllower 更新操作回到leader上去做fllower进行交互.
Zookeeper的作用及背景【数据一致性、高可用】
管理代码中的变量的配置
设置命名服务
提升系统的可用性和安全性
管理Kafaka集群
3. 基本概念
了解代理、生产者、消费者、消费者组等概念
- Broker(代理)
- Producer(生产者)
在Kafka系统中,写入数据的应用一般被称为 “生产者” 。
Kafka生产者可以理解成Kafka系统与外界进行数据交互的应用接口。 - Consumer(消费者)
- Comsumer Group(消费者组)
明白主题、分区、副本、记录在kafka中所代表的含义
- Topic(主题)
- Partition(分区)
- Replication(副本)
- Record(记录)
- Kafka的设计初衷?
高吞吐量、高可用队列、低延时、分布式机制 - Kafka的特性是什么?
高吞吐量、高可用队列、低延时、分布式机制 - Kafka使用于哪些场景?
异步产生数据、偏移量迁移、安全机制、连接器、机架感知、数据流、时间戳、消息语义、日志收集、消息系统、用户轨迹、记录运营监控数据、实现流处理、事件源 - Kafka有哪些元数据信息存储在zookeeper?
控制器选举次数、代理节点和主题、配置、管理员操作、控制器。 - 这些元数据信息是如何分布的?
- 为什么需要消费者组?
水平程序拓展,放置信息堆积
4. 分区存储
4.1. 分区存储数据
分区文件存储
- 一个主题下包含多个分区,每个分区为单独目录
- 分区命名规则为主题+有序序号 从零开始 到分区n-1
片段文件存储
- 由索引文件和数据文件组成 *.index索引文件 .log 数据文件
- Kafka并不是给每条消息记录建立索引,而是采用稀疏索引方式
4.2. Kafka清理过期数据有哪些方法
基于时间和大小的删除策略
#系统默认保存7天
log.retention.hours=168
#系统默认没有设置大小
log.retention.bytes=-1
压缩策略清除
如果使用压缩策略清除过期日志,则需要设置属性
log.cleanup.policy=compact
5. Kafka安全机制
5.1. 了解Kafka安全机制
0.9版本前无安全机制存在 泄露敏感数据,删除主题,修改分区等风险
身份认证
1.客户端和Kafka Broker之间连接认证
2.Broker和Broker之间连接认证
3.Broker和Zookeeper之间连接认证
权限控制
1. 对读写删改主题权限控制
2. 可插拔权限认证,支持与外部授权服务集成
3. 自带简单的授权类kafka.secutity.auth.SimpleAclAuthorizer
4. 部署安全模块是可选的
5.2. 配置ACL
集群操作
倾向于集群内部代理节点之间的管理,例如代理节点升级、主题分区元数据Leader切换、主题分区副本设置等
主题操作
针对具体的访问权限,例如对主题的读取、删除、查看等
#如果没有设置ACL、则除超级用户外其他用户不能访问。默认为false
allow.everyone.if.no.acl.found=true
#设置超级用户
super.users=User.admin
#启用ACL,配置授权
authorizer.class.name=kafka.secutity.auth.SimpleAclAuthorizer
5.3. Kafka启用ACL模式
集群启动
# 文件/**/reader_jaas.conf权限认证信息内容
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
user_admin="admin"
user_reader="reader"
user_writer="writer";
};
#在 zookeeper-server-start.sh kafka-server-start.sh cat kafka-acls.sh脚本中添加
export KAFKA_OPTS="-Djava.security.auth.login.config=/**/reader_jaas.conf"
# 启动zookeeper
./zookeeper-server-start.sh ../config/zookeeper.properties 1>/dev/null 2>&1 &
# 启动Kafka
nohup ./kafka-server-start.sh ../config/server.properties > kafka-server-start.log 2>&1 &
查看权限
kafka-acls.sh
6. Kafka连接器
连接器核心概念
- 连接器实例
- 任务数
- 事件线程
- 转换器
6.1. 了解连接器使用场景
连接器一般是用来构建数据管道
1.开始和结束的端点 [举例 Kafka数据移出到hbase 或者oracle数据移入到Kafka]
2.数据传输的中间介质[举例 海量数据存储到ES中,作为临时存储]
6.2. 特性和优势
特性
- 通用的框架
- 单机模式和分布式模式
- REST接口
- 自动管理偏移量
- 分布式和可扩展
- 数据流和批量集成
优势
- Source连接器
- Sink连接器
6.3. 操作Kafka连接器
单机模式将数据导入Kafka
第一步:创建要导入的文件
第一步:修改配置文件../config/connect-file-source.properties
./connect-standalone.sh ../config/connect-standalone.properties ../config/connect-file-source.properties
分布式模式将数据导入Kafka
./connect-distributed.sh ../config/connect-distributed.properties
#查看版本号
curl http://dns:8083
6.4. 开发一个简易Kafka连接器插件
编写Source连接器
1.SourceConnector类:用来初始化连接器配置和任务数
2.SourceTask类:用来实现标准输入或者文件读取
编写Sink连接器
1.SinkTask类:用来实现标准输出或者文件写入
2.SinkConnector类:用来初始化连接器配置和任务数
参考资料:
Kafka监控系统 — Kafka Eagle
Centos下kafka 单机配置部署详解
kafka安装部署
Kafka安装教程(详细过程)
apache kafka系列之server.properties配置文件参数说明
Kafka监控系统Kafka Eagle剖析
Kafka集群部署(Docker容器的方式)