1.架构
第一步,Flume和Kakfa对接,Flume抓取日志,写到Kafka中
第二部,Spark Streaming读取Kafka中的数据,进行实时分析
本文首先使用Kakfa自带的消息处理(脚本)来获取消息,走通Flume和Kafka的对接
2.安装flume,kafka
flume install: http://my.oschina.net/u/192561/blog/692225
kafka install: http://my.oschina.net/u/192561/blog/692357
3.Flume和Kafka整合
3.1 两者整合优势
Flume更倾向于数据传输本身,Kakfa是典型的消息中间件用于解耦生产者消费者。
具体架构上,Agent并没把数据直接发送到Kafka,在Kafka前面有层由Flume构成的forward。这样做有两个原因:
Kafka的API对非JVM系的语言支持很不友好,forward对外提供更加通用的HTTP接口。forward层可以做路由、Kafka topic和Kafkapartition key等逻辑,进一步减少Agent端的逻辑。
数据有数据源到flume再到Kafka时,数据一方面可以同步到HDFS做离线计算,另一方面可以做实时计算。本文实时计算采用SparkStreaming做测试。
3.2 Flume和Kafka整合安装
1. 下载Flume和Kafka集成的插件,下载地址:
https://github.com/beyondj2ee/flumeng-kafka- plugin
将package目录中的flumeng-kafka-plugin.jar拷贝到Flume安装目录的lib目录下
2. 将Kakfa安装目录libs目录下的如下jar包拷贝到Flume安装目录的lib目录下
kafka_2.11-0.10.0.0.jar
scala-library-2.11.8.jar
metrics-core-2.2.0.jar
提取插件中的flume-conf.properties文件:修改如下:flume源采用exec
producer.sources.s.type = exec
producer.sources.s.command=tail -F -n+1 /home/eric/bigdata/kafka-logs/a.log
producer.sources.s.channels = c1
修改producer代理的topic为 HappyBirthDayToAnYuan
将配置放到 apache-flume-1.6.0-bin/conf/producer.conf中
完整 producer.conf:
#agentsection producer.sources= s1 producer.channels= c1 producer.sinks= k1 #配置数据源 producer.sources.s1.type=exec #配置需要监控的日志输出文件或目录 producer.sources.s1.command=tail -F -n+1 /home/eric/bigdata/kafka-logs/a.log
#配置数据通道 producer.channels.c1.type=memory producer.channels.c1.capacity=10000 producer.channels.c1.transactionCapacity=100 #配置数据源输出 #设置Kafka接收器,此处最坑,注意版本,此处为Flume 1.6.0的输出槽类型 producer.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink #设置Kafka的broker地址和端口号 producer.sinks.k1.brokerList=localhost:9092 #设置Kafka的Topic producer.sinks.k1.topic=HappyBirthDayToAnYuan #设置序列化方式 producer.sinks.k1.serializer.class=kafka.serializer.StringEncoder #将三者级联 producer.sources.s1.channels=c1 producer.sinks.k1.channel=c1
3.3 启动kafka flume相关服务
启动ZK bin/zookeeper-server-start.sh config/zookeeper.properties
启动Kafka服务 bin/kafka-server-start.sh config/server.properties
创建主题
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic HappyBirthDayToAnYuan
查看主题
bin/kafka-topics.sh --list --zookeeper localhost:2181
查看主题详情
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic HappyBirthDayToAnYuan
删除主题
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test
创建消费者
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
启动flume
bin/flume-ng agent -n producer -c conf -f conf/producer.conf -Dflume.root.logger=INFO,console
向flume发送数据:
echo "yuhai" >> a.log
kafka消费数据:
注意:当前文件内容删除,服务器重启,主题需重新创建,但是消费内容有落地文件,当前消费内容不消失.