Kafka生产者发送消息的三种方式

Stella981
• 阅读 751

Kafka是一种分布式的基于发布/订阅的消息系统,它的高吞吐量、灵活的offset是其它消息系统所没有的。

Kafka发送消息主要有三种方式:

1.发送并忘记 2.同步发送 3.异步发送+回调函数

下面以单节点的方式分别用三种方法发送1w条消息测试:

方式一:发送并忘记(不关心消息是否正常到达,对返回结果不做任何判断处理)

发送并忘记的方式本质上也是一种异步的方式,只是它不会获取消息发送的返回结果,这种方式的吞吐量是最高的,但是无法保证消息的可靠性:

 1 import pickle
 2 import time
 3 from kafka import KafkaProducer
 4 
 5 producer = KafkaProducer(bootstrap_servers=['192.168.33.11:9092'],
 6                          key_serializer=lambda k: pickle.dumps(k),
 7                          value_serializer=lambda v: pickle.dumps(v))
 8 
 9 start_time = time.time()
10 for i in range(0, 10000):
11     print('------{}---------'.format(i))
12     future = producer.send('test_topic', key='num', value=i, partition=0)
13 
14 # 将缓冲区的全部消息push到broker当中
15 producer.flush()
16 producer.close()
17 
18 end_time = time.time()
19 time_counts = end_time - start_time
20 print(time_counts)

测试结果:1.88s

方式二:同步发送(通过get方法等待Kafka的响应,判断消息是否发送成功)

以同步的方式发送消息时,一条一条的发送,对每条消息返回的结果判断, 可以明确地知道每条消息的发送情况,但是由于同步的方式会阻塞,只有当消息通过get返回future对象时,才会继续下一条消息的发送:

 1 import pickle
 2 import time
 3 from kafka import KafkaProducer
 4 from kafka.errors import kafka_errors
 5 
 6 producer = KafkaProducer(
 7     bootstrap_servers=['192.168.33.11:9092'],
 8     key_serializer=lambda k: pickle.dumps(k),
 9     value_serializer=lambda v: pickle.dumps(v)
10 )
11 
12 start_time = time.time()
13 for i in range(0, 10000):
14     print('------{}---------'.format(i))
15     future = producer.send(topic="test_topic", key="num", value=i)
16     # 同步阻塞,通过调用get()方法进而保证一定程序是有序的.
17     try:
18         record_metadata = future.get(timeout=10)
19         # print(record_metadata.topic)
20         # print(record_metadata.partition)
21         # print(record_metadata.offset)
22     except kafka_errors as e:
23         print(str(e))
24 
25 end_time = time.time()
26 time_counts = end_time - start_time
27 print(time_counts)

测试结果:16s

方式三:异步发送+回调函数(消息以异步的方式发送,通过回调函数返回消息发送成功/失败)

在调用send方法发送消息的同时,指定一个回调函数,服务器在返回响应时会调用该回调函数,通过回调函数能够对异常情况进行处理,当调用了回调函数时,只有回调函数执行完毕生产者才会结束,否则一直会阻塞:

 1 import pickle
 2 import time
 3 from kafka import KafkaProducer
 4 
 5 producer = KafkaProducer(
 6     bootstrap_servers=['192.168.33.11:9092'],
 7     key_serializer=lambda k: pickle.dumps(k),
 8     value_serializer=lambda v: pickle.dumps(v)
 9 )
10 
11 
12 def on_send_success(*args, **kwargs):
13     """
14     发送成功的回调函数
15     :param args:
16     :param kwargs:
17     :return:
18     """
19     return args
20 
21 
22 def on_send_error(*args, **kwargs):
23     """
24     发送失败的回调函数
25     :param args:
26     :param kwargs:
27     :return:
28     """
29 
30     return args
31 
32 
33 start_time = time.time()
34 for i in range(0, 10000):
35     print('------{}---------'.format(i))
36     # 如果成功,传进record_metadata,如果失败,传进Exception.
37     producer.send(
38         topic="test_topic", key="num", value=i
39     ).add_callback(on_send_success).add_errback(on_send_error)
40 
41 producer.flush()
42 producer.close()
43 
44 end_time = time.time()
45 time_counts = end_time - start_time
46 print(time_counts)

测试结果:2.15s

三种方式虽然在时间上有所差别,但并不是说时间越快的越好,具体要看业务的应用场景:

场景1:如果业务要求消息必须是按顺序发送的,那么可以使用同步的方式,并且只能在一个partation上,结合参数设置retries的值让发送失败时重试,设置max_in_flight_requests_per_connection=1,可以控制生产者在收到服务器晌应之前只能发送1个消息,从而控制消息顺序发送;

场景2:如果业务只关心消息的吞吐量,容许少量消息发送失败,也不关注消息的发送顺序,那么可以使用发送并忘记的方式,并配合参数acks=0,这样生产者不需要等待服务器的响应,以网络能支持的最大速度发送消息;

场景3:如果业务需要知道消息发送是否成功,并且对消息的顺序不关心,那么可以用异步+回调的方式来发送消息,配合参数retries=0,并将发送失败的消息记录到日志文件中;

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
待兔 待兔
4个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Easter79 Easter79
3年前
thinkcmf+jsapi 实现微信支付
首先从小程序端接收订单号、金额等参数,然后后台进行统一下单,把微信支付的订单号返回,在把订单号发送给前台,前台拉起支付,返回参数后更改支付状态。。。回调publicfunctionnotify(){$wechatDb::name('wechat')where('status',1)find();
Stella981 Stella981
3年前
Kafka概述及安装部署
一、Kafka概述1.Kafka是一个分布式流媒体平台,它有三个关键功能:(1)发布和订阅记录流,类似于消息队列或企业消息传递系统;(2)以容错的持久方式存储记录流;(3)记录发送时处理流。2.Kafka通常应用的两大类应用(1)构建在系统或应用程序之间的可靠获取数据的实时流数据管道;(2)构建转换或响应数据流的实施
Stella981 Stella981
3年前
FusionInsight大数据开发
Kafka应用开发1.了解Kafka应用开发适用场景2.熟悉Kafka应用开发流程3.熟悉并使用Kafka常用API4.进行Kafka应用开发Kafka的定义Kafka是一个高吞吐、分布式、基于发布订阅的消息系统Kafka有如下几个特点:1.高吞吐量2.消息持久化到磁
Stella981 Stella981
3年前
Kafka笔记
第1章Kafka简介1.1kafka起源Kafka是由LinkedIn开发并开源的分布式消息系统,2012年捐赠给Apache基金会,采用Scala语言,运行在JVM中,最新版本1.0.0。1.2kafka设计目标Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下:①以时间复杂度O(1)的方式提供消息持久化能力,即
Stella981 Stella981
3年前
Kafka 中两个重要概念:主题与分区
在Kafka中还有两个特别重要的概念—主题(Topic)与分区(Partition)。Kafka中的消息以主题为单位进行归类,生产者负责将消息发送到特定的主题(发送到Kafka集群中的每一条消息都要指定一个主题),而消费者负责订阅主题并进行消费。这里补充了对Kafka基本概念(https://www.oschina.net/action
Stella981 Stella981
3年前
Kafka 生产者与可靠性保证ACK(2)
生产者消息发送流程消息发送的整体流程,生产端主要由两个线程协调运行。分别是main线程和sender线程(发送线程)。在Kafka(2.6.0版本)源码中,可以看到。源码地址:kafka\clients\src\main\java\org.apache.kafka.clients.producer.KafkaProdu
Wesley13 Wesley13
3年前
JMS消息的概念解释
1、默认生产者消息是持久的:会存数据库\消费者的持久:createDurableSubscriber是指消费者能收到所有它订阅时间点之后的消息,即使消费者注册后关闭,当它重启就能收到注册时间点之后所有的消息;即当此消费用户ID(AAA)在producer发送之前就已经注册,那么此id能收到producer发送的所有消息,如果是在produce
融云IM即时通讯 融云IM即时通讯
11个月前
消息丢失排查方法?
遇到丢消息问题,如果是单聊,群聊,聊天室,系统消息可以在开发者后台北极星自助查询一下消息是否发送成功。根据您实际发送的相关信息(发送者、接收者、时间、消息ID……)看是否可以查到消息如果消息查不到一般有几种可能:信息有误(获取token的用户id跟您系统中