1、引入依赖jar包
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2、配置kafka信息
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: group1
listener:
missing-topics-fatal: false
启动报错,需要配置missing-topics-fatal为false
org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is java.lang.IllegalStateException: Topic(s) [topic1] is/are not present and missingTopicsFatal is true
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185)
at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53)
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360)
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158)
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122)
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:894)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:162)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:553)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:141)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:747)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:315)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215)
at com.zhoulp.SyncMessageWebApplication.main(SyncMessageWebApplication.java:24)
Caused by: java.lang.IllegalStateException: Topic(s) [topic1] is/are not present and missingTopicsFatal is true
at org.springframework.kafka.listener.AbstractMessageListenerContainer.checkTopics(AbstractMessageListenerContainer.java:383)
at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:144)
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:340)
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:312)
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:257)
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182)
... 14 common frames omitted
3、实现生产者
package com.zhoulp.producer;
import javax.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
/**
*
* @author zhoulp
* @date 2020-08-03
*
*/
@Component("kafkaProducer")
public class KafkaProducer {
private static Logger log = LoggerFactory.getLogger(KafkaProducer.class);
@Inject
private KafkaTemplate<String, String> template;
public void sendMessage(String topic, String data) {
log.info("send: topic = {}, data = {}", topic, data);
template.send(topic, data);
}
}
4、实现消费者
package com.zhoulp.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
*
* @author zhoulp
* @date 2020-08-03
*
*/
@Component("kafkaConsumer")
public class KafkaConsumer {
private static Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
@KafkaListener(topics = "topic1")
public void listenTopic1(ConsumerRecord<String, String> consumerRecord) {
log.info("listenTopic1");
log.info(consumerRecord.toString());
log.info(consumerRecord.topic());
log.info(consumerRecord.value());
}
}