Kafka——SpringBoot整合(消费者位移的提交)

Stella981
• 阅读 636

消费者位移的提交方式以及提交时机需要根据不同的业务场景进行选择,可以看之前的博客kafka消费者相关。 这里只做应用相关,更多的使用场景,该怎么用、何时用要看前面的博客了解原理。

参考博客:https://blog.csdn.net/yy756127197/article/details/103895810

自动提交偏移量

    // 自动提交偏移量
        // 如果设置成true,偏移量由auto.commit.interval.ms控制自动提交的频率
        // 如果设置成false,不需要定时的提交offset,可以自己控制offset,当消息认为已消费过了,这个时候再去提交它们的偏移量。
        // 这个很有用的,当消费的消息结合了一些处理逻辑,这个消息就不应该认为是已经消费的,直到它完成了整个处理。
        configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        // 自动提交的频率
        configProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

手动提交偏移量

主要步骤: 1.消费者配置 configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, “false”); 2.消费者配置ack模式factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); 3.消费者手动提交 consumer.commitSync();

ConsumerConfig

@Configuration
@EnableKafka
public class ManualConsumerConfig {
    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${kafka.topic.manual}")
    private String topic;

    @Bean
    public KafkaListenerContainerFactory<?> manualKafkaListenerContainerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "manual-group");
        // 手动提交
        configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(configProps));
        // ack模式,详细见下文注释
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

        return factory;
    }

    /**
     * AckMode针对ENABLE_AUTO_COMMIT_CONFIG=false时生效,有以下几种:
     *
     * RECORD
     * 每处理一条commit一次
     *
     * BATCH(默认)
     * 每次poll的时候批量提交一次,频率取决于每次poll的调用频率
     *
     * TIME
     * 每次间隔ackTime的时间去commit(跟auto commit interval有什么区别呢?)
     *
     * COUNT
     * 累积达到ackCount次的ack去commit
     *
     * COUNT_TIME
     * ackTime或ackCount哪个条件先满足,就commit
     *
     * MANUAL
     * listener负责ack,但是背后也是批量上去
     *
     * MANUAL_IMMEDIATE
     * listner负责ack,每调用一次,就立即commit
     *
     */

}

Consumer

@Component
@Slf4j
public class ManualConsumer {

    @KafkaListener(topics = "${kafka.topic.manual}", containerFactory = "manualKafkaListenerContainerFactory")
    public void receive(@Payload String message,
                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                        Consumer consumer,
                        Acknowledgment ack) {
        System.out.println(String.format("From partition %d : %s", partition, message));
        // 同步提交
        consumer.commitSync();

        // ack这种方式提交也可以
        // ack.acknowledge();
    }

    /**
     * commitSync和commitAsync组合使用
     * <p>
     * 手工提交异步 consumer.commitAsync();
     * 手工同步提交 consumer.commitSync()
     * <p>
     * commitSync()方法提交最后一个偏移量。在成功提交或碰到无怯恢复的错误之前,
     * commitSync()会一直重试,但是commitAsync()不会。
     * <p>
     * 一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大问题,因为如果提交失败是因为临时问题导致的,
     * 那么后续的提交总会有成功的。但如果这是发生在关闭消费者或再均衡前的最后一次提交,就要确保能够提交成功。
     * 因此,在消费者关闭前一般会组合使用commitAsync()和commitSync()。
     */
//    @KafkaListener(topics = "${kafka.topic.manual}", containerFactory = "manualKafkaListenerContainerFactory")
    public void manual(@Payload String message,
                       @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                       @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                       Consumer consumer,
                       Acknowledgment ack) {
        try {
            System.out.println(String.format("From partition %d : %s", partition, message));
            // 同步提交
            consumer.commitSync();
        } catch (Exception e) {
            System.out.println("commit failed");
        } finally {
            try {
                consumer.commitSync();
            } finally {
                consumer.close();
            }
        }

    }


    /**
     * 手动提交,指定偏移量
     *
     * @param record
     * @param consumer
     */
//    @KafkaListener(topics = "${kafka.topic.manual}", containerFactory = "manualKafkaListenerContainerFactory")
    public void offset(ConsumerRecord record, Consumer consumer) {
        System.out.println(String.format("From partition %d : %s", record.partition(), record.value()));

        Map<TopicPartition, OffsetAndMetadata> currentOffset = new HashMap<>();
        currentOffset.put(new TopicPartition(record.topic(), record.partition()),
                new OffsetAndMetadata(record.offset() + 1));
        consumer.commitSync(currentOffset);
    }
    
}
点赞
收藏
评论区
推荐文章
作为一名程序员我不忘初心,复习指南
01kafka入门1.1什么是kafka 1.2kafka中的基本概念  1.2.1消息和批次  1.2.2主题和分区  1.2.3生产者和消费者、偏移量、消费者群组  1.2.4Broker和集群  1.2.5保留消息02为什么选择kafka2.1优点 2.2常见场景  2.2.1活动跟踪  2.2.2传递
Wesley13 Wesley13
3年前
Dubbo与Zookeeper、SpringMVC整合和使用(入门级)
!(https://static.oschina.net/uploads/space/2017/0503/130638_HzVw_1444646.png)介绍就不过多的说明。可以参考http://blog.csdn.net/congcong68/article/details/41113239博客里面写的相关介绍。后续会补充完善SpringMVC
Stella981 Stella981
3年前
Mac002
MacGit安装注意:在安装Git前,可先安装homebrew,应用brew命令安装Git即可。一。Mac安装homebrew参考博客:https://blog.csdn.net/yuexiaxiaoxi27172319/article/details/51279369Homebr
Stella981 Stella981
3年前
Kafka监控工具kafka
KafkaMonitor为Kafka的可视化管理与监控工具,为Kafka的稳定运维提供高效、可靠、稳定的保障,这里主要简单介绍KafkaMonitor的相关功能与页面的介绍;  KafkaMonitorv0.1主要功能有:Kafka基本信息仪表盘、broker列表、topic列表、当前消费者列表、Topic添加删除、Topic数据
Stella981 Stella981
3年前
Kafka入门(2):消费与位移
摘要在这篇文章中,我将从消息在Kafka中的物理存储方式讲起,介绍分区日志段日志的各个层次。然后我将接着上一篇文章的内容,把消费者的内容展开讲一讲,区分消费者与消费者组,以及这么设计有什么用。根据消费者的消费可能引发的问题,我将介绍Kafka中的位移主题,以及消费者要怎么提交位移到这个位移主题中。最后,我将聊一聊消费者Rebalan
Stella981 Stella981
3年前
Kafka消费者 之 指定位移消费
!(https://oscimg.oschina.net/oscnet/efac2f4e1ab5a626cba566851c0719af782.gif)每一个成功人士的背后,必定曾经做出过勇敢而又孤独的决定。放弃不难,但坚持很酷~由于消费者模块的知识涉及太多,所以决定先按模块来整理知识,最后再进行知识模块汇总。今天学习一下消费者如何指定位
Wesley13 Wesley13
3年前
ActiveMQ学习(六)
_消费者集群负载均衡_!(https://oscimg.oschina.net/oscnet/c6f4857e2e675380e23ec1f33dc45e08711.jpg)broker服务器集群已经做好了,现在来看怎么做消费者集群。就是一个broker端有很多的消费者,在这一个端的消费者之间是集群。消费者端可以用多线程来
Stella981 Stella981
3年前
Spring Kafka中关于Kafka的配置参数
consumer的配置参数(开始)如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。spring.kafka.consumer.autocommitinterval;当Kafka中
Stella981 Stella981
3年前
Kafka在哪些场景下会造成重复消费或消息丢失?
kafka消费者在消费的时候对于位移提交的具体时机的把握也很有讲究,有可能会造成重复消费和消息丢失的现象。!(https://oscimg.oschina.net/oscnet/3566bbd82538478396a25dd6e023974e.png"image.png")参考上图,当前一次poll()操作所拉取的消
Wesley13 Wesley13
3年前
Java多线程导致的的一个事物性问题
业务场景我们现在有一个类似于文件上传的功能,各个子站点接受业务,业务上传文件,各个子站点的文件需要提交到总站点保存,文件是按批次提交到总站点的,也就是说,一个批次下面约有几百个文件。      考虑到白天提交这么多文件会影响到子站点其他系统带宽,我们将分站点的文件提交到总站点这个操作过程独立出来,放到晚上来做,具体时间是晚上7:00到早上7:00。