spring boot使用Java并行流发送kafka消息报错

javalover123
• 阅读 420

一、背景

  • 公司业务分 2个Kafka,我们组一个,其他组公用一个
  • 我们组有2个业务在 Java并行流中发消息到 其他组的Kafka,一个是 批量管理接口(app接口公用底层方法,不是批量的,没有用 并行流),另一个是 消费我们组Kafka消息然后发送。
  • 使用 spring-boot-maven-plugin 打包,发布到生产环境以后,偶尔会接到 发送消息到其他组Kafka报错告警,Invalid value org.apache.kafka.common.serialization.StringSerializer for configuration key.serializer: Class org.apache.kafka.common.serialization.StringSerializer could not be found.

二、临时方案

  1. 查了 测试、开发 环境 最近1个月的日志,没有出现过这个错误
  2. 网上搜索的结果,应该是 类加载器的问题
  3. 本地 IDEA开发工具 启动程序、打包jar运行,访问 app接口,都没有重现
  4. 生产环境第一次以后就正常,并且没有重现的情况下,不敢修改上线,只能先加异常处理

三、原因分析

  • 类加载器不一样
  • 使用 spring-boot-maven-plugin 打包出来是 fat jar,其中 BOOT-INF/lib/ 存放依赖jar,BOOT-INF/classes/ 存放项目的 classes,使用spring自定义的 ClassLoader 加载
  • 业务处理使用了parallelStream包括发kafka消息,底层使用ForkJoin线程池,因为是JDK的类,使用BootClassLoader加载,BootClassLoader 加载不到 spring自定义目录 BOOT-INF的类

四、最终方案

  1. 方案一:不使用 并行 接口这边批量不会特别大,并且是 操作Redis,就改回 普通stream

  2. 方案二:自定义线程池 消费Kafka那边,原来加 parallelStream() 就是因为 每条消息要做 几个业务处理,串行的话,性能不够(Kafka分区数公司不允许加太多了,现在9个)

注意:建议使用 自定义ForkJoinPool + submit() + join()。不能用 execute(),因为它是 异步执行的,也就是说,这条消息 可能和 后面的消息同时处理,产生并发问题。

// 自定义ForkJoinPool,默认的使用 BootClassLoader加载,有问题
ForkJoinPool bizPool = new ForkJoinPool(6, new ForkJoinWorkerThreadFactory() {
    @Override
    public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
        return new ForkJoinWorkerThread(pool) {
        };
    }
}, null, false);
for (String msg : msgs) {
    bizPool.submit(() -> list.parallelStream().forEach(item -> {
        // do something, then send to kafka
    })).join();
}

本文首发于 https://www.890808.xyz/spring-boot-kafka-send-error-with-fork-join/ ,其他平台需要审核更新慢一些。

spring boot使用Java并行流发送kafka消息报错

五、参考链接

kafka消费者报错:Class org.apache.kafka.common.serialization.StringDeserializer could not be found._Jaming R的博客-CSDN博客

Kafka Producer - org.apache.kafka.common.serialization.StringSerializer could not be found - Stack Overflow

Fix StringSerializer could not be found when it not in ContextClassLoader by eshizhan · Pull Request #10938 · apache/kafka (github.com)

点赞
收藏
评论区
推荐文章
Stella981 Stella981
3年前
Kafka概述及安装部署
一、Kafka概述1.Kafka是一个分布式流媒体平台,它有三个关键功能:(1)发布和订阅记录流,类似于消息队列或企业消息传递系统;(2)以容错的持久方式存储记录流;(3)记录发送时处理流。2.Kafka通常应用的两大类应用(1)构建在系统或应用程序之间的可靠获取数据的实时流数据管道;(2)构建转换或响应数据流的实施
Stella981 Stella981
3年前
Kafka、RabbitMQ、RocketMQ等消息中间件的对比 —— 消息发送性能和区别
Kafka、RabbitMQ、RocketMQ等消息中间件的对比——消息发送性能和区别那么,消息中间件性能究竟哪家强?带着这个疑问,我们中间件测试组对常见的三类消息产品(Kafka、RabbitMQ、RocketMQ)做了
Stella981 Stella981
3年前
Kafka 简介
Kafka简介_Kafka是分布式流平台。_一个流平台有3个主要特征:发布和订阅消息流,这一点与传统的消息队列相似。以容灾持久化方式的消息流存储。在消息流发生时处理消息流。Kafka通常使用在两大类应用中:在系统或应用之间,构建实时、可靠的消息流管道。构建实时流应用
Stella981 Stella981
3年前
Kafka 中两个重要概念:主题与分区
在Kafka中还有两个特别重要的概念—主题(Topic)与分区(Partition)。Kafka中的消息以主题为单位进行归类,生产者负责将消息发送到特定的主题(发送到Kafka集群中的每一条消息都要指定一个主题),而消费者负责订阅主题并进行消费。这里补充了对Kafka基本概念(https://www.oschina.net/action
Stella981 Stella981
3年前
Kafka中produer发送消息回调超时错误
Kafka版本0.10.1.1producer发送消息后出现如下错误消息: Theproducerhasaerror:Expiring1record(s)fortesttopic0dueto30039mshaspassedsincelastappendTheproducerhasaerror:Expi
Stella981 Stella981
3年前
Kafka 生产者与可靠性保证ACK(2)
生产者消息发送流程消息发送的整体流程,生产端主要由两个线程协调运行。分别是main线程和sender线程(发送线程)。在Kafka(2.6.0版本)源码中,可以看到。源码地址:kafka\clients\src\main\java\org.apache.kafka.clients.producer.KafkaProdu
Wesley13 Wesley13
3年前
CCBPM工作流引擎的消息机制与设计
CCBPM工作流引擎的消息机制与设计关键字:ccflowjflow消息机制流程引擎自动发送短信发送邮件发送消息流程引擎微信连接消息接口关于ccbpm:我们把ccflowjflow两个版本的工作流引擎统称为ccbpm.工作流引擎的消息产生:在发送、抄送、退回、转发、加签、删除等等操作过程中
Stella981 Stella981
3年前
Kafka生产者发送消息的三种方式
Kafka是一种分布式的基于发布/订阅的消息系统,它的高吞吐量、灵活的offset是其它消息系统所没有的。Kafka发送消息主要有三种方式:1.发送并忘记2.同步发送3.异步发送回调函数下面以单节点的方式分别用三种方法发送1w条消息测试:方式一:发送并忘记(不关心消息是否正常到达,对返回结果不做任何判断处理)发送并忘记的方式本质上也
扫盲Kafka?看这一篇就够了! | 京东云技术团队
kafka的使用场景为什么要使用Kafka消息队列?解耦、削峰:传统的方式上游发送数据下游需要实时接收,如果上游在某些业务场景:例如上午十点会流量激增至顶峰,那么下游资源可能会扛不住压力。但如果使用消息队列,就可以将消息暂存在消息管道中,下游可以按照自己的
京东云开发者 京东云开发者
2星期前
扫盲Kafka?看这一篇就够了!
作者:京东科技于添馨kafka的使用场景为什么要使用Kafka消息队列?解耦、削峰:传统的方式上游发送数据下游需要实时接收,如果上游在某些业务场景:例如上午十点会流量激增至顶峰,那么下游资源可能会扛不住压力。但如果使用消息队列,就可以将消息暂存在消息管道中
javalover123
javalover123
Lv1
10年Java经验,多个开源项目贡献者。https://github.com/javalover123
文章
16
粉丝
2
获赞
5