目录
一、导入clickhouse jdbc 依赖
二、编写 Flink 写入ClickHouse代码
三、创建ClickHouse 表
四、运行向localhost,7777端口发送数据,并启动Flink应用程序
五、查询ClickHouse 数据结果,验证数据是否写入成功
一、导入clickhouse jdbc 依赖
<!-- 写入数据到clickhouse -->
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.1.54</version>
</dependency>
二、编写 Flink 写入ClickHouse代码
Java Bean实体类
package com.lei.domain;
public class J_User {
public int id;
public String name;
public int age;
public J_User(int id, String name, int age) {
this.id = id;
this.name = name;
this.age = age;
}
public static J_User of(int id, String name, int age) {
return new J_User(id, name, age);
}
}
编写ClickHouseUtil 工具类
package com.lei.util;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
public class ClickHouseUtil {
private static Connection connection;
public static Connection getConn(String host, int port, String database) throws SQLException, ClassNotFoundException {
Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
String address = "jdbc:clickhouse://" + host + ":" + port + "/" + database;
connection = DriverManager.getConnection(address);
return connection;
}
public static Connection getConn(String host, int port) throws SQLException, ClassNotFoundException {
return getConn(host,port,"default");
}
public static Connection getConn() throws SQLException, ClassNotFoundException {
return getConn("node-01",8123);
}
public void close() throws SQLException {
connection.close();
}
}
编写 业务写入ClickHouse类
package com.lei.util;
import com.lei.domain.J_User;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.PreparedStatement;
public class J_MyClickHouseUtil extends RichSinkFunction<J_User> {
Connection connection = null;
String sql;
public J_MyClickHouseUtil(String sql) {
this.sql = sql;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = ClickHouseUtil.getConn("node-01", 8123, "default");
}
@Override
public void close() throws Exception {
super.close();
if (connection != null) {
connection.close();
}
}
@Override
public void invoke(J_User user, Context context) throws Exception {
PreparedStatement preparedStatement = connection.prepareStatement(sql);
preparedStatement.setLong(1, user.id);
preparedStatement.setString(2, user.name);
preparedStatement.setLong(3, user.age);
preparedStatement.addBatch();
long startTime = System.currentTimeMillis();
int[] ints = preparedStatement.executeBatch();
connection.commit();
long endTime = System.currentTimeMillis();
System.out.println("批量插入完毕用时:" + (endTime - startTime) + " -- 插入数据 = " + ints.length);
}
}
编写Flink 业务类,即执行业务逻辑
package com.lei.sinktest;
import com.lei.domain.J_User;
import com.lei.util.J_MyClickHouseUtil;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/*
进入clickhouse-client
use default;
drop table if exists user_table;
CREATE TABLE default.user_table(id UInt16, name String, age UInt16 ) ENGINE = TinyLog();
*/
public class J05_ClickHouseSinkTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(1);
// source
DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
// Transform 操作
SingleOutputStreamOperator<J_User> dataStream = inputStream.map(new MapFunction<String, J_User>() {
@Override
public J_User map(String data) throws Exception {
String[] split = data.split(",");
return J_User.of(Integer.parseInt(split[0]),
split[1],
Integer.parseInt(split[2]));
}
});
// sink
String sql = "INSERT INTO default.user_table (id, name, age) VALUES (?,?,?)";
J_MyClickHouseUtil jdbcSink = new J_MyClickHouseUtil(sql);
dataStream.addSink(jdbcSink);
dataStream.print();
env.execute("clickhouse sink test");
}
}
三、创建ClickHouse 表
-- 进入clickhouse-client
use default;
drop table if exists user_table;
CREATE TABLE default.user_table(id UInt16, name String, age UInt16 ) ENGINE = TinyLog();
四、运行向localhost,7777端口发送数据,并启动Flink应用程序
五、查询ClickHouse 数据结果,验证数据是否写入成功
文章最后,给大家推荐一些受欢迎的技术博客链接_:_
- JAVA相关的深度技术博客链接
- Flink 相关技术博客链接
- Spark 核心技术链接
- 设计模式 —— 深度技术博客链接
- 机器学习 —— 深度技术博客链接
- Hadoop相关技术博客链接
- 超全干货--Flink思维导图,花了3周左右编写、校对
- 深入JAVA 的JVM核心原理解决线上各种故障【附案例】
- 请谈谈你对volatile的理解?--最近小李子与面试官的一场“硬核较量”
- 聊聊RPC通信,经常被问到的一道面试题。源码+笔记,包懂
- 深入聊聊Java 垃圾回收机制【附原理图及调优方法】
欢迎扫描下方的二维码或 搜索 公众号“大数据高级架构师”,我们会有更多、且及时的资料推送给您,欢迎多多交流!