Kafka+Log4j实现日志集中管理

Stella981
• 阅读 737

引言

前段时间写的《Spring+Log4j+ActiveMQ实现远程记录日志——实战+分析》得到了许多同学的认可,在认可的同时,也有同学提出可以使用Kafka来集中管理日志,于是今天就来学习一下。

特别说明,由于网络上关于Kafka+Log4j的完整例子并不多,我也是一边学习一边使用,因此如果有解释得不好或者错误的地方,欢迎批评指正,如果你有好的想法,也欢迎留言探讨。

第一部分 搭建Kafka环境


安装Kafka

下载:http://kafka.apache.org/downloads.html

tar zxf kafka-<VERSION>.tgz
cd kafka-<VERSION>

启动Zookeeper

启动Zookeeper前需要配置一下config/zookeeper.properties:‍

Kafka+Log4j实现日志集中管理

接下来启动Zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

启动Kafka Server

启动Kafka Server前需要配置一下config/server.properties。主要配置以下几项,内容就不说了,注释里都很详细:

Kafka+Log4j实现日志集中管理

然后启动Kafka Server:

bin/kafka-server-start.sh config/server.properties

 创建Topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看创建的Topic

>bin/kafka-topics.sh --list --zookeeper localhost:2181

启动控制台Producer,向Kafka发送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
^C

启动控制台Consumer****,消费刚刚发送的消息

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is another message

删除Topic

bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test

注:只有当delete.topic.enable=true时,该操作才有效

配置Kafka****集群(单台机器上)

首先拷贝server.properties文件为多份(这里演示4****个节点的Kafka集群,因此还需要拷贝3份配置文件):

cp config/server.properties config/server1.properties
cp config/server.properties config/server2.properties
cp config/server.properties config/server3.properties

修改server1.properties的以下内容:

broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1

同理修改server2.properties和server3.properties的这些内容,并保持所有配置文件的zookeeper.connect属性都指向运行在本机的zookeeper地址localhost:2181。注意,由于这几个Kafka节点都将运行在同一台机器上,因此需要保证这几个值不同,这里以累加的方式处理。例如在server2.properties上:

broker.id=2
port=9094
log.dir=/tmp/kafka-logs-2

把server3.properties也配置好以后,依次启动这些节点:

bin/kafka-server-start.sh config/server1.properties &
bin/kafka-server-start.sh config/server2.properties &
bin/kafka-server-start.sh config/server3.properties &

Topic & Partition

Topic在逻辑上可以被认为是一个queue,每条消费都必须指定它的Topic,可以简单理解为必须指明把这条消息放进哪个queue里。为了使得Kafka的吞吐率可以线性提高,物理上把Topic分成一个或多个Partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件。

现在在Kafka集群上创建备份因子为3,分区数为4的Topic:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 4 --topic kafka

说明:备份因子replication-factor越大,则说明集群容错性越强,就是当集群down掉后,数据恢复的可能性越大。所有的分区数里的内容共同组成了一份数据,分区数partions越大,则该topic的消息就越分散,集群中的消息分布就越均匀。

然后使用kafka-topics.sh的--describe参数查看一下Topic为kafka的详情:

Kafka+Log4j实现日志集中管理

输出的第一行是所有分区的概要,接下来的每一行是一个分区的描述。可以看到Topic为kafka的消息,PartionCount=4,ReplicationFactor=3正是我们创建时指定的分区数和备份因子。

另外:Leader是指负责这个分区所有读写的节点;Replicas是指这个分区所在的所有节点(不论它是否活着);ISR是Replicas的子集,代表存有这个分区信息而且当前活着的节点。

拿partition:0这个分区来说,该分区的Leader是server0,分布在id为0,1,2这三个节点上,而且这三个节点都活着。

再来看下Kafka集群的日志:

Kafka+Log4j实现日志集中管理

其中kafka-logs-0代表server0的日志,kafka-logs-1代表server1的日志,以此类推。

从上面的配置可知,id为‍0,1,2,3‍的节点分别对应server0, server1, server2, server3。而上例中的partition:0分布在id为0, 1, 2这三个节点上,因此可以在server0, server1, server2这三个节点上看到有kafka-0这个文件夹。这个kafka-0就代表Topic为kafka的partion0。

第二部分 Kafka+Log4j****项目整合

先来看下Maven项目结构图:

Kafka+Log4j实现日志集中管理

作为Demo,文件不多。先看看pom.xml引入了哪些jar包:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.9.2</artifactId>
    <version>0.8.2.1</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.8.2.1</version>
</dependency>
<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>18.0</version>
</dependency>

重要的内容是log4j.properties:

log4j.rootLogger=INFO,console

# for package com.demo.kafka, log would be sent to kafka appender.
log4j.logger.com.demo.kafka=DEBUG,kafka

# appender kafka
log4j.appender.kafka=kafka.producer.KafkaLog4jAppender
log4j.appender.kafka.topic=kafka
# multiple brokers are separated by comma ",".
log4j.appender.kafka.brokerList=localhost:9092, localhost:9093, localhost:9094, localhost:9095
log4j.appender.kafka.compressionType=none
log4j.appender.kafka.syncSend=true
log4j.appender.kafka.layout=org.apache.log4j.PatternLayout
log4j.appender.kafka.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n
 
# appender console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n

App.java里面就很简单啦,主要是通过log4j输出日志:

package com.demo.kafka;
import org.apache.log4j.Logger;
public class App {
    private static final Logger LOGGER = Logger.getLogger(App.class);
    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 20; i++) {
            LOGGER.info("Info [" + i + "]");
            Thread.sleep(1000);
        }
    }
}

MyConsumer.java用于消费kafka中的信息:

package com.demo.kafka;

import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.google.common.collect.ImmutableMap;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

public class MyConsumer {
    private static final String ZOOKEEPER = "localhost:2181";
    //groupName可以随意给,因为对于kafka里的每条消息,每个group都会完整的处理一遍
    private static final String GROUP_NAME = "test_group";
    private static final String TOPIC_NAME = "kafka";
    private static final int CONSUMER_NUM = 4;
    private static final int PARTITION_NUM = 4;

    public static void main(String[] args) {
        // specify some consumer properties
        Properties props = new Properties();
        props.put("zookeeper.connect", ZOOKEEPER);
        props.put("zookeeper.connectiontimeout.ms", "1000000");
        props.put("group.id", GROUP_NAME);

        // Create the connection to the cluster
        ConsumerConfig consumerConfig = new ConsumerConfig(props);
        ConsumerConnector consumerConnector = 
            Consumer.createJavaConsumerConnector(consumerConfig);

        // create 4 partitions of the stream for topic “test”, to allow 4
        // threads to consume
        Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = 
            consumerConnector.createMessageStreams(
                ImmutableMap.of(TOPIC_NAME, PARTITION_NUM));
        List<KafkaStream<byte[], byte[]>> streams = topicMessageStreams.get(TOPIC_NAME);

        // create list of 4 threads to consume from each of the partitions
        ExecutorService executor = Executors.newFixedThreadPool(CONSUMER_NUM);

        // consume the messages in the threads
        for (final KafkaStream<byte[], byte[]> stream : streams) {
            executor.submit(new Runnable() {
                public void run() {
                    for (MessageAndMetadata<byte[], byte[]> msgAndMetadata : stream) {
                        // process message (msgAndMetadata.message())
                        System.out.println(new String(msgAndMetadata.message()));
                    }
                }
            });
        }
    }
}

MyProducer.java用于向Kafka发送消息,但不通过log4j的appender发送。此案例中可以不要。但是我还是放在这里:

package com.demo.kafka;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class MyProducer {
    private static final String TOPIC = "kafka";
    private static final String CONTENT = "This is a single message";
    private static final String BROKER_LIST = "localhost:9092";
    private static final String SERIALIZER_CLASS = "kafka.serializer.StringEncoder";
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("serializer.class", SERIALIZER_CLASS);
        props.put("metadata.broker.list", BROKER_LIST);
        
        ProducerConfig config = new ProducerConfig(props);
        Producer<String, String> producer = new Producer<String, String>(config);

        //Send one message.
        KeyedMessage<String, String> message = 
            new KeyedMessage<String, String>(TOPIC, CONTENT);
        producer.send(message);
        
        //Send multiple messages.
        List<KeyedMessage<String,String>> messages = 
            new ArrayList<KeyedMessage<String, String>>();
        for (int i = 0; i < 5; i++) {
            messages.add(new KeyedMessage<String, String>
                (TOPIC, "Multiple message at a time. " + i));
        }
        producer.send(messages);
    }
}

到这里,代码就结束了。

第三部分 运行与验证

先运行MyConsumer,使其处于监听状态。同时,还可以启动Kafka自带的ConsoleConsumer来验证是否跟MyConsumer的结果一致。最后运行App.java。

先来看看MyConsumer的输出:

Kafka+Log4j实现日志集中管理

再来看看ConsoleConsumer的输出:

Kafka+Log4j实现日志集中管理

可以看到,尽管发往Kafka的消息去往了不同的地方,但是内容是一样的,而且一条也不少。最后再来看看Kafka的日志。

Kafka+Log4j实现日志集中管理

我们知道,Topic为kafka的消息有4个partion,从之前的截图可知这4个partion均匀分布在4个kafka节点上,于是我对每一个partion随机选取一个节点查看了日志内容。

上图中黄色选中部分依次代表在server0上查看partion0,在server1上查看partion1,以此类推。

红色部分是日志内容,由于在创建Topic时准备将20条日志分成4个区存储,可以很清楚的看到,这20条日志确实是很均匀的存储在了几个partion上。

摘一点Infoq上的话:每个日志文件都是一个log entrie序列,每个log entrie包含一个4字节整型数值(值为N+5),1个字节的"magic value",4个字节的CRC校验码,其后跟N个字节的消息体。每条消息都有一个当前Partition下唯一的64字节的offset,它指明了这条消息的起始位置。磁盘上存储的消息格式如下:

message length : 4 bytes (value: 1+4+n)
"magic" value : 1 byte 
crc : 4 bytes 
payload : n bytes

这里我们看到的日志文件的每一行,就是一个log entrie,每一行前面无法显示的字符(蓝色选中部分),就是(message length + magic value + crc)了。而log entrie的后部分,则是消息体的内容了。

问题:

1. 如果要使用此种方式,有一种场景是提取某天或者某小时的日志,那么如何设计Topic呢?是不是要在Topic上带入日期或者小时数?还有更好的设计方案吗?

2. 假设按每小时设计Topic,那么如何在使用诸如logger.info()这样的方法时,自动根据时间去改变Topic呢?有类似的例子吗?

----欢迎交流,共同进步。

样例下载:百度网盘

链接: http://pan.baidu.com/s/1i400DZv 密码: f25c

参考页面:

http://kafka.apache.org/07/quickstart.html

http://kafka.apache.org/documentation.html#quickstart

http://www.infoq.com/cn/articles/kafka-analysis-part-1

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
5个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Stella981 Stella981
3年前
Spring Boot日志集成
!(https://oscimg.oschina.net/oscnet/1bde8e8d00e848be8b84e9d1d44c9e5c.jpg)SpringBoot日志框架SpringBoot支持JavaUtilLogging,Log4j2,Lockback作为日志框架,如果你使用star
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
11个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这