一. 什么是Table API
1.1 Flink API总览
首先:flink根据使用的便捷性提供了三种API,自下而上是:

Table API & SQL
声明行:用户只关心做什么,不用关心怎么做
高性能:支持性能优化,可以获取更好的执行性能
流批统一:相同的统计逻辑,既可以流模式运行,也可以批模式运行
性能稳定:语义遵循SQL标准,不易变动
易理解:语义明确,所见即所得
Table API:tab.groupBy(“word”).select(“word,count(1) as count”)
SQL:SELECT word,COUNT(*) as cnt FROM MyTable GROUP BY word
1.2 Table API 特点
1、Table API使得多声明的数据处理起来比较容易
例如:我们把a大于10的数据存xxx的外部表,同时需要把a小于10的数据插入到外部表yyy,我们是使用TableAPI很方便。
Table.filter(a>10).insertInto(“xxx”)
Table.filter(a<10).insertInto(“yyy”)
2、TableAPI使得扩展标准SQL更容易(当且仅当需要的时候)

二. Table API 编程
2.1 WordCount示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.table.descriptors.FileSystem; import org.apache.flink.table.descriptors.OldCsv; import org.apache.flink.table.descriptors.Schema; import org.apache.flink.types.Row;
public class JavaStreamWordCount {
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
String path = JavaStreamWordCount.class.getClassLoader().getResource("words.txt").getPath(); tEnv.connect(new FileSystem().path(path)) .withFormat(new OldCsv().field("word", Types.STRING).lineDelimiter("\n")) .withSchema(new Schema().field("word", Types.STRING)) .inAppendMode() .registerTableSource("fileSource");
Table result = tEnv.scan("fileSource") .groupBy("word") .select("word, count(1) as count");
tEnv.toRetractStream(result, Row.class).print(); env.execute(); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.table.descriptors.FileSystem; import org.apache.flink.table.descriptors.OldCsv; import org.apache.flink.table.descriptors.Schema; import org.apache.flink.types.Row;
public class JavaStreamWordCount {
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
String path = JavaStreamWordCount.class.getClassLoader().getResource("words.txt").getPath(); tEnv.connect(new FileSystem().path(path)) .withFormat(new OldCsv().field("word", Types.STRING).lineDelimiter("\n")) .withSchema(new Schema().field("word", Types.STRING)) .inAppendMode() .registerTableSource("fileSource");
Table result = tEnv.scan("fileSource") .groupBy("word") .select("word, count(1) as count");
tEnv.toRetractStream(result, Row.class).print(); env.execute(); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.BatchTableEnvironment; import org.apache.flink.table.descriptors.FileSystem; import org.apache.flink.table.descriptors.OldCsv; import org.apache.flink.table.descriptors.Schema; import org.apache.flink.types.Row;
public class JavaBatchWordCount {
public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
String path = JavaBatchWordCount.class.getClassLoader().getResource("words.txt").getPath(); tEnv.connect(new FileSystem().path(path)) .withFormat(new OldCsv().field("word", Types.STRING).lineDelimiter("\n")) .withSchema(new Schema().field("word", Types.STRING)) .registerTableSource("fileSource");
Table result = tEnv.scan("fileSource") .groupBy("word") .select("word, count(1) as count");
tEnv.toDataSet(result, Row.class).print(); } }
|
参考:https://github.com/hequn8128/TableApiDemo
Environment

Table Environment使用优化
https://mail.google.com/mail/u/0/?tab=wm#search/label%3Aflink-dev+table+environment/FMfcgxvzMBlhTWVjxlzCnVZsLvvDkmph
https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions
2.2 TableAPI操作
(1)how to get a Table
Table myTable = tableExnvironment.scan(“MyTable”); //Table 是从tableExnvironment中scan出来的,那么MyTable是如果注册呢,即:How to register a table?大致又三种:

(2)how to emit a Table

(3)how to query a Table


Columns Operation & Function
Columns Operation - 易用性
假设有一张100列的表,我们需要去掉一列,需要怎么操作?
Operators |
Examples |
AddColumns |
Table orders = tableEnv.scan(“Orders”); Table result = orders.addColumns(“concat(c, ‘sunny‘)as desc”); |
AddOrReplaceColumns |
Table orders = tableEnv.scan(“Orders”); Table result = orders.addOrReplaceColumns(“concat(c, ‘sunny’) as desc”); |
DropColumns |
Table orders = tableEnv.scan(“Orders”); Table result = orders.dropColumns(“b, c”); |
RenameColumns |
Table orders = tableEnv.scan(“Orders”); Table result = orders.renameColumns(“b as b2, c as c2”); |
假设有一张100列的表,我们需要选择第20-第80列,需要怎么操作?
Columns Function - 易用性

Syntax
The proposed colu mn operati on syn tax is as follows:
columnOperation:
withColu mn s(colu mn Exprs) / withoutColu mn s(colu mn Exprs)
columnExprs:
columnExpr [, columnExpr]*
columnExpr:
columnRef | columnlndex to columnlndex | columnName to columnName
columnRef:
colu mnN ame(The field n ame that exists in the table) | colu mnln dex(a
positive in teger start ing at 1)
Example: withColumns(a, b, 2 to 10, w to z)
总结
API |
Example |
Columns Operation |
table.dropColumns(‘a, ‘b) |
Columns Function |
table.select(withColum ns(‘a, 1 to 10)) |
Row-based Operation
Map operation - 易用性

FlatMap operation - 易用性

Aggregate operation 一 易用性
FlatAggregate operation —功能性

Aggregate VS TableAggregate

总结

三. Table API 动态
3.1 Flip29
https://issues.apache.org/jira/browse/FLINK-10972
3.2 Python Table API
https://issues.apache.org/jira/browse/FLINK-12308
3.3 Interactive Programming(交互式编程)
https://issues.apache.org/jira/browse/FLINK-11199
3.4 Iterative processing(迭代计算)
https://issues.apache.org/jira/browse/FLINK-11199