ClickHouse 表引擎
本文及后续所有文章都以 21.7.3.14-2 做为版本讲解和入门学习
# 表引擎的使用
表引擎是 ClickHouse 的一大特色。可以说, 表引擎决定了如何存储表的数据。包括:
- 数据的存储方式和位置,写到哪里以及从哪里读取数据。
- 支持哪些查询以及如何支持。
- 并发数据访问。
- 索引的使用(如果存在)。
- 是否可以执行多线程请求。
- 数据复制参数。
表引擎的使用方式就是必须显式在创建表时定义该表使用的引擎,以及引擎使用的相关参数。
注意:引擎的名称大小写敏感,都是以驼峰命名,且首字母注意是大写。
# TinyLog
以列文件的形式保存在磁盘上,不支持索引,没有并发控制。一般保存少量数据的小表,生产环境上作用有限。可以用于平时练习测试用。一般用于一些简单的测试使用,生产环境不考虑。
create table t_tinylog ( id String, name String) engine=TinyLog;
# Memory
内存引擎,数据以未压缩的原始形式直接保存在内存当中,服务器重启数据就会消失。读写操作不会相互阻塞,不支持索引。简单查询下有非常非常高的性能表现(超过 10G/s)。
一般用到它的地方不多,除了用来测试,就是在需要非常高的性能,同时数据量又不太大(上限大概 1 亿行)的场景。
# MergeTree
ClickHouse 中最强大的表引擎当属 MergeTree(合并树)引擎及该系列(*MergeTree)中的其他引擎,支持索引和分区,地位可以相当于 Mysql 的 innodb。而且基于 MergeTree,还衍生除了很多小弟,也是非常有特色的引擎。
# 建表语句
create table t_order_mt(
id UInt32,
sku_id String,
total_amount Decimal(16,2),
create_time Datetime
) engine =MergeTree
partition by toYYYYMMDD(create_time)
primary key (id)
order by (id,sku_id);
# 插入数据
insert into t_order_mt values
(101,'sku_001',1000.00,'2020-06-01 12:00:00') ,
(102,'sku_002',2000.00,'2020-06-01 11:00:00'),
(102,'sku_004',2500.00,'2020-06-01 12:00:00'),
(102,'sku_002',2000.00,'2020-06-01 13:00:00'),
(102,'sku_002',12000.00,'2020-06-01 13:00:00'),
(102,'sku_002',600.00,'2020-06-02 12:00:00');
# 查询
select * from t_order_mt;
┌──id─┬─sku_id──┬─total_amount─┬─────────create_time─┐
│ 102 │ sku_002 │ 600.00 │ 2020-06-02 12:00:00 │
└─────┴─────────┴──────────────┴─────────────────────┘
┌──id─┬─sku_id──┬─total_amount─┬─────────create_time─┐
│ 101 │ sku_001 │ 1000.00 │ 2020-06-01 12:00:00 │
│ 102 │ sku_002 │ 2000.00 │ 2020-06-01 11:00:00 │
│ 102 │ sku_002 │ 2000.00 │ 2020-06-01 13:00:00 │
│ 102 │ sku_002 │ 12000.00 │ 2020-06-01 13:00:00 │
│ 102 │ sku_004 │ 2500.00 │ 2020-06-01 12:00:00 │
└─────┴─────────┴──────────────┴─────────────────────┘
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
MergeTree 其实还有很多参数 (绝大多数用默认值即可),但是三个参数是更加重要的,也涉及了关于 MergeTree 的很多概念。
# 文件介绍
以上我们创建的表,他是一个文件夹,具体位置如下
/var/lib/clickhouse/data/system/t_order_mt
该 t_order_mt 文件夹下包含有如下文件
20200601_1_1_0 20200602_2_2_0 detached format_version.txt
- 20200601_1_1_0 分区目录,PartitionId_MinBlocakNum_MaxBlockNum_Level
PartitionId 数据分区 ID 生成规则由分区 ID 决定,分区 ID 由 PARTITION BY 分区键决定。根据分区键字段类型,ID 生成规则可分为:
-> 未定义分区键:没有定义 PARTITION BY ,默认生成一个目录名位 all 的数据分区,所有数据均放在 all 目录下。
-> 整型分区键:分区键为整型,那么直接用该整型值得字符串形式作为分区 ID。
-> 日期类分区键:分区键为日期类型,或者可以转换成日期类型。
-> 其他类型分区键:String、float 类型等,通过 128 位得 Hash 算法取其 Hash 值作为分区 ID。
MinBlocakNum:最小分区块编号,自增类型,从 1 开始向上递增。每产生一个新的目录分区就向上递增一个数字。
MaxBlockNum:最大分区块编号,新创建得分区 MinBlocakNum 等于 MaxBlockNum 得编号。
Level:合并的层级,被合并的次数。合并次数越多,层级值越大。 - detached 卸载,默认空的
- format_version.txt 格式化版本,默认是 1
再进入 20200601_1_1_0 文件夹,我们可以见到,如下几个文件
checksums.txt columns.txt count.txt data.bin data.mrk3 default_compression_codec.txt minmax_create_time.idx partition.dat primary.idx
- data.bin 数据文件,表里面的数据会在这里面,老版本会把每个字段拆开来 比如 id.bin,id.mrk3,name.bin,name.mrk3
- data.mrk3 标记文件,标记文件在 idx 索引文件和 bin 数据文件之间起到桥梁作用。老版本以 mrk2 结尾的文件,表示该表启用了自适应索引间隔。
- default_compression_codec.txt 压缩格式
- count.txt 记录表的行数,如果合并,也会记录合并后的。
- columns.txt 列的信息,记录列名和类型
- checksums.txt 校验文件,用于校验各个文件的正确性。存放各个文件的 size 以及 hash 值
- primary.idx 主键的索引文件,稀疏索引,用于加快查询效率
- partition.dat 分区信息
- minmax_create_time.idx 分区键的最大最小值
# partition by 分区 (可选)
# 作用
学过 hive 的应该都不陌生,分区的目的主要是降低扫描的范围,优化查询速度。
# 如果不填
只会使用一个分区,默认分区名称 all。
# 分区目录
MergeTree 是以列文件 + 索引文件 + 表定义文件组成的,但是如果设定了分区那么这些文件就会保存到不同的分区目录中。
# 并行
分区后,面对涉及跨分区的查询统计,ClickHouse 会以分区为单位并行处理。如果按时间分区推荐按 天 进行分区。
# 数据写入与分区合并
任何一个批次的数据写入都会产生一个临时分区,不会纳入任何一个已有的分区。写入后的某个时刻(大概 10-15 分钟后),ClickHouse 会自动执行合并操作(等不及也可以手动通过 optimize 执行),把临时分区的数据,合并到已有分区中。
# 对所有分区进行合并
optimize table xxxx final;
# 指定分区合并
optimize table xxxx partition '20200601' final;
2
3
4
执行了该语句,临时文件不会立马删除掉,等到一定的时间会自己删除掉。
# primary key 主键 (可选)
ClickHouse 中的主键,和其他数据库不太一样,它只提供了数据的一级索引,但是却不是唯一约束。这就意味着是可以存在相同 primary key 的数据的。
主键的设定主要依据是查询语句中的 where 条件。
根据条件通过对主键进行某种形式的二分查找,能够定位到对应的 index granularity (索引粒度), 避免了全表扫描。
index granularity: 直接翻译的话就是索引粒度,指在稀疏索引中两个相邻索引对应数据的间隔。ClickHouse 中的 MergeTree 默认是 8192。官方不建议修改这个值,除非该列存在大量重复值,比如在一个分区中几万行才有一个不同数据。
在同一个分区中,列值重复,索引是重复创建还是用第一个创建?用第一个就不会有太多的重复索引,加大索引粒度毫无意义,如果是建立重复索引,如何区分重复值建立索引的区别?
答:会重复建立索引,加大索引粒度可以说是跨度。
稀疏索引的好处就是可以用很少的索引数据,定位更多的数据,代价就是只能定位到索引粒度的第一行,然后再进行进行一点扫描。
# order by(必选)
order by 设定了分区内的数据按照哪些字段顺序进行有序保存。
order by 是 MergeTree 中唯一一个必填项,甚至比 primary key 还重要,因为当用户不设置主键的情况,很多处理会依照 order by 的字段进行处理(比如去重和汇总)。
要求:主键必须是 order by 字段的前缀字段。比如 order by 字段是 (id,sku_id) 那么主键必须是 id 或者 (id,sku_id), 不能跳过前者用 sku_id。
# 二级索引
目前在 ClickHouse 的官网上二级索引的功能在 v20.1.2.4 之前是被标注为实验性的,在这个版本之后默认是开启的。
老版本使用二级索引前需要增加设置,是否允许使用实验性的二级索引 (v20.1.24 开始,这个参数已被删除,默认开启)
set allow_experimental_data_skipping_indices=1;
二级索引的使用,其中 GRANULARITY N 是设定二级索引对于一级索引粒度的粒度(跨度)。
create table t_order_mt2(
id UInt32,
sku_id String,
total_amount Decimal(16,2),
create_time Datetime,
# 二级索引 INDEX 命名 字段 TYPE minmax GRANULARITY 粒度N
INDEX a total_amount TYPE minmax GRANULARITY 5
) engine =MergeTree
partition by toYYYYMMDD(create_time)
primary key (id)
order by (id, sku_id);
2
3
4
5
6
7
8
9
10
11
该索引对于该分区下的文件名称为 skp_idx_a.mrk3 跳数索引
# 数据 TTL
TTL 即 Time To Live,MergeTree 提供了可以管理数据表或者列的生命周期的功能。
# 适用场景
数仓建设需要考虑数据的生命周期问题,数据的生命周期包括数据最初的写入,存储,处理,查询,归档和销毁几个基本的阶段。实际中数仓数据量的成倍增长,不但产生了巨大容量的存储,同时也造成管理的困难,更换存储方式和存储迁移对项目来讲都是需要考虑成本和风险的。clickhouse 这样的一个设计,可以有效处理解决数据有效的存储周期和销毁的问题。ck 的出现对数据存储的数仓的业务选型又添加一种选择。
# 列级别 TTL
TTL 所监控的列不能是 主键字段,类型必须是日期类型。interval 支持 second (秒),minute (分),hour (小时),day (天),week (周),month (月),quarter (季度),year (年)
# 表创建
create table t_order_mt3(
id UInt32,
sku_id String,
# TTL 列名+interval 10 SECOND
total_amount Decimal(16,2) TTL create_time + INTERVAL 10 SECOND,
create_time Datetime
) engine =MergeTree
partition by toYYYYMMDD(create_time)
primary key (id)
order by (id, sku_id);
#插入数据
insert into t_order_mt3 values
(106,'sku_001',1000.00,'2021-10-29 17:01:00'),
(107,'sku_002',2000.00,'2021-10-29 17:01:00'),
(110,'sku_003',600.00,'2021-10-29 17:01:00');
# 查询数据
select * from t_order_mt3;
┌──id─┬─sku_id──┬─total_amount─┬─────────create_time─┐
│ 106 │ sku_001 │ 1000.00 │ 2021-10-29 15:13:50 │
│ 107 │ sku_002 │ 2000.00 │ 2021-10-29 15:13:50 │
│ 110 │ sku_003 │ 600.00 │ 2021-10-29 15:13:50 │
└─────┴─────────┴──────────────┴─────────────────────┘
# 等到了 15:14:00的时候可以手动执行合并,查看效果 到期后,指定的字段数据归 0,不合并不会变
select sleep(10)
optimize table t_order_mt3 final;
select * from t_order_mt3;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 表级别 TTL
创建时指定 TTL
CREATE TABLE example_table
(
d DateTime,
a Int
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(d)
ORDER BY d
TTL d + INTERVAL 1 MONTH [DELETE],
d + INTERVAL 1 WEEK TO VOLUME 'aaa',
d + INTERVAL 2 WEEK TO DISK 'bbb';
2
3
4
5
6
7
8
9
10
11
- DELETE - 删除过期的行(默认操作);
- TO DISK 'aaa' - 将数据片段移动到磁盘 aaa;
- TO VOLUME 'bbb' - 将数据片段移动到卷 bbb.
- GROUP BY - 聚合过期的行
修改表的 TTL
alter table t_order_mt3 MODIFY TTL create_time + INTERVAL 10 SECOND;
设置列过期做被聚合。列 x 包含每组行中的最大值,y 为最小值,d 为可能任意值。
CREATE TABLE table_for_aggregation
(
d DateTime,
k1 Int,
k2 Int,
x Int,
y Int
)
ENGINE = MergeTree
ORDER BY (k1, k2)
TTL d + INTERVAL 1 MONTH GROUP BY k1, k2 SET x = max(x), y = min(y);
2
3
4
5
6
7
8
9
10
11
# ReplacingMergeTree
ReplacingMergeTree 是 MergeTree 的一个变种,它得存储特性完全继承 MergeTree,只是多了一个去重的功能。 尽管 MergeTree 可以设置主键,但是 primary key 其实没有唯一约束的功能。如果你想处理掉重复的数据,可以借助这个 ReplacingMergeTree。ReplacingMergeTree 不会根据 primary key 约束去重,只会根据 order by 去重。
# 去重时机
数据的去重只会在合并的过程中出现。合并会在未知的时间在后台进行,所以你无法预先作出计划。有一些数据可能仍未被处理。
# 去重范围
如果表经过了分区,去重只会在分区内部进行去重,不能执行跨分区的去重。
所以 ReplacingMergeTree 能力有限, ReplacingMergeTree 适用于在后台清除重复的数据以节省空间,但是它不保证没有重复的数据出现
# 具体操作
create table t_order_rmt(
id UInt32,
sku_id String,
total_amount Decimal(16,2) ,
create_time Datetime
) engine = ReplacingMergeTree(create_time)
partition by toYYYYMMDD(create_time)
primary key (id)
order by (id, sku_id);
2
3
4
5
6
7
8
9
ReplacingMergeTree (create_time) 填入的参数为 依据 (版本) 字段,重复数据保留 依据 (版本) 字段值最大的。如果不填依据 (版本) 字段,默认按照插入顺序保留最后一条。
# 结论
➢ 实际上是使用 order by 字段作为唯一键
➢ 去重不能跨分区
➢ 只有同一批插入(新版本)或合并分区时才会进行去重
➢ 认定重复的数据保留,版本字段值最大的
➢ 如果版本字段相同则按插入顺序保留最后一笔
# SummingMergeTree
对于不查询明细,只关心以维度进行汇总聚合结果的场景。如果只使用普通的 MergeTree 的话,无论是存储空间的开销,还是查询时临时聚合的开销都比较大。
ClickHouse 为了这种场景,提供了一种能够 “预聚合” 的引擎 SummingMergeTree
# 建表
create table t_order_smt(
id UInt32,
sku_id String,
total_amount Decimal(16,2) ,
create_time Datetime
) engine = SummingMergeTree(total_amount)
partition by toYYYYMMDD(create_time)
primary key (id)
order by (id,sku_id );
2
3
4
5
6
7
8
9
SummingMergeTree (total_amount) 设置你想聚合的列,也可以填多个。会根据 order by 做类似 group by 一样。
# 结论
➢ 分区内聚合,不在一个分区的数据不会被聚合
➢ 分片合并的时候才聚合
➢ 以 order by 的字段去重用作维度列,对指定(必须是数值)字段做聚合汇总数据,如果不填,以所有 ** 非维度列(意思就是除 order by 以外)** 且为数字列的字段做汇总
➢ 其余字段取最早(最小)的数据
➢ 只有在同一批次插入(新版本)或分片合并时才会进行聚合。
# 建议
设计聚合表的话,唯一键值、流水号可以去掉,所有字段全部是维度、度量或者时间戳。
在写 sql 的时候还是需要把 sum () 函数带上,因为可能会包含一些还没来得及聚合的临时明细。如果是要获取汇总值,还是需要使用 sum 进行聚合,这样效率会有一定的提高,但本身 ClickHouse 是列式存储的,效率提升有限,不会特别明显。
select sum(total_amount) from province_name=’’ and create_date=‘xxx’
# integrations 集成引擎
# MYSQL
MySQL 引擎允许您对存储在远程 MySQL 服务器上的数据执行 SELECT 和 INSERT 查询.
语法
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
...
) ENGINE = MySQL('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_duplicate_clause'])
SETTINGS
[connection_pool_size=16, ]
[connection_max_tries=3, ]
[connection_wait_timeout=5, ] /* 0 -- do not wait */
[connection_auto_close=true ]
;
2
3
4
5
6
7
8
9
10
11
12
- host:port 数据库连接的 IP 和端口
- database 要选择的库
- table 对应选择库里的表
- user MYSQL 用户名
- password MYSQL 密码
- replace_query
- on_duplicate_clause
其中引擎支持可以用 | 查询多个不同的
CREATE TABLE test_replicas (id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL(`mysql{2|3|4}:3306`, 'clickhouse', 'test_replicas', 'root', 'clickhouse');
先在 mysql 中创建表
mysql> CREATE TABLE `test`.`test` (
-> `int_id` INT NOT NULL AUTO_INCREMENT,
-> `int_nullable` INT NULL DEFAULT NULL,
-> `float` FLOAT NOT NULL,
-> `float_nullable` FLOAT NULL DEFAULT NULL,
-> PRIMARY KEY (`int_id`));
Query OK, 0 rows affected (0,09 sec)
mysql> insert into test (`int_id`, `float`) VALUES (1,2);
Query OK, 1 row affected (0,00 sec)
mysql> select * from test;
+------+----------+-----+----------+
| int_id | int_nullable | float | float_nullable |
+------+----------+-----+----------+
| 1 | NULL | 2 | NULL |
+------+----------+-----+----------+
1 row in set (0,00 sec)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
在 clickhouse 创建对 MySQL 的映射
CREATE TABLE mysql_table
(
`float_nullable` Nullable(Float32),
`int_id` Int32
)
ENGINE = MySQL('localhost:3306', 'test', 'test', 'bayonet', '123')
# 查询
SELECT * FROM mysql_table
┌─float_nullable─┬─int_id─┐
│ ᴺᵁᴸᴸ │ 1 │
└────────────────┴────────┘
2
3
4
5
6
7
8
9
10
11
# JDBC
语法
CREATE TABLE [IF NOT EXISTS] [db.]table_name
(
columns list...
)
ENGINE = JDBC(datasource_uri, external_database, external_table)
2
3
4
5
- datasource_uri 外部 DBMS 的 URI 或名称
- external_database 外部 DBMS 中的数据库
- external_table external_database 中表的名称或可以直接写查询语句,如 select * from table1 where column1=1。
可以在 mysql 中先创建表和数据
mysql> CREATE TABLE `test`.`test` (
-> `int_id` INT NOT NULL AUTO_INCREMENT,
-> `int_nullable` INT NULL DEFAULT NULL,
-> `float` FLOAT NOT NULL,
-> `float_nullable` FLOAT NULL DEFAULT NULL,
-> PRIMARY KEY (`int_id`));
Query OK, 0 rows affected (0,09 sec)
mysql> insert into test (`int_id`, `float`) VALUES (1,2);
Query OK, 1 row affected (0,00 sec)
mysql> select * from test;
+------+----------+-----+----------+
| int_id | int_nullable | float | float_nullable |
+------+----------+-----+----------+
| 1 | NULL | 2 | NULL |
+------+----------+-----+----------+
1 row in set (0,00 sec)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
在 clickhouse 创建映射
CREATE TABLE jdbc_table
(
`int_id` Int32,
`int_nullable` Nullable(Int32),
`float` Float32,
`float_nullable` Nullable(Float32)
)
ENGINE JDBC('jdbc:mysql://localhost:3306/?user=root&password=root', 'test', 'test')
# 查询
SELECT * FROM jdbc_table
┌─int_id─┬─int_nullable─┬─float─┬─float_nullable─┐
│ 1 │ ᴺᵁᴸᴸ │ 2 │ ᴺᵁᴸᴸ │
└────────┴──────────────┴───────┴────────────────┘
# 可以从其他表插入数据到 jdbc_table
INSERT INTO jdbc_table(`int_id`, `float`)
SELECT toInt32(number), toFloat32(number * 1.0)
FROM system.numbers
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# KAFKA
语法以及可选参数
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'host:port',
kafka_topic_list = 'topic1,topic2,...',
kafka_group_name = 'group_name',
kafka_format = 'data_format'[,]
[kafka_row_delimiter = 'delimiter_symbol',]
[kafka_schema = '',]
[kafka_num_consumers = N,]
[kafka_max_block_size = 0,]
[kafka_skip_broken_messages = N,]
[kafka_commit_every_batch = 0,]
[kafka_thread_per_consumer = 0]
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
- kafka_broker_list kafka 地址,可以用逗号分隔多个地址 (127.0.0.1:9092,127.0.0.2:9092,127.0.0.3:9092)
- kafka_topic_list 可以有多个 topic
- kafka_group_name 组名称
- kafka_format 消息格式,可以查看这个地址 https://clickhouse.com/docs/en/interfaces/formats/
- kafka_row_delimiter 以什么符号来结束消息的接收
- kafka_schema
- kafka_num_consumers 消费者的数量,默认 1。消费者总数不应超过主题的分区数,每个分区只能分配一个消费者。
- kafka_max_block_size poll 一次消息可拉取的大小
- kafka_skip_broken_messages Kafka 消息解析器对每个块的模式不兼容消息的容忍度。默认值:0。如果 kafka_skip_broken_messages = N,那么引擎会跳过 N 条无法解析的 Kafka 消息(一条消息等于一行数据)
- kafka_commit_every_batch 提交消息批次(默认值:0)
- kafka_thread_per_consumer 为每个消费者提供独立的线程(默认值:0)。启用后,每个消费者独立地、并行地刷新数据(否则来自多个消费者的数据被挤压形成阻塞)
具体使用
CREATE TABLE queue (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');
SELECT * FROM queue LIMIT 5;
CREATE TABLE queue2 (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'topic',
kafka_group_name = 'group1',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 4;
CREATE TABLE queue3 (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka('localhost:9092', 'topic', 'group1')
SETTINGS kafka_format = 'JSONEachRow',
kafka_num_consumers = 4;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# HDFS
CREATE TABLE hdfs_engine_table (name String, value UInt32) ENGINE=HDFS('hdfs://hdfs1:9000/other_storage', 'TSV')
# 插入
INSERT INTO hdfs_engine_table VALUES ('one', 1), ('two', 2), ('three', 3)
# 查询
SELECT * FROM hdfs_engine_table LIMIT 2
┌─name─┬─value─┐
│ one │ 1 │
│ two │ 2 │
└──────┴───────┘
2
3
4
5
6
7
8
9
文档地址 https://clickhouse.com/docs/en/engines/table-engines/integrations/jdbc/