技术博客 技术博客
  • JAVA
  • 仓颉
  • 设计模式
  • 人工智能
  • Spring
  • Mybatis
  • Maven
  • Git
  • Kafka
  • RabbitMQ
  • RocketMQ
  • Redis
  • Zookeeper
  • Nginx
  • 数据库套件
  • MySQL
  • Elasticsearch
  • MongoDB
  • Hadoop
  • ClickHouse
  • Hbase
  • Hive
  • Flink
  • Flume
  • SQLite
  • linux
  • Docker
  • Jenkins
  • Kubernetes
  • 工具
  • 前端
  • AI
GitHub (opens new window)
  • JAVA
  • 仓颉
  • 设计模式
  • 人工智能
  • Spring
  • Mybatis
  • Maven
  • Git
  • Kafka
  • RabbitMQ
  • RocketMQ
  • Redis
  • Zookeeper
  • Nginx
  • 数据库套件
  • MySQL
  • Elasticsearch
  • MongoDB
  • Hadoop
  • ClickHouse
  • Hbase
  • Hive
  • Flink
  • Flume
  • SQLite
  • linux
  • Docker
  • Jenkins
  • Kubernetes
  • 工具
  • 前端
  • AI
GitHub (opens new window)
  • mysql

    • MySQL 问题汇总
    • MySQL 索引介绍
    • MySQL 锁介绍
    • MySQL 索引优化工具 explain
    • MySQL 主从复制(GTID)
    • MySQL 8安装
    • MySQL 8.x新特性总结
    • MySQL UDF以及新类型JSON
    • MySQL 高可用MGR(一) 理论
    • MySQL 高可用MGR(二) 搭建
    • MySQL 高可用MGR(三) 测试
  • Elasticsearch

    • ES 7.8.0(一) 入门介绍
    • ES 7.8.0(二) 读、写和写索引流程以及文档分析过程
    • ES 7.8.0(三) 文档冲突
  • mongodb

    • mongodb
  • hadoop

    • Hadoop 伪分布式及集群
    • Hadoop 指令
    • Hadoop 读写流程详解
    • Hadoop SpringBoot集成
    • Hadoop MapReduce机制
    • Hadoop YARN
    • Hadoop MapReduce配置和编写job及数据倾斜的解决
    • Hadoop MapReduce自定义格式输入输出
  • clickhouse

    • ClickHouse 介绍及安装
    • ClickHouse 数据类型
    • ClickHouse 表引擎
    • ClickHouse SQL操作
    • ClickHouse 副本配置
    • ClickHouse 分片与集群部署
    • ClickHouse Explain及建表优化
    • ClickHouse 语法优化规则
    • ClickHouse 查询优化
    • ClickHouse 数据一致性
    • ClickHouse 物化视图
    • ClickHouse MaterializeMySQL引擎
    • ClickHouse 监控及备份
  • hbase

    • Hbase 介绍及安装
    • Hbase 优化
    • Hbase phoenix安装及使用
    • Hbase LSM-TREE
  • hive

    • Hive 介绍及安装
    • Hive 内外部表、分区表、分桶表概念及hiveSQL命令
    • Hive 数据类型
    • Hive 函数 MySQL联合
    • Hive 数据倾斜和优化
    • Hive Sqoop安装及指令
  • flink

    • Flink 介绍及安装
    • Flink 配置介绍及Demo
    • Flink API讲解
    • Flink 运行架构
    • Flink 时间语义及Watermark
    • Flink 状态管理
    • Flink 容错,检查点,保存点
    • Flink 状态一致性
    • Flink Table API 和 Flink SQL
      • 简单的用例
      • 基于批处理或流处理的环境配置
      • 表(table)
      • 优化用例
      • 用例数据输出到另一个文件
      • flink table api 之 kafka
      • 将 Table 转换成 DataStream
      • 动态表(Dynamic Tables)
      • 时间特性(Time Attributes)
        • 定义处理时间(Processing Time)
        • 定义事件事件(Event Time)
      • 窗口
        • Group Windows
        • 基于ddl 开窗代码演示
        • Over Windows
        • 无界 Over Windows
        • 有界 Over Windows
        • sql 中的 Over Window
        • 代码演示
      • 函数
        • 用户自定义函数(UDF)
        • 标量函数(Scalar Functions)
        • 表函数(Table Functions)
        • 聚合函数(Aggregate Functions)
      • 表聚合函数
    • Flink CEP编程
    • Flink Joining编程
    • Flink CDC
  • flume

    • Flume 日志收集系统介绍及安装
    • Flume Source支持的类型
    • Flume Sink支持的类型
    • Flume Channel支持的类型
    • Flume Selector
    • Flume Interceptor拦截器类型
    • Flume Process
  • sqlite

    • SQLite介绍
目录

Flink Table API 和 Flink SQL

Table API 和 Flink SQL 是 flink 对批处理和流处理,提供了统一的上层 API。Table API 是一套内嵌在 Java 和 Scala 语言中的查询 API,它允许以非常直观的方式组合来自一些关系运算符的查询; Flink SQL 支持基于实现 SQL 标准的 Apache Calcite。

# 简单的用例

把一个流,转成 table api 来操作数据
依赖

<!-- table API -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_2.12</artifactId>
    <version>1.12.2</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.12</artifactId>
    <version>1.12.2</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.12</artifactId>
    <version>1.12.2</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

测试数据

1,1623051400,test data,1
2,1623051401,test data,1
3,1623051402,test data,1
1,1623051405,test data,3
2,1623051406,test data,3
3,1623051409,test data,3
1,1623051410,test data,5
1
2
3
4
5
6
7

用例代码

public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    // 1 读取数据
    DataStream<String> dataStream = env.readTextFile("D:\\workspace\\spring\\middleware\\flink\\flink-test\\src\\main\\resources\\hello.txt");
    // 2 转换数据
    DataStream<EventData> map = dataStream.map(i -> {
        String[] strs = i.split(",");
        return new EventData(
                strs[0],
                Long.valueOf(strs[1]),
                String.valueOf((Long.valueOf(strs[1]) - 5)),
                Integer.valueOf(strs[3])
        );
    });

    // 3 创建表得执行环境
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    // 4 基于流创建一个table
    Table table = tableEnv.fromDataStream(map);
    // 5 调用table api进行转换操作
    Table where = table
            .filter($("id").isEqual("1"))
            // 显示哪些列
            .select($("id"),$("eventTime"));
    // 6 执行sql
    tableEnv.createTemporaryView("event_data",table); //基于table 创建一个匿名视图的表名 eventData
    // 没有创建视图不能使用 表名称 eventData
    String sql = "select id,eventTime from event_data where id = '2'";
    Table sqlTable = tableEnv.sqlQuery(sql);

    // 7 结果转换成行
    tableEnv.toAppendStream(where, Row.class).print("where");
    tableEnv.toAppendStream(sqlTable, Row.class).print("sql");

    env.execute("test");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

结果

where> 1,1623051400
sql> 2,1623051401
where> 1,1623051405
sql> 2,1623051406
where> 1,1623051410
1
2
3
4
5

# 基于批处理或流处理的环境配置

public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

    // 基于老版本流处理
    EnvironmentSettings oldSettings = EnvironmentSettings.newInstance()
            .inStreamingMode()// 流处理
            //.inBatchMode()// 批处理
            .useOldPlanner() // 老版本
            //.useBlinkPlanner() // 新版本
            .build();
    StreamTableEnvironment oldStringTableEnv = StreamTableEnvironment.create(env, oldSettings);

    // 基于老版本批处理
    ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
    BatchTableEnvironment oldBatchEnv = BatchTableEnvironment.create(batchEnv);

    // 基于blink,blink 多了些功能以及架构上真正的批流统一,都是转换成了 DataStream,不像老版本还有 DateSet
    EnvironmentSettings blinkSettings = EnvironmentSettings.newInstance()
            .useBlinkPlanner()
            .inStreamingMode()
            .build();
    StreamTableEnvironment blinkStringTableEnv = StreamTableEnvironment.create(env, blinkSettings);

    // 基于blink 批处理
    EnvironmentSettings blinkBatchSettings = EnvironmentSettings.newInstance()
            .useBlinkPlanner()
            .inBatchMode()
            .build();
    TableEnvironment blinkBatchTableEnv = TableEnvironment.create(blinkSettings);
    
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

# 表(table)

TableEnvironment 可以注册目录 Catalog,并可以基于 Catalog 注册表,表是由一个 “标识符”(identifier)来指定的,由 3 部分组成: Catalog 名、数据库名、对象名。

表可以是常规的,也可以是虚拟的(视图,View),常规表一般可以用来描述外部数据,比如文件、数据库表或消息队列的数据,也可以直接从 DataStream 转换而来;视图(View)可以从现有的表中创建,通常是 table api 或者 sql 查询的一个结果集。

创建表的执行环境,需要将 flink 流处理的执行环境传入

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
1

TableEnvironment 是 flink 中集成 table api 和 sql 的核心概念,所有对表得操作都基于 TableEnvironment,包括注册 Catalog、在 Catalog 中注册表、执行 sql 查询、注册用户自定义函数(UDF)

# 优化用例

直接创建 TableEnvironment 来读取文件,并用 Table API 做查询。

public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

    // 连接外部系统读取数据
    String path = "D:\\workspace\\spring\\middleware\\flink\\flink-test\\src\\main\\resources\\hello.txt";
    // 类似于 source
    tableEnv.connect(
            new FileSystem().path(path)
    // 类似于 split
    ).withFormat(
            // 引入 flink csv 依赖,默认 , 分割
            new Csv()
    // 类似于 map 转换
    ).withSchema(
        new Schema().field("id", DataTypes.STRING())
            .field("eventTime", DataTypes.BIGINT())
            .field("data", DataTypes.STRING())
            .field("num", DataTypes.INT())
    ).createTemporaryTable("inputTable");

    Table inputTable = tableEnv.from("inputTable");
    inputTable.printSchema();


    // 简单查询转换
    Table resultTable = inputTable.select($("id"), $("eventTime"))
            // 大于 1623051405
            .filter($("eventTime").isGreater(1623051405))
            // id = 1
            .where($("id").isEqual("1"));

    // 聚合统计
    Table aggTable = inputTable.groupBy($("id"))
            .select($("id").count().as("ct"),$("eventTime").avg().as("et"));

    // sql写法
    Table rt = tableEnv.sqlQuery("select id,eventTime from inputTable where eventTime > 1623051405 and id = '1'");
    Table at = tableEnv.sqlQuery("select id,count(id) as ct,eventTime,avg(eventTime) as ev from inputTable group by id,eventTime");

    //打印输出
    tableEnv.toAppendStream(inputTable, Row.class).print("inputTable");
    tableEnv.toAppendStream(resultTable, Row.class).print("resultTable");
    // group by 操作会让数据发生改变,所以不是普通的 append 追加操作
    tableEnv.toRetractStream(aggTable, Row.class).print("aggTable");
    tableEnv.toAppendStream(rt, Row.class).print("rt");
    tableEnv.toRetractStream(at, Row.class).print("at");

    env.execute("test");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

toAppendStream doesn't support consuming update changes which is produced by node GroupAggregate (groupBy=[id], select=[id, COUNT (id) AS EXPR$0, AVG (eventTime) AS EXPR$1]) 这句话的意思是 使用到 groupy by 有 count 操作等都是值更新操作,不是 append 追加操作结果,所以输出 toRetractStream ,toRetractStream 是撤回流,会把上一次结果撤回改成 false,输出新的结果 true。

# 用例数据输出到另一个文件

对于数据的写入在某些是有要求的,聚合操作的结果集就没办法写到文件,聚合操作会输出两条结果,一个上一次结果的撤回 false,一次新结果的输出 true,是没办法追加到文件系统的。

public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

    // 连接外部系统读取数据
    String path = "D:\\workspace\\spring\\middleware\\flink\\flink-test\\src\\main\\resources\\hello.txt";
    // 类似于 source
    tableEnv.connect(
            new FileSystem().path(path)
    // 类似于 split
    ).withFormat(
            // 引入 flink csv 依赖,默认 , 分割
            new Csv()
    // 类似于 map 转换
    ).withSchema(
        new Schema().field("id", DataTypes.STRING())
            .field("eventTime", DataTypes.BIGINT())
            .field("data", DataTypes.STRING())
            .field("num", DataTypes.INT())
    ).createTemporaryTable("inputTable");

    Table inputTable = tableEnv.from("inputTable");
    inputTable.printSchema();

    // 简单查询转换
    Table resultTable = inputTable.select($("id"), $("eventTime"))
            // 大于 1623051405
            .filter($("eventTime").isGreater(1623051405))
            // id = 1
            .where($("id").isEqual("1"));

    // 聚合统计
    Table aggTable = inputTable.groupBy($("id"))
            .select($("id").count().as("ct"),$("eventTime").avg().as("et"));

    // 输出到另一个文件
    String outPutPath = "D:\\workspace\\spring\\middleware\\flink\\flink-test\\src\\main\\resources\\out.txt";
    // 类似于 source
    tableEnv.connect(
            new FileSystem().path(outPutPath)
            // 类似于 split
    ).withFormat(
            // 引入 flink csv 依赖,默认 , 分割
            new Csv()
            // 类似于 map 转换
    ).withSchema(
            new Schema().field("id", DataTypes.STRING())
                    .field("eventTime", DataTypes.BIGINT())
    ).createTemporaryTable("outPutTable");

    // 写道另一个文件,如果是 aggTable 是不能写入到文件的
    resultTable.executeInsert("outPutTable",false);

    env.execute("test");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58

# flink table api 之 kafka

KafkaTableSinkBase 实现的是 AppendStreamTableSink 所以也没办法把聚合数据写进去,就像以上 CSVTableSInkBase 也是实现了 AppendStreamTableSink ,都无法去写入聚合的数据。

public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

    // 建立源
    tableEnv.connect(
            // 需要引入flink-connector-kafka_2.12<
            new Kafka()
            // kafka 版本
            .version("0.12")
            .topic("topic_consumer")
            .property("zookeeper.connect","localhost:2181")
            .property("bootstrap.servers","localhost:9092")
    ).withFormat(
            new Csv()
    ).withSchema(
            new Schema().field("id", DataTypes.STRING())
                    .field("eventTime", DataTypes.BIGINT())
                    .field("data", DataTypes.STRING())
                    .field("num", DataTypes.INT())
    ).createTemporaryTable("inputTable");

    // 简单转换
    Table inputTable = tableEnv.from("inputTable");

    // 输出kafka
    tableEnv.connect(
            // 需要引入flink-connector-kafka_2.12<
            new Kafka()
                    // kafka 版本
                    .version("0.12")
                    .topic("topic_producer")
                    .property("zookeeper.connect","localhost:2181")
                    .property("bootstrap.servers","localhost:9092")
    ).withFormat(
            new Csv()
    ).withSchema(
            new Schema().field("id", DataTypes.STRING())
                    .field("eventTime", DataTypes.BIGINT())
                    .field("data", DataTypes.STRING())
                    .field("num", DataTypes.INT())
    ).createTemporaryTable("outPutTable");

    // KafkaTableSinkBase 实现的是 AppendStreamTableSink 所以也没办法把聚合数据写进去
    inputTable.executeInsert("outPutTable");

    env.execute("test");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

对于流式查询,需要声明如何在表和外部连接器之间执行转换,与外部系统交换的消息类型,由更新模式(update mode)指定。

  • 追加(Append)
    表制作插入操作,和外部连接器只交换插入(insert)消息
  • 撤回(Retract)
    表和外部连接器交换添加(Add)和撤回(Retract)消息。插入操作(insert)编码为 Add 消息;删除(Delete)编码为 Retract 消息;更新(Upsert)编码为上一条的 Retract 和吓一跳的 Add 消息。
  • 更新插入(Upsert)
    更新和插入都被编码为 Upsert 消息;删除编码为 Delet 消息

kafka 和 文本不支持,但 支持 ES(ElasticSearchUpsertTableSinkBase 类)、Mysql(需要引入 flink-jdbc_2.12)

# 将 Table 转换成 DataStream

表可以转换为 DataStream 或 DataSet,这样自定义流处理或批处理程序就可以继续在 Table API 或 SQL 查询的结果上运行了。将表转换为 DataStream 或 DataSet 时,需要指定生成的数据类型,即要将表的每一行转换成指定数据类型。表作为流式查询的结果,是动态更新的,转换有两种转换模式:追加(Append)模式和撤回(Retract)模式
追加模式,用于表只会被插入(insert)操作场景

DataStream<Row> resultStream = tableEnv.toAppendStream(resultTable,Row.class)
1

撤回模式,用于任何场景。有些类似于更新模式中 Retract 模式,他只有 insert 和 Delete 两类操作。得到的数据会增加一个 Boolean 类型的标识位(返回的第一个字段),用它来表示到底是新增的数据(insert),还是被删除的数据(Delete)

DataStream<Tuple2<Boolean,Row>> aggResultStream = tableEnv.toRetractStream(aggResultTable,Row.class)
1

流转换成表

 // 3 创建表得执行环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 4 基于流创建一个table
Table table = tableEnv.fromDataStream(map);
1
2
3
4

默认转换后的 Table schema 和 DataStream 中的字段定义一一对应,也可以单独指定出来

Table table = tableEnv.fromDataStream(map,"id,eventTime as et,data,num");
1

基于 DataStream 创建临时视图

tableEnv.createTemporaryView("view_name",dataStream);
tableEnv.createTemporaryView("view_name",dataStream,"id,eventTime as et,data,num");
1
2

基于 Table 创建临时视图

tableEnv.createTemporaryView("view_name",table);
1

查看执行计划

tableEnv.explain(resultTable);
1

# 动态表(Dynamic Tables)

动态表是 Flink 对流数据的 Table API 和 SQL 支持的核心概念,它表示批处理数据的静态表不同,动态表是随时间变化的。

动态表可以像静态的批处理一样进行查询,查询一个动态表会产生持续查询(Continuous Query),连续查询永远不会终止,并会生成另一个动态表,查询会不断更新其动态结果表,以反映其动态输入表上的更改。

流式表查询的处理过程:1. 流被转换为动态表;2. 对动态表计算连续查询,生成新的动态表;3. 生成的动态表被转换回流

一常规数据库表一样,动态表可以通过插入(Insert)、更新(Update)和删除(Delete)更改,进行持续的修改,将动态表转换为流或将其写入外部系统时,需要对这些更改进行编码:追加流(Append-only)、撤回流(Retract)、更新插入流(Upsert)

# 时间特性(Time Attributes)

基于时间的操作(比如 Table API 和 SQL 中窗口操作),需要定义相关的时间语义和时间数据来源的信息。

Table 可以提供一个逻辑上的时间字段,用于在表处理程序中,指示时间和访问相应的时间戳。

时间属性,可以是每个表 schema 的一部分。一旦定义了时间属性,他就可以作为一个字段引用,并且可以在基于时间的操作中使用。时间属性的行为类似于常规的时间戳,可以访问,并且进行计算。

# 定义处理时间(Processing Time)

处理时间语义下,允许表处理程序根据机器的本地时间生成成果。它是时间的最简单概念。它既不需要提取时间戳,也不需要生成 watermark。

由 DataStream 转换成表时指定。在定义 Schema 期间,可以使用 .proctime,指定字段名定义处理时间字段。这个 proctime 属性只能通过附加逻辑字段,来扩展物理 schema。因此,只能 schema 定义的末尾定义它。

Table table = tavleEnv.fromDataStream(dataStream,"id,eventTime,data,num.pt.proctime");
1

connect 中使用

.withSchema(
    new Schema().field("id", DataTypes.STRING())
        .field("eventTime", DataTypes.BIGINT())
        .field("data", DataTypes.STRING())
        .field("num", DataTypes.INT())
        .field("pt", DataTypes.TIMESTAMP(3))
        .proctime()
1
2
3
4
5
6
7

mysql 中定义,PROCTIME 只有 blink 中支持,要引入 blink api

String sinkDDL = "create table dataTable( "+
    "id varchar(20) not null, "+
    "eventTime bigint, "+
    "data varchar(30), "+
    "num int, "+
    "pt as PROCTIME(), "+
    ") with ( "+
    " 'connector.type' = 'filesystem' "+
    " 'connector.path' = '/test.tct' "+
    " 'format.type' = 'csv' ";
tableEnv.sqlUpdate(sinkDDL)
1
2
3
4
5
6
7
8
9
10
11

# 定义事件事件(Event Time)

事件时间语义,允许表处理程序根据每个记录中包含的时间生成结果。这样即使在有序乱序事件或延迟事件时,也可以获得正确的结果。

为了处理无须事件,并区分流中的准时和迟到事件;Flink 需要从事件数据中,提取时间戳,并用来推进事件时间的进展。

定义事件事件,同样有三种方法:由 DataStream 转换成表时指定;定义 Table Schema 时指定;在创建表的 DDL 中定义。

// 将 DataStream 转换位Table,并指定时间字段
Table table = tableEnv.fromDataStream(dataStream,"id,eventTime.rowtime,data,num");

// 或者,直接追加时间字段
Table table = tableEnv.fromDataStream(dataStream,"id,eventTime,data,num,rt.rowtime");
1
2
3
4
5

connect 中定义

.withSchema(
    new Schema().field("id", DataTypes.STRING())
        .field("eventTime", DataTypes.BIGINT())
        .field("data", DataTypes.STRING())
        .field("num", DataTypes.INT())
        .rowtime(
                new Rowtime()
                // 从字段中提取时间戳
                .timestampsFromField("eventTime")
                // waternark 延迟1s
                .watermarksPeriodicBounded(1000)
        )
)
1
2
3
4
5
6
7
8
9
10
11
12
13

sql ddl 定义,需要 blink api

String sinkDDL = "create table dataTable( "+
    "id varchar(20) not null, "+
    "eventTime bigint, "+
    "data varchar(30), "+
    "num int, "+
    "rt as TO_TIMESTAMP( FROM_UNIXTIME(eventTime) ), "+
    "watermark for rt as rt - interval '1' second "+
    ") with ( "+
    " 'connector.type' = 'filesystem' "+
    " 'connector.path' = '/test.tct' "+
    " 'format.type' = 'csv' ";
tableEnv.sqlUpdate(sinkDDL)
1
2
3
4
5
6
7
8
9
10
11
12

# 窗口

时间语义,要配合敞口操作才能发挥作用,在 Table API 和 SQL 中,主要有两种装口:group Windows(分组窗口),根据时间或行计数间隔,将行聚合到有限的组(Group)中,并对每个组的数据执行一次聚合函数;Over Windows,针对每个输入行,计算相邻行范围内的聚合。

# Group Windows

Group Windows 时使用 window 定义的,并且必须又 as 子句指定一个别名。为了按窗口进行分组,窗口的别名必须在 greop by 子句中,像常规的分组字段一样引用。

Table aggTable = inputTable
        .window([w:GroupWindow] as "w") // 定义窗口,别名为 w
        .groupBy("w,a") // 按照字段a和窗口 w分组
        .select("a,b.sum"); // 聚合
1
2
3
4

Table API 提供了一组具有特定语义的预定义 Window 类,这些类会被转换为底层 DataStream 或 DataSet 的窗口操作。
滚动窗口

// 开了一个10分钟的滚动窗口,指定一个字段 rowtime,as 别名
.window(Tumble.over($("10.minutes")).on($("rowtime")).as("w"))
// 处理时间
.window(Tumble.over($("10.minutes")).on($("proctime")).as("w"))
// 计数窗口 10.rows 10行
.window(Tumble.over($("10.rows")).on($("proctime")).as("w"))
1
2
3
4
5
6

滑动窗口

// 事件事件滑动窗口,长度是10分钟,5分钟滑动一次
.window(Slide.over($("10.minutes")).every($("5.minutes")).on("rowtime").as("w"))
// 处理时间滑动窗口
.window(Slide.over($("10.minutes")).every($("5.minutes")).on("proctime").as("w"))
// 计数滑动窗口
.window(Slide.over($("10.rows")).every($("5.rows")).on("proctime").as("w"))
1
2
3
4
5
6

会话窗口

// 最小间隔时间
.window(Session.withGap($("10.minutes")).on("rowtime").as("w"))
.window(Session.withGap($("10.minutes")).on("proctime").as("w"))
1
2
3

sql 中的定义,在查询的 group by 子句中使用

// 定义一个滚动窗口,第一个参数是时间字段,第二个参数是长度
TUMBLE(time_attr,interval)
// 定义一个滑动窗口,第一个参数是时间字段,第二个参数是滑动长度,第三个是窗口长度
HOP(time_attr,interval,interval)
// 定义一个会话窗口,第一个参数是时间字段,第二个参数窗口间隔
SESSION(time_attr,interval)
1
2
3
4
5
6

# 基于 ddl 开窗代码演示

ddl 更多操作 (opens new window),1.12 中不推荐 connect,推荐使用 ddl 来操作。

public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

    // 连接外部系统读取数据
    String path = "D:\\workspace\\spring\\middleware\\flink\\flink-test\\src\\main\\resources\\hello.txt";
    // 类似于 source
    tableEnv.connect(
            new FileSystem().path(path)
    // 类似于 split
    ).withFormat(
            // 引入 flink csv 依赖,默认 , 分割
            new Csv()
    // 类似于 map 转换
    ).withSchema(
        new Schema().field("id", DataTypes.STRING())
            .field("data", DataTypes.STRING())
            .field("num", DataTypes.INT())
            .field("eventTime",  DataTypes.TIMESTAMP(3))
    ).createTemporaryTable("inputTable");

    String ddl = "CREATE TABLE MyUserTable ( " +
            "  id STRING, " +
            "  eventTime BIGINT, " +
            "  data STRING, " +
            "  num INTEGER, " +
            "  rt as TO_TIMESTAMP( FROM_UNIXTIME(eventTime) ), " +
            "  watermark for rt as rt - interval '5' second  " +
            ") WITH ( " +
            "  'connector.type' = 'filesystem'," +
            "  'connector.path' = 'D:\\workspace\\spring\\middleware\\flink\\flink-test\\src\\main\\resources\\hello.txt', " +
            "  'format.type' = 'csv' " +
            ")";

    tableEnv.executeSql(ddl);

    String sql = "select " +
            "count(id) as ct,id,tumble_start(rt,interval '15' second) as st,tumble_end(rt,interval '15' second) as ed " +
            "from MyUserTable " +
            "group by id,tumble(rt,interval '15' second) ";
    // 第一种方式
    Table table = tableEnv.sqlQuery(sql);
    tableEnv.toRetractStream(table, Row.class).print("sql1");

    // 第二种方式 打印sqk结构
    tableEnv.executeSql(sql).print();

    // 第三种方式 使用api
    Table myUserTable = tableEnv.from("MyUserTable");
    Table select = myUserTable.window(Tumble.over(lit(15).seconds()).on($("rt")).as("w"))
            .groupBy($("id"), $("w"))
            .select($("id").count().as("ct"),$("w").start().as("st"),$("w").end().as("ed"));

    tableEnv.toAppendStream(select, Row.class).print("api");

    env.execute("test");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60

# Over Windows

Over Window 聚合是标准 SQL 中已有得(over 子句),可以在查询的 SELECT 子句中定义;Over Window 聚合,会针对每个输入行,计算相邻行范围内的聚合;Over Window 使用 windows (w:overwindows*) 子句定义,并在 select () 方法中通过别名来引用。

table table = input.window([e:OverWindow] as "w").select("a,b.sum over w,c.min over w");
1

Table API 提供了 Over 类,来配置 Over 窗口的属性。

# 无界 Over Windows

可以在事件事件或处理事件,以及指定为时间间隔、或行计数的范围内,定义 Over Window,无界的 over window 是使用常量指定的。

// 无界的事件时间 
.window(Over.partitionBy("a").orderBy("rowtime").preceding(UNBOUNDED_RABGE).as("w"))
// 无界的处理时间
.window(Over.partitionBy("a").orderBy("proctime").preceding(UNBOUNDED_RABGE).as("w"))
// 无界的事件时间
.window(Over.partitionBy("a").orderBy("rowtime").preceding(UNBOUNDED_RABGE).as("w"))
// 无界的处理时间
.window(Over.partitionBy("a").orderBy("proctime").preceding(UNBOUNDED_ROW).as("w"))
.window(Over.partitionBy("a").orderBy("rowtime").preceding(UNBOUNDED_ROW).as("w"))
1
2
3
4
5
6
7
8
9

# 有界 Over Windows

// 有界的事件时间
.window(Over.partitionBy("a").orderBy("rowtime").preceding("1.minutes").as("w"))
// 有界的处理时间
.window(Over.partitionBy("a").orderBy("proctime").preceding("1.minutes").as("w"))
.window(Over.partitionBy("a").orderBy("rowtime").preceding("10.rows").as("w"))
.window(Over.partitionBy("a").orderBy("proctime").preceding("10.rows").as("w"))
1
2
3
4
5
6

# sql 中的 Over Window

用 Over 做窗口聚合时,所有聚合必须在同一窗口上定义,也就是说必须是相同的分区、排序和范围;目前仅支持在当前行范围之前的窗口;Order By 必须在单一的时间属性上指定。

select count(id) over(
    partition by id
    order by eventTime
    // 当前行以及前两行
    row between 2 preceding and current row
) from table
1
2
3
4
5
6

# 代码演示

public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

    // 连接外部系统读取数据
    String path = "D:\\workspace\\spring\\middleware\\flink\\flink-test\\src\\main\\resources\\hello.txt";
    // 类似于 source
    tableEnv.connect(
            new FileSystem().path(path)
    // 类似于 split
    ).withFormat(
            // 引入 flink csv 依赖,默认 , 分割
            new Csv()
    // 类似于 map 转换
    ).withSchema(
        new Schema().field("id", DataTypes.STRING())
            .field("data", DataTypes.STRING())
            .field("num", DataTypes.INT())
            .field("eventTime",  DataTypes.TIMESTAMP(3))
    ).createTemporaryTable("inputTable");

    String ddl = "CREATE TABLE MyUserTable ( " +
            "  id STRING, " +
            "  eventTime BIGINT, " +
            "  data STRING, " +
            "  num INTEGER, " +
            "  rt as TO_TIMESTAMP( FROM_UNIXTIME(eventTime) ), " +
            "  watermark for rt as rt - interval '5' second  " +
            ") WITH ( " +
            "  'connector.type' = 'filesystem'," +
            "  'connector.path' = 'D:\\workspace\\spring\\middleware\\flink\\flink-test\\src\\main\\resources\\hello.txt', " +
            "  'format.type' = 'csv' " +
            ")";


    tableEnv.executeSql(ddl);

    Table myUserTable = tableEnv.from("MyUserTable");

    Table select = myUserTable.window(Over.partitionBy($("id")).orderBy($("rt")).preceding(rowInterval(2L)).as("w"))
            .select("id,rt,id.count over w,num.sum over w");

    String sql = "select id,rt,count(id) over w,sum(num) over w "+
            " from MyUserTable "+
            " window w as ( partition by id order by rt rows between 2 preceding and current row ) ";
    Table Table = tableEnv.sqlQuery(sql);

    tableEnv.executeSql(sql).print();
    tableEnv.toAppendStream(Table, Row.class).print("sql");
    tableEnv.toRetractStream(select, Row.class).print("api");

    env.execute("test");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56

数据

1,1623051400,test data,1
2,1623051401,test data,3
3,1623051402,test data,5
1,1623051405,test data,7
2,1623051406,test data,9
3,1623051409,test data,2
1,1623051415,test data,4
2,1623051416,test data,6
3,1623051417,test data,7
1,1623051418,test data,9
2,1623051419,test data,3
3,1623051420,test data,2
1,1623051421,test data,2
1,1623051422,test data,3
1,1623051423,test data,1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

结果

+----+--------------------------------+-------------------------+----------------------+-------------+
| op |                             id |                      rt |               EXPR$2 |      EXPR$3 |
+----+--------------------------------+-------------------------+----------------------+-------------+
| +I |                              1 |     2021-06-07T15:36:40 |                    1 |           1 |
| +I |                              2 |     2021-06-07T15:36:41 |                    1 |           3 |
| +I |                              3 |     2021-06-07T15:36:42 |                    1 |           5 |
| +I |                              1 |     2021-06-07T15:36:45 |                    2 |           8 |
| +I |                              2 |     2021-06-07T15:36:46 |                    2 |          12 |
| +I |                              3 |     2021-06-07T15:36:49 |                    2 |           7 |
| +I |                              1 |     2021-06-07T15:36:55 |                    3 |          12 |
| +I |                              2 |     2021-06-07T15:36:56 |                    3 |          18 |
| +I |                              3 |     2021-06-07T15:36:57 |                    3 |          14 |
| +I |                              1 |     2021-06-07T15:36:58 |                    3 |          20 |
| +I |                              2 |     2021-06-07T15:36:59 |                    3 |          18 |
| +I |                              3 |        2021-06-07T15:37 |                    3 |          11 |
| +I |                              1 |     2021-06-07T15:37:01 |                    3 |          15 |
| +I |                              1 |     2021-06-07T15:37:02 |                    3 |          14 |
| +I |                              1 |     2021-06-07T15:37:03 |                    3 |           6 |
+----+--------------------------------+-------------------------+----------------------+-------------+
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

partition by id order by rt rows between 2 preceding and current row, 是以 id 分组,计算当前行的前 2 行,也就是一共 3 行,的值的计算。第一条数据进来,对于他来说就是当前行,前两行没有。
更详细的 over 概念可以看这篇文章 (opens new window)

# 函数

Flink Table API 和 SQL 为用户提供了一组用于数据转换的内置函数,sql 中支持的很多函数,Table API 和 SQL 都已经做了实现。

比较函数                      逻辑函数                      算数函数
SQL:                          SQL:                          SQL:
value1 = value2               boolean1 or boolean2          numeric1+numeric2
value1 > value2               boolean1 is false             power(numeric1,numeric2)
                              not boolean

Table API:                    Table API:                    
ANY1 === ANY2                 boolean1 || boolean2          numeric1+numeric2
ANY1 > ANY2                   boolean.isFalse               numeric1.power(numeric2)
                              !boolean


字符串函数                    时间函数                       聚合函数
SQL:                          SQL:                           SQL:
string1 || string2            date string                    count(*)
upper(string)                 timestamp string               sum(expression)
char_length(string)           current_time                   rank()
                              interval string range          row_number()

Table API:                    Table API:                     Table API:
string1 + string2             string.toDate                  field.count
string.upperCase()            string.toTimestamp             field.sum
string.charLength()           currentTime()
                              numeric.days
                              numeric.minutes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

# 用户自定义函数(UDF)

用户自定义函数(User defined Functions,UDF)是一个重要得特性,他们显著地扩展了查询的表达能力。

在大多数情况下,用户定义的函数必须先注册,然后才能在查询中使用;函数通过调用 registerFunctionn () 方法在 TableEnvironment 的函数目录中,这样 Table API 或 SQL 解析器就可以识别并正确地解释它。

# 标量函数(Scalar Functions)

用户自定义的标量函数,可以将 0、1 或多个标量值,映射到新的标量值;为了定义标量函数,必须在 org.apache.flink.table.functions 中扩展基类 Scalar Function,并实现(一个或多个)求值(eval)方法。简单来说,就是把一个表的字段传入解析,列中可以显示这个被解析的字段的结果。

标量函数的行为由求值方法决定,求值方法必须公开声明并名命为 eval。

 public static void main(String[] args) {
    // 执行环境创建
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

    // 读取数据
    DataStream<String> dataStream = env.readTextFile("D:\\workspace\\middleware\\flink\\flink-test\\src\\main\\resources\\hello.txt");
    // 转换数据
    DataStream<EventData> map = dataStream.map(i -> {
        String[] strs = i.split(",");
        return new EventData(
                strs[0],
                Long.valueOf(strs[1]),
                String.valueOf((Long.valueOf(strs[1]) - 5)),
                Integer.valueOf(strs[3])
        );
    });

    // 将流转换成表
    Table dataTable = tableEnv.fromDataStream(map, $("id"), $("eventTime"), $("data"), $("num"));
    HashCode hashCode = new HashCode(20);
    // 创建一个临时UDF 注册到环境中
    tableEnv.createTemporarySystemFunction("hashCode",hashCode);
    // table api 使用自定义函数
    dataTable.select($("id"),call("hashCode",$("id"))).execute().print();

    tableEnv.createTemporaryView("event_data",map);
    tableEnv.sqlQuery("select id,hashCode(id) from event_data").execute().print();

}

public static class HashCode extends ScalarFunction{
    private  int factor = 10;
    // 可以定义构造函数来传标准配置
    public HashCode(int factor){
        this.factor = factor;
    }
    // 必须是 public 返回类型和参数类型随便定,但方法名必须交 eval
    public int eval(String id){
        return id.hashCode() % factor;
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

# 表函数(Table Functions)

用户定义的表函数,也可以将 0、1 或多个标量值作为输入参数;与标量函数不同的是,它可以返回任意数量的行为作为输出,而不是单个值。简单来说就是把表的字段传入解析,一行变多行。

为了定义一个表函数,必须扩展 org.apache.flink.table.functions 中的基类 TableFuntion 并实现 (一个或多个) 求值方法;表函数的行为由其求值方法决定,求值方法必须是 public 的,并命名为 eval。

public static void main(String[] args) {
    // 执行环境创建
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

    // 读取数据
    DataStream<String> dataStream = env.readTextFile("D:\\workspace\\middleware\\flink\\flink-test\\src\\main\\resources\\hello.txt");
    // 转换数据
    DataStream<EventData> map = dataStream.map(i -> {
        String[] strs = i.split(",");
        return new EventData(
                strs[0],
                Long.valueOf(strs[1]),
                strs[2],
                Integer.valueOf(strs[3])
        );
    });

    // 将流转换成表
    Table dataTable = tableEnv.fromDataStream(map, $("id"), $("eventTime"), $("data"), $("num"));

    Split split = new Split(" ");
    // 创建一个临时UDF 注册到环境中
    tableEnv.createTemporarySystemFunction("split",split);
    // table api 使用自定义函数
    dataTable.joinLateral(call("split",$("data")).as("word","length"))
            .select($("id"),$("data"),$("word"),$("length"))
            .execute()
            .print();

    tableEnv.createTemporaryView("event_data",map);
    tableEnv.sqlQuery("select id,data,word,length from event_data,lateral table(split(data)) as aplitid(word,length)")
            .execute()
            .print();

}

public static class Split extends TableFunction<Tuple2<String,Integer>> {
    private String mark = ",";
    // 可以定义构造函数来传标准配置
    public Split(String mark){
        this.mark = mark;
    }
    // 必须是 public 返回类型和参数类型随便定,但方法名必须交 eval
    public void eval(String data){
        for(String str : data.split(mark)) {
            collect(new Tuple2<String, Integer>(str,str.length()));
        }
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

# 聚合函数(Aggregate Functions)

用户定义聚合函数(User Defined Aggregate Functions,UDAGGs)可以把一个表中的数据,聚合成一个标量值;用户定义的聚合函数,是通过继承 AggregateFunction 抽象类实现的。对表分组聚合,相同组的会聚合成一条数据。

获取最大价格

AggregateFunction 要求必须实现的方法:createAccumulator ()、Accumulate ()、getValue ();

AggregateFunction 的工作原理:首先,它需要一个累加器(Accumulate),用来保存聚合中间结果的数据结构,可以通过调用 createAccumulator () 方法创建空累加器;随后,对每个输入行调用函数的 Accumulate () 方法来更新累加器;处理完所有行,将调用函数的 getValue () 方法来计算并返回最终结果。

public static void main(String[] args) {
    // 执行环境创建
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

    // 读取数据
    DataStream<String> dataStream = env.readTextFile("D:\\workspace\\middleware\\flink\\flink-test\\src\\main\\resources\\hello.txt");
    // 转换数据
    DataStream<EventData> map = dataStream.map(i -> {
        String[] strs = i.split(",");
        return new EventData(
                strs[0],
                Long.valueOf(strs[1]),
                strs[2],
                Integer.valueOf(strs[3])
        );
    });

    // 将流转换成表
    Table dataTable = tableEnv.fromDataStream(map, $("id"), $("eventTime"), $("data"), $("num"));

    Avg avg = new Avg();
    // 创建一个临时UDF 注册到环境中
    tableEnv.createTemporarySystemFunction("avg_0",avg);
    // table api 使用自定义函数
    dataTable.groupBy($("id"))
            .select($("id"),call("avg_0",$("num").as("avg_num")))
            .execute()
            .print();


    tableEnv.createTemporaryView("event_data",map);
    tableEnv.sqlQuery("select  id,avg_0(num) as avg_num from event_data group by id ")
            .execute()
            .print();

}

// 求 num 平均值
public static class Avg extends AggregateFunction<Double,Tuple2<Double,Integer>> {

    // 求平均值
    @Override
    public Double getValue(Tuple2<Double, Integer> doubleIntegerTuple2) {
        return doubleIntegerTuple2.f0 / doubleIntegerTuple2.f1;
    }

    // 初始化状态
    @Override
    public Tuple2<Double, Integer> createAccumulator() {
        return new Tuple2<>(0.0,0);
    }

    // 必须实现 accumulate 方法,来数据之后更新状态,acc 为当前状态,tmp 为输入数据
    public void accumulate(Tuple2<Double, Integer> acc,Double tmp){
        acc.f0 += tmp;
        acc.f1 += 1;
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

# 表聚合函数

表聚合函数 和 聚合函数的区别是,对表分组聚合,相同组的会聚合成多行多列的结果表;表聚合函数通过继承 TableAggregateFunction 抽象类来实现的。

保留两个价格最高的

public static void main(String[] args) {
    // 执行环境创建
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

    // 读取数据
    DataStream<String> dataStream = env.readTextFile("D:\\workspace\\middleware\\flink\\flink-test\\src\\main\\resources\\hello.txt");
    // 转换数据
    DataStream<EventData> map = dataStream.map(i -> {
        String[] strs = i.split(",");
        return new EventData(
                strs[0],
                Long.valueOf(strs[1]),
                strs[2],
                Integer.valueOf(strs[3])
        );
    });

    // 将流转换成表
    Table dataTable = tableEnv.fromDataStream(map, $("id"), $("eventTime"), $("data"), $("num"));

//        Top2 top2 = new Top2();
    // 创建一个临时UDF 注册到环境中
//        tableEnv.createTemporarySystemFunction("top2",top2);
    tableEnv.registerFunction("top2", new Top2());
    // table api 使用自定义函数
    dataTable.groupBy($("id"))
            .flatAggregate("top2(num) as (TOP1, TOP2)")
            .select($("id"),$("TOP1"),$("TOP2"))
            .execute()
            .print();
}

// 求 num 平均值
// 第一个泛型是输出,第二个泛型是状态
public static class Top2 extends TableAggregateFunction<Tuple2<Integer,Integer>,Tuple2<Integer,Integer>> {

    // 输出
    public void emitValue(Tuple2<Integer, Integer> acc, Collector<Tuple2<Integer, Integer>> out){
        out.collect(acc);
    }

    // 初始化状态
    @Override
    public Tuple2<Integer, Integer> createAccumulator() {
        return new Tuple2<>(0,0);
    }

    // 必须实现 accumulate 方法,来数据之后更新状态,acc 为当前状态,tmp 为输入数据
    public void accumulate(Tuple2<Integer, Integer> acc,Integer tmp){
        // 如果 tem 大于 f0,且 f0 > f1 那么
        // 三个数降序 取前两个
        List<Integer> doubles = Arrays
                .asList(acc.f0, acc.f1,tmp)
                .stream()
                .sorted(new Comparator<Integer>() {

                    @Override
                    public int compare(Integer o1, Integer o2) {
                        return Integer.compare(o1,o2);
                    }
                }.reversed())
                .collect(Collectors.toList());
        acc.f0 = doubles.get(0);
        acc.f1 = doubles.get(1);
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
上次更新: 6/11/2025, 4:10:30 PM
Flink 状态一致性
Flink CEP编程

← Flink 状态一致性 Flink CEP编程→

Theme by Vdoing | Copyright © 2023-2025
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式