ClickHouse 使用插入代替更新

参考链接:https://blog.csdn.net/qq_42651904/article/details/112802212

由于ClickHouse的更新操作十分佛系,无法满足日常业务的更新需求,所以我们需要有新的方式去实现数据的更新。

OLAP 数据库中,可变数据通常不受欢迎。ClickHouse 也不例外。与其他 OLAP 产品一样,ClickHouse 最初甚至不支持更新。后来,添加了更新,但是和其他很多东西一样,它们是以“ ClickHouse”的方式添加的

即使是现在,ClickHouse 的更新也是异步的,这使得它们很难在交互式应用程序中使用。尽管如此,在许多用例中,用户需要对现有数据进行修改,并希望立即看到效果。能做到这一点吗?当然可以。

早在2016年,ClickHouse 团队发表了一篇题为“如何在 ClickHouse 中更新数据”的文章。当时 ClickHouse 不支持数据修改。只能使用特殊的插入结构来模拟更新,数据必须通过分区删除。

GDPR 要求的压力下,ClickHouse 团队在2018年发布了 UPDATESDELETES。后续文章“ ClickHouse 中的更新和删除”仍然是 Altinity 博客中阅读量最高的文章之一。这些异步的、非原子的更新是以 alter table update 语句的形式实现的,并且可能会传输大量数据。这对于不需要立即结果的批量操作和不经常的更新非常有用。“普通” SQL 更新仍然缺失在 ClickHouse 中,尽管它们每年都可靠地出现在路线图中。如果需要实时更新行为,我们必须使用不同的方法。让我们考虑一个实际的用例,并比较在 ClickHouse 中实现它的不同方法。

存储引擎选择

ClickHouse中的表引擎有以下几种:

- MergeTree
- ReplacingMergeTree
- SummingMergeTree
- AggregatingMergeTree
- CollapsingMergeTree
- VersionedCollapsingMergeTree
- GraphiteMergeTree

我们主要尝试使用以下三种:

  • ReplacingMergeTree

  • Aggregate functions

  • AggregatingMergeTree

ReplacingMergeTree

让我们从创建一个存储警报的表开始。

1
2
3
4
5
6
7
8
9
10
11
12
CREATE TABLE alerts(
tenant_id UInt32,
alert_id String,
timestamp DateTime Codec(Delta, LZ4),
alert_data String,
acked UInt8 DEFAULT 0,
ack_time DateTime DEFAULT toDateTime(0),
ack_user LowCardinality(String) DEFAULT ''
)
ENGINE = ReplacingMergeTree(ack_time)
PARTITION BY tuple()
ORDER BY (tenant_id, timestamp, alert_id);

为简单起见,所有警报特定列都被打包到一个通用的“ alert _ data”列中。但是你可以想象这个警报可能包含几十个甚至几百个列。另外,alert _ id在我们的示例中是一个随机字符串。

注意 ReplacingMergeTree 引擎。ReplacingMergeTee 是一个特殊的表引擎,它用主键替换数据(ORDER BY)ーー具有相同键值的新版本行将替换旧版本的行。在我们的例子中,“新鲜感”是由一个列决定的,“回到过去”。在后台合并操作期间执行替换。它不会立即发生,也不能保证会发生,因此查询结果的一致性是个问题。不过,ClickHouse 有一种特殊的语法来处理这些表,我们将在下面的查询中使用它。

在运行查询之前,让我们用一些数据填充表格,我们为1000个租户生成1000万个警报:

1
2
3
4
5
6
7
INSERT INTO alerts(tenant_id, alert_id, timestamp, alert_data)
SELECT
toUInt32(rand(1)%1000+1) AS tenant_id,
randomPrintableASCII(64) as alert_id,
toDateTime('2020-01-01 00:00:00') + rand(2)%(3600*24*30) as timestamp,
randomPrintableASCII(1024) as alert_data
FROM numbers(10000000);

接下来,让我们确认99% 的警报,为ackedack userack time列提供新的值。我们只是插入一个新行,而不是更新。

1
2
3
4
5
INSERT INTO alerts (tenant_id, alert_id, timestamp, alert_data, acked, ack_user, ack_time)
SELECT tenant_id, alert_id, timestamp, alert_data,
1 as acked,
concat('user', toString(rand()%1000)) as ack_user, now() as ack_time
FROM alerts WHERE cityHash64(alert_id) % 99 != 0;

如果我们现在查询这个表,我们会看到如下内容:

1
2
3
4
5
6
7
SELECT count() FROM alerts

┌──count()─┐
19898060
└──────────┘

1 rows in set. Elapsed: 0.008 sec.

因此,我们在表中肯定有已确认行和未确认行。因此,替换还没有发生。为了查看真实的数据,我们必须添加一个 FINAL 关键字。

1
2
3
4
5
6
7
SELECT count() FROM alerts FINAL

┌──count()─┐
10000000
└──────────┘

1 rows in set. Elapsed: 3.693 sec. Processed 19.90 million rows, 1.71 GB (5.39 million rows/s., 463.39 MB/s.)

计数现在是正确的,但是请查看查询时间!使用 FINALClickHouse 必须在查询时扫描所有行并按主键合并它们。这样就得到了正确的答案,但是开销很大。让我们看看是否可以通过仅过滤未被确认的行来做得更好

1
2
3
4
5
6
7
SELECT count() FROM alerts FINAL WHERE NOT acked

┌─count()─┐
101940
└─────────┘

1 rows in set. Elapsed: 3.570 sec. Processed 19.07 million rows, 1.64 GB (5.34 million rows/s., 459.38 MB/s.)

查询时间和处理的数据量都是相同的,尽管计数要小得多。过滤无助于加快查询速度。随着表大小的增加,成本可能会更高。它无法扩大规模。

好吧,查询整个表并没有多大帮助。我们仍然可以使用 ReplacingMergeTree 作为我们的用例吗?让我们随机选择一个租户 id,然后选择所有尚未确认的记录ー假设有一个用户正在查看的仪表板。我喜欢雷 · 布拉德伯里,所以我选了451。因为alert data只是随机的垃圾,我们将计算一个校验和,并用它来确认不同方法的结果是相同的:

1
2
3
4
5
6
7
8
9
10
11
SELECT 
count(),
sum(cityHash64(*)) AS data
FROM alerts FINAL
WHERE (tenant_id = 451) AND (NOT acked)

┌─count()─┬─────────────────data─┐
9018441617166277032220
└─────────┴──────────────────────┘

1 rows in set. Elapsed: 0.278 sec. Processed 106.50 thousand rows, 119.52 MB (383.45 thousand rows/s., 430.33 MB/s.)

真够快的!在278毫秒内,我们可以查询所有未确认的数据。这次为什么这么快?不同之处在于过滤条件。tenant _ id是主键的一部分,因此 ClickHouse 可以在 FINAL 之前过滤数据。在这种情况下,ReplacingMergeTree 变得有效。

让我们也尝试一个用户过滤器,并查询特定用户承认的警报数量。列的基数是相同的ー我们有1000个用户,可以尝试 user451

1
2
3
4
5
6
7
8
SELECT count() FROM alerts FINAL
WHERE (ack_user = 'user451') AND acked

┌─count()─┐
9725
└─────────┘

1 rows in set. Elapsed: 4.778 sec. Processed 19.04 million rows, 1.69 GB (3.98 million rows/s., 353.21 MB/s.)

由于没有使用索引,现在的速度非常慢。点击之家扫描了所有1904万行。注意,我们不能向索引中添加ack _ user ,因为它会破坏 ReplacingMergeTree 语义。不过,我们可以在 PREWHERE 问题上耍一把:

1
2
3
4
5
6
7
8
SELECT count() FROM alerts FINAL
PREWHERE (ack_user = 'user451') AND acked

┌─count()─┐
9725
└─────────┘

1 rows in set. Elapsed: 0.639 sec. Processed 19.04 million rows, 942.40 MB (29.80 million rows/s., 1.48 GB/s.)

PREWHERE 是让 ClickHouse 以不同方式应用过滤器的特殊提示。通常 ClickHouse 非常聪明,可以自动将条件移动到 PREWHERE,因此用户不应该在意。这次没有发生,所以我们已经检查过了。

Aggregate Functions

众所周知,ClickHouse 支持各种各样的聚合函数。在最新的版本中,它已经有了超过100个。结合9个聚合函数组合子(见 https://clickhouse.tech/docs/en/query_language/agg_functions/combinators/) ,这给有经验的用户提供了巨大的灵活性。对于这个用例,我们不需要任何高级的,将只使用3个函数: argMaxmaxany

451st tenant 的相同查询可以用一个argMax聚合函数执行,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
SELECT count(), sum(cityHash64(*)) data FROM (
SELECT tenant_id, alert_id, timestamp,
argMax(alert_data, ack_time) alert_data,
argMax(acked, ack_time) acked,
max(ack_time) ack_time_,
argMax(ack_user, ack_time) ack_user
FROM alerts
GROUP BY tenant_id, alert_id, timestamp
)
WHERE tenant_id=451 AND NOT acked;

┌─count()─┬─────────────────data─┐
│ 90 │ 18441617166277032220 │
└─────────┴──────────────────────┘

1 rows in set. Elapsed: 0.059 sec. Processed 73.73 thousand rows, 82.74 MB (1.25 million rows/s., 1.40 GB/s.)

相同的结果,相同的行数,但是性能要好4倍!这就是 ClickHouse 的聚合效率。缺点是查询变得更加复杂。但是我们可以让它变得更简单。

让我们注意,当收到警报时,我们只更新了3栏:

  • acked: 0 => 1

  • ack_time: 0 => now()

  • ack_user: ‘’ => ‘user1’

在所有3种情况下,列值都会增加!所以我们可以用 max 来代替笨重的 argMax因为我们不更改alert _ data ,所以我们不需要在这个列上进行任何实际的聚合。有一个很好的any聚合功能。它选择任何没有额外开销的价值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
SELECT count(), sum(cityHash64(*)) data FROM (
SELECT tenant_id, alert_id, timestamp,
any(alert_data) alert_data,
max(acked) acked,
max(ack_time) ack_time,
max(ack_user) ack_user
FROM alerts
GROUP BY tenant_id, alert_id, timestamp
)
WHERE tenant_id=451 AND NOT acked;

┌─count()─┬─────────────────data─┐
│ 90 │ 18441617166277032220 │
└─────────┴──────────────────────┘

1 rows in set. Elapsed: 0.055 sec. Processed 73.73 thousand rows, 82.74 MB (1.34 million rows/s., 1.50 GB/s.)

查询变得简单,而且稍微快一点!原因是使用 any 函数,ClickHouse 不需要在 alert _ data 列上计算 max

AggregatingMergeTree

AggregatingMergeTree 是最强大的 ClickHouse 功能之一。当与物化视图结合时,它支持实时数据聚合。既然我们在前面的方法中使用了聚合函数,那么我们可以使用 AggregatingMergeTree 使其更好吗?事实上,这并不是什么大的进步。

我们只更新一行,所以一个组只有两行需要聚合。在这种情况下,AggregatingMergeTree 并不是最佳选择。不过,我们可以玩个小把戏。我们知道,警报总是作为非确认首先插入,然后成为确认。一旦用户确认了警报,只需要修改3列。如果不重复其他列的数据,是否可以节省磁盘空间并提高性能?

让我们创建一个表,它使用“ max”聚合函数实现聚合。除了max之外,我们还可以使用any ,但是这需要将列设置为 Nullable ーany将选择一个非空值。

1
2
3
4
5
6
7
8
9
10
11
12
13
DROP TABLE alerts_amt_max;

CREATE TABLE alerts_amt_max (
tenant_id UInt32,
alert_id String,
timestamp DateTime Codec(Delta, LZ4),
alert_data SimpleAggregateFunction(max, String),
acked SimpleAggregateFunction(max, UInt8),
ack_time SimpleAggregateFunction(max, DateTime),
ack_user SimpleAggregateFunction(max, LowCardinality(String))
)
Engine = AggregatingMergeTree()
ORDER BY (tenant_id, timestamp, alert_id);

由于原始数据是随机的,我们将使用来自“ alerts”的现有数据填充新表。我们将像前面一样分为两个插入,一个插入用于非确认警报,另一个插入用于确认警报:

请注意,对于已确认的事件,我们插入一个空字符串,而不是“ alert _ data”。我们知道数据不会改变,我们只能存储一次!聚合函数将填补这个空白。在真正的应用程序中,我们可以跳过所有不更改的列,让它们获得默认值。

一旦我们有了数据,让我们首先检查数据大小:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
SELECT 
table,
sum(rows) AS r,
sum(data_compressed_bytes) AS c,
sum(data_uncompressed_bytes) AS uc,
uc / c AS ratio
FROM system.parts
WHERE active AND (database = 'last_state')
GROUP BY table

┌─table──────────┬────────r─┬───────────c─┬──────────uc─┬──────────────ratio─┐
│ alerts │ 1903943920926009562210493077101.0058921003373666
│ alerts_amt_max │ 1903943910723636061109020481781.0166372782501314
└────────────────┴──────────┴─────────────┴─────────────┴────────────────────┘

多亏了随机字符串,我们几乎没有压缩。但是由于我们不需要存储两次‘ alerts data’ ,因此总量要小两倍。

现在让我们尝试对聚合表进行查询:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
SELECT count(), sum(cityHash64(*)) data FROM (
SELECT tenant_id, alert_id, timestamp,
max(alert_data) alert_data,
max(acked) acked,
max(ack_time) ack_time,
max(ack_user) ack_user
FROM alerts_amt_max
GROUP BY tenant_id, alert_id, timestamp
)
WHERE tenant_id=451 AND NOT acked;

┌─count()─┬─────────────────data─┐
│ 90 │ 18441617166277032220 │
└─────────┴──────────────────────┘

1 rows in set. Elapsed: 0.036 sec. Processed 73.73 thousand rows, 40.75 MB (2.04 million rows/s., 1.13 GB/s.)

多亏了 AggregatingMergeTree,我们处理的数据更少了(以前是40mb82mb) ,而且现在效率更高了。

Materializing The Update

ClickHouse 将尽最大努力在后台合并数据,删除重复行并执行聚合。然而,有时强制合并是有意义的,例如,为了释放磁盘空间。这可以通过 OPTIMIZE FINAL 语句来完成。OPTIMIZE 是一个阻塞和昂贵的操作,因此不能执行太频繁。让我们看看它是否对查询性能有任何影响。

1
2
3
4
5
6
7
OPTIMIZE TABLE alerts FINAL
Ok.
0 rows in set. Elapsed: 105.675 sec.

OPTIMIZE TABLE alerts_amt_max FINAL
Ok.
0 rows in set. Elapsed: 70.121 sec.

在 OPTIMIZE FINAL 之后,两个表具有相同的行数和相同的数据。

1
2
3
4
┌─table──────────┬────────r─┬───────────c─┬──────────uc─┬────────────ratio─┐
│ alerts │ 10000000 │ 10616223201 │ 10859490300 │ 1.02291465565429 │
│ alerts_amt_max │ 10000000 │ 10616223201 │ 10859490300 │ 1.02291465565429 │
└────────────────┴──────────┴─────────────┴─────────────┴──────────────────┘

不同方法之间的性能差异变得不那么显著。以下是总表:

After inserts After OPTIMIZE FINAL
ReplacingMergeTree FINA 0.278 0.037
argMax 0.059 0.034
any/max 0.055 0.029
AggregatingMergeTree 0.036 0.026

总结

ClickHouse 提供了一个丰富的工具集来处理实时更新,比如 ReplacingMergeTreeCollapsingMergeTree (这里没有讨论)、 AggregatingMergeTree 和聚合函数。所有这些方法都有三个共同属性:

数据通过插入新版本进行“修改”。在 ClickHouse 中插入的速度非常快。
有一些高效的方法可以仿真更新类似 OLTP 数据库的语义
然而,实际的修改不会立即发生。
特定方法的选择取决于应用程序用例。ReplacingMergeTree 对于用户来说是最简单和最方便的,但是可能只用于中小型表,或者数据总是由主键查询。使用聚合函数提供了更多的灵活性和性能,但需要大量的查询重写。最后,AggregatingMergeTree 允许存储保存,只保留修改过的列。这些都是 ClickHouse DB 设计器的工具库中的好工具,并在需要时应用。

下面是实践过程:

使用表 引擎为AggregatingMergeTree

创建测试表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
CREATE TABLE default.test2
(
`pk` SimpleAggregateFunction(any, Int64),
`accNO` Nullable(String),
`createdat` SimpleAggregateFunction(max,DateTime),
`hiscode` String,
`last_update` Nullable(DateTime),
`modality` Nullable(String),
`patientId` Nullable(String),
`seriesInstUID` Nullable(String),
`seriesNo` Nullable(String),
`seriesdate` Nullable(String),
`seriesdescr` Nullable(String),
`seriestime` Nullable(String),
`sopInstUID` Nullable(String),
`studyId` Nullable(String),
`studyInstUID` Nullable(String),
`studydate` Nullable(String),
`studytime` Nullable(String),
`uploadflag` Nullable(Int32)
)
ENGINE = AggregatingMergeTree
ORDER BY pk
SETTINGS index_granularity = 8192

创建表时指定了表的主键和更新字段的数据覆盖策略,SimpleAggregateFunction(name, types_of_arguments…) 数据类型存储聚合函数的当前值,max 则指定保留最新的数据。

由于AggregatingMergeTree数据聚合是在后台进行,并没有确定的时候,所以如果想要查询出最新的数据需要在查询语句的后面加上 final 关键字,如:SELECT * from test2 st final

也可以定时执行optimize table series_test2 强制进行数据合并操作,但是OPTIMIZE 是一个阻塞和昂贵的操作,因此不能执行太频繁。

执行完以上操作之后,可以再次查询series_test2表,数据应该是合并之后的。

PS:如果其他字段也许要进行聚合更新,那么创建表时字段函数依然需要指定为SimpleAggregateFunction,里面的第一个参数按需要配置,如果配置any可能会不生效。

经过 SimpleAggregateFunction 修饰的字段查出来的类型都是String,应该是该函数bug,所以需要自行数据类型转换。