KafkaConsumer实现精确一次消费

Stella981
• 阅读 432

转自 https://blog.csdn.net/qq_18581221/article/details/89766073

简介

在使用kafka时,大多数场景对于数据少量的不一致(重复或者丢失)并不关注,比如日志,因为不会影响最终的使用或者分析,但是在某些应用场景(比如业务数据),需要对任何一条消息都要做到精确一次的消费,才能保证系统的正确性,kafka并不提供准确一致的消费API,需要我们在实际使用时借用外部的一些手段来保证消费的精确性,下面我们介绍如何实现

kafka消费机制

这篇文章KafkaConsumer使用介绍、参数配置介绍了如何kafka具有两种提交offset(消费偏移量)方式,我们在Kafka简介以及安装和使用可知每个分区具备一offset记录消费位置,如果消费者一直处于正常的运行转态,那么offset将没有什么用处,因为正常消费时,consumer记录了本次消费的offset和下一次将要进行poll数据的offset起始位置,但是如果消费者发生崩溃或者有新的消费者加入消费者组,就会触发再均衡Rebalance,Rebalance之后,每个消费者将会分配到新的分区,而消费者对于新的分区应该从哪里进行起始消费,这时候提交的offset信息就起作用了,提交的offset信息包括消费者组所有分区的消费进度,这时候消费者可以根据消费进度继续消费,提交offset提交自动提交是最不具确定性的,所以要使用手动提交来控制offset

消费时出现几种异常情况

自动提交

  • 重复消费:当数据已经被处理,然后自动提交offset时消费者出现故障或者有新消费者加入组导致再均衡,这时候offset提交失败,导致这批已经处理的数据的信息没有记录,后续会重复消费一次
  • 丢失数据:如果业务处理时间较长一点,这时候数据处理业务还未完成,offset信息已经提交了,但是在后续处理数据过程中程序发生了崩溃,导致这批数据未正常消费,这时候offset已经提交,消费者后续将不在消费这批数据,导致这批数据将会丢失

手动提交

  • 重复消费(最少一次消费语义实现):消费数据处理业务完成后进行offset提交,可以保证数据最少一次消费,因为在提交offset的过程中可能出现提交失败的情况,导致数据重复消费

    /**

    • 手动提交offset
    • 实现至少一次的消费语义 at least once
    • 当手动提交位移失败,会重复消费数据

    */ @Test public void testCommitOffset() { String topic = "first-topic"; String group = "g1";

    Properties props = new Properties();
    props.put("bootstrap.servers", "node00:9092,node03:9092");   //required
    props.put("group.id", group);   //required
    props.put("enable.auto.commit", "false"); // 关闭自动提交
    props.put("auto.commit.interval.ms", "1000");
    props.put("auto.offset.reset", "latest");     //从最早的消息开始读取
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  //required
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //required
    
    Consumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList(topic));       //订阅topic
    final int minBatchSize = 10;
    // 缓存
    List<ConsumerRecord<String, String>> buffer = new ArrayList<>(minBatchSize);
    try {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            records.forEach(buffer::add);
    
            // 缓存满了才对数据进行处理
            if (buffer.size() >= minBatchSize) {
    
                // 业务逻辑--插入数据库
                // insertIntoDb(buffer);
                // 等数据插入数据库之后,再异步提交位移
    
                // 通过异步的方式提交位移
                consumer.commitAsync(((offsets, exception) -> {
                    if (exception == null) {
                        offsets.forEach((topicPartition, metadata) -> {
                            System.out.println(topicPartition + " -> offset=" + metadata.offset());
                        });
                    } else {
                        exception.printStackTrace();
                        // 如果出错了,同步提交位移
                        consumer.commitSync(offsets);
                    }
                }));
    
               
                // 如果提交位移失败了,那么重启consumer后会重复消费之前的数据,再次插入到数据库中
                // 清空缓冲区
                buffer.clear();
            }
        }
    } finally {
        consumer.close();
    }
    

    }

  • 丢失数据(最多一次消费语义实现):在消费数据业务处理前进行offset提交,可以保证最多一次消费,在后续数据业务处理程序出现故障,将导致数据丢失

代码实现

/**
 * 实现最多一次语义
 * 在消费前提交位移,当后续业务出现异常时,可能丢失数据
 */
@Test
public void testAtMostOnce() {
    Properties props = new Properties();
    props.put("enable.auto.commit", "false");
    KafkaConsumer<String, String> kafkaConsumer = KafkaFactory.buildConsumer(props);
    kafkaConsumer.subscribe(Arrays.asList("first-topic"));
    try {

        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(500);
            // 处理业务之前就提交位移
            kafkaConsumer.commitAsync();
            // 下面是业务逻辑
            records.forEach(record -> {
                System.out.println(record.value() + ", offset=" + record.offset());
            });
        }
    } catch (Exception e) {

    } finally {
        kafkaConsumer.close();
    }

}

精确一次消费实现

从kafka的消费机制,我们可以得到是否能够精确的消费关键在消费进度信息的准确性,如果能够保证消费进度的准确性,也就保证了消费数据的准确性

  • 数据有状态:可以根据数据信息进行确认数据是否重复消费,这时候可以使用手动提交的最少一次消费语义实现,即使消费的数据有重复,可以通过状态进行数据去重,以达到幂等的效果
  • 存储数据容器具备幂等性:在数据存入的容器具备天然的幂等(比如ElasticSearch的put操作具备幂等性,相同的数据多次执行Put操作和一次执行Put操作的结果是一致的),这样的场景也可以使用手动提交的最少一次消费语义实现,由存储数据端来进行数据去重
  • 数据无状态,并且存储容器不具备幂等:这种场景需要自行控制offset的准确性,今天文章主要说明这种场景下的处理方式,这里数据不具备状态,存储使用关系型数据库,比如MySQL

这里简单说明一下实现思路

  1. 利用consumer api的seek方法可以指定offset进行消费,在启动消费者时查询数据库中记录的offset信息,如果是第一次启动,那么数据库中将没有offset信息,需要进行消费的元数据插入,然后从offset=0开始消费

  2. 关系型数据库具备事务的特性,当数据入库时,同时也将offset信息更新,借用关系型数据库事务的特性保证数据入库和修改offset记录这两个操作是在同一个事务中进行

  3. 使用ConsumerRebalanceListener来完成在分配分区时和Relalance时作出相应的处理逻辑

  4. 要弄清楚的是,我们在消费的时候,关闭了自动提交,我们也没有通过consumer.commitAsync()手动提交我们的位移信息,而是在每次启动一个新的consumer的时候,触发rebalance时,读取数据库中的位移信息,从该位移中开始读取partition的信息(初始化的时候为0),在没有出现异常的情况下,我们的consumer会不断从producer读取信息,这个位移是最新的那个消息位移,而且会同时把这个位移更新到数据库中,但是,当出现了rebalance时,那么consumer就会从数据库中读取开始的位移。

表设计

create table kafka_info(
    topic_group_partition varchar(32) primary key, //主题+组名+分区号 这里冗余设计方便通过这个主键进行更新提升效率 
    topic_group varchar(30), //主题和组名
    partition_num tinyint,//分区号
    offsets bigint default 0 //offset信息
);

代码

/**
 * @Description: 实现Kafka的精确一次消费
 * @author: HuangYn
 * @date: 2019/10/15 21:10
 */
public class ExactlyOnceConsume {

    private final KafkaConsumer<String, String> consumer;
    private Map<TopicPartition, Long> tpOffsetMap;
    private List<ConsumerRecord> list;
    private JDBCHelper jdbcHelper = JDBCHelper.getInstance();
    private String groupId;
    private String topic;

    public ExactlyOnceConsume(Properties props, String topic, String groupId) {
        this.consumer = KafkaFactory.buildConsumer(props);
        this.list = new ArrayList<>(100);
        this.tpOffsetMap = new HashMap<>();
        this.groupId = groupId;
        this.topic = topic;
        this.consumer.subscribe(Arrays.asList(this.topic), new HandleRebalance());
    }

    public void receiveMsg() {
        try {

            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                if (!records.isEmpty()) {
                    // 处理每个partition的记录
                    records.partitions().forEach(tp -> {
                        List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
                        // 记录加到缓存中
                        tpRecords.forEach(record -> {
                            System.out.println("partition=" + record.partition() +
                                    ", offset= " + record.offset() +
                                    ", value=" + record.value());
                            list.add(record);
                        });
                        // 将partition对应的offset加到map中, 获取partition中最后一个元素的offset,
                        // +1 就是下一次读取的位移,就是本次需要提交的位移
                        tpOffsetMap.put(tp, tpRecords.get(tpRecords.size() - 1).offset() + 1);
                    });
                }
                // 缓存中有数据
                if (!list.isEmpty()) {
                    // 将数据插入数据库,并且将位移信息也插入数据库
                    // 因此,每次读取到数据,都要更新本consumer在数据库中的位移信息
                    boolean success = insertIntoDB(list, tpOffsetMap);
                    if (success) {
                        list.clear();
                        tpOffsetMap.clear();
                    }
                }
            }

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }

    private boolean insertIntoDB(List<ConsumerRecord> list,
                                 Map<TopicPartition, Long> tpOffsetMap) {

        // 这里应该是在同一个事务中进行的
        // 为了方便就省略了

        try {
            // TODO 将数据入库,这里省略了

            // 将partition位移更新
            String sql = "UPDATE kafka_info SET offsets = ? WHERE topic_group_partition = ?";
            List<Object[]> params = new ArrayList<>(tpOffsetMap.size());
            tpOffsetMap.forEach((tp, offset) -> {
                Object[] param = new Object[]{offset, topic + "_" + groupId + "_" + tp.partition()};
                params.add(param);
            });
            jdbcHelper.batchExecute(sql, params);
            return true;
        } catch (Exception e) {
            // 回滚事务
        }
    }

    /**
     * rebalance触发的处理器
     */
    private class HandleRebalance implements ConsumerRebalanceListener {

        // rebalance之前触发
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            //发生Rebalance时,只需要将list中数据和记录offset信息清空即可
            //这里为什么要清除数据,应为在Rebalance的时候有可能还有一批缓存数据在内存中没有进行入库,
            //并且offset信息也没有更新,如果不清除,那么下一次还会重新poll一次这些数据,将会导致数据重复
            System.out.println("==== onPartitionsRevoked ===== ");
            list.clear();
            tpOffsetMap.clear();
        }

        // rebalance后调用,consumer抓取数据之前触发
        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            System.out.println("== onPartitionsAssigned ==");

            List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
            // 从数据库读取当前partition的信息
            Map<TopicPartition, Long> partitionOffsetMapFromDB = getPartitionOffsetMapFromDB(partitionInfos.size());

            // 在分配分区时指定消费位置
            for (TopicPartition partition : partitions) {
                // 指定consumer在每个partition上的消费开始位置
                // 如果在数据库中有对应partition的信息则使用,否则将默认从offset=0开始消费
                if (partitionOffsetMapFromDB.get(partition) != null) {
                    consumer.seek(partition, partitionOffsetMapFromDB.get(partition));
                } else {
                    consumer.seek(partition, 0L);
                }
            }
        }
    }

    /**
     * 从数据库读取offset信息
     *
     * @param size
     * @return
     */
    private Map<TopicPartition, Long> getPartitionOffsetMapFromDB(int size) {
        Map<TopicPartition, Long> partitionOffsetMapFromDB = new HashMap<>();
        String sql = "SELECT partition_num, offsets FROM kafka_info WHERE topic_group = ?";
        jdbcHelper.executeQuery(sql, new Object[]{topic + "_" + groupId}, resultSet -> {

            int partition_num = -1;
            long offsets = -1;
            while (resultSet.next()) {
                partition_num = resultSet.getInt("partition_num");
                offsets = resultSet.getLong("offsets");
                System.out.println("partition_num=" + partition_num + ", offset=" + offsets);
                partitionOffsetMapFromDB.put(new TopicPartition(topic, partition_num), offsets);
            }

            System.out.println("partitionOffsetMapFromDB.size = " + partitionOffsetMapFromDB.size());

            //判断数据库是否存在所有的分区的信息,如果没有,则需要进行初始化
            if (partitionOffsetMapFromDB.size() < size) {
                String insert = "INSERT INTO kafka_info (topic_group_partition,topic_group,partition_num) VALUES(?,?,?)";
                List<Object[]> params = new ArrayList<>();
                for (int p_num = 0; p_num < size; p_num++) {
                    Object[] param = new Object[]{
                            topic + "_" + groupId + "_" + p_num,
                            topic + "_" + groupId,
                            p_num
                    };
                    params.add(param);
                }
                jdbcHelper.batchExecute(insert, params);
            }

        });
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return partitionOffsetMapFromDB;
    }

}

 数据库中记录

KafkaConsumer实现精确一次消费

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Easter79 Easter79
3年前
swap空间的增减方法
(1)增大swap空间去激活swap交换区:swapoff v /dev/vg00/lvswap扩展交换lv:lvextend L 10G /dev/vg00/lvswap重新生成swap交换区:mkswap /dev/vg00/lvswap激活新生成的交换区:swapon v /dev/vg00/lvswap
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Wesley13 Wesley13
3年前
Java获得今日零时零分零秒的时间(Date型)
publicDatezeroTime()throwsParseException{    DatetimenewDate();    SimpleDateFormatsimpnewSimpleDateFormat("yyyyMMdd00:00:00");    SimpleDateFormatsimp2newS
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这