一、背景
在springboot实际项目开发中,kafka可能需要消费多个不同服务器地址的数据,这时懂得如何进行配置就显得非常必要了。
二、配置
1、KafkaConfig.java配置
package com.lantaiyuan.ebus.kafka;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${kafka.bootstrap.servers}")
private String kafkaBootstrapServers;
@Value("${kafka.session.timeout-ms}")
private Integer sessionTimeoutMs;
@Value("${kafka.auto-commit.enable}")
private boolean enableAutoCommit;
@Value("${kafka.auto-commit.interval-ms}")
private Integer autoCommitIntervalMs;
@Value("${kafka.auto-offset.reset}")
private String autoOffsetReset;
@Value("${kafka.group.id}")
private String groupId;
@Value("${kafka.topic.city-code}")
private String topicCityCodeStr;
/**
* 以下是新增配置,获取司机上下班打卡信息(注入新增的kafka服务消费地址)
*/
@Value("${kafka.bootstrap.driver-servers}")
private String kafkaBootstrapDriverServers;
@Bean
@Primary//重要!!!指定该ContainerFactory为主要的容器工厂,kafka消费者默认关联该容器
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
/**
* 以下为司机打卡kafka配置,即在原有kafka消费配置基础上重新再复制一份(共3个方法,记得同步更改方法名)
*/
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactoryForDriverSchedule() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactoryForDriverSchedule());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactoryForDriverSchedule() {
return new DefaultKafkaConsumerFactory<>(consumerConfigsForDriverSchedule());
}
/**
*
* 司机打卡信息kafka消费者配置
*/
@Bean
public Map<String, Object> consumerConfigsForDriverSchedule() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapDriverServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
@Bean(name="topicCityMap")
public Map<String, String> topicCityMap() {
Map<String, String> map = new HashMap<>();
String[] topicCityArray = topicCityCodeStr.split("\\|");
for (String topicCity : topicCityArray) {
String[] array = topicCity.split(":");
map.put(array[0], array[1]);
}
return map;
}
}
2、KafkaConsumer.java更改说明
package com.lantaiyuan.ebus.kafka;
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import javax.annotation.Resource;
import com.lantaiyuan.ebus.dao.BaseDriverScheduleMapper;
import com.lantaiyuan.ebus.model.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.lantaiyuan.ebus.constants.KafkaConsts;
import com.lantaiyuan.ebus.service.BaseRouteServiceI;
import com.lantaiyuan.ebus.service.BaseStationServiceI;
import com.lantaiyuan.ebus.service.JpushServiceI;
import com.lantaiyuan.ebus.service.MyTrailServiceI;
import com.lantaiyuan.ebus.service.NoticeHistoryServiceI;
import com.lantaiyuan.ebus.service.OnBusInfoServiceI;
import com.lantaiyuan.ebus.service.RelRouteBusServiceI;
import com.lantaiyuan.ebus.service.TaskTimerServiceI;
import com.lantaiyuan.ebus.util.Tools;
import com.lantaiyuan.ebus.virtual.card.model.VirtualCardSwipeHistory;
import com.lantaiyuan.ebus.virtual.card.model.enummodel.VirtualCardRecordHeaderTypeEnum;
import com.lantaiyuan.ebus.virtual.card.service.VirtualCardSwipeHistoryServiceI;
@Component
public class KafkaConsumer {
//.........................
//containerFactory默认关联kafkaListenerContainerFactory容器工厂
@KafkaListener(topics = "${kafka.topic.od-topic}")
public void odConsumer(ConsumerRecord<Integer, String> msg) {
String record = msg.value();
JSONObject jsonObject = JSONObject.parseObject(record);
if (!StringUtils.isEmpty(jsonObject.getString("startStationId"))) { // O点
ProduceOriginPoint originPoint = JSONObject.parseObject(record, ProduceOriginPoint.class);
myTrailService.insertOriginPoint(originPoint);
} else { // D点
ProduceDestPoint destPoint = JSONObject.parseObject(record, ProduceDestPoint.class);
myTrailService.updateDestPoint(destPoint);
}
}
//.........................
/***
*
* <p>Title: </p>
* <p>Description: 消费司机考勤消息</p>
*/
//containerFactory手动关联到kafkaListenerContainerFactoryForDriverSchedule容器工厂
@KafkaListener(topics = "${kafka.topic.driverSheduleTopic}", containerFactory = "kafkaListenerContainerFactoryForDriverSchedule")
public void driverSheduleConsumer(ConsumerRecord<Integer, String> msg) {
String record = msg.value();
JSONObject jsonObject = JSONObject.parseObject(record);
BaseDriverSchedule baseDriverSchedule = new BaseDriverSchedule();
baseDriverSchedule.setCityCode(jsonObject.getString("cityCode"));
baseDriverSchedule.setDriverName(jsonObject.getString("driverName"));
baseDriverSchedule.setOccurTime(jsonObject.getDate("occurTime"));
//jsonObject的typeId为int类型,model为String类型,但是可以jsonObject.getString()进行自动类型转换
baseDriverSchedule.setOnboardId(jsonObject.getString("onBoardId"));
//jsonObject的typeId为int类型,model为byte类型,但是可以jsonObject.getByte()进行自动类型转换
baseDriverSchedule.setTypeId(jsonObject.getByte("typeId"));
baseDriverSchedule.setWorkId(jsonObject.getString("workerId"));
this.baseDriverScheduleMapper.insertSelective(baseDriverSchedule);
}
}
三、附KafkaListener注解源码
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.kafka.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Repeatable;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.messaging.handler.annotation.MessageMapping;
/**
* Annotation that marks a method to be the target of a Kafka message listener on the
* specified topics.
*
* The {@link #containerFactory()} identifies the
* {@link org.springframework.kafka.config.KafkaListenerContainerFactory
* KafkaListenerContainerFactory} to use to build the Kafka listener container. If not
* set, a <em>default</em> container factory is assumed to be available with a bean name
* of {@code kafkaListenerContainerFactory} unless an explicit default has been provided
* through configuration.
*
* <p>
* Processing of {@code @KafkaListener} annotations is performed by registering a
* {@link KafkaListenerAnnotationBeanPostProcessor}. This can be done manually or, more
* conveniently, through {@link EnableKafka} annotation.
*
* <p>
* Annotated methods are allowed to have flexible signatures similar to what
* {@link MessageMapping} provides, that is
* <ul>
* <li>{@link org.apache.kafka.clients.consumer.ConsumerRecord} to access to the raw Kafka
* message</li>
* <li>{@link org.springframework.kafka.support.Acknowledgment} to manually ack</li>
* <li>{@link org.springframework.messaging.handler.annotation.Payload @Payload}-annotated
* method arguments including the support of validation</li>
* <li>{@link org.springframework.messaging.handler.annotation.Header @Header}-annotated
* method arguments to extract a specific header value, defined by
* {@link org.springframework.kafka.support.KafkaHeaders KafkaHeaders}</li>
* <li>{@link org.springframework.messaging.handler.annotation.Headers @Headers}-annotated
* argument that must also be assignable to {@link java.util.Map} for getting access to
* all headers.</li>
* <li>{@link org.springframework.messaging.MessageHeaders MessageHeaders} arguments for
* getting access to all headers.</li>
* <li>{@link org.springframework.messaging.support.MessageHeaderAccessor
* MessageHeaderAccessor} for convenient access to all method arguments.</li>
* </ul>
*
* <p>When defined at the method level, a listener container is created for each method.
* The {@link org.springframework.kafka.listener.MessageListener} is a
* {@link org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter},
* configured with a {@link org.springframework.kafka.config.MethodKafkaListenerEndpoint}.
*
* <p>When defined at the class level, a single message listener container is used to
* service all methods annotated with {@code @KafkaHandler}. Method signatures of such
* annotated methods must not cause any ambiguity such that a single method can be
* resolved for a particular inbound message. The
* {@link org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter} is
* configured with a
* {@link org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint}.
*
* @author Gary Russell
*
* @see EnableKafka
* @see KafkaListenerAnnotationBeanPostProcessor
* @see KafkaListeners
*/
@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@MessageMapping
@Documented
@Repeatable(KafkaListeners.class)
public @interface KafkaListener {
/**
* The unique identifier of the container managing for this endpoint.
* <p>If none is specified an auto-generated one is provided.
* @return the {@code id} for the container managing for this endpoint.
* @see org.springframework.kafka.config.KafkaListenerEndpointRegistry#getListenerContainer(String)
*/
String id() default "";
/**
* The bean name of the {@link org.springframework.kafka.config.KafkaListenerContainerFactory}
* to use to create the message listener container responsible to serve this endpoint.
* <p>If not specified, the default container factory is used, if any.
* @return the container factory bean name.
*/
String containerFactory() default "";
/**
* The topics for this listener.
* The entries can be 'topic name', 'property-placeholder keys' or 'expressions'.
* Expression must be resolved to the topic name.
* Mutually exclusive with {@link #topicPattern()} and {@link #topicPartitions()}.
* @return the topic names or expressions (SpEL) to listen to.
*/
String[] topics() default {};
/**
* The topic pattern for this listener.
* The entries can be 'topic name', 'property-placeholder keys' or 'expressions'.
* Expression must be resolved to the topic pattern.
* Mutually exclusive with {@link #topics()} and {@link #topicPartitions()}.
* @return the topic pattern or expression (SpEL).
*/
String topicPattern() default "";
/**
* The topicPartitions for this listener.
* Mutually exclusive with {@link #topicPattern()} and {@link #topics()}.
* @return the topic names or expressions (SpEL) to listen to.
*/
TopicPartition[] topicPartitions() default {};
/**
* If provided, the listener container for this listener will be added to a bean
* with this value as its name, of type {@code Collection<MessageListenerContainer>}.
* This allows, for example, iteration over the collection to start/stop a subset
* of containers.
* @return the bean name for the group.
*/
String group() default "";
}