1.maven依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.0.0</version>
</dependency>
2.生产者
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
/**
* Created by zhangpeiran on 2018/9/30.
*/
public class MyProducer {
private static class DemoProducerCallback implements Callback{
public void onCompletion(RecordMetadata recordMetadata,Exception e){
if(e != null)
e.printStackTrace();
else
System.out.print("send message success");
}
}
public static void main(String[] args){
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers","ip1:9092,ip2:9092");
kafkaProps.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer<String,String>(kafkaProps);
ProducerRecord<String,String> record = new ProducerRecord<String, String>("DemoTopic","DemoKey","DemoValue");
//async
producer.send(record,new DemoProducerCallback());
producer.close();
}
}
3.自定义分区
import org.apache.kafka.clients.producer.Partitioner;import org.apache.kafka.common.Cluster;import org.apache.kafka.common.record.InvalidRecordException;import org.apache.kafka.common.utils.Utils;import java.util.Map;/** * Created by zhangpeiran on 2018/10/8. */public class MyPartitioner implements Partitioner { public void configure(Map<String,?> configs){} public void close(){} public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { if((keyBytes == null) || !(key instanceof String)) throw new InvalidRecordException("Message should has a key"); int numPartitions = cluster.partitionsForTopic(topic).size(); return Math.abs(Utils.murmur2(keyBytes)) % numPartitions; }}
需要在Producer中加入配置:
kafkaProps.put("partitioner.class","MyPartitioner");
4.消费者
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;
/**
* Created by zhangpeiran on 2018/10/9.
*/
public class MyConsumer {
public static void main(String[] args){
Properties properties = new Properties();
properties.put("bootstrap.servers","ip1:9092,ip2:9092,ip3:9092");
properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.put("group.id","DemoConsumerGroup");
//默认值为latest,当消费者读取的分区没有偏移量或偏移量无效时,消费者将从最新的记录开始读
//当一个消费group第一次订阅主题时,符合这种情况,在Consumer启动之前,Producer生产的数据不会被读取
//置为earliest,表示从分区起始位置读取消息
properties.put("auto.offset.reset","earliest");
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Collections.singletonList("DemoTopic"));
try {
while (true){
ConsumerRecords<String,String> records = consumer.poll(100);
for(ConsumerRecord<String ,String> record : records){
System.out.print(record.topic() + "," + record.partition() + "," + record.offset() + "," + record.key() + "," + record.value());
}
}
} finally {
consumer.close();
}
}
}