Springboot集成Kafka

Easter79
• 阅读 607

 Kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性: 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。支持通过Kafka服务器和消费机集群来分区消息。支持Hadoop并行数据加载。

Springboot的基本搭建和配置我在之前的文章已经给出代码示例了,如果还不了解的话可以先按照 SpringMVC配置太多?试试SpringBoot 进行学习哦。 那么如今很火的Springboot与kafka怎么完美的结合呢?多说无宜,放码过来 (talk is cheap,show me your code)!

安装Kafka

因为安装kafka需要zookeeper的支持,所以Windows安装时需要将zookeeper先安装上,然后将kafka安装好就可以了。 下面我给出Mac安装的步骤以及需要注意的点吧,windows的配置除了所在位置不太一样其他几乎没什么不同。

brew install kafka

对,就是这么简单,mac上一个命令就可以搞定了,这个安装过程可能需要等一会儿,应该是和网络状况有关系。安装提示信息可能有错误消息,如"Error: Could not link: /usr/local/share/doc/homebrew" 这个没关系,自动忽略掉了。 最终我们看到下面的样子就成功咯。

==> Summary 🍺/usr/local/Cellar/kafka/1.1.0: 157 files, 47.8MB

安装的配置文件位置如下,根据自己的需要修改端口号什么的就可以了。

安装的zoopeeper和kafka的位置 /usr/local/Cellar/

配置文件 /usr/local/etc/kafka/server.properties /usr/local/etc/kafka/zookeeper.properties

启动zookeeper

  ./bin/zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties &

启动kafka 

./bin/kafka-server-start /usr/local/etc/kafka/server.properties &

为kafka创建Topic,topic 名为test,可以配置成自己想要的名字,回头再代码中配置正确就可以了。

 ./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

代码示例

pom.xml

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.2.RELEASE</version>
        <relativePath/>
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.2</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>RELEASE</version>
        </dependency>

    </dependencies>

application.yml

server:
  servlet:
    context-path: /
  port: 8080
spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    #生产者的配置,大部分我们可以使用默认的,这里列出几个比较重要的属性
    producer:
      #每批次发送消息的数量
      batch-size: 16
      #设置大于0的值将使客户端重新发送任何数据,一旦这些数据发送失败。注意,这些重试与客户端接收到发送错误时的重试没有什么不同。允许重试将潜在的改变数据的顺序,如果这两个消息记录都是发送到同一个partition,则第一个消息失败第二个发送成功,则第二条消息会比第一条消息出现要早。
      retries: 0
      #producer可以用来缓存数据的内存大小。如果数据产生速度大于向broker发送的速度,producer会阻塞或者抛出异常,以“block.on.buffer.full”来表明。这项设置将和producer能够使用的总内存相关,但并不是一个硬性的限制,因为不是producer使用的所有内存都是用于缓存。一些额外的内存会用于压缩(如果引入压缩机制),同样还有一些用于维护请求。
      buffer-memory: 33554432
      #key序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    #消费者的配置
    consumer:
      #Kafka中没有初始偏移或如果当前偏移在服务器上不再存在时,默认区最新 ,有三个选项 【latest, earliest, none】
      auto-offset-reset: latest
      #是否开启自动提交
      enable-auto-commit: true
      #自动提交的时间间隔
      auto-commit-interval: 100
      #key的解码方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #value的解码方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #在/usr/local/etc/kafka/consumer.properties中有配置
      group-id: test-consumer-group

Producer 消息生产者

@Component
public class Producer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    private static Gson gson = new GsonBuilder().create();

    //发送消息方法
    public void send() {
        Message message = new Message();
        message.setId("KFK_"+System.currentTimeMillis());
        message.setMsg(UUID.randomUUID().toString());
        message.setSendTime(new Date());
        kafkaTemplate.send("test", gson.toJson(message));
    }

}


public class Message {

    private String id;

    private String msg;

    private Date sendTime;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }

    public Date getSendTime() {
        return sendTime;
    }

    public void setSendTime(Date sendTime) {
        this.sendTime = sendTime;
    }
}

Consumer 消息消费者

public class Consumer {

    @KafkaListener(topics = {"test"})
    public void listen(ConsumerRecord<?, ?> record){

        Optional<?> kafkaMessage = Optional.ofNullable(record.value());

        if (kafkaMessage.isPresent()) {

            Object message = kafkaMessage.get();
            System.out.println("---->"+record);
            System.out.println("---->"+message);

        }

    }
}

测试接口用例

这里我们用一个接口来测试我们的消息发送会不会被消费者接收。

@RestController
@RequestMapping("/kafka")
public class SendController {

    @Autowired
    private Producer producer;

    @RequestMapping(value = "/send")
    public String send() {
        producer.send();
        return "{\"code\":0}";
    }
}

在Springboot启动类启动后在浏览器访问http://127.0.0.1:8080/kafka/send,我们可以再IDE控制台中看到输出的结果,这时候我们的整合基本上就完成啦。 具体代码可以在SpringBootKafkaDemo@github获取哦。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Stella981 Stella981
3年前
Spring Boot 2.x 快速集成Kafka
1KafkaKafka是一个开源分布式的流处理平台,一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。Kafka由Scala和Java编写,2012年成为Apache基金会下顶级项目。2Kafka优点低延迟:Kafka支持低延迟消息传递,速度极快,能达到200w写/秒
Stella981 Stella981
3年前
FusionInsight大数据开发
Kafka应用开发1.了解Kafka应用开发适用场景2.熟悉Kafka应用开发流程3.熟悉并使用Kafka常用API4.进行Kafka应用开发Kafka的定义Kafka是一个高吞吐、分布式、基于发布订阅的消息系统Kafka有如下几个特点:1.高吞吐量2.消息持久化到磁
Stella981 Stella981
3年前
Kafka实战解惑
一、Kafka简介Kafka是LinkedIn使用Scala开发的一个分布式消息中间件,它以水平扩展能力和高吞吐率著称,被广泛用于日志处理、ETL等应用场景。Kafka具有以下主要特点:\\消息的发布、订阅均具有高吞吐量:\\据统计数字表明,Kafka每秒可以生产约25万消息(50MB),每秒处理55万消息(110MB)。
Stella981 Stella981
3年前
Kafka笔记
第1章Kafka简介1.1kafka起源Kafka是由LinkedIn开发并开源的分布式消息系统,2012年捐赠给Apache基金会,采用Scala语言,运行在JVM中,最新版本1.0.0。1.2kafka设计目标Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下:①以时间复杂度O(1)的方式提供消息持久化能力,即
可莉 可莉
3年前
2020Kafka最新最全面试题!
1、请说明什么是ApacheKafka?ApacheKafka是由Apache开发的一种发布订阅消息系统,它是一个分布式的、分区的和可复制的提交日志服务。2、说说Kafka的使用场景?①异步处理②应用解耦③流量削峰④日志处理⑤消息通讯等。3、使用Kafka有什么优点和缺点?优点:①支持跨数据中心的消息复制;②单
Stella981 Stella981
3年前
2020Kafka最新最全面试题!
1、请说明什么是ApacheKafka?ApacheKafka是由Apache开发的一种发布订阅消息系统,它是一个分布式的、分区的和可复制的提交日志服务。2、说说Kafka的使用场景?①异步处理②应用解耦③流量削峰④日志处理⑤消息通讯等。3、使用Kafka有什么优点和缺点?优点:①支持跨数据中心的消息复制;②单
Wesley13 Wesley13
3年前
MQ之对比
activeMQ:高效、可扩展、稳定安全企业级消息通信rabbitMQ:分布式系统可靠、可扩展、功能丰富,内存式堆积,某些条件下触发换页动作将内存中消息换页到磁盘;支持多租户  不支持重试队列,二次封装延迟队列实现呢  拉模式,不回溯,支持消息追踪  多租户kafka:高吞吐量分布式发布订阅消息系统,可水平扩展,磁盘式堆积,冗余功能
Stella981 Stella981
3年前
Kafka生产者发送消息的三种方式
Kafka是一种分布式的基于发布/订阅的消息系统,它的高吞吐量、灵活的offset是其它消息系统所没有的。Kafka发送消息主要有三种方式:1.发送并忘记2.同步发送3.异步发送回调函数下面以单节点的方式分别用三种方法发送1w条消息测试:方式一:发送并忘记(不关心消息是否正常到达,对返回结果不做任何判断处理)发送并忘记的方式本质上也
Easter79
Easter79
Lv1
今生可爱与温柔,每一样都不能少。
文章
2.8k
粉丝
6
获赞
1.2k