This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
查询 #
TableEnvironment 的 sqlQuery() 方法可以执行 SELECT 和 VALUES 语句。 这个方法把 SELECT 语句(或 VALUES 语句)的结果作为一个 Table 返回。 Table可以用在后续 SQL 和 Table API 查询中,可以转换为 DataStream, 或者 写入到TableSink。 SQL 和 Table API 查询可以无缝混合,并进行整体优化并转换为单个程序。
为了在SQL查询中访问表,它必须注册在 TableEnvironment。 表使用下列方式注册:TableSource, Table,CREATE TABLE 语句,DataStream。 也可以通过在 TableEnvironment 中注册 Catalog 来指定数据源的位置。
为了方便起见,Table.toString() 自动在 TableEnvironment 中注册一个名称唯一的表,并返回表名。 所以Table对象可以直接内嵌入 SQL 中查询使用,如下示例所示。
注意: 查询如果包含不支持的 SQL 特性,会抛出TableException异常。 下面的章节中列出了批处理和流处理上支持的 SQL 特性。
指定查询 #
下面的示例演示如何在一个注册的表和内联(inlined)的表上指定SQL查询。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // ingest a DataStream from an external source DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...); // SQL query with an inlined (unregistered) table Table table = tableEnv.fromDataStream(ds, $("user"), $("product"), $("amount")); Table result = tableEnv.sqlQuery( "SELECT SUM(amount) FROM " + table + " WHERE product LIKE '%Rubber%'"); // SQL query with a registered table // register the DataStream as view "Orders" tableEnv.createTemporaryView("Orders", ds, $("user"), $("product"), $("amount")); // run a SQL query on the Table and retrieve the result as a new Table Table result2 = tableEnv.sqlQuery( "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'"); // create and register a TableSink final Schema schema = Schema.newBuilder() .column("product", DataTypes.STRING()) .column("amount", DataTypes.INT()) .build(); final TableDescriptor sinkDescriptor = TableDescriptor.forConnector("filesystem") .schema(schema) .option("path", "/path/to/file") .format(FormatDescriptor.forFormat("csv") .option("field-delimiter", ",") .build()) .build(); tableEnv.createTemporaryTable("RubberOrders", sinkDescriptor); // run an INSERT SQL on the Table and emit the result to the TableSink tableEnv.executeSql( "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'"); val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(env) // read a DataStream from an external source val ds: DataStream[(Long, String, Integer)] = env.addSource(...) // SQL query with an inlined (unregistered) table val table = ds.toTable(tableEnv, $"user", $"product", $"amount") val result = tableEnv.sqlQuery( s"SELECT SUM(amount) FROM $table WHERE product LIKE '%Rubber%'") // SQL query with a registered table // register the DataStream under the name "Orders" tableEnv.createTemporaryView("Orders", ds, $"user", $"product", $"amount") // run a SQL query on the Table and retrieve the result as a new Table val result2 = tableEnv.sqlQuery( "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'") // create and register a TableSink val schema = Schema.newBuilder() .column("product", DataTypes.STRING()) .column("amount", DataTypes.INT()) .build() val sinkDescriptor = TableDescriptor.forConnector("filesystem") .schema(schema) .format(FormatDescriptor.forFormat("csv") .option("field-delimiter", ",") .build()) .build() tableEnv.createTemporaryTable("RubberOrders", sinkDescriptor) // run an INSERT SQL on the Table and emit the result to the TableSink tableEnv.executeSql( "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'") env = StreamExecutionEnvironment.get_execution_environment() table_env = StreamTableEnvironment.create(env) # SQL query with an inlined (unregistered) table # elements data type: BIGINT, STRING, BIGINT table = table_env.from_elements(..., ['user', 'product', 'amount']) result = table_env \ .sql_query("SELECT SUM(amount) FROM %s WHERE product LIKE '%%Rubber%%'" % table) # create and register a TableSink schema = Schema.new_builder() .column("product", DataTypes.STRING()) .column("amount", DataTypes.INT()) .build() sink_descriptor = TableDescriptor.for_connector("filesystem") .schema(schema) .format(FormatDescriptor.for_format("csv") .option("field-delimiter", ",") .build()) .build() t_env.create_temporary_table("RubberOrders", sink_descriptor) # run an INSERT SQL on the Table and emit the result to the TableSink table_env \ .execute_sql("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'") 执行查询 #
通过 TableEnvironment.executeSql() 方法可以执行 SELECT 或 VALUES 语句,并把结果收集到本地。它将SELECT语句(或VALUES语句)的结果作为 TableResult 返回。和 SELECT 语句相似,Table.execute() 方法可以执行Table对象,并把结果收集到本地客户端。 TableResult.collect() 方法返回一个可关闭的行迭代器(row iterator)。除非所有结果数据都被收集完成了,否则SELECT作业不会停止,所以应该主动使用 CloseableIterator#close() 方法关闭作业,以防止资源泄露。TableResult.print() 可以打印 SELECT 的结果到客户端的控制台中。 TableResult 上的结果数据只能被访问一次。因此 collect() 和 print() 只能二选一。
TableResult.collect() 和 TableResult.print()在不同的 checkpointing 设置下有一些差异。(流式作业开启 checkpointing,参见 checkpointing 设置)。
- 对于没有开启 checkpoint 的批作业或流作业,
TableResult.collect()和TableResult.print()既不保证精确一次(exactly-once)也不保证至少一次(at-least-once)。查询结果一旦产生,客户端可以立即访问,但是,作业失败或重启将抛出异常。 - 对于 checkpoint 设置为精确一次(exactly-once)的流式作业,
TableResult.collect()和TableResult.print()保证端到端的数据只传递一次。相应的checkpoint完成后,客户端才能访问结果。 - 对于 checkpoint 设置为至少一次(at-least-once)的流式作业,
TableResult.collect()和TableResult.print()保证端到端的数据至少传递一次,查询结果一旦产生,客户端可以立即访问,但是可能会有同一条数据出现多次的情况。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)"); // execute SELECT statement TableResult tableResult1 = tableEnv.executeSql("SELECT * FROM Orders"); // use try-with-resources statement to make sure the iterator will be closed automatically try (CloseableIterator<Row> it = tableResult1.collect()) { while(it.hasNext()) { Row row = it.next(); // handle row } } // execute Table TableResult tableResult2 = tableEnv.sqlQuery("SELECT * FROM Orders").execute(); tableResult2.print(); val env = StreamExecutionEnvironment.getExecutionEnvironment() val tableEnv = StreamTableEnvironment.create(env, settings) // enable checkpointing tableEnv.getConfig .set(CheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE) tableEnv.getConfig .set(CheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(10)) tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)") // execute SELECT statement val tableResult1 = tableEnv.executeSql("SELECT * FROM Orders") val it = tableResult1.collect() try while (it.hasNext) { val row = it.next // handle row } finally it.close() // close the iterator to avoid resource leak // execute Table val tableResult2 = tableEnv.sqlQuery("SELECT * FROM Orders").execute() tableResult2.print() env = StreamExecutionEnvironment.get_execution_environment() table_env = StreamTableEnvironment.create(env, settings) # enable checkpointing table_env.get_config().set("execution.checkpointing.mode", "EXACTLY_ONCE") table_env.get_config().set("execution.checkpointing.interval", "10s") table_env.execute_sql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)") # execute SELECT statement table_result1 = table_env.execute_sql("SELECT * FROM Orders") table_result1.print() # execute Table table_result2 = table_env.sql_query("SELECT * FROM Orders").execute() table_result2.print() 语法 #
Flink使用支持标准 ANSI SQL 的 Apache Calcite 解析 SQL。
下面的 BNF-grammar 描述了批处理和流处理查询中所支持 SQL 特性的超集。操作展示了支持的功能以及示例,并指示了哪些功能仅支持批处理或流处理查询。
Grammar
query: values | WITH withItem [ , withItem ]* query | { select | selectWithoutFrom | query UNION [ ALL ] query | query EXCEPT query | query INTERSECT query } [ ORDER BY orderItem [, orderItem ]* ] [ LIMIT { count | ALL } ] [ OFFSET start { ROW | ROWS } ] [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY] withItem: name [ '(' column [, column ]* ')' ] AS '(' query ')' orderItem: expression [ ASC | DESC ] select: SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ] [ GROUP BY { groupItem [, groupItem ]* } ] [ HAVING booleanExpression ] [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ] selectWithoutFrom: SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } projectItem: expression [ [ AS ] columnAlias ] | tableAlias . * tableExpression: tableReference [, tableReference ]* | tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ] joinCondition: ON booleanExpression | USING '(' column [, column ]* ')' tableReference: tablePrimary [ matchRecognize ] [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ] tablePrimary: [ TABLE ] tablePath [ dynamicTableOptions ] [systemTimePeriod] [[AS] correlationName] | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')' | [ LATERAL ] '(' query ')' | UNNEST '(' expression ')' tablePath: [ [ catalogName . ] databaseName . ] tableName systemTimePeriod: FOR SYSTEM_TIME AS OF dateTimeExpression dynamicTableOptions: /*+ OPTIONS(key=val [, key=val]*) */ key: stringLiteral val: stringLiteral values: VALUES expression [, expression ]* groupItem: expression | '(' ')' | '(' expression [, expression ]* ')' | CUBE '(' expression [, expression ]* ')' | ROLLUP '(' expression [, expression ]* ')' | GROUPING SETS '(' groupItem [, groupItem ]* ')' windowRef: windowName | windowSpec windowSpec: [ windowName ] '(' [ ORDER BY orderItem [, orderItem ]* ] [ PARTITION BY expression [, expression ]* ] [ RANGE numericOrIntervalExpression {PRECEDING} | ROWS numericExpression {PRECEDING} ] ')' matchRecognize: MATCH_RECOGNIZE '(' [ PARTITION BY expression [, expression ]* ] [ ORDER BY orderItem [, orderItem ]* ] [ MEASURES measureColumn [, measureColumn ]* ] [ ONE ROW PER MATCH ] [ AFTER MATCH ( SKIP TO NEXT ROW | SKIP PAST LAST ROW | SKIP TO FIRST variable | SKIP TO LAST variable | SKIP TO variable ) ] PATTERN '(' pattern ')' [ WITHIN intervalLiteral ] DEFINE variable AS condition [, variable AS condition ]* ')' measureColumn: expression AS alias pattern: patternTerm [ '|' patternTerm ]* patternTerm: patternFactor [ patternFactor ]* patternFactor: variable [ patternQuantifier ] patternQuantifier: '*' | '*?' | '+' | '+?' | '?' | '??' | '{' { [ minRepeat ], [ maxRepeat ] } '}' ['?'] | '{' repeat '}' Flink SQL使用的标识符词法规则(table,attribute,function names)和Java相似。
- 大写或小写的标识符都是保留的,就算没有被引用。
- 标识符的匹配区分大小写。
- 和Java不同,反引号(
\)允许标识符包含非字母数字(no-alphanumeric)字符(例如:“SELECT a AS `my field` FROM t”)。
字符串必须被单引号括起来(例如: SELECT 'Hello World')。两个单引号用于转义(例如:SELECT 'It''s me')。
Flink SQL> SELECT 'Hello World', 'It''s me'; +-------------+---------+ | EXPR$0 | EXPR$1 | +-------------+---------+ | Hello World | It's me | +-------------+---------+ 1 row in set 字符串支持Unicode字符。 下面是显式使用Unicode编码的语法:
- 使用反斜杠(
\)作为转义字符 (默认):SELECT U&'\263A' - 使用自定义的转义字符:
SELECT U&'#263A' UESCAPE '#'
Starting Flink 2.0 there is C-style escape available
| Backslash Escape Sequence | Interpretation |
|---|---|
| \b | backspace |
| \f | form feed |
| \n | newline |
| \r | carriage return |
| \t | tab |
| \o, \oo, \ooo (o = 0–7) | octal byte value |
| \xh, \xhh (h = 0–9, A–F) | hexadecimal byte value |
| \uxxxx, \Uxxxxxxxx (x = 0–9, A–F) | 16 or 32-bit hexadecimal Unicode character value |
Example: SELECT e'\u0061\x61\141' AS c or SELECT E'\u0061\x61\141' AS c;