Kafka版本0.10.1.1
producer发送消息后出现如下错误消息:
The producer has a error:Expiring 1 record(s) for testtopic-0 due to 30039 ms has passed since last append
The producer has a error:Expiring 1 record(s) for testtopic-0 due to 30039 ms has passed since last append
修改timeout.ms,batch.size等均无效,仍然出错。
删掉topic及数据,重新用命令行创建topic后,问题解决。
原因:producer发消息到具体topic的时候,如果没有该topic,kafka会自动创建topic,但可能会出现上面的错误。所以topic一定要命令行创建,然后再使用。
参考如下文章:
实践代码采用kafka-clients V0.10.0.0 编写(http://m.blog.csdn.net/article/details?id=51577117)
一、编写producer
第一步:使用./kafka-topics.sh 命令创建topic及partitions 分区数
./kafka-topics.sh --create--zookepper "172.16.49.173:2181" --topic "producer_test" --partitions 10 replication-factor 3
第二步:实现org.apache.kafka.clients.producer.Partitioner
分区接口,以实现自定义的消息分区
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MyPartition implements Partitioner {
private static Logger LOG = LoggerFactory.getLogger(MyPartition.class);
public MyPartition() {
// TODO Auto-generated constructor stub
}
@Override
public void configure(Map<String, ?> configs) {
// TODO Auto-generated method stub
}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// TODO Auto-generated method stub
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
int partitionNum = 0;
try {
partitionNum = Integer.parseInt((String) key);
} catch (Exception e) {
partitionNum = key.hashCode() ;
}
LOG.info("the message sendTo topic:"+ topic+" and the partitionNum:"+ partitionNum);
return Math.abs(partitionNum % numPartitions);
}
@Override
public void close() {
// TODO Auto-generated method stub
}
}
第三步:编写 producer
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PartitionTest {
private static Logger LOG = LoggerFactory.getLogger(PartitionTest.class);
public static void main(String[] args) {
// TODO Auto-generated method stub
Properties props = new Properties();
props.put("bootstrap.servers", "172.16.49.173:9092;172.16.49.173:9093");
props.put("retries", 0);
// props.put("batch.size", 16384);
props.put("linger.ms", 1);
// props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class", "com.goodix.kafka.MyPartition");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
ProducerRecord<String, String> record = new ProducerRecord<String, String>("producer_test", "2223132132",
"test23_60");
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
// TODO Auto-generated method stub
if (e != null)
LOG.error("the producer has a error:" + e.getMessage());
else {
LOG.info("The offset of the record we just sent is: " + metadata.offset());
LOG.info("The partition of the record we just sent is: " + metadata.partition());
}
}
});
try {
Thread.sleep(1000);
producer.close();
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
}
备注: 要先用命令创建topic及partitions 分区数;否则在自定义的分区中如果有大于1的情况下,发送数据消息到kafka时会报
expired due to timeout while requesting metadata from brokers
错误
二、使用Old Consumer High Level API编写consumer
第一步:编写具体处理消息的类
import java.io.UnsupportedEncodingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.message.MessageAndMetadata;
public class Consumerwork implements Runnable {
private static Logger LOG = LoggerFactory.getLogger(Consumerwork.class);
@SuppressWarnings("rawtypes")
private KafkaStream m_stream;
private int m_threadNumber;
@SuppressWarnings("rawtypes")
public Consumerwork(KafkaStream a_stream,int a_threadNumber) {
// TODO Auto-generated constructor stub
m_threadNumber = a_threadNumber;
m_stream = a_stream;
}
@SuppressWarnings("unchecked")
@Override
public void run() {
// TODO Auto-generated method stub
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
while (it.hasNext())
try {
MessageAndMetadata<byte[], byte[]> thisMetadata=it.next();
String jsonStr = new String(thisMetadata.message(),"utf-8") ;
LOG.info("Thread " + m_threadNumber + ": " +jsonStr);
LOG.info("partion"+thisMetadata.partition()+",offset:"+thisMetadata.offset());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} catch (UnsupportedEncodingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
第二步:编写启动Consumer主类
mport java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class ConsumerGroup {
private final ConsumerConnector consumer;
private final String topic;
private ExecutorService executor;
private static Logger LOG = LoggerFactory.getLogger(ConsumerGroup.class);
public ConsumerGroup(String a_zookeeper, String a_groupId, String a_topic) {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper, a_groupId));
this.topic = a_topic;
}
public static void main(String[] args) {
Scanner sc = new Scanner(System.in);
System.out.println("请输入zookeeper集群地址(如zk1:2181,zk2:2181,zk3:2181):");
String zooKeeper = sc.nextLine();
System.out.println("请输入指定的消费group名称:");
String groupId = sc.nextLine();
System.out.println("请输入指定的消费topic名称:");
String topic = sc.nextLine();
System.out.println("请输入指定的消费处理线程数:");
int threads = sc.nextInt();
LOG.info("Starting consumer kafka messages with zk:" + zooKeeper + " and the topic is " + topic);
ConsumerGroup example = new ConsumerGroup(zooKeeper, groupId, topic);
example.run(threads);
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
}
// example.shutdown();
}
private void shutdown() {
// TODO Auto-generated method stub
if (consumer != null)
consumer.shutdown();
if (executor != null)
executor.shutdown();
try {
if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
LOG.info("Timed out waiting for consumer threads to shut down, exiting uncleanly");
}
} catch (InterruptedException e) {
LOG.info("Interrupted during shutdown, exiting uncleanly");
}
}
private void run(int a_numThreads) {
// TODO Auto-generated method stub
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
// now launch all the threads
//
executor = Executors.newFixedThreadPool(a_numThreads);
// now create an object to consume the messages
//
int threadNumber = 0;
LOG.info("the streams size is "+streams.size());
for (final KafkaStream stream : streams) {
executor.submit(new com.goodix.kafka.oldconsumer.Consumerwork(stream, threadNumber));
// consumer.commitOffsets();
threadNumber++;
}
}
private ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
// TODO Auto-generated method stub
Properties props = new Properties();
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", a_groupId);
props.put("zookeeper.session.timeout.ms", "60000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "smallest");
// props.put("rebalance.max.retries", "5");
// props.put("rebalance.backoff.ms", "15000");
return new ConsumerConfig(props);
}
}
1. topicCountMap.put(topic, new Integer(a_numThreads)) 是告诉Kafka我有多少个线程来处理消息。
(1). 这个线程数必须是小等于topic的partition分区数;可以通过
./kafka-topics.sh --describe --zookeeper "172.16.49.173:2181" --topic "producer_test"
命令来查看分区的情况
(2). kafka会根据partition.assignment.strategy指定的分配策略来指定线程消费那些分区的消息;这里没有单独配置该项即是采用的默认值range策略(按照阶段平均分配)。比如分区有10个、线程数有3个,则线程 1消费0,1,2,3,线程2消费4,5,6,线程3消费7,8,9。另外一种是roundrobin(循环分配策略),官方文档中写有使用该策略有两个前提条件的,所以一般不要去设定。
(3). 经过测试:consumerMap.get(topic).size(),应该是获得的目前该topic有数据的分区数
(4). stream即指的是来自一个或多个服务器上的一个或者多个partition的消息。每一个stream都对应一个单线程处理。因此,client能够设置满足自己需求的stream数目。总之,一个stream也许代表了多个服务器partion的消息的聚合,但是每一个 partition都只能到一个stream2. Executors.newFixedThreadPool(a_numThreads)是创建一个创建固定容量大小的缓冲池:每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
3. props.put(“auto.offset.reset”, “smallest”) 是指定从最小没有被消费offset开始;如果没有指定该项则是默认的为largest,这样的话该consumer就得不到生产者先产生的消息。
4. 要使用old consumer API需要引用kafka_2.11以及kafka-clients。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>
三、使用Old SimpleConsumerAPI编写consumer
这是一个更加底层和复杂的API
使用的场景
由于使用该API需要自己控制的项比较多,也比较复杂,官方给出了一些合适的适用场景,也可以理解成为这些场景是High Level Consumer API 不能够做到的
1. 针对一个消息读取多次
2. 在一个process中,仅仅处理一个topic中的一个partitions
3. 使用事务,确保每个消息只被处理一次
需要处理的事情
1. 必须在程序中跟踪offset值
2. 必须找出指定Topic Partition中的lead broker
3. 必须处理broker的变动
使用SimpleConsumer的步骤
首先,你必须知道读哪个topic的哪个partition
然后,找到负责该partition的broker leader,从而找到存有该partition副本的那个broker
再者,自己去写request并fetch数据
最终,还要注意需要识别和处理broker leader的改变
示例
package com.goodix.kafka.oldconsumer;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.*;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Scanner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SimpleExample {
private static Logger LOG = LoggerFactory.getLogger(SimpleExample.class);
public static void main(String args[]) {
SimpleExample example = new SimpleExample();
Scanner sc = new Scanner(System.in);
System.out.println("请输入broker节点的ip地址(如172.16.49.173)");
String brokerIp = sc.nextLine();
List<String> seeds = new ArrayList<String>();
seeds.add(brokerIp);
System.out.println("请输入broker节点端口号(如9092)");
int port = Integer.parseInt( sc.nextLine());
System.out.println("请输入要订阅的topic名称(如test)");
String topic = sc.nextLine();
System.out.println("请输入要订阅要查找的分区(如0)");
int partition = Integer.parseInt( sc.nextLine());
System.out.println("请输入最大读取消息数量(如10000)");
long maxReads = Long.parseLong( sc.nextLine());
try {
example.run(maxReads, topic, partition, seeds, port);
} catch (Exception e) {
LOG.error("Oops:" + e);
e.printStackTrace();
}
}
private List<String> m_replicaBrokers = new ArrayList<String>();
public SimpleExample() {
m_replicaBrokers = new ArrayList<String>();
}
public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception {
// find the meta data about the topic and partition we are interested in
//获取指定Topic partition的元数据
PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);
if (metadata == null) {
LOG.error("Can't find metadata for Topic and Partition. Exiting");
return;
}
if (metadata.leader() == null) {
LOG.error("Can't find Leader for Topic and Partition. Exiting");
return;
}
String leadBroker = metadata.leader().host();
String clientName = "Client_" + a_topic + "_" + a_partition;
SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
long readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);
int numErrors = 0;
while (a_maxReads > 0) {
if (consumer == null) {
consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
}
FetchRequest req = new FetchRequestBuilder()
.clientId(clientName)
.addFetch(a_topic, a_partition, readOffset, 100000) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka
.build();
FetchResponse fetchResponse = consumer.fetch(req);
if (fetchResponse.hasError()) {
numErrors++;
// Something went wrong!
short code = fetchResponse.errorCode(a_topic, a_partition);
LOG.error("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);
if (numErrors > 5) break;
if (code == ErrorMapping.OffsetOutOfRangeCode()) {
// We asked for an invalid offset. For simple case ask for the last element to reset
readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);
continue;
}
consumer.close();
consumer = null;
leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
continue;
}
numErrors = 0;
long numRead = 0;
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
long currentOffset = messageAndOffset.offset();
if (currentOffset < readOffset) {
LOG.error("Found an old offset: " + currentOffset + " Expecting: " + readOffset);
continue;
}
readOffset = messageAndOffset.nextOffset();
ByteBuffer payload = messageAndOffset.message().payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
LOG.info("the messag's offset is :"+String.valueOf(messageAndOffset.offset()) + " and the value is :" + new String(bytes, "UTF-8"));
numRead++;
a_maxReads--;
}
if (numRead == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
}
}
}
if (consumer != null) consumer.close();
}
public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
long whichTime, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
LOG.error("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
return 0;
}
long[] offsets = response.offsets(topic, partition);
return offsets[0];
}
/**
* 找一个leader broker
* 遍历每个broker,取出该topic的metadata,然后再遍历其中的每个partition metadata,如果找到我们要找的partition就返回
* 根据返回的PartitionMetadata.leader().host()找到leader broker
* @param a_oldLeader
* @param a_topic
* @param a_partition
* @param a_port
* @return
* @throws Exception
*/
private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {
for (int i = 0; i < 3; i++) {
boolean goToSleep = false;
PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
if (metadata == null) {
goToSleep = true;
} else if (metadata.leader() == null) {
goToSleep = true;
} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
// first time through if the leader hasn't changed give ZooKeeper a second to recover
// second time, assume the broker did recover before failover, or it was a non-Broker issue
//
goToSleep = true;
} else {
return metadata.leader().host();
}
if (goToSleep) {
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
}
}
}
LOG.error("Unable to find new leader after Broker failure. Exiting");
throw new Exception("Unable to find new leader after Broker failure. Exiting");
}
/**
*
* @param a_seedBrokers
* @param a_port
* @param a_topic
* @param a_partition
* @return
*/
private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {
PartitionMetadata returnMetaData = null;
loop:
for (String seed : a_seedBrokers) { //遍历每个broker
SimpleConsumer consumer = null;
try {
// 创建Simple Consumer,
consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
List<String> topics = Collections.singletonList(a_topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);
//发送TopicMetadata Request请求
kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
//取到Topic的Metadata
List<TopicMetadata> metaData = resp.topicsMetadata();
//遍历每个partition的metadata
for (TopicMetadata item : metaData) {
for (PartitionMetadata part : item.partitionsMetadata()) {
// 判断是否是要找的partition
if (part.partitionId() == a_partition) {
returnMetaData = part;
//找到就返回
break loop;
}
}
}
} catch (Exception e) {
LOG.info("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic
+ ", " + a_partition + "] Reason: " + e);
} finally {
if (consumer != null) consumer.close();
}
}
if (returnMetaData != null) {
m_replicaBrokers.clear();
for (kafka.cluster.BrokerEndPoint replica : returnMetaData.replicas()) {
m_replicaBrokers.add(replica.host());
}
}
return returnMetaData;
}
}
四、 使用NewConsumer API
(一)、自动提交offset偏移量
Properties props = new Properties();
//brokerServer(kafka)ip地址,不需要把所有集群中的地址都写上,可是一个或一部分
props.put("bootstrap.servers", "172.16.49.173:9092");
//设置consumer group name,必须设置
props.put("group.id", a_groupId);
//设置自动提交偏移量(offset),由auto.commit.interval.ms控制提交频率
props.put("enable.auto.commit", "true");
//偏移量(offset)提交频率
props.put("auto.commit.interval.ms", "1000");
//设置使用最开始的offset偏移量为该group.id的最早。如果不设置,则会是latest即该topic最新一个消息的offset
//如果采用latest,消费者只能得道其启动后,生产者生产的消息
props.put("auto.offset.reset", "earliest");
//设置心跳时间
props.put("session.timeout.ms", "30000");
//设置key以及value的解析(反序列)类
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//订阅topic
consumer.subscribe(Arrays.asList("topic_test"));
while (true) {
//每次取100条信息
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}
需要注意的:
group.id :必须设置
auto.offset.reset:如果想获得消费者启动前生产者生产的消息,则必须设置为earliest;如果只需要获得消费者启动后生产者生产的消息,则不需要设置该项
enable.auto.commit(默认值为true):如果使用手动commit offset则需要设置为false,并再适当的地方调用consumer.commitSync()
,否则每次启动消费折后都会从头开始消费信息(在auto.offset.reset=earliest的情况下);
(二)、 自己控制偏移量提交
很多时候,我们是希望在获得消息并经过一些逻辑处理后,才认为该消息已被消费,这可以通过自己控制偏移量提交来实现。
示例1:批量提交偏移量
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 手动批量提交偏移量
* @author lxh
*
*/
public class ManualOffsetConsumer {
private static Logger LOG = LoggerFactory.getLogger(ManualOffsetConsumer.class);
public ManualOffsetConsumer() {
// TODO Auto-generated constructor stub
}
public static void main(String[] args) {
// TODO Auto-generated method stub
Properties props = new Properties();
//props.put("bootstrap.servers", bootstrapServers);//"172.16.49.173:9092;172.16.49.173:9093");
//设置brokerServer(kafka)ip地址
props.put("bootstrap.servers", "172.16.49.173:9092");
//设置consumer group name
props.put("group.id","manual_g1");
props.put("enable.auto.commit", "false");
//设置使用最开始的offset偏移量为该group.id的最早。如果不设置,则会是latest即该topic最新一个消息的offset
//如果采用latest,消费者只能得道其启动后,生产者生产的消息
props.put("auto.offset.reset", "earliest");
//
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String ,String> consumer = new KafkaConsumer<String ,String>(props);
consumer.subscribe(Arrays.asList("producer_test"));
final int minBatchSize = 5; //批量提交数量
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
LOG.info("consumer message values is "+record.value()+" and the offset is "+ record.offset());
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
LOG.info("now commit offset");
consumer.commitSync();
buffer.clear();
}
}
}
}
示例2:消费完一个分区后手动提交偏移量
package com.goodix.kafka;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 消费完一个分区后手动提交偏移量
* @author lxh
*
*/
public class ManualCommitPartion {
private static Logger LOG = LoggerFactory.getLogger(ManualCommitPartion.class);
public ManualCommitPartion() {
// TODO Auto-generated constructor stub
}
public static void main(String[] args) {
// TODO Auto-generated method stub
Properties props = new Properties();
//props.put("bootstrap.servers", bootstrapServers);//"172.16.49.173:9092;172.16.49.173:9093");
//设置brokerServer(kafka)ip地址
props.put("bootstrap.servers", "172.16.49.173:9092");
//设置consumer group name
props.put("group.id","manual_g2");
props.put("enable.auto.commit", "false");
//设置使用最开始的offset偏移量为该group.id的最早。如果不设置,则会是latest即该topic最新一个消息的offset
//如果采用latest,消费者只能得道其启动后,生产者生产的消息
props.put("auto.offset.reset", "earliest");
//
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String ,String> consumer = new KafkaConsumer<String ,String>(props);
consumer.subscribe(Arrays.asList("producer_test"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
LOG.info("now consumer the message it's offset is :"+record.offset() + " and the value is :" + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
LOG.info("now commit the partition[ "+partition.partition()+"] offset");
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
}
}
(三)、指定消费某个分区的消息
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 消费指定分区的消息
* @author lxh
*
*/
public class ManualPartion {
private static Logger LOG = LoggerFactory.getLogger(ManualPartion.class);
public ManualPartion() {
// TODO Auto-generated constructor stub
}
public static void main(String[] args) {
Properties props = new Properties();
//设置brokerServer(kafka)ip地址
props.put("bootstrap.servers", "172.16.49.173:9092");
//设置consumer group name
props.put("group.id", "manual_g4");
//设置自动提交偏移量(offset),由auto.commit.interval.ms控制提交频率
props.put("enable.auto.commit", "true");
//偏移量(offset)提交频率
props.put("auto.commit.interval.ms", "1000");
//设置使用最开始的offset偏移量为该group.id的最早。如果不设置,则会是latest即该topic最新一个消息的offset
//如果采用latest,消费者只能得道其启动后,生产者生产的消息
props.put("auto.offset.reset", "earliest");
//
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
TopicPartition partition0 = new TopicPartition("producer_test", 0);
TopicPartition partition1 = new TopicPartition("producer_test", 1);
KafkaConsumer<String ,String> consumer = new KafkaConsumer<String ,String>(props);
consumer.assign(Arrays.asList(partition0, partition1));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s \r\n", record.offset(), record.key(), record.value());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
总结
使用newConsumer API 只需要引用kafka-clients即可
newConsumer API 更加易懂、易用
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>
官方对于consumer与partition的建议
1. 如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数
2. 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀。最好partiton数目是consumer数目的整数倍,所以partition数目很重要,比如取24,就很容易设定consumer数目
3. 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同
4. 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化
5. High-level接口中获取不到数据的时候是会block的
参考文章:
Java线程池使用说明
Java并发编程:线程池的使用
如何确定Kafka的分区数、key和consumer线程数s
Kafka Consumer接口
如何发送15MB的message到KAFKA里面,StackOverflow的答复:
Minor changes required for Kafka 0.10 and the new consumer compared to laughing_man's answer:
- Broker: No changes, you still need to increase properties
message.max.bytes
andreplica.fetch.max.bytes
.message.max.bytes
has to be smaller thanreplica.fetch.max.bytes
. - Producer: Increase
max.request.size
to send the larger message. - Consumer: Increase
max.partition.fetch.bytes
to receive larger messages.
5down vote
The idea is to have equal size of message being sent from Kafka Producer to Kafka Broker and then received by Kafka Consumer i.e.
Kafka producer --> Kafka Broker --> Kafka Consumer
Suppose if the requirement is to send 15MB of message, then the Producer, the Broker and the Consumer, all three, needs to be in sync.
Kafka Producer sends 15 MB --> Kafka Broker Allows/Stores 15 MB --> Kafka Consumer receives 15 MB
The setting therefore should be A.) On Broker: message.max.bytes=15728640 replica.fetch.max.bytes=15728640
B.) On Consumer: fetch.message.max.bytes=15728640