ClickHouse是如何批量写入的?

Stella981
• 阅读 1330

ClickHouse是如何批量写入的?

简介

批量写入又称为bulk write,对于单表插入多条数据的场景,可以减少插入请求数量,提高吞吐量和效率。clickhouse官方Golang驱动clickhouse-go[1]支持该关键特性,但是文档的介绍不是很详细,只有一句:

Bulk write support : begin->prepare->(in loop exec)->commit

并没有详细介绍用法和原理,笔者在开发业务时使用的库是sqlx[2],sql也支持clickhouse-go驱动。参考了官方样例代码[3]:

`...
tx, err := connect.Begin()
checkErr(err)
stmt, err := tx.Prepare("INSERT INTO example (country_code, os_id, browser_id, categories, action_day, action_time) VALUES (?, ?, ?, ?, ?, ?)")
checkErr(err)

for i := 0; i < 100; i++ {
 if _, err := stmt.Exec(
  "RU",
  10+i,
  100+i,
  []int16{1, 2, 3},
  time.Now(),
  time.Now(),
 ); err != nil {
  log.Fatal(err)
 }
}
...
`

我写的bulk write类似上面的代码,但是提交给同事review时,他提出了疑问:stmt.Exec是每次执行都发送写请求到数据库吗?这个问题其实我不敢肯定,官方文档也说得不明确。考虑到严谨性,让自己的PR更有说服力,自己去翻看了相关源代码。

这里需要指出,如果利用编辑器里的代码跳转功能会跳到database/sql库中的Exec函数实现,实际上我们要看的代码是clickhouse-go中的实现,至于编辑器跳转到database/sql中的原因,书写此文时笔者也没弄清楚,先挖个坑吧

核心实现

stmt.Exec的核心代码如下[4]:

func (stmt *stmt) execContext(ctx context.Context, args []driver.Value) (driver.Result, error) {  if stmt.isInsert {   stmt.counter++   if err := stmt.ch.block.AppendRow(args); err != nil {    return nil, err   }   if (stmt.counter % stmt.ch.blockSize) == 0 {    stmt.ch.logf("[exec] flush block")    if err := stmt.ch.writeBlock(stmt.ch.block); err != nil {     return nil, err    }    if err := stmt.ch.encoder.Flush(); err != nil {     return nil, err    }   }   return emptyResult, nil  }  if err := stmt.ch.sendQuery(stmt.bind(convertOldArgs(args))); err != nil {   return nil, err  }  if err := stmt.ch.process(); err != nil {   return nil, err  }  return emptyResult, nil }

上面的代码不多,非常清晰,当执行Exec时,stmt.ch.block.AppendRow(args)会先把sql参数附加到本地缓存block中,然后(stmt.counter % stmt.ch.blockSize)判断本地缓存大小是否到达阈值,到达则执行Flush(),将数据写入远端。综上,clickhouse-go中的核心实现逻辑是:

  1. 底层维护一个缓存block,同时设置block_size控制缓存大小

  2. 执行stmt.Exec时,不会直接写入远程ClickHouse中,而是将插入参数Append到block中

  3. 每次Append后,判断block的size和block_size的关系,如果正好整除,则刷新block(即写入clickhouse)

因此block_size这个参数很重要,它表示本地缓存的上限,如果很大的话,程序会占用一些内存。笔者起初设置为100000,在调试日志中看不到stmt.ch.logf("[exec] flush block")打印的log,设置小后就看到下面的输出:

... [clickhouse][connect=1][begin] tx=false, data=false [clickhouse][connect=1][prepare] [clickhouse][connect=1][read meta] <- data: packet=1, columns=6, rows=0 [clickhouse][connect=1][exec] flush block [clickhouse][connect=1][exec] flush block ....

总结

很多数据库驱动都支持bulk write特性,clickhouse-go这个驱动也不例外,但是它的文档写得不是很详细,只是在文档中指明要放在begin/commit中做。再加上clickhouse不支持事务,begin/commit这种写法会让人困惑。

本文通过分析clickhouse-go的源代码,了解bulk write的执行过程,帮助大家梳理其具体实现。

参考资料

[1]

clickhouse-go: https://github.com/ClickHouse/clickhouse-go

[2]

sqlx: https://github.com/jmoiron/sqlx

[3]

官方样例代码: https://github.com/ClickHouse/clickhouse-go/blob/master/examples/sqlx.go#L35-L51

[4]

核心代码如下: https://github.com/clickhouse/clickhouse-go/blob/master/stmt.go#L44-L68

[5]

INSERT INTO Statement: https://clickhouse.tech/docs/en/sql-reference/statements/insert-into/

[6]

go-clickhouse-batchinsert: https://github.com/MaruHyl/go-clickhouse-batchinsert/blob/master/batch.go#L349-L354

本文分享自微信公众号 - 机器学习与系统(aimlsystem)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

点赞
收藏
评论区
推荐文章
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Clickhouse表引擎探究-ReplacingMergeTree
作者:耿宏宇1表引擎简述1.1官方描述MergeTree系列的引擎被设计用于插入极大量的数据到一张表当中。数据可以以数据片段的形式一个接着一个的快速写入,数据片段在后台按照一定的规则进行合并。相比在插入时不断修改(重写)已存储的数据,这种策略会高效很多。R
Stella981 Stella981
3年前
Flink 写数据到ClickHouse
目录一、导入clickhousejdbc依赖二、编写Flink写入ClickHouse代码三、创建ClickHouse表四、运行向localhost,7777端口发送数据,并启动Flink应用程序五、查询ClickHouse数据结果,验证数据是否写入成功一、导入clickhousejdbc依赖
Stella981 Stella981
3年前
GreenPlum tidb 性能比较
主要的需求  针对大体量表的OLAP统计查询,需要找到一个稳定,高性能的大数据数据库,具体使用  数据可以实时的写入和查询,并发的tps不是很高建立数据仓库,模式上主要采用星星模型、雪花模型,或者宽表前端展示分为3类 saiku、granafa、c代码开发数据体量:事实表在35亿、维度表大的在500
Stella981 Stella981
3年前
PostgreSQL_如何实现批量更新、删除、插入
标签PostgreSQL,批量,batch,insert,update,delete,copy背景如何一次插入多条记录?如何一次更新多条记录?如何一次批量删除多条记录?批量操作可以减少数据库与应用程序的交互次数,提高数据处理的吞吐量。批量插入批量插入1使用insert
Stella981 Stella981
3年前
Mybatisd对MySQL批量插入、批量更新及批量删除语句
1、批量插入<insertid"insertBatch"parameterType"java.util.List"insertintot_student(name,age,class)values<forea
Wesley13 Wesley13
3年前
MySQL批量插入多条数据方便测试
批量插入流程_数据库字段__!(https://oscimg.oschina.net/oscnet/76e6a5939257a8370d3a253a72224e38935.png)_1delimiter2createproceduredoinsert3()3begin
Stella981 Stella981
3年前
PHP如何避免高并发下insert into 重复入库
场景:用户签到/分享功能,每天只能签到一次或分享一次数据库:id  user\_id  add\_time  逻辑分析:用户每天进行分享或签到,得到积分,数据库通过以上字段进行记录,同一时间不可插入多条,一天只能有一条记录,插入前判断是否当天已插入过问题点:用户连点、并发请求等会导致同时插入多条记录,导致积分异常解决方案:使用文件锁,经过
Wesley13 Wesley13
3年前
mysql 插入数据
简单用法:insertintotb\_name(字段1,字段2,.........)values(值1,值2,.....)注意,字段个数必须和值的个数一致。字符用引号引起来,数字不用,插入空值使用null批量插入:insertintotb\_name(字段1,字段2,.........)values(值1,值2,.....
Wesley13 Wesley13
3年前
mysql——批量插入数据
要测试一下新功能,需要测试环境下的数据库有大量的数据,一个个插入显然不现实,需要了解一下存储过程https://www.cnblogs.com/endtel/p/5407455.html(https://www.oschina.net/action/GoToLink?urlhttps%3A%2F%2Fwww.cnblogs.com%2Fendtel