ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

完成Pipeline。-- 代码源自虎哥博客。感谢

2022-01-11 22:31:37  阅读:226  来源: 互联网

标签:std Pipeline return -- chunk 虎哥 num input columns


  当时想实践一下Pipeline的构建。

  未能实现的原因主要是在于 

  1. ClickHouse架构认识不足。

  2.CMakeLists.txt 功力不足。

  各占一半一半。

  参见虎哥的博客: https://bohutang.me/2020/06/11/clickhouse-and-friends-processor/

 

1. Source

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class MySource : public ISource
{
public:
String getName() const override { return "MySource"; }

MySource(UInt64 end_)
: ISource(Block({ColumnWithTypeAndName{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number"}})), end(end_)
{
}

private:
UInt64 end;
bool done = false;

Chunk generate() override
{
if (done)
{
return Chunk();
}
MutableColumns columns;
columns.emplace_back(ColumnUInt64::create());
for (auto i = 0U; i < end; i++)
columns[0]->insert(i);

done = true;
return Chunk(std::move(columns), end);
}
};

2. MyAddTransform

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
class MyAddTransformer : public IProcessor
{
public:
String getName() const override { return "MyAddTransformer"; }

MyAddTransformer()
: IProcessor(
{Block({ColumnWithTypeAndName{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number"}})},
{Block({ColumnWithTypeAndName{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number"}})})
, input(inputs.front())
, output(outputs.front())
{
}

Status prepare() override
{
if (output.isFinished())
{
input.close();
return Status::Finished;
}

if (!output.canPush())
{
input.setNotNeeded();
return Status::PortFull;
}

if (has_process_data)
{
output.push(std::move(current_chunk));
has_process_data = false;
}

if (input.isFinished())
{
output.finish();
return Status::Finished;
}

if (!input.hasData())
{
input.setNeeded();
return Status::NeedData;
}
current_chunk = input.pull(false);
return Status::Ready;
}

void work() override
{
auto num_rows = current_chunk.getNumRows();
auto result_columns = current_chunk.cloneEmptyColumns();
auto columns = current_chunk.detachColumns();
for (auto i = 0U; i < num_rows; i++)
{
auto val = columns[0]->getUInt(i);
result_columns[0]->insert(val+1);
}
current_chunk.setColumns(std::move(result_columns), num_rows);
has_process_data = true;
}

InputPort & getInputPort() { return input; }
OutputPort & getOutputPort() { return output; }

protected:
bool has_input = false;
bool has_process_data = false;
Chunk current_chunk;
InputPort & input;
OutputPort & output;
};

3. MySink

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class MySink : public ISink
{
public:
String getName() const override { return "MySinker"; }

MySink() : ISink(Block({ColumnWithTypeAndName{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number"}})) { }

private:
WriteBufferFromFileDescriptor out{STDOUT_FILENO};
FormatSettings settings;

void consume(Chunk chunk) override
{
size_t rows = chunk.getNumRows();
size_t columns = chunk.getNumColumns();

for (size_t row_num = 0; row_num < rows; ++row_num)
{
writeString("prefix-", out);
for (size_t column_num = 0; column_num < columns; ++column_num)
{
if (column_num != 0)
writeChar('\t', out);
getPort()
.getHeader()
.getByPosition(column_num)
.type->serializeAsText(*chunk.getColumns()[column_num], row_num, out, settings);
}
writeChar('\n', out);
}

out.next();
}
};

4. DAG Scheduler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
int main(int, char **)
{
auto source0 = std::make_shared<MySource>(5);
auto add0 = std::make_shared<MyAddTransformer>();
auto sinker0 = std::make_shared<MySink>();

/// Connect.
connect(source0->getPort(), add0->getInputPort());
connect(add0->getOutputPort(), sinker0->getPort());

std::vector<ProcessorPtr> processors = {source0, add0, sinker0};
PipelineExecutor executor(processors);
executor.execute(1);
}

 

标签:std,Pipeline,return,--,chunk,虎哥,num,input,columns
来源: https://www.cnblogs.com/wyc2021/p/15790409.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有