ClickHouse和他的朋友们(4)Pipeline处理器和调度器

Stella981
• 阅读 735

原文出处:https://bohutang.me/2020/06/11/clickhouse-and-friends-processor/

本文谈下 ClickHouse 核心科技:处理器 Processor 和有向无环调度器 DAG Scheduler。

这些概念并不是 ClickHouse 首创,感兴趣的同学可以关注下 materialize的 timely-dataflow,虎哥用golang 也写过一个原型。

拼的是实现细节,正是这些模块的精良设计,才有了 ClickHous e整体的高性能。

Pipeline问题

在传统数据库系统中,一个 Query 处理流程大体是:ClickHouse和他的朋友们(4)Pipeline处理器和调度器

其中在Plan阶段,往往会增加一个 Pipeline 组装(一个 transformer 代表一次数据处理):

ClickHouse和他的朋友们(4)Pipeline处理器和调度器

所有 transformer 被编排成一个流水线(pipeline),然后交给 executor 串行式执行,每执行一个 transformer 数据集就会被加工并输出,一直到下游的 sinker。可以看到,这种模型的优点是简单,缺点是性能低,无法发挥 CPU 的并行能力,通常叫火山模型(_volcano_-style),对于 OLTP 低延迟来说足够,对于计算密集的 OLAP 来说是远远不够的,CPU 不到 100% 就是犯罪!

对于上面的例子,如果 transformer1 和 transformer2 没有交集,那么它们就可以并行处理:

ClickHouse和他的朋友们(4)Pipeline处理器和调度器

这样就涉及到一些比较灵魂的问题:

  1. 如何实现 transformer 的灵活编排?

  2. 如何实现 transformer 间的数据同步?

  3. 如何实现 transformer 间的并行调度?

Processor 和 DAG Scheduler

1. Transformer 编排

ClickHouse 实现了一系列基础 transformer 模块,见 src/Processors/Transforms,比如:

  • FilterTransform -- WHERE 条件过滤

  • SortingTransform -- ORDER BY 排序

  • LimitByTransform -- LIMIT 裁剪

当我们执行:

SELECT * FROM t1 WHERE id=1 ORDER BY time DESC LIMIT 10

对于 ClickHouse 的 QueryPipeline 来说,它会按照以下方式进行编排组装:

QueryPipeline::addSimpleTransform(Source)QueryPipeline::addSimpleTransform(FilterTransform)QueryPipeline::addSimpleTransform(SortingTransform)QueryPipeline::addSimpleTransform(LimitByTransform)QueryPipeline::addSimpleTransform(Sinker)

这样就实现了 Transformer 的编排,但是执行时数据如何进行同步呢?

2. Transformer 数据同步

当 QueryPipeline 进行 transformer 编排时,我们还需要进行更加底层的 DAG 连通构建。

connect(Source.OutPort, FilterTransform.InPort)connect(FilterTransform.OutPort, SortingTransform.InPort)connect(SortingTransform.OutPort, LimitByTransform.InPort)connect(LimitByTransform.OutPort, Sinker.InPort)

这样就实现了数据的流向关系,一个 transformer 的 OutPort 对接另外一个的 InPort,就像我们现实中的水管管道一样,接口有 3 通甚至多通。

3. Transformer 执行调度

现在管道组装起来了,那么管道内的水如何进行处理和给压流动呢?

ClickHouse 定义了一套 transform 状态,processor 根据这些状态来实现调度。

    enum class Status    {        NeedData  // 等待数据流进入        PortFull, // 管道流出端阻塞        Finished, // 完成状态,退出        Ready,    // 切换到 work 函数,进行逻辑处理        Async,    // 切换到 schedule 函数,进行异步处理        Wait,     // 等待异步处理        ExpandPipeline,      // Pipeline 需要裂变    };

当 source 生成数据后,它的状态会设置为 PortFull,意思是等着流入其他 transformer 的 InPort,processor 会开始调度 FilterTransformer(NeedData) 的 Prepare,进行 PullData,然后它的状态设置为 Ready,等待 processor 调度 Work 方法进行数据Filter处理,大家就这样靠状态让 processor 去感知,来调度和做状态迁移,直到 Finished 状态。

这里值得一提的是 ExpandPipeline 状态,它会根据 transformer 的实现,可以把一个 transformer 裂变出更多个 transformer 并行执行,达到一个爆炸效果。

Example

SELECT number + 1 FROM t1;

为了更加深入理解 ClickHouse 的 processor 和 scheduler 机制,我们来一个原生态的 example:

  1. 一个 Source:{0,1,2,3,4}

  2. AdderTransformer 对每个数字做加1操作

  3. 一个 Sinker,输出结果

1. Source

class MySource : public ISource{public:    String getName() const override { return "MySource"; }    MySource(UInt64 end_)        : ISource(Block({ColumnWithTypeAndName{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number"}})), end(end_)    {    }private:    UInt64 end;    bool done = false;    Chunk generate() override    {        if (done)        {            return Chunk();        }        MutableColumns columns;        columns.emplace_back(ColumnUInt64::create());        for (auto i = 0U; i < end; i++)            columns[0]->insert(i);        done = true;        return Chunk(std::move(columns), end);    }};

2. MyAddTransform

class MyAddTransformer : public IProcessor{public:    String getName() const override { return "MyAddTransformer"; }    MyAddTransformer()        : IProcessor(            {Block({ColumnWithTypeAndName{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number"}})},            {Block({ColumnWithTypeAndName{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number"}})})        , input(inputs.front())        , output(outputs.front())    {    }    Status prepare() override    {        if (output.isFinished())        {            input.close();            return Status::Finished;        }        if (!output.canPush())        {            input.setNotNeeded();            return Status::PortFull;        }        if (has_process_data)        {            output.push(std::move(current_chunk));            has_process_data = false;        }        if (input.isFinished())        {            output.finish();            return Status::Finished;        }        if (!input.hasData())        {            input.setNeeded();            return Status::NeedData;        }        current_chunk = input.pull(false);        return Status::Ready;    }    void work() override    {        auto num_rows = current_chunk.getNumRows();        auto result_columns = current_chunk.cloneEmptyColumns();        auto columns = current_chunk.detachColumns();        for (auto i = 0U; i < num_rows; i++)        {            auto val = columns[0]->getUInt(i);            result_columns[0]->insert(val+1);        }        current_chunk.setColumns(std::move(result_columns), num_rows);        has_process_data = true;    }    InputPort & getInputPort() { return input; }    OutputPort & getOutputPort() { return output; }protected:    bool has_input = false;    bool has_process_data = false;    Chunk current_chunk;    InputPort & input;    OutputPort & output;};

3. MySink

class MySink : public ISink{public:    String getName() const override { return "MySinker"; }    MySink() : ISink(Block({ColumnWithTypeAndName{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number"}})) { }private:    WriteBufferFromFileDescriptor out{STDOUT_FILENO};    FormatSettings settings;    void consume(Chunk chunk) override    {        size_t rows = chunk.getNumRows();        size_t columns = chunk.getNumColumns();        for (size_t row_num = 0; row_num < rows; ++row_num)        {            writeString("prefix-", out);            for (size_t column_num = 0; column_num < columns; ++column_num)            {                if (column_num != 0)                    writeChar('\t', out);                getPort()                    .getHeader()                    .getByPosition(column_num)                    .type->serializeAsText(*chunk.getColumns()[column_num], row_num, out, settings);            }            writeChar('\n', out);        }        out.next();    }};

4. DAG Scheduler

int main(int, char **){    auto source0 = std::make_shared<MySource>(5);    auto add0 = std::make_shared<MyAddTransformer>();    auto sinker0 = std::make_shared<MySink>();    /// Connect.    connect(source0->getPort(), add0->getInputPort());    connect(add0->getOutputPort(), sinker0->getPort());    std::vector<ProcessorPtr> processors = {source0, add0, sinker0};    PipelineExecutor executor(processors);    executor.execute(1);}

总结

从开发者角度看还是比较复杂,状态迁移还需要开发者自己控制,不过 upstream 已经做了大量的基础工作,比如对 source的封装 ISource,对 sink 的封装 ISink,还有一个基础的 ISimpleTransform,让开发者在上层使用 processor 时更加容易,可以积木式搭建出自己想要的 pipeline。

ClickHouse 的 transformer 数据单元是 Chunk,transformer 对上游 OutPort 流过来的 Chunk 进行加工,然后输出给下游的 InPort,图连通式的流水线并行工作,让 CPU 尽量满负荷工作。

当一个 SQL 被解析成 AST 后,ClickHouse 根据 AST 构建 Query Plan,然后根据 QueryPlan 构建出 pipeline,最后由 processor 负责调度和执行。目前,ClickHouse 新版本已经默认开启 QueryPipeline,同时这块代码也在不停的迭代。

文内链接

延伸阅读

全文完。

Enjoy ClickHouse :)

叶老师的「MySQL核心优化」大课已升级到MySQL 8.0,扫码开启MySQL 8.0修行之旅吧

ClickHouse和他的朋友们(4)Pipeline处理器和调度器

本文分享自微信公众号 - 老叶茶馆(iMySQL_WX)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Stella981 Stella981
3年前
Python之time模块的时间戳、时间字符串格式化与转换
Python处理时间和时间戳的内置模块就有time,和datetime两个,本文先说time模块。关于时间戳的几个概念时间戳,根据1970年1月1日00:00:00开始按秒计算的偏移量。时间元组(struct_time),包含9个元素。 time.struct_time(tm_y
Stella981 Stella981
3年前
ClickHouse和他的朋友们(1)编译、开发、测试
原文出处:https://bohutang.me/2020/06/05/clickhouseandfriendsdevelopment/一次偶然的机会,和ClickHouse团队做了一次线下沟通,Alexey提到ClickHouse的设计哲学:1.Theproductmustsolveactualproblem
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(