springboot+kafka集成

Easter79
• 阅读 765

kafka概念

  Topic:消息根据Topic进行归类,可以理解为一个队里。
  Producer:消息生产者,就是向kafka broker发消息的客户端。
  Consumer:消息消费者,向kafka broker取消息的客户端。
  broker:每个kafka实例(server),一台kafka服务器就是一个broker,一个集群由多个broker组成,一个broker可以容纳多个topic。
  Zookeeper:依赖集群保存meta信息。

1.安装

启动kafka需要zookeeper,所以需要安装zookeeper和kafka,这里就不介绍安装过程了。

2.引入kafka的依赖

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        
         <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
        </dependency>
    </dependencies>

3.定义消息

public class Message {
    private Long id;    //id

    private String msg; //消息

    private Date sendTime;  //时间戳

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }

    public Date getSendTime() {
        return sendTime;
    }

    public void setSendTime(Date sendTime) {
        this.sendTime = sendTime;
    }
    
}

4.提供者,注意加注解

发送消息时会有一个topic

@Component
public class KafkaSender {
    
    private final org.slf4j.Logger log = LoggerFactory.getLogger(getClass());
    
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    private Gson gson = new GsonBuilder().create();

    //发送消息方法
    public void send() {
        Message message = new Message();
        message.setId(System.currentTimeMillis());
        message.setMsg(UUID.randomUUID().toString());
        message.setSendTime(new Date());
        log.info("+++++++++++++++++++++  message = {}", gson.toJson(message));
        kafkaTemplate.send("zhisheng", gson.toJson(message));
    }
}

5.消费者,注意加注解

需要加一个kafka监听的注解,里面需要指定topic,表示消费哪一种消息。

@Component
public class KafkaReceiver {
    private final org.slf4j.Logger log = LoggerFactory.getLogger(getClass());
    
    @KafkaListener(topics = {"zhisheng"})
    public void listen(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {

            Object message = kafkaMessage.get();

            log.info("----------------- record =" + record);
            log.info("------------------ message =" + message);
        }

    }
}

7.启动类编写

这里调用消息提供方,写入了三条消息

@SpringBootApplication
public class KafkaDemoApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(KafkaDemoApplication.class, args);

        KafkaSender sender = context.getBean(KafkaSender.class);

        for (int i = 0; i < 3; i++) {
            //调用消息发送类中的消息发送方法
            sender.send();

            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

8.配置文件application.properties

#============== kafka ===================
# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=127.0.0.1:9092

#=============== provider  =======================

spring.kafka.producer.retries=0
# 每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432

# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#=============== consumer  =======================
# 指定默认消费者group id
spring.kafka.consumer.group-id=test-consumer-group

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100

# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
3个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
作为一名程序员我不忘初心,复习指南
01kafka入门1.1什么是kafka 1.2kafka中的基本概念  1.2.1消息和批次  1.2.2主题和分区  1.2.3生产者和消费者、偏移量、消费者群组  1.2.4Broker和集群  1.2.5保留消息02为什么选择kafka2.1优点 2.2常见场景  2.2.1活动跟踪  2.2.2传递
Stella981 Stella981
3年前
Kafka安装步骤
基本概念1.Producer:消息生产者,就是向kafkabroker发消息的客户端2.Consumer:消息消费者,向kafkabroker取消息的客户端3.ConsumerGroup(CG):消费者组,由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一
Stella981 Stella981
3年前
Kafka
1kafka是用于构建实时数据管道和流应用程序。具有横向扩展,容错wickedfast(变态快)等优点.kafka名词解释producer:消息和数据生产者,向kakfa的一个topic发布消息的过程叫做producerconsumers:消息和数据消费者,订阅topics并发处理其发布的消息的过程叫做consumertopic特指
Stella981 Stella981
3年前
Kafka 中两个重要概念:主题与分区
在Kafka中还有两个特别重要的概念—主题(Topic)与分区(Partition)。Kafka中的消息以主题为单位进行归类,生产者负责将消息发送到特定的主题(发送到Kafka集群中的每一条消息都要指定一个主题),而消费者负责订阅主题并进行消费。这里补充了对Kafka基本概念(https://www.oschina.net/action
Stella981 Stella981
3年前
Kafka 原理详解
Kafka原理详解1kakfa基础概念说明Broker:消息服务器,就是我们部署的一个kafka服务Partition:消息的水平分区,一个Topic可以有多个分区,这样实现了消息的无限量存储Replica:消息的副本,即备份消息,存储在其他的broker上,当leader挂掉
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
9个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
Easter79
Easter79
Lv1
今生可爱与温柔,每一样都不能少。
文章
2.8k
粉丝
5
获赞
1.2k