Flume Kafka 测试案例,Flume 的配置。
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = netcat
a1.sources.s1.bind = master
a1.sources.s1.port = 44444
a1.channels.c1.type = memory
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = t1 # kafka topic 不需要加 k1.kafka.topic,直接去掉 kafka
a1.sinks.k1.brokerList = master:9092 # 新的使用 brokerList,旧的使用 kafka.bootstrap.servers
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
1. 启动 kafka。
kafka-server-start.sh config/server.properties
2. 创建 kafka topic,flume配置中的 topic 为 t1。
# 这里 --replication-factor 为1,是因为只启动了master上的kafka,从节点上面没有启动kafka,如果设置大于1的,需要将从节点的kafka也启动
# partitions 分区数量保持大于 replication-factor,分区大的话可以缓解数据过大的问题,解决内存不够,但是解决内存本质上还是需要从机器上解决。
kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 2 --topic t1
3. 启动 flume。
flume-ng agent -c conf -f conf/kafka_test.conf -n a1 -Dflume.root.logger=INFO,console
4. 启动 kafka 的消费者,来观察看是否成功。
kafka-console-consumer.sh --bootstrap-server master:9092 --topic t1
5. 由于 flume 配置文件中监控的命令是 netcat,启动一个远程,来发送消息。
# 如果没有 telnet, 使用 yum install telnet 进行安装
# localhost 本机
# 端口 44444,是flume配置文件中指定的,flume启动就会启动对应的端口监听
telnet localhost 44444
6. 测试
telnet localhost 44444
> hello
>world
>nice
查看 kafka 的消费者窗口,会发现已经有了对应的内容
# kafka-console-consumer.sh --bootstrap-server master:9092 --topic t1
hello
world
nice
总结:一开始由于 flume 的配置文件没有写对,调试很久才调通,真是不应该。其次,flume启动之后要学会看对应的日志信息,比如启动flume后,就应该可以观察到kafka对应的topic,但是由于没有仔细看,发现前几次调试都是不通的,不论怎么做kafka 的消费者就是拿不到数据。但是最后发现如果 flume 配置文件不正确的话,启动 flume,监听的topic 是默认的 default-topic,所以最后问题出现在 flume 的配置文件上面,把对应的 flume 中关于 sink 部分的配置要注意,由于版本不一样有的配置需要做一点转换才能跑成功。一定要注意检查日志。