Kafka Consumer端的一些解惑

Stella981
• 阅读 677

最近一直忙着各种设计和文档,终于有时间来更新一点儿关于kafka的东西。之前有一篇文章讲述的是kafka Producer端的程序,也就是日志的生产者,这部分比较容易理解,业务系统将运行日志或者业务日志发送到broker中,由broker代为存储。那讲的是如何收集日志,今天要写的是如何获取日志,然后再做相关的处理。

之前写过kafka是讲日志按照topic的形式存储,一个topic会按照partition存在同一个文件夹下,目录在config/server.properties中指定,具体的存储规则可以查看之前的文章:

# The directory under which to store log files
log.dir=/tmp/kafka-logs

Consumer端的目的就是为了获取log日志,然后做进一步的处理。在这里我们可以将数据的处理按照需求分为两个方向,线上和线下,也可以叫实时和离线。实时处理部分类似于网站里的站短,有消息了马上就推送到前端,这是一种对实时性要求极高的模式,kafka可以做到,当然针对站短这样的功能还有更好的处理方式,我主要将kafka线上消费功能用在了实时统计上,处理一些如实时流量汇总、各系统实时吞吐量汇总等。

这种应用,一般采用一个consumer中的一个group对应一个业务,配合多个producer提供数据,如下图模
式:

Kafka Consumer端的一些解惑

采用这种方式处理很简单,采用官网上给的例子即可解决,只是由于版本的问题,代码稍作更改即可:

package com.a2.kafka.consumer;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;

public class CommonConsumer {
    public static void main(String[] args) {
        // specify some consumer properties
        Properties props = new Properties();
        props.put("zk.connect", "192.168.181.128:2181");
        props.put("zk.connectiontimeout.ms", "1000000");
        props.put("groupid", "test_group");

        // Create the connection to the cluster
        ConsumerConfig consumerConfig = new ConsumerConfig(props);
        ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

        Map<String, Integer> map=new HashMap<String,Integer>();
        map.put("test", 2);
        // create 4 partitions of the stream for topic “test”, to allow 4 threads to consume
        Map<String, List<KafkaStream<Message>>> topicMessageStreams = 
            consumerConnector.createMessageStreams(map);
        List<KafkaStream<Message>> streams = topicMessageStreams.get("test");

        // create list of 4 threads to consume from each of the partitions 
        ExecutorService executor = Executors.newFixedThreadPool(4);

        // consume the messages in the threads
        for(final KafkaStream<Message> stream: streams) {
          executor.submit(new Runnable() {
            public void run() {
              for(MessageAndMetadata<Message> msgAndMetadata: stream) {
                // process message (msgAndMetadata.message())
                  System.out.println(msgAndMetadata.message());
              }    
            }
          });
        }
    }
}

这是一个user level的API,还有一个low level的API可以从官网找到,这里就不贴出来了。这个consumer是底层采用的是一个阻塞队列,只要一有producer生产数据,那consumer就会将数据打印出来,这是不是十分符合实时性的要求。

当然这里会产生一个很严重的问题,如果你重启一下上面这个程序,那你连一条数据都抓不到,但是你去log文件中明明可以看到所有数据都好好的存在。换句话说,一旦你消费过这些数据,那你就无法再次用同一个groupid消费同一组数据了。我已经把结论说出来了,要消费同一组数据,你可以采用不同的group。

简单说下产生这个问题的原因,这个问题类似于transaction commit,在消息系统中都会有这样一个问题存在,数据消费状态这个信息到底存哪里。是存在consumer端,还是存在broker端。对于这样的争论,一般会出现三种情况:

  • _At most once_—this handles the first case described. Messages are immediately marked as consumed, so they can't be given out twice, but many failure scenarios may lead to losing messages.
  • _At least once_—this is the second case where we guarantee each message will be delivered at least once, but in failure cases may be delivered twice.
  • _Exactly once_—this is what people actually want, each message is delivered once and only once.

第一种情况是将消费的状态存储在了broker端,一旦消费了就改变状态,但会因为网络原因少消费信息,第二种是存在两端,并且先在broker端将状态记为send,等consumer处理完之后将状态标记为consumed,但也有可能因为在处理消息时产生异常,导致状态标记错误等,并且会产生性能的问题。第三种当然是最好的结果。

Kafka解决这个问题采用high water mark这样的标记,也就是设置offset:

Kafka does two unusual things with respect to metadata. First the stream is partitioned on the brokers into a set of distinct partitions. The semantic meaning of these partitions is left up to the producer and the producer specifies which partition a message belongs to. Within a partition messages are stored in the order in which they arrive at the broker, and will be given out to consumers in that same order. This means that rather than store metadata for each message (marking it as consumed, say), we just need to store the "high water mark" for each combination of consumer, topic, and partition. Hence the total metadata required to summarize the state of the consumer is actually quite small. In Kafka we refer to this high-water mark as "the offset" for reasons that will become clear in the implementation section.

所以在每次消费信息时,log4j中都会输出不同的offset:

[FetchRunnable-0] INFO : kafka.consumer.FetcherRunnable#info : FetchRunnable-0 start fetching topic: test part: 0 offset: 0 from 192.168.181.128:9092

[FetchRunnable-0] INFO : kafka.consumer.FetcherRunnable#info : FetchRunnable-0 start fetching topic: test part: 0 offset: 15 from 192.168.181.128:9092

除了采用不同的groupid去抓取已经消费过的数据,kafka还提供了另一种思路,这种方式更适合线下的操作,镜像。

Kafka Consumer端的一些解惑

通过一些配置,就可以将线上产生的数据同步到镜像中去,然后再由特定的集群区处理大批量的数据,这种方式可以采用low level的API按照不同的partition和offset来抓取数据,以获得更好的并行处理效果。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
待兔 待兔
4个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Easter79 Easter79
3年前
springboot+kafka集成
kafka概念  Topic:消息根据Topic进行归类,可以理解为一个队里。  Producer:消息生产者,就是向kafkabroker发消息的客户端。  Consumer:消息消费者,向kafkabroker取消息的客户端。  broker:每个kafka实例(server),一台kafka服务器就是一个broker,一个集群
Stella981 Stella981
3年前
Spring Boot日志集成
!(https://oscimg.oschina.net/oscnet/1bde8e8d00e848be8b84e9d1d44c9e5c.jpg)SpringBoot日志框架SpringBoot支持JavaUtilLogging,Log4j2,Lockback作为日志框架,如果你使用star
Stella981 Stella981
3年前
Kafka+Zookeeper+Filebeat+ELK 搭建日志收集系统
ELKELK目前主流的一种日志系统,过多的就不多介绍了Filebeat收集日志,将收集的日志输出到kafka,避免网络问题丢失信息kafka接收到日志消息后直接消费到LogstashLogstash将从kafka中的日志发往elasticsearchKibana对elasticsearch中的日志数
Stella981 Stella981
3年前
Kafka 消息存储与索引设计
消息中间件的性能好坏,它的消息存储的机制是衡量该性能的最重要指标之一,而Kafka具有高性能、高吞吐、低延时的特点,动不动可以上到几十上百万TPS,离不开它优秀的消息存储设计。下面我按照自己的理解为大家讲解Kafka消息存储设计的那些事。在Kafka的设计思想中,消息的存储文件被称作日志,我们Java后端绝大部分人谈到日志,一般会联想到
Wesley13 Wesley13
3年前
JavaWeb项目架构之Kafka分布式日志队列
架构、分布式、日志队列,标题自己都看着唬人,其实就是一个日志收集的功能,只不过中间加了一个Kafka做消息队列罢了。!(https://images2017.cnblogs.com/blog/109211/201802/109211201802061533201231622886397.png)kafka介绍Kafka是由
Stella981 Stella981
3年前
Kafka副本与ISR设计(I)
在Kafka中一个分区日志其实就是一个备份日志,kafka利用多个相同备份日志来提高系统的可用性。这些备份日志其实就是所谓的副本。Kafka的副本具有leader副本和follower副本之分,leader副本为客户端提供读写请求,follower副本只是用于被动地从leader副本中同步数据,对外不提供读写服务。Kafka的所有节点所有副本假设都在
Stella981 Stella981
3年前
Kafka框架基础
\Kafka框架基础官网:kafka.apache.org框架简介ApacheKafka是分布式发布订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。相关概念\
文盘Rust -- 给程序加个日志 | 京东云技术团队
日志是应用程序的重要组成部分。无论是服务端程序还是客户端程序都需要日志做为错误输出或者业务记录。在这篇文章中,我们结合聊聊rust程序中如何使用日志。类似java生态中的log4j,使用方式也很相似log4rs中的基本概念log4rs的功能组件也由appe