多读书多实践,勤思考善领悟

Flink Table API编程

本文于1819天之前发表,文中内容可能已经过时。

一. 什么是Table API

首先:flink根据使用的便捷性提供了三种API,自下而上是:

img

Table API & SQL

  1. 声明行:用户只关心做什么,不用关心怎么做

  2. 高性能:支持性能优化,可以获取更好的执行性能

  3. 流批统一:相同的统计逻辑,既可以流模式运行,也可以批模式运行

  4. 性能稳定:语义遵循SQL标准,不易变动

  5. 易理解:语义明确,所见即所得

Table API:tab.groupBy(“word”).select(“word,count(1) as count”)

SQL:SELECT word,COUNT(*) as cnt FROM MyTable GROUP BY word

1.2 Table API 特点

1、Table API使得多声明的数据处理起来比较容易

  例如:我们把a大于10的数据存xxx的外部表,同时需要把a小于10的数据插入到外部表yyy,我们是使用TableAPI很方便。

    Table.filter(a>10).insertInto(“xxx”)

    Table.filter(a<10).insertInto(“yyy”)

2、TableAPI使得扩展标准SQL更容易(当且仅当需要的时候)

img

二. Table API 编程

2.1 WordCount示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.OldCsv;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;

public class JavaStreamWordCount {

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);

String path = JavaStreamWordCount.class.getClassLoader().getResource("words.txt").getPath();
tEnv.connect(new FileSystem().path(path))
.withFormat(new OldCsv().field("word", Types.STRING).lineDelimiter("\n"))
.withSchema(new Schema().field("word", Types.STRING))
.inAppendMode()
.registerTableSource("fileSource");

Table result = tEnv.scan("fileSource")
.groupBy("word")
.select("word, count(1) as count");

tEnv.toRetractStream(result, Row.class).print();
env.execute();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.OldCsv;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;

public class JavaStreamWordCount {

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);

String path = JavaStreamWordCount.class.getClassLoader().getResource("words.txt").getPath();
tEnv.connect(new FileSystem().path(path))
.withFormat(new OldCsv().field("word", Types.STRING).lineDelimiter("\n"))
.withSchema(new Schema().field("word", Types.STRING))
.inAppendMode()
.registerTableSource("fileSource");

Table result = tEnv.scan("fileSource")
.groupBy("word")
.select("word, count(1) as count");

tEnv.toRetractStream(result, Row.class).print();
env.execute();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.OldCsv;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;

public class JavaBatchWordCount {

public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);

String path = JavaBatchWordCount.class.getClassLoader().getResource("words.txt").getPath();
tEnv.connect(new FileSystem().path(path))
.withFormat(new OldCsv().field("word", Types.STRING).lineDelimiter("\n"))
.withSchema(new Schema().field("word", Types.STRING))
.registerTableSource("fileSource");

Table result = tEnv.scan("fileSource")
.groupBy("word")
.select("word, count(1) as count");

tEnv.toDataSet(result, Row.class).print();
}
}

参考:https://github.com/hequn8128/TableApiDemo

Environment

img

Table Environment使用优化
https://mail.google.com/mail/u/0/?tab=wm#search/label%3Aflink-dev+table+environment/FMfcgxvzMBlhTWVjxlzCnVZsLvvDkmph
https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions

2.2 TableAPI操作

(1)how to get a Table

    Table myTable = tableExnvironment.scan(“MyTable”); //Table 是从tableExnvironment中scan出来的,那么MyTable是如果注册呢,即:How to register a table?大致又三种:

img

(2)how to emit a Table  

img

(3)how to query a Table

img

img

Columns Operation & Function

Columns Operation - 易用性

假设有一张100列的表,我们需要去掉一列,需要怎么操作?

Operators Examples
AddColumns Table orders = tableEnv.scan(“Orders”); Table result = orders.addColumns(“concat(c, ‘sunny‘)as desc”);
AddOrReplaceColumns Table orders = tableEnv.scan(“Orders”); Table result = orders.addOrReplaceColumns(“concat(c, ‘sunny’) as desc”);
DropColumns Table orders = tableEnv.scan(“Orders”); Table result = orders.dropColumns(“b, c”);
RenameColumns Table orders = tableEnv.scan(“Orders”); Table result = orders.renameColumns(“b as b2, c as c2”);

假设有一张100列的表,我们需要选择第20-第80列,需要怎么操作?

Columns Function - 易用性

img

Syntax

The proposed colu mn operati on syn tax is as follows:

columnOperation:

withColu mn s(colu mn Exprs) / withoutColu mn s(colu mn Exprs)

columnExprs:

columnExpr [, columnExpr]*

columnExpr:

columnRef | columnlndex to columnlndex | columnName to columnName

columnRef:

colu mnN ame(The field n ame that exists in the table) | colu mnln dex(a
positive in teger start ing at 1)

Example: withColumns(a, b, 2 to 10, w to z)

总结

API Example
Columns Operation table.dropColumns(‘a, ‘b)
Columns Function table.select(withColum ns(‘a, 1 to 10))

Row-based Operation

Map operation - 易用性

img

FlatMap operation - 易用性

img

Aggregate operation 一 易用性

img

FlatAggregate operation —功能性

img

Aggregate VS TableAggregate

img

总结

img

三. Table API 动态

3.1 Flip29

https://issues.apache.org/jira/browse/FLINK-10972

3.2 Python Table API

https://issues.apache.org/jira/browse/FLINK-12308

3.3 Interactive Programming(交互式编程)

https://issues.apache.org/jira/browse/FLINK-11199

3.4 Iterative processing(迭代计算)

https://issues.apache.org/jira/browse/FLINK-11199