基本概念
1.Producer:消息生产者,就是向 kafka broker 发消息的客户端
2.Consumer:消息消费者,向 kafka broker 取消息的客户端
3.Consumer Group(CG ):消费者组,由多个 consumer 组成。 消费者组内每个消费者负责消费不同分区的数据, 一个分区只能由一个 组内 消费者消费; 消费者组之间互不影响。 所有的消费者都属于某个消费者组,即 消费者组是逻辑上的一个订阅者。
4.Broker:一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker可以容纳多个 topic。
5.Topic:可以理解为一个队列, 生产者和消费者面向的都是一个 topic
6.Partition:为了实现扩展性, 一个非常大的 topic 可以分布到多个 broker (即服务器) 上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列;
7.Replica: 副本, 为保证集群中的某个节点发生故障时, 该节点上的 partition 数据不丢失,且 kafka 仍然能够继续工作, kafka 提供了副本机制, 一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower。
8.leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 leader。
9.follower:每个分区多个副本中的“从”,实时从 leader 中同步数据,保持和 leader 数据的同步。leader 发生故障时,某个 follower 会成为新的 follower。
采用伪集群的方式部署Kafka集群
# 下载软件
cd /usr/local/src
wget https://www.apache.org/dyn/closer.cgi?path=/kafka/2.3.0/kafka_2.12-2.3.0.tgz
tar -xvf kafka_2.12-2.3.0.tgz
cd kafka_2.12-2.3.0
# 解压缩后进入目录中
# 方法1:使用同一个程序,但是采用不同的配置文件
# 方法2:复制两份目录出来,使用不同的目录区分
# 此处采用的是方法1,在后续步骤采用的是方法2
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
# log.dir表示的是队列数据存储的路径,根据实际情况修改
config/server.properties:
broker.id=0
listeners=PLAINTEXT://:9092
log.dir=/data/kafka-logs-0
delete.topic.enable=true #删除 topic 功能
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/data/kafka-logs-1
delete.topic.enable=true #删除 topic 功能
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/data/kafka-logs-2
delete.topic.enable=true
后台启动内置的zookeeper
# 测试可以使用Kafka自带的zookeeper。生产上建议使用单独的zookeeper集群
# 若是搭建集群的话,方法同Kafka
# 此处先启动一个zookeeper,后续采用同Kafka的方法2搭建集群使用
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
关闭zookeeper
bin/zookeeper-server-stop.sh stop
开启kafk集群(伪集群),shell脚本
bin/kafka-server-start.sh -daemon config/server.properties
bin/kafka-server-start.sh -daemon config/server-1.properties
bin/kafka-server-start.sh -daemon config/server-2.properties
关闭kafk集群(伪集群),shell脚本
bin/zookeeper-server-stop.sh stop
查看当前服务器中的所有 topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
创建 topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic first
删除 topic(需要 server.properties 中设置 delete.topic.enable=true 否则只是标记删除)
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic first
发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic first
消费消息(默认当前,不是从头开始)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first
消费消息,从头开始
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic second --from-beginning
查看某个 Topic 的详情
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic first
修改分区数
bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic first --partitions 6
自带压测命令
bin/kafka-producer-perf-test.sh --topic first --num-records 1000 --record-size 100 --throughput 1000 --producer-props bootstrap.servers=localhost:9092
安装Kafka Eagle监控Kafka集群
1.修改 kafka-server-start.sh 命令 因为使用的是伪集群,所以,需要把目录复制三份,每个目录下该文件的JMX_PORT值都不一样才行
正好利用这种情况,每个目录下都可以启动自带的zookeeper,修改端口号,从而构建一个zookeeper伪集群
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
export JMX_PORT="9999"
#export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi
2.下载Kafka Eagle软件
cd /usr/local/src/
wget https://github.com/smartloli/kafka-eagle-bin/archive/v1.3.9.tar.gz
tar -zxvf v1.3.9.tar.gz
cd kafka-eagle-bin-1.3.9/
cp kafka-eagle-web-1.3.9-bin.tar.gz ../
tar -zxvf kafka-eagle-web-1.3.9-bin.tar.gz
cd kafka-eagle-web-1.3.9/conf
vim system-config.properties
3.设置 若是使用zookeeper伪集群,做相应的增加
vim system-config.properties
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=localhost:2181
cluster1.kafka.eagle.offset.storage=kafka
kafka.eagle.metrics.charts=true
kafka.eagle.sql.fix.error=false
# 使用sqlite数据库,注意保存路径,没有的话需要新建,可以换成MySQL
kafka.eagle.driver=org.sqlite.JDBC
kafka.eagle.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
kafka.eagle.username=root
kafka.eagle.password=www.kafka-eagle.org
4.安装maven
yum install maven
5.配置环境变量
vim /etc/profile.d/ke.sh
export KE_HOME=/usr/local/src/kafka-eagle-web-1.3.9
export PATH=$PATH:$KE_HOME/bin
vim /etc/profile.d/java.sh
export JAVA_HOME=//usr/lib/jvm/java-1.8.0-openjdk-1.8.0.222.b10-0.el7_6.x86_64
export JRE_HOME=$JAVA_HOME/jre
export CLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
source /etc/profile.d/ke.sh
soucer /etc/profile.d/ke.sh
6.启动
cd bin
bash ke.sh start.sh
*******************************************************************
* Kafka Eagle system monitor port successful...
*******************************************************************
[2019-09-10 16:41:46] INFO: Status Code[0]
[2019-09-10 16:41:46] INFO: [Job done!]
Welcome to
__ __ ___ ____ __ __ ___ ______ ___ ______ __ ______
/ //_/ / | / __/ / //_/ / | / ____/ / | / ____/ / / / ____/
/ ,< / /| | / /_ / ,< / /| | / __/ / /| | / / __ / / / __/
/ /| | / ___ | / __/ / /| | / ___ | / /___ / ___ |/ /_/ / / /___ / /___
/_/ |_| /_/ |_|/_/ /_/ |_| /_/ |_| /_____/ /_/ |_|\____/ /_____//_____/
Version 1.3.9
*******************************************************************
* Kafka Eagle Service has started success.
* Welcome, Now you can visit 'http://127.0.0.1:8048/ke'
* Account:admin ,Password:123456
*******************************************************************
* <Usage> ke.sh [start|status|stop|restart|stats] </Usage>
* <Usage> https://www.kafka-eagle.org/ </Usage>
*******************************************************************
提示的访问地址是http://127.0.0.1:8048/ke,但是可以用http://192.168.0.145/ke进行访问