Spring Boot 2.x 快速集成Kafka

Stella981
• 阅读 735

1 Kafka

Kafka是一个开源分布式的流处理平台,一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。Kafka由Scala和Java编写,2012年成为Apache基金会下顶级项目。

2 Kafka优点

  • 低延迟:Kafka支持低延迟消息传递,速度极快,能达到200w写/秒
  • 高性能:Kafka对于消息的分布,订阅都有高吞吐量。即使存储了TB级的信息,依然能够保证稳定的性能
  • 可靠性:Kafka是分布式,分区,复制和容错的,保证零停机和零数据丢失
  • 可扩展:用户可以从但个代理Broker开始作POC,然后慢慢扩展到由三个Broker组成的小型开发集群,接着扩展到数十个甚至数百个Broker集群进入生产阶段,可以在集群联机时进行扩展,而不会影响整个系统的可用性
  • 多个生产者:无论这些客户使用相同Topic还是多个Topic,Kafka都能无缝处理多个生产者,使得系统可以非常容易聚合来自许多前端系统的数据并使其保持一致
  • 多个消费者:Kafka具有多个消费者设计,可以读取任何但个消息流而不会相互干扰。多个Kafka消费者可以组成一个消费组进行操作并共享消息流,从而确保每一条消息只被整个消费组处理一次
  • 基于磁盘的保留:Kafka使用分布式提交日志,消息能够快速持久化到磁盘上。消息持久化意味着如果消费者落后,无论是由于处理速度缓慢还是突然的消息涌入都不会有丢失数据的危险,也意味着消费者可以被停止。消息将保留在Kafka中,允许消费者重新启动并且从中断处获取处理信息而不会丢失数据

3 Kafka相关术语

  • Broker:Kafka集群包含一个或多个服务器,这种服务器称为Broker
  • Topic:每条发布到Kafka的消息都有一个类别,这个类别称为Topic。物理上不同Topic的消息分开存储,逻辑上Topic的消息虽然保存在一个或多个Broker上,但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存放于何处
  • Partition:每个Topic包含一个或多个Partition
  • Producer:生产者,负责发布消息到Broker
  • Consumer:消费者,向Broker读取消息的客户端
  • Consumer Group:每个Consumer属于一个特定的Consumer Group,可以为每个Consumer指定Group Name,否则属于默认Group

4 动手干活

4.1 环境

  • Spring Boot 2.3.1
  • IDEA 2020.1.1
  • OpenJDK 11.0.7
  • Kafka 2.5.0
  • Kotlin 1.3.72

4.2 下载Kafka

官网戳这里。

下载并解压(注意需要Kafka与Spring Boot版本对应,可以参考这里):

tar -xvf kafka_2.12-2.5.0.tgz
cd kafka_2.12-2.5.0

接着启动ZooKeeper与Kafka:

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

Kafka需要用到ZooKeeper,需要在启动Kafka之前启动ZooKeeper(ZooKeeper是一个开源的分布式应用程序协调服务,是Hadoop的组件,主要用于解决分布式应用中的一些数据管理问题)。

Kafka默认使用9092端口,部署在服务器上需要注意防火墙以及安全组的处理。

4.3 新建工程

考虑到Spring Boot在2.3.0M1中(截至本文写作日期2020.07.14Spring Boot已更新到2.4.0M1)首次采用Gradle而不是Maven来构建项目,换句话说日后Spring Boot的构建工具将从Maven迁移到Gradle,Spring Boot团队给出的主要原因是可以减少项目构建所花费的时间,详情可以戳这里瞧瞧。

另外由于另一个基于JVM的语言Kotlin的日渐崛起,后端开始逐渐有人采用Kotlin(尽管不多,不过语法糖真的香,JetBrains家的语言配合IDE,爽得飞起),因此本示例项目将采用两种方式搭建:

  • Java+Maven
  • Kotlin+Gradle

选择的依赖如下(当然您喜欢的话可以在pom.xml或者build.gradle.kts里面加,对于Kotlin不需要Lombok):

Spring Boot 2.x 快速集成Kafka

4.4 项目结构

Java版:

Spring Boot 2.x 快速集成Kafka

Kotlin版:

Spring Boot 2.x 快速集成Kafka

  • serialize:序列化/反序列化实体类
  • Constant.java/Constant.kt:常量类
  • Consumer.java/Consumer.kt:消费者类
  • Entity.java/Entity.kt:实体类
  • Producer.java/Product.kt:生产者类
  • TestApplicationTests:测试类

4.5 常量类

包含Topic与GroupId,Java版:

public class Constants {
    public static final String TOPIC = "TestTopic";
    public static final String GROUP_ID = "TestGroupId";
}

Kotlin版:

object Constants
{
    const val TOPIC = "TestTopic"
    const val GROUP_ID = "TestGroupId"
}

4.6 实体类

@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class Entity {
    private long id;
    private String name;
    private int num;
}

说一下Lombok的几个注解:

  • @AllArgsConstructor/@NoArgsConstructor:生成所有参数/无参数构造方法
  • @Data:等价于@Setter+@Getter+@RequiredArgsConstrucotr+@ToString+@EqualAndHashCode,自动生成Setter+Getter+toString()+equals()+hashCode(),还有@RequireArgsConstructor为类的每一个final或非空字段生成一个构造方法
  • @Builder:可以通过建造者模式创建对象

Kotlin版:

class Entity {
    var id: Long = 0
    var name: String = ""
    var num: Int = 0

    constructor()

    constructor(id:Long,name:String,num:Int)
    {
        this.id = id
        this.name = name
        this.num = num
    }
}

4.7 生产者

@Component
@Slf4j
//防止出现Field injection not recommended警告,代替了原来的直接在字段上@Autowired
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class Producer {

    private final KafkaTemplate<String, Entity> kafkaTemplate;

    public void send(Entity entity) {
        //发送消息
        //类型一般为String+自定义消息内容,String代表消息Topic,这里消息内容用Entity表示
        ListenableFuture<SendResult<String, Entity>> future =
                kafkaTemplate.send(Constants.TOPIC, entity);
        //回调函数
        future.addCallback(new ListenableFutureCallback<>() {
            @Override
            public void onFailure(Throwable throwable) {
                log.info("Send message failed");
            }

            @Override
            public void onSuccess(SendResult<String, Entity> stringEntitySendResult) {
                log.info("Send message success");
            }
        });
    }
}

这里的send有两个参数,对应于sendResult<>中的参数类型,第一个为消息的Topic,第二个为消息体,一般使用String或者Json。

Kotlin版:

@Component
class Producer
{
    @Autowired
    private var kafkaTemplate:KafkaTemplate<String,Entity> ? = null
    private val log = LoggerFactory.getLogger(this.javaClass)

    fun send(entity: Entity)
    {
        val future = kafkaTemplate!!.send(Constants.TOPIC,entity);
        future.addCallback(object : ListenableFutureCallback<SendResult<String?, Entity?>?>{
            override fun onSuccess(result : SendResult<String?,Entity?>?)
            {
                log.info("Send success");
            }

            override fun onFailure(e:Throwable)
            {
                log.info("Send failed");
            }
        })
    }
}

4.8 消费者

@Component
@Slf4j
public class Consumer {
    @KafkaListener(topics = Constants.TOPIC,groupId = Constants.GROUP_ID)
    public void consume(Entity entity)
    {
        log.info("Consume a entity, id is "+entity.getId());
    }
}

使用@KafkaListener注解,第一个参数表示需要消费的消息的Topic,可以是String [],第二个是消费者组的id。生产者的消息Topic必须与消费者的Topic保持一致否则不能消费,这里简单处理打印日志。

Kotlin版:

@Component
class Consumer {
    private val log = LoggerFactory.getLogger(this.javaClass)

    @KafkaListener(topics = [Constants.TOPIC],groupId = Constants.GROUP_ID)
    fun consume(entity: Entity) {
        log.info("Consume a entity, id is "+entity.id.toString())
    }
}

4.9 序列化/反序列化

这里自定义了序列化/反序列化类,序列化/反序列化类需要实现org.apache.kafka.common.serialization.Serializer<T>/Deserializer<T>接口,其中T是想要序列化的类型,这里是Entity。序列化接口反编译如下:

public interface Serializer<T> extends Closeable {
    default void configure(Map<String, ?> configs, boolean isKey) {
    }

    byte[] serialize(String var1, T var2);

    default byte[] serialize(String topic, Headers headers, T data) {
        return this.serialize(topic, data);
    }

    default void close() {
    }
}

反序列化反编译接口如下:

public interface Deserializer<T> extends Closeable {
    default void configure(Map<String, ?> configs, boolean isKey) {
    }

    T deserialize(String var1, byte[] var2);

    default T deserialize(String topic, Headers headers, byte[] data) {
        return this.deserialize(topic, data);
    }

    default void close() {
    }
}

也就是只需要实现其中的serialize/deserialize方法即可。这里序列化/反序列化用到了自带的Jackson:

@Slf4j
public class Serializer implements org.apache.kafka.common.serialization.Serializer<Entity> {
    public byte [] serialize(String topic, Entity entity)
    {
        try {
            return entity == null ? null : new ObjectMapper().writeValueAsBytes(entity);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
            log.error("Can not serialize entity in Serializer");
        }
        return null;
    }
}

反序列化:

@Slf4j
public class Deserializer implements org.apache.kafka.common.serialization.Deserializer<Entity> {
    public Entity deserialize(String topic,byte [] data)
    {
        try {
            return data == null ? null : new ObjectMapper().readValue(data,Entity.class);
        } catch (IOException e) {
            e.printStackTrace();
            log.error("Can not deserialize entity in Deserializer");
        }
        return null;
    }
}

Kotlin版:

class Serializer : org.apache.kafka.common.serialization.Serializer<Entity?>
{
    private val log = LoggerFactory.getLogger(this.javaClass)

    override fun serialize(topic: String?, data: Entity?): ByteArray? {
        try {
            return if (data == null) null else ObjectMapper().writeValueAsBytes(data)
        }
        catch (e:JsonProcessingException)
        {
            e.printStackTrace()
            log.error("Can not serialize entity in Serializer")
        }
        return null
    }
}


class Deserializer : org.apache.kafka.common.serialization.Deserializer<Entity?>
{
    private val log = LoggerFactory.getLogger(this.javaClass)

    override fun deserialize(topic: String?, data: ByteArray?): Entity? {
        try
        {
            return ObjectMapper().readValue(data, Entity::class.java)
        }
        catch (e:IOException)
        {
            e.printStackTrace()
            log.error("Can not deserialize entity in Deserializer")
        }
        return null
    }
}

4.10 配置文件

application.properties

# 地址,本地直接localhost,部署可以使用公网ip
spring.kafka.bootstrap-servers=localhost:9092
# 消费者组id
spring.kafka.consumer.group-id=TestGroupId
spring.kafka.consumer.auto-offset-reset=earliest
# 消费者键反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 消费者值反序列化类
spring.kafka.consumer.value-deserializer=com.test.serialize.Deserializer

# 生产者键序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
# 生产者值序列化类
spring.kafka.producer.value-serializer=com.test.serialize.Serializer

对于auto-offest-rest,该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下怎么处理,有四个取值:

  • earliest:当各分区有已提交的offest时,从提交的offest开始消费,无提交的offest时,从头开始消费
  • latest(默认):当各分区有已提交的offest时,从提交的offest开始消费,无提交的offest时,消费新产生的该分区下的数据
  • none:各分区都存在已提交的offest时,从offest后消费,只要有一个分区不存在已提交的offest,则抛出异常
  • exception:其他情况将抛出异常给消费者

对于序列化/反序列化,String可以使用自带的序列化/反序列化类:

org.apache.kafka.common.serialization.StringSerializer
org.apache.kafka.common.serialization.StringDeserializer

至于Json可以使用:

org.springframework.kafka.support.serializer.JsonSerializer
org.springframework.kafka.support.serializer.JsonDeserializer

其他自定义的请实现org.apache.kafka.common.serialization.Serializer<T>/Deserializer<T>接口。

yml版:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: TestGroupId
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: com.test.serialize.Deserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: com.test.serialize.Serializer

5 测试

5.1 测试类

@SpringBootTest
@Slf4j
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
class TestApplicationTests {

    private final Producer producer;

    @Test
    void contextLoads() {
        Random random = new Random();
        for (int i = 0; i < 1000; i++) {
            long id = i+1;
            String name = UUID.randomUUID().toString();
            int num = random.nextInt();
            producer.send(Entity.builder().id(id).name(name).num(num).build());
        }
    }
}

生产者发送1000条消息。 Kotlin版:

@SpringBootTest
class TestApplicationTests {

    @Autowired
    private val producer:Producer? = null

    @Test
    fun contextLoads() {
        for(i in 0..1000)
        {
            val id = (i + 1).toLong()
            val name = java.util.UUID.randomUUID().toString()
            val num = (0..100000).random()
            producer!!.send(Entity(id,name,num))
        }
    }
}

5.2 测试

控制台输出如下:

Spring Boot 2.x 快速集成Kafka

所有消息被成功发送并且被成功消费。

最后可以去验证一下Kafka的Topic列表,可以看到配置文件中的Topic的值(TestTopic),进入Kafka目录:

bin/kafka-topics.sh --list --zookepper localhost:2181

Spring Boot 2.x 快速集成Kafka

6 源码

7 参考

1、CSDN-Kafka优点 2、简书-Spring Boot 2.x 快速集成整合消息中间件 Kafka 3、简书-springboot 之集成kafka

如果觉得文章好看,欢迎点赞。

同时欢迎关注微信公众号:氷泠之路。

Spring Boot 2.x 快速集成Kafka

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
4个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
10个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这