XML及属性配置
1.业务处理(kiafka.worker.xml)
<?xml version="1.0" encoding="UTF-8"?>
<root>
<topics>
<!--name:要消费的topic;-->
<!--worker:执行消费逻辑的worker,配置值为执行的类全路径-->
<!--consumerThreads:队列获取线程的数量,建议和broker数量一致;-->
<!--workerThreads:队列执行逻辑线程的数量,建议按消费队列量修改-->
<topic topic="TEST_TOPIC" worker="com.zzj.worker.ServiceWorker" consumerThreads="3" workerThreads="20"/>
</topics>
</root>
2.消费者连接属性kafka.consumer.properties
zookeeper.connect=your broker1:2181,your broker2:2181,your broker3:2181
group.id=sddzj
zookeeper.session.timeout.ms=1000
zookeeper.sync.time.ms=500
auto.commit.interval.ms=1000
Jar包结构
1.自定义消息处理父类
public abstract class Worker<T> {
private String topic;//订阅主题public abstract void execute(T message);//业务处理接口
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
}
2.业务处理类(继承消息处理父类)
public class ServiceWorker extends Worker<T> {
@Override
public void execute(T message) {
//业务处理,即取到消息中的内容,根据消息内容做对应处理
}
}
3.消费者接收消息
public class KafkaConsumer implements Runnable {
private KafkaStream stream;
private int consumerThreadNumber;
private Worker worker;
private ExecutorService workerExecute;
public KafkaConsumer(KafkaStream stream, int consumerThreadNumber, Worker worker, int workerThreadNum) {
this.consumerThreadNumber = consumerThreadNumber;
this.stream = stream;
this.worker = worker;
this.workerExecute = Executors.newFixedThreadPool(workerThreadNum);
}
public void run() {
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
while (it.hasNext()) {
String message = new String(it.next().message());
workerExecute.submit(new Runnable() {
public void run() {
try {
worker.execute(message);
} catch (Exception ex) {
}
}
});
}
}
}
4.创建topic消费线程池
private static void startWorker(HierarchicalConfiguration topicConfig, Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap) {
String topic = topicConfig.getString("[@topic]");
String workerCls = topicConfig.getString("[@worker]");
Integer consumerThreadNum = topicConfig.getInt("[@consumerThreads]");
Integer workerThreadNum = topicConfig.getInt("[@workerThreads]");
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
//创建每个topic的消费线程池
ExecutorService consumerExecutor = Executors.newFixedThreadPool(consumerThreadNum);
//获取消费的worker
Worker worker = null;
try {
Class<Worker> cls = (Class<Worker>)Class.forName(workerCls);
if (cls != null){
worker = cls.newInstance();
}
} catch (Exception e) {
}
if (worker == null ) return;
worker.setTopic(topic);
for (final KafkaStream stream : streams) {
consumerExecutor.submit(new KafkaConsumer(stream, consumerThreadNum, worker, workerThreadNum));
}
}