欢迎访问我的GitHub
https://github.com/zq2599/blog\_demos
内容:所有原创文章分类汇总及配套源码,涉及Java、Docker、Kubernetes、DevOPS等;
本篇概览
本文是《Flink的sink实战》系列的第二篇,前文《Flink的sink实战之一:初探》对sink有了基本的了解,本章来体验将数据sink到kafka的操作;
全系列链接
- 《Flink的sink实战之一:初探》
- 《Flink的sink实战之二:kafka》
- 《Flink的sink实战之三:cassandra3》
- 《Flink的sink实战之四:自定义》
版本和环境准备
本次实战的环境和版本如下:
- JDK:1.8.0_211
- Flink:1.9.2
- Maven:3.6.0
- 操作系统:macOS Catalina 10.15.3 (MacBook Pro 13-inch, 2018)
- IDEA:2018.3.5 (Ultimate Edition)
- Kafka:2.4.0
- Zookeeper:3.5.5
请确保上述环境和服务已经就绪;
源码下载
如果您不想写代码,整个系列的源码可在GitHub下载到,地址和链接信息如下表所示(https://github.com/zq2599/blog\_demos):
名称
链接
备注
项目主页
https://github.com/zq2599/blog\_demos
该项目在GitHub上的主页
git仓库地址(https)
https://github.com/zq2599/blog\_demos.git
该项目源码的仓库地址,https协议
git仓库地址(ssh)
git@github.com:zq2599/blog_demos.git
该项目源码的仓库地址,ssh协议
这个git项目中有多个文件夹,本章的应用在flinksinkdemo文件夹下,如下图红框所示:
准备完毕,开始开发;
准备工作
正式编码前,先去官网查看相关资料了解基本情况:
- 地址:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html
- 我这里用的kafka是2.4.0版本,在官方文档查找对应的库和类,如下图红框所示:
kafka准备
创建名为test006的topic,有四个分区,参考命令:
./kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 4 --topic test006
在控制台消费test006的消息,参考命令:
./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test006
此时如果该topic有消息进来,就会在控制台输出;
接下来开始编码;
创建工程
用maven命令创建flink工程:
mvn \archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.9.2
根据提示,groupid输入com.bolingcavalry,artifactid输入flinksinkdemo,即可创建一个maven工程;
在pom.
org.apache.flink flink-connector-kafka_2.11 1.9.0 工程创建完成,开始编写flink任务的代码;
发送字符串消息的sink
先尝试发送字符串类型的消息:
创建KafkaSerializationSchema接口的实现类,后面这个类要作为创建sink对象的参数使用:
package com.bolingcavalry.addsink;import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;import org.apache.kafka.clients.producer.ProducerRecord;import java.nio.charset.StandardCharsets;public class ProducerStringSerializationSchema implements KafkaSerializationSchema
{ private String topic; public ProducerStringSerializationSchema(String topic) { super(); this.topic = topic; } @Override public ProducerRecord<byte[], byte[]> serialize(String element, Long timestamp) { return new ProducerRecord<byte[], byte[]>(topic, element.getBytes(StandardCharsets.UTF_8)); }} 创建任务类KafkaStrSink,请注意FlinkKafkaProducer对象的参数,FlinkKafkaProducer.Semantic.EXACTLY_ONCE表示严格一次:
package com.bolingcavalry.addsink;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import java.util.ArrayList;import java.util.List;import java..........