技术博客 技术博客
  • JAVA
  • 仓颉
  • 设计模式
  • 人工智能
  • Spring
  • Mybatis
  • Maven
  • Git
  • Kafka
  • RabbitMQ
  • RocketMQ
  • Redis
  • Zookeeper
  • Nginx
  • 数据库套件
  • MySQL
  • Elasticsearch
  • MongoDB
  • Hadoop
  • ClickHouse
  • Hbase
  • Hive
  • Flink
  • Flume
  • SQLite
  • linux
  • Docker
  • Jenkins
  • Kubernetes
  • 工具
  • 前端
  • AI
GitHub (opens new window)
  • JAVA
  • 仓颉
  • 设计模式
  • 人工智能
  • Spring
  • Mybatis
  • Maven
  • Git
  • Kafka
  • RabbitMQ
  • RocketMQ
  • Redis
  • Zookeeper
  • Nginx
  • 数据库套件
  • MySQL
  • Elasticsearch
  • MongoDB
  • Hadoop
  • ClickHouse
  • Hbase
  • Hive
  • Flink
  • Flume
  • SQLite
  • linux
  • Docker
  • Jenkins
  • Kubernetes
  • 工具
  • 前端
  • AI
GitHub (opens new window)
  • mysql

    • MySQL 问题汇总
    • MySQL 索引介绍
    • MySQL 锁介绍
    • MySQL 索引优化工具 explain
    • MySQL 主从复制(GTID)
    • MySQL 8安装
    • MySQL 8.x新特性总结
    • MySQL UDF以及新类型JSON
    • MySQL 高可用MGR(一) 理论
    • MySQL 高可用MGR(二) 搭建
    • MySQL 高可用MGR(三) 测试
  • Elasticsearch

    • ES 7.8.0(一) 入门介绍
    • ES 7.8.0(二) 读、写和写索引流程以及文档分析过程
    • ES 7.8.0(三) 文档冲突
  • mongodb

    • mongodb
  • hadoop

    • Hadoop 伪分布式及集群
    • Hadoop 指令
    • Hadoop 读写流程详解
    • Hadoop SpringBoot集成
    • Hadoop MapReduce机制
    • Hadoop YARN
    • Hadoop MapReduce配置和编写job及数据倾斜的解决
    • Hadoop MapReduce自定义格式输入输出
  • clickhouse

    • ClickHouse 介绍及安装
    • ClickHouse 数据类型
    • ClickHouse 表引擎
      • 表引擎的使用
      • TinyLog
      • Memory
      • MergeTree
        • 文件介绍
        • partition by 分区(可选)
        • 作用
        • 如果不填
        • 分区目录
        • 并行
        • 数据写入与分区合并
        • primary key 主键(可选)
        • order by(必选)
        • 二级索引
        • 数据 TTL
        • 适用场景
        • 列级别TTL
        • 表级别TTL
      • ReplacingMergeTree
        • 去重时机
        • 去重范围
        • 具体操作
        • 结论
      • SummingMergeTree
        • 建表
        • 结论
        • 建议
      • integrations 集成引擎
        • MYSQL
        • JDBC
        • KAFKA
        • HDFS
    • ClickHouse SQL操作
    • ClickHouse 副本配置
    • ClickHouse 分片与集群部署
    • ClickHouse Explain及建表优化
    • ClickHouse 语法优化规则
    • ClickHouse 查询优化
    • ClickHouse 数据一致性
    • ClickHouse 物化视图
    • ClickHouse MaterializeMySQL引擎
    • ClickHouse 监控及备份
  • hbase

    • Hbase 介绍及安装
    • Hbase 优化
    • Hbase phoenix安装及使用
    • Hbase LSM-TREE
  • hive

    • Hive 介绍及安装
    • Hive 内外部表、分区表、分桶表概念及hiveSQL命令
    • Hive 数据类型
    • Hive 函数 MySQL联合
    • Hive 数据倾斜和优化
    • Hive Sqoop安装及指令
  • flink

    • Flink 介绍及安装
    • Flink 配置介绍及Demo
    • Flink API讲解
    • Flink 运行架构
    • Flink 时间语义及Watermark
    • Flink 状态管理
    • Flink 容错,检查点,保存点
    • Flink 状态一致性
    • Flink Table API 和 Flink SQL
    • Flink CEP编程
    • Flink Joining编程
    • Flink CDC
  • flume

    • Flume 日志收集系统介绍及安装
    • Flume Source支持的类型
    • Flume Sink支持的类型
    • Flume Channel支持的类型
    • Flume Selector
    • Flume Interceptor拦截器类型
    • Flume Process
  • sqlite

    • SQLite介绍
目录

ClickHouse 表引擎

本文及后续所有文章都以 21.7.3.14-2 做为版本讲解和入门学习

# 表引擎的使用

表引擎是 ClickHouse 的一大特色。可以说, 表引擎决定了如何存储表的数据。包括:

  • 数据的存储方式和位置,写到哪里以及从哪里读取数据。
  • 支持哪些查询以及如何支持。
  • 并发数据访问。
  • 索引的使用(如果存在)。
  • 是否可以执行多线程请求。
  • 数据复制参数。

表引擎的使用方式就是必须显式在创建表时定义该表使用的引擎,以及引擎使用的相关参数。
注意:引擎的名称大小写敏感,都是以驼峰命名,且首字母注意是大写。

# TinyLog

以列文件的形式保存在磁盘上,不支持索引,没有并发控制。一般保存少量数据的小表,生产环境上作用有限。可以用于平时练习测试用。一般用于一些简单的测试使用,生产环境不考虑。

create table t_tinylog ( id String, name String) engine=TinyLog;
1

# Memory

内存引擎,数据以未压缩的原始形式直接保存在内存当中,服务器重启数据就会消失。读写操作不会相互阻塞,不支持索引。简单查询下有非常非常高的性能表现(超过 10G/s)。

一般用到它的地方不多,除了用来测试,就是在需要非常高的性能,同时数据量又不太大(上限大概 1 亿行)的场景。

# MergeTree

ClickHouse 中最强大的表引擎当属 MergeTree(合并树)引擎及该系列(*MergeTree)中的其他引擎,支持索引和分区,地位可以相当于 Mysql 的 innodb。而且基于 MergeTree,还衍生除了很多小弟,也是非常有特色的引擎。

# 建表语句
create table t_order_mt(
 id UInt32,
 sku_id String,
 total_amount Decimal(16,2),
 create_time Datetime
) engine =MergeTree
 partition by toYYYYMMDD(create_time)
 primary key (id)
 order by (id,sku_id);
# 插入数据
insert into t_order_mt values
(101,'sku_001',1000.00,'2020-06-01 12:00:00') ,
(102,'sku_002',2000.00,'2020-06-01 11:00:00'),
(102,'sku_004',2500.00,'2020-06-01 12:00:00'),
(102,'sku_002',2000.00,'2020-06-01 13:00:00'),
(102,'sku_002',12000.00,'2020-06-01 13:00:00'),
(102,'sku_002',600.00,'2020-06-02 12:00:00');
# 查询 
select * from t_order_mt;
┌──id─┬─sku_id──┬─total_amount─┬─────────create_time─┐
│ 102 │ sku_002 │       600.00 │ 2020-06-02 12:00:00 │
└─────┴─────────┴──────────────┴─────────────────────┘
┌──id─┬─sku_id──┬─total_amount─┬─────────create_time─┐
│ 101 │ sku_001 │      1000.00 │ 2020-06-01 12:00:00 │
│ 102 │ sku_002 │      2000.00 │ 2020-06-01 11:00:00 │
│ 102 │ sku_002 │      2000.00 │ 2020-06-01 13:00:00 │
│ 102 │ sku_002 │     12000.00 │ 2020-06-01 13:00:00 │
│ 102 │ sku_004 │      2500.00 │ 2020-06-01 12:00:00 │
└─────┴─────────┴──────────────┴─────────────────────┘
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

MergeTree 其实还有很多参数 (绝大多数用默认值即可),但是三个参数是更加重要的,也涉及了关于 MergeTree 的很多概念。

# 文件介绍

以上我们创建的表,他是一个文件夹,具体位置如下

/var/lib/clickhouse/data/system/t_order_mt
1

该 t_order_mt 文件夹下包含有如下文件

20200601_1_1_0  20200602_2_2_0  detached  format_version.txt
1
  • 20200601_1_1_0 分区目录,PartitionId_MinBlocakNum_MaxBlockNum_Level
    PartitionId 数据分区 ID 生成规则由分区 ID 决定,分区 ID 由 PARTITION BY 分区键决定。根据分区键字段类型,ID 生成规则可分为:
    -> 未定义分区键:没有定义 PARTITION BY ,默认生成一个目录名位 all 的数据分区,所有数据均放在 all 目录下。
    -> 整型分区键:分区键为整型,那么直接用该整型值得字符串形式作为分区 ID。
    -> 日期类分区键:分区键为日期类型,或者可以转换成日期类型。
    -> 其他类型分区键:String、float 类型等,通过 128 位得 Hash 算法取其 Hash 值作为分区 ID。
    MinBlocakNum:最小分区块编号,自增类型,从 1 开始向上递增。每产生一个新的目录分区就向上递增一个数字。
    MaxBlockNum:最大分区块编号,新创建得分区 MinBlocakNum 等于 MaxBlockNum 得编号。
    Level:合并的层级,被合并的次数。合并次数越多,层级值越大。
  • detached 卸载,默认空的
  • format_version.txt 格式化版本,默认是 1

再进入 20200601_1_1_0 文件夹,我们可以见到,如下几个文件

checksums.txt  columns.txt  count.txt  data.bin  data.mrk3  default_compression_codec.txt  minmax_create_time.idx  partition.dat  primary.idx
1
  • data.bin 数据文件,表里面的数据会在这里面,老版本会把每个字段拆开来 比如 id.bin,id.mrk3,name.bin,name.mrk3
  • data.mrk3 标记文件,标记文件在 idx 索引文件和 bin 数据文件之间起到桥梁作用。老版本以 mrk2 结尾的文件,表示该表启用了自适应索引间隔。
  • default_compression_codec.txt 压缩格式
  • count.txt 记录表的行数,如果合并,也会记录合并后的。
  • columns.txt 列的信息,记录列名和类型
  • checksums.txt 校验文件,用于校验各个文件的正确性。存放各个文件的 size 以及 hash 值
  • primary.idx 主键的索引文件,稀疏索引,用于加快查询效率
  • partition.dat 分区信息
  • minmax_create_time.idx 分区键的最大最小值

# partition by 分区 (可选)

# 作用

学过 hive 的应该都不陌生,分区的目的主要是降低扫描的范围,优化查询速度。

# 如果不填

只会使用一个分区,默认分区名称 all。

# 分区目录

MergeTree 是以列文件 + 索引文件 + 表定义文件组成的,但是如果设定了分区那么这些文件就会保存到不同的分区目录中。

# 并行

分区后,面对涉及跨分区的查询统计,ClickHouse 会以分区为单位并行处理。如果按时间分区推荐按 天 进行分区。

# 数据写入与分区合并

任何一个批次的数据写入都会产生一个临时分区,不会纳入任何一个已有的分区。写入后的某个时刻(大概 10-15 分钟后),ClickHouse 会自动执行合并操作(等不及也可以手动通过 optimize 执行),把临时分区的数据,合并到已有分区中。

# 对所有分区进行合并
optimize table xxxx final;
# 指定分区合并
optimize table xxxx partition '20200601' final;
1
2
3
4

执行了该语句,临时文件不会立马删除掉,等到一定的时间会自己删除掉。

# primary key 主键 (可选)

ClickHouse 中的主键,和其他数据库不太一样,它只提供了数据的一级索引,但是却不是唯一约束。这就意味着是可以存在相同 primary key 的数据的。

主键的设定主要依据是查询语句中的 where 条件。

根据条件通过对主键进行某种形式的二分查找,能够定位到对应的 index granularity (索引粒度), 避免了全表扫描。

index granularity: 直接翻译的话就是索引粒度,指在稀疏索引中两个相邻索引对应数据的间隔。ClickHouse 中的 MergeTree 默认是 8192。官方不建议修改这个值,除非该列存在大量重复值,比如在一个分区中几万行才有一个不同数据。

在同一个分区中,列值重复,索引是重复创建还是用第一个创建?用第一个就不会有太多的重复索引,加大索引粒度毫无意义,如果是建立重复索引,如何区分重复值建立索引的区别?
答:会重复建立索引,加大索引粒度可以说是跨度。

稀疏索引的好处就是可以用很少的索引数据,定位更多的数据,代价就是只能定位到索引粒度的第一行,然后再进行进行一点扫描。

# order by(必选)

order by 设定了分区内的数据按照哪些字段顺序进行有序保存。

order by 是 MergeTree 中唯一一个必填项,甚至比 primary key 还重要,因为当用户不设置主键的情况,很多处理会依照 order by 的字段进行处理(比如去重和汇总)。

要求:主键必须是 order by 字段的前缀字段。比如 order by 字段是 (id,sku_id) 那么主键必须是 id 或者 (id,sku_id), 不能跳过前者用 sku_id。

# 二级索引

目前在 ClickHouse 的官网上二级索引的功能在 v20.1.2.4 之前是被标注为实验性的,在这个版本之后默认是开启的。

老版本使用二级索引前需要增加设置,是否允许使用实验性的二级索引 (v20.1.24 开始,这个参数已被删除,默认开启)

set allow_experimental_data_skipping_indices=1;
1

二级索引的使用,其中 GRANULARITY N 是设定二级索引对于一级索引粒度的粒度(跨度)。

create table t_order_mt2(
 id UInt32,
 sku_id String,
 total_amount Decimal(16,2),
 create_time Datetime,
 # 二级索引 INDEX 命名 字段 TYPE minmax GRANULARITY 粒度N
 INDEX a total_amount TYPE minmax GRANULARITY 5
) engine =MergeTree
 partition by toYYYYMMDD(create_time)
 primary key (id)
 order by (id, sku_id);
1
2
3
4
5
6
7
8
9
10
11

该索引对于该分区下的文件名称为 skp_idx_a.mrk3 跳数索引

# 数据 TTL

TTL 即 Time To Live,MergeTree 提供了可以管理数据表或者列的生命周期的功能。

# 适用场景

数仓建设需要考虑数据的生命周期问题,数据的生命周期包括数据最初的写入,存储,处理,查询,归档和销毁几个基本的阶段。实际中数仓数据量的成倍增长,不但产生了巨大容量的存储,同时也造成管理的困难,更换存储方式和存储迁移对项目来讲都是需要考虑成本和风险的。clickhouse 这样的一个设计,可以有效处理解决数据有效的存储周期和销毁的问题。ck 的出现对数据存储的数仓的业务选型又添加一种选择。

# 列级别 TTL

TTL 所监控的列不能是 主键字段,类型必须是日期类型。interval 支持 second (秒),minute (分),hour (小时),day (天),week (周),month (月),quarter (季度),year (年)

# 表创建
create table t_order_mt3(
 id UInt32,
 sku_id String,
 # TTL 列名+interval 10 SECOND
 total_amount Decimal(16,2) TTL create_time + INTERVAL 10 SECOND,
 create_time Datetime
) engine =MergeTree
partition by toYYYYMMDD(create_time)
 primary key (id)
 order by (id, sku_id);
#插入数据
insert into t_order_mt3 values 
(106,'sku_001',1000.00,'2021-10-29 17:01:00'), 
(107,'sku_002',2000.00,'2021-10-29 17:01:00'), 
(110,'sku_003',600.00,'2021-10-29 17:01:00');
# 查询数据
 select * from t_order_mt3;
┌──id─┬─sku_id──┬─total_amount─┬─────────create_time─┐
│ 106 │ sku_001 │      1000.00 │ 2021-10-29 15:13:50 │
│ 107 │ sku_002 │      2000.00 │ 2021-10-29 15:13:50 │
│ 110 │ sku_003 │       600.00 │ 2021-10-29 15:13:50 │
└─────┴─────────┴──────────────┴─────────────────────┘
# 等到了 15:14:00的时候可以手动执行合并,查看效果 到期后,指定的字段数据归 0,不合并不会变
select sleep(10)
optimize table t_order_mt3 final;
select * from t_order_mt3;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# 表级别 TTL

创建时指定 TTL

CREATE TABLE example_table
(
    d DateTime,
    a Int
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(d)
ORDER BY d
TTL d + INTERVAL 1 MONTH [DELETE],
    d + INTERVAL 1 WEEK TO VOLUME 'aaa',
    d + INTERVAL 2 WEEK TO DISK 'bbb';
1
2
3
4
5
6
7
8
9
10
11
  • DELETE - 删除过期的行(默认操作);
  • TO DISK 'aaa' - 将数据片段移动到磁盘 aaa;
  • TO VOLUME 'bbb' - 将数据片段移动到卷 bbb.
  • GROUP BY - 聚合过期的行

修改表的 TTL

alter table t_order_mt3 MODIFY TTL create_time + INTERVAL 10 SECOND; 
1

设置列过期做被聚合。列 x 包含每组行中的最大值,y 为最小值,d 为可能任意值。

CREATE TABLE table_for_aggregation
(
    d DateTime,
    k1 Int,
    k2 Int,
    x Int,
    y Int
)
ENGINE = MergeTree
ORDER BY (k1, k2)
TTL d + INTERVAL 1 MONTH GROUP BY k1, k2 SET x = max(x), y = min(y);
1
2
3
4
5
6
7
8
9
10
11

# ReplacingMergeTree

ReplacingMergeTree 是 MergeTree 的一个变种,它得存储特性完全继承 MergeTree,只是多了一个去重的功能。 尽管 MergeTree 可以设置主键,但是 primary key 其实没有唯一约束的功能。如果你想处理掉重复的数据,可以借助这个 ReplacingMergeTree。ReplacingMergeTree 不会根据 primary key 约束去重,只会根据 order by 去重。

# 去重时机

数据的去重只会在合并的过程中出现。合并会在未知的时间在后台进行,所以你无法预先作出计划。有一些数据可能仍未被处理。

# 去重范围

如果表经过了分区,去重只会在分区内部进行去重,不能执行跨分区的去重。

所以 ReplacingMergeTree 能力有限, ReplacingMergeTree 适用于在后台清除重复的数据以节省空间,但是它不保证没有重复的数据出现

# 具体操作

create table t_order_rmt(
 id UInt32,
 sku_id String,
 total_amount Decimal(16,2) ,
 create_time Datetime
) engine = ReplacingMergeTree(create_time)
 partition by toYYYYMMDD(create_time)
 primary key (id)
 order by (id, sku_id);
1
2
3
4
5
6
7
8
9

ReplacingMergeTree (create_time) 填入的参数为 依据 (版本) 字段,重复数据保留 依据 (版本) 字段值最大的。如果不填依据 (版本) 字段,默认按照插入顺序保留最后一条。

# 结论

➢ 实际上是使用 order by 字段作为唯一键
➢ 去重不能跨分区
➢ 只有同一批插入(新版本)或合并分区时才会进行去重
➢ 认定重复的数据保留,版本字段值最大的
➢ 如果版本字段相同则按插入顺序保留最后一笔

# SummingMergeTree

对于不查询明细,只关心以维度进行汇总聚合结果的场景。如果只使用普通的 MergeTree 的话,无论是存储空间的开销,还是查询时临时聚合的开销都比较大。

ClickHouse 为了这种场景,提供了一种能够 “预聚合” 的引擎 SummingMergeTree

# 建表

create table t_order_smt(
 id UInt32,
 sku_id String,
 total_amount Decimal(16,2) ,
 create_time Datetime
) engine = SummingMergeTree(total_amount)
 partition by toYYYYMMDD(create_time)
 primary key (id)
 order by (id,sku_id );
1
2
3
4
5
6
7
8
9

SummingMergeTree (total_amount) 设置你想聚合的列,也可以填多个。会根据 order by 做类似 group by 一样。

# 结论

➢ 分区内聚合,不在一个分区的数据不会被聚合
➢ 分片合并的时候才聚合
➢ 以 order by 的字段去重用作维度列,对指定(必须是数值)字段做聚合汇总数据,如果不填,以所有 ** 非维度列(意思就是除 order by 以外)** 且为数字列的字段做汇总
➢ 其余字段取最早(最小)的数据
➢ 只有在同一批次插入(新版本)或分片合并时才会进行聚合。

# 建议

设计聚合表的话,唯一键值、流水号可以去掉,所有字段全部是维度、度量或者时间戳。

在写 sql 的时候还是需要把 sum () 函数带上,因为可能会包含一些还没来得及聚合的临时明细。如果是要获取汇总值,还是需要使用 sum 进行聚合,这样效率会有一定的提高,但本身 ClickHouse 是列式存储的,效率提升有限,不会特别明显。

select sum(total_amount) from province_name=’’ and create_date=‘xxx’
1

# integrations 集成引擎

# MYSQL

MySQL 引擎允许您对存储在远程 MySQL 服务器上的数据执行 SELECT 和 INSERT 查询.
语法

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
    ...
) ENGINE = MySQL('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_duplicate_clause'])
SETTINGS
    [connection_pool_size=16, ]
    [connection_max_tries=3, ]
    [connection_wait_timeout=5, ] /* 0 -- do not wait */
    [connection_auto_close=true ]
;
1
2
3
4
5
6
7
8
9
10
11
12
  • host:port 数据库连接的 IP 和端口
  • database 要选择的库
  • table 对应选择库里的表
  • user MYSQL 用户名
  • password MYSQL 密码
  • replace_query
  • on_duplicate_clause

其中引擎支持可以用 | 查询多个不同的

CREATE TABLE test_replicas (id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL(`mysql{2|3|4}:3306`, 'clickhouse', 'test_replicas', 'root', 'clickhouse');
1

先在 mysql 中创建表

mysql> CREATE TABLE `test`.`test` (
    ->   `int_id` INT NOT NULL AUTO_INCREMENT,
    ->   `int_nullable` INT NULL DEFAULT NULL,
    ->   `float` FLOAT NOT NULL,
    ->   `float_nullable` FLOAT NULL DEFAULT NULL,
    ->   PRIMARY KEY (`int_id`));
Query OK, 0 rows affected (0,09 sec)

mysql> insert into test (`int_id`, `float`) VALUES (1,2);
Query OK, 1 row affected (0,00 sec)

mysql> select * from test;
+------+----------+-----+----------+
| int_id | int_nullable | float | float_nullable |
+------+----------+-----+----------+
|      1 |         NULL |     2 |           NULL |
+------+----------+-----+----------+
1 row in set (0,00 sec)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

在 clickhouse 创建对 MySQL 的映射

CREATE TABLE mysql_table
(
    `float_nullable` Nullable(Float32),
    `int_id` Int32
)
ENGINE = MySQL('localhost:3306', 'test', 'test', 'bayonet', '123')
# 查询
SELECT * FROM mysql_table
┌─float_nullable─┬─int_id─┐
│           ᴺᵁᴸᴸ │      1 │
└────────────────┴────────┘
1
2
3
4
5
6
7
8
9
10
11

# JDBC

语法

CREATE TABLE [IF NOT EXISTS] [db.]table_name
(
    columns list...
)
ENGINE = JDBC(datasource_uri, external_database, external_table)
1
2
3
4
5
  • datasource_uri 外部 DBMS 的 URI 或名称
  • external_database 外部 DBMS 中的数据库
  • external_table external_database 中表的名称或可以直接写查询语句,如 select * from table1 where column1=1。

可以在 mysql 中先创建表和数据

mysql> CREATE TABLE `test`.`test` (
    ->   `int_id` INT NOT NULL AUTO_INCREMENT,
    ->   `int_nullable` INT NULL DEFAULT NULL,
    ->   `float` FLOAT NOT NULL,
    ->   `float_nullable` FLOAT NULL DEFAULT NULL,
    ->   PRIMARY KEY (`int_id`));
Query OK, 0 rows affected (0,09 sec)

mysql> insert into test (`int_id`, `float`) VALUES (1,2);
Query OK, 1 row affected (0,00 sec)

mysql> select * from test;
+------+----------+-----+----------+
| int_id | int_nullable | float | float_nullable |
+------+----------+-----+----------+
|      1 |         NULL |     2 |           NULL |
+------+----------+-----+----------+
1 row in set (0,00 sec)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

在 clickhouse 创建映射

CREATE TABLE jdbc_table
(
    `int_id` Int32,
    `int_nullable` Nullable(Int32),
    `float` Float32,
    `float_nullable` Nullable(Float32)
)
ENGINE JDBC('jdbc:mysql://localhost:3306/?user=root&password=root', 'test', 'test')
# 查询
SELECT * FROM jdbc_table
┌─int_id─┬─int_nullable─┬─float─┬─float_nullable─┐
│      1 │         ᴺᵁᴸᴸ │     2 │           ᴺᵁᴸᴸ │
└────────┴──────────────┴───────┴────────────────┘
# 可以从其他表插入数据到 jdbc_table
INSERT INTO jdbc_table(`int_id`, `float`)
SELECT toInt32(number), toFloat32(number * 1.0)
FROM system.numbers
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# KAFKA

语法以及可选参数

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'host:port',
    kafka_topic_list = 'topic1,topic2,...',
    kafka_group_name = 'group_name',
    kafka_format = 'data_format'[,]
    [kafka_row_delimiter = 'delimiter_symbol',]
    [kafka_schema = '',]
    [kafka_num_consumers = N,]
    [kafka_max_block_size = 0,]
    [kafka_skip_broken_messages = N,]
    [kafka_commit_every_batch = 0,]
    [kafka_thread_per_consumer = 0]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
  • kafka_broker_list kafka 地址,可以用逗号分隔多个地址 (127.0.0.1:9092,127.0.0.2:9092,127.0.0.3:9092)
  • kafka_topic_list 可以有多个 topic
  • kafka_group_name 组名称
  • kafka_format 消息格式,可以查看这个地址 https://clickhouse.com/docs/en/interfaces/formats/
  • kafka_row_delimiter 以什么符号来结束消息的接收
  • kafka_schema
  • kafka_num_consumers 消费者的数量,默认 1。消费者总数不应超过主题的分区数,每个分区只能分配一个消费者。
  • kafka_max_block_size poll 一次消息可拉取的大小
  • kafka_skip_broken_messages Kafka 消息解析器对每个块的模式不兼容消息的容忍度。默认值:0。如果 kafka_skip_broken_messages = N,那么引擎会跳过 N 条无法解析的 Kafka 消息(一条消息等于一行数据)
  • kafka_commit_every_batch 提交消息批次(默认值:0)
  • kafka_thread_per_consumer 为每个消费者提供独立的线程(默认值:0)。启用后,每个消费者独立地、并行地刷新数据(否则来自多个消费者的数据被挤压形成阻塞)

具体使用

  CREATE TABLE queue (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');

  SELECT * FROM queue LIMIT 5;

  CREATE TABLE queue2 (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092',
                            kafka_topic_list = 'topic',
                            kafka_group_name = 'group1',
                            kafka_format = 'JSONEachRow',
                            kafka_num_consumers = 4;

  CREATE TABLE queue3 (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka('localhost:9092', 'topic', 'group1')
              SETTINGS kafka_format = 'JSONEachRow',
                       kafka_num_consumers = 4;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

# HDFS

CREATE TABLE hdfs_engine_table (name String, value UInt32) ENGINE=HDFS('hdfs://hdfs1:9000/other_storage', 'TSV')
# 插入
INSERT INTO hdfs_engine_table VALUES ('one', 1), ('two', 2), ('three', 3)
# 查询 
SELECT * FROM hdfs_engine_table LIMIT 2
┌─name─┬─value─┐
│ one  │     1 │
│ two  │     2 │
└──────┴───────┘
1
2
3
4
5
6
7
8
9

文档地址 https://clickhouse.com/docs/en/engines/table-engines/integrations/jdbc/

上次更新: 6/11/2025, 4:10:30 PM
ClickHouse 数据类型
ClickHouse SQL操作

← ClickHouse 数据类型 ClickHouse SQL操作→

Theme by Vdoing | Copyright © 2023-2025
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式