1、加入POM
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.6.4</version>
</dependency>
2、配置
# kafka configuration
spring.kafka.bootstrap-servers=110.121.233.203:9092
# 大于0,生产者失败重试
spring.kafka.producer.retries=1
# 每次批发消息量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
# 指定消息key和消息body的编解码
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.group-id=group_user
3、Application
@EnableKafka
@SpringBootApplication
public class KafkaApplication {
public static void main(String[] args) {
Main.run(KafkaApplication.class, args);
}
}
4、配置类
@Configuration
public class KafkaConfig {
@Bean
public NewTopic topic() {
return new NewTopic(Constants.TOPIC_USER, 1, (short) 1);
}
}
5、其它测试代码
public class Constants {
public static final String TOPIC_USER = "topic_user";
public static final String GROUP_USER = "group_user";
}
@Component
public class UserProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public UserProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public KafkaTemplate<String, String> getKafkaTemplate() {
return kafkaTemplate;
}
}
消息生产者和消费者:
@Api(value = "用户管理")
@Log4j2
@RestController
@RequestMapping("/user")
public class UserController {
private UserProducer userProducer;
private UserService userService;
public UserController(UserProducer userProducer, UserService userService) {
this.userProducer = userProducer;
this.userService = userService;
}
@ApiOperation(value = "保存或更新用户", notes = "userName是必输入项")
@ApiImplicitParams(value = {
@ApiImplicitParam(name = "id", value = "用户主键", dataTypeClass = Long.class, required = false),
@ApiImplicitParam(name = "userName", value = "用户姓名", dataTypeClass = String.class, required = true)
})
@PostMapping("/saveOrUpdate")
public ResponseEntity<Result<User>> saveOrUpdate(
@RequestParam(required = false) Long id,
@RequestParam String userName
) {
User user = new User(id, userName, new Date());
userService.saveOrUpdate(user);
userProducer.getKafkaTemplate().send(Constants.TOPIC_USER, JSON.toJSONString(user).toString());
return ResultUtils.ok(user);
}
@ApiOperation(value = "查询所有用户", notes = "")
@GetMapping("/queryAll")
public ResponseEntity<Result<List<User>>> queryAll() {
List<User> userList = userService.list();
return ResultUtils.ok(userList, "查询成功");
}
}
@ApiModel(value = "用户实体")
@Data
@NoArgsConstructor
@AllArgsConstructor
@TableName("t_base_user")
public class User implements Serializable {
@ApiModelProperty(value = "主键")
@TableId(value = "id", type = IdType.NONE)
private Long id;
@ApiModelProperty(value = "姓名")
private String userName;
@ApiModelProperty(value = "创建时间")
private Date createTime;
}
@Log4j2
@Component
public class UserConsumer {
@KafkaListener(topics = {Constants.TOPIC_USER}, groupId = Constants.GROUP_USER)
public void onMessage(String message) {
log.error("[onMessage start]");
log.error(message);
log.error("[onMessage end]");
}
}