➜ flink-1.7.2 bin/flink run -d examples/streaming/TopSpeedWindowing.jar Starting execution of program Executing TopSpeedWindowing example with default input data set. Use --input to specify file input. Printing result to stdout. Use --output to specify output path. Job has been submitted with JobID 5e20cb6b0f357591171dfcca2eea09de
运行起来后默认是 1 个并发:
点左侧「Task Manager」,然后点「Stdout」能看到输出日志:
或者查看本地 Log 目录下的 *.out 文件:
List
查看任务列表:
1 2 3 4 5 6
➜ flink-1.7.2 bin/flink list -m 127.0.0.1:8081 Waiting for response... ------------------ Running/Restarting Jobs ------------------- 24.03.2019 10:14:06 : 5e20cb6b0f357591171dfcca2eea09de : CarTopSpeedWindowingExample (RUNNING) -------------------------------------------------------------- No scheduled jobs.
➜ flink-1.7.2 bin/flink stop -m 127.0.0.1:8081 d67420e52bd051fae2fddbaa79e046bb Stopping job d67420e52bd051fae2fddbaa79e046bb. ------------------------------------------------------------ The program finished with the following exception: org.apache.flink.util.FlinkException: Could not stop the job d67420e52bd051fae2fddbaa79e046bb. at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:554) at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:985) at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:547) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1062) at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.rest.util.RestClientException: [Job termination (STOP) failed: This job is not stoppable.] at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) at org.apache.flink.client.program.rest.RestClusterClient.stop(RestClusterClient.java:392) at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:552) ... 9 more Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Job termination (STOP) failed: This job is not stoppable.] at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:380) at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:364) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
➜ flink-1.7.2 bin/flink savepoint -m 127.0.0.1:8081 ec53edcfaeb96b2a5dadbfbe5ff62bbb /tmp/savepoint Triggering savepointfor job ec53edcfaeb96b2a5dadbfbe5ff62bbb. Waiting for response... Savepoint completed. Path: file:/tmp/savepoint/savepoint-ec53ed-84b00ce500ee You can resume your program from this savepointwith the run command.
Savepoint 是全量做的,每次的时间较长,数据量较大,需要用户主动去触发。Savepoint 一般用于程序的版本更新(详见文档),Bug 修复,A/B Test 等场景,需要用户指定。
通过 -s 参数从指定的 Savepoint 启动:
1 2 3 4 5
➜ flink-1.7.2 bin/flink run -d -s /tmp/savepoint/savepoint-f049ff-24ec0d3e0dc7 ./examples/streaming/TopSpeedWindowing.jar Starting execution of program Executing TopSpeedWindowing example with default input data set. Use --input to specify file input. Printing result to stdout. Use --output to specify output path.
查看 JobManager 的日志,能够看到类似这样的 Log:
1 2 3 4 5 6
2019-03-2810:30:53,957 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Starting job 790d7b98db6f6af55d04aec1d773852d from savepoint /tmp/savepoint/savepoint-f049ff-24ec0d3e0dc7 () 2019-03-2810:30:53,959 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Reset the checkpoint ID of job 790d7b98db6f6af55d04aec1d773852d to 2. 2019-03-2810:30:53,959 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Restoring job 790d7b98db6f6af55d04aec1d773852d from latest valid checkpoint: Checkpoint 1 @ 0for790d7b98db6f6af55d04aec1d773852d.
➜ flink-1.7.2 bin/flink modify -p 47752ea7b0e7303c780de9d86a5ded3fa Modify job 7752ea7b0e7303c780de9d86a5ded3fa. Rescaled job 7752ea7b0e7303c780de9d86a5ded3fa. Its new parallelism is 4. ➜ flink-1.7.2 ll /tmp/savepoint total 0 drwxr-xr-x3 baoniu 96 Jun 1709:05 savepoint-7752ea-00c05b015836/ ➜ flink-1.7.2 bin/flink modify -p 37752ea7b0e7303c780de9d86a5ded3fa Modify job 7752ea7b0e7303c780de9d86a5ded3fa. Rescaled job 7752ea7b0e7303c780de9d86a5ded3fa. Its new parallelism is 3. ➜ flink-1.7.2 ll /tmp/savepoint total 0 drwxr-xr-x3 baoniu 96 Jun 1709:08 savepoint-7752ea-449b131b2bd4/
查看 JobManager 的日志,可以看到:
1 2 3 4 5 6
2019-06-1709:05:11,179 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Starting job 7752ea7b0e7303c780de9d86a5ded3fa from savepoint file:/tmp/savepoint/savepoint-790d7b-3581698f007e () 2019-06-1709:05:11,182 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Reset the checkpoint ID of job 7752ea7b0e7303c780de9d86a5ded3fa to 3. 2019-06-1709:05:11,182 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Restoring job 790d7b98db6f6af55d04aec1d773852d from latest valid checkpoint: Checkpoint 2 @ 0for7752ea7b0e7303c780de9d86a5ded3fa. 2019-06-1709:05:11,184 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - No master state to restore 2019-06-1709:05:11,184 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job CarTopSpeedWindowingExample (7752ea7b0e7303c780de9d86a5ded3fa) switched from state RUNNING to SUSPENDING. org.apache.flink.util.FlinkException: Job is being rescaled.
[admin@z17.sqa.zth /home/admin/flink/flink-1.7.2] $echo$HADOOP_CONF_DIR /etc/hadoop/conf/ [admin@z17.sqa.zth /home/admin/flink/flink-1.7.2] $./bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar 2019-06-1709:15:24,511 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at z05c05217.sqa.zth.tbsite.net/11.163.188.29:8050 2019-06-1709:15:24,690 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2019-06-1709:15:24,690 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2019-06-1709:15:24,907 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1024, numberTaskManagers=1, slotsPerTaskManager=4} 2019-06-1709:15:25,430 WARN org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 2019-06-1709:15:25,438 WARN org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration directory ('/Users/baoniu/Documents/work/tool/flink/flink-1.7.2/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them. 2019-06-1709:15:36,239 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application master application_1532332183347_0724 2019-06-1709:15:36,276 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1532332183347_0724 2019-06-1709:15:36,276 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster to be allocated 2019-06-1709:15:36,281 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, current state ACCEPTED 2019-06-1709:15:40,426 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has been deployed successfully. Starting execution of program Executing WordCount example with default input data set. Use --input to specify file input. Printing result to stdout. Use --output to specify output path. (a,5) (action,1) (after,1) (against,1) (all,2) ... ... (would,2) (wrong,1) (you,1) Program execution finished Job with JobID 8bfe7568cb5c3254af30cbbd9cd5971e has finished. Job Runtime: 9371 ms Accumulator Results: - 2bed2c5506e9237fb85625416a1bc508 (java.util.ArrayList) [170 elements]
./bin/flink run -m yarn-cluster ./examples/streaming/TopSpeedWindowing.jar
单任务 Detached 模式
由于是 Detached 模式,客户端提交完任务就退出了
Yarn 上显示为 Flink per-job cluster
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
$./bin/flink run -yd -m yarn-cluster ./examples/streaming/TopSpeedWindowing.jar 2019-06-1809:21:59,247 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at z05c05217.sqa.zth.tbsite.net/11.163.188.29:8050 2019-06-1809:21:59,428 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2019-06-1809:21:59,428 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2019-06-1809:21:59,940 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1024, numberTaskManagers=1, slotsPerTaskManager=4} 2019-06-1809:22:00,427 WARN org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 2019-06-1809:22:00,436 WARN org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration directory ('/Users/baoniu/Documents/work/tool/flink/flink-1.7.2/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them. ^@2019-06-1809:22:12,113 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application master application_1532332183347_0729 2019-06-1809:22:12,151 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1532332183347_0729 2019-06-1809:22:12,151 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster to be allocated 2019-06-1809:22:12,155 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, current state ACCEPTED 2019-06-1809:22:16,275 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has been deployed successfully. 2019-06-1809:22:16,275 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - The Flink YARN client has been started in detached mode. In order to stop Flink on YARN, use the following command or a YARN web interface to stop it: yarn application -kill application_1532332183347_0729 Please also note that the temporary files of the YARN session in the home directory will not be removed. Job has been submitted with JobID e61b9945c33c300906ad50a9a11f36df
➜ flink-1.7.2 ./bin/yarn-session.sh -tm 2048 -s 3 2019-06-1709:21:50,177 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.address, localhost 2019-06-1709:21:50,179 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.port, 6123 2019-06-1709:21:50,179 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.heap.size, 1024m 2019-06-1709:21:50,179 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.heap.size, 1024m 2019-06-1709:21:50,179 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.numberOfTaskSlots, 4 2019-06-1709:21:50,179 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: state.savepoints.dir, file:///tmp/savepoint 2019-06-1709:21:50,180 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: parallelism.default, 1 2019-06-1709:21:50,180 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: rest.port, 8081 2019-06-1709:21:50,644 WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2019-06-1709:21:50,746 INFO org.apache.flink.runtime.security.modules.HadoopModule - Hadoop user set to baoniu (auth:SIMPLE) 2019-06-1709:21:50,848 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at z05c05217.sqa.zth.tbsite.net/11.163.188.29:8050 2019-06-1709:21:51,148 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=2048, numberTaskManagers=1, slotsPerTaskManager=3} 2019-06-1709:21:51,588 WARN org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 2019-06-1709:21:51,596 WARN org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration directory ('/Users/baoniu/Documents/work/tool/flink/flink-1.7.2/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them. ^@2019-06-1709:22:03,304 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application master application_1532332183347_0726 2019-06-1709:22:03,336 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1532332183347_0726 2019-06-1709:22:03,336 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster to be allocated 2019-06-1709:22:03,340 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, current state ACCEPTED 2019-06-1709:22:07,722 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has been deployed successfully. 2019-06-1709:22:08,050 INFO org.apache.flink.runtime.rest.RestClient - Rest client endpoint started. Flink JobManager is now running on z07.sqa.net:37109 with leader id 00000000-0000-0000-0000-000000000000. JobManager Web Interface: http://z07.sqa.net:37109
客户端默认是 Attach 模式,不会退出:
可以 ctrl + c 退出,然后再通过 ./bin/yarn-session.sh -id application_1532332183347_0726 连上来;
➜ flink-1.7.2 ./bin/flink run ./examples/batch/WordCount.jar 2019-06-1709:26:42,767 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn properties file under /var/folders/2b/r6d49pcs23z43b8fqsyz885c0000gn/T/.yarn-properties-baoniu. 2019-06-1709:26:42,767 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn properties file under /var/folders/2b/r6d49pcs23z43b8fqsyz885c0000gn/T/.yarn-properties-baoniu. 2019-06-1709:26:43,058 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - YARN properties set default parallelism to 3 2019-06-1709:26:43,058 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - YARN properties set default parallelism to 3 YARN properties set default parallelism to 3 2019-06-1709:26:43,097 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at z05c05217.sqa.zth.tbsite.net/11.163.188.29:8050 2019-06-1709:26:43,229 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2019-06-1709:26:43,229 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2019-06-1709:26:43,327 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Found application JobManager host name 'z05c07216.sqa.zth.tbsite.net' and port '37109' from supplied application id 'application_1532332183347_0726' Starting execution of program Executing WordCount example with default input data set. Use --input to specify file input. Printing result to stdout. Use --output to specify output path. ^@(a,5) (action,1) (after,1) (against,1) (all,2) (and,12) ... ... (wrong,1) (you,1) Program execution finished Job with JobID ad9b0f1feed6d0bf6ba4e0f18b1e65ef has finished. Job Runtime: 9152 ms Accumulator Results: - fd07c75d503d0d9a99e4f27dd153114c (java.util.ArrayList) [170 elements]
运行结束后 TM 的资源会释放。
提交到指定的 Session
通过 -yid 参数来提交到指定的 Session。
1 2 3 4 5 6 7 8 9 10 11
$./bin/flink run -d -p30 -m yarn-cluster -yid application_1532332183347_0708 ./examples/streaming/TopSpeedWindowing.jar 2019-03-2412:36:33,668 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at z05c05217.sqa.zth.tbsite.net/11.163.188.29:8050 2019-03-2412:36:33,773 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2019-03-2412:36:33,773 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2019-03-2412:36:33,837 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Found application JobManager host name 'z05c05218.sqa.zth.tbsite.net' and port '60783' from supplied application id 'application_1532332183347_0708' Starting execution of program Executing TopSpeedWindowing example with default input data set. Use --input to specify file input. Printing result to stdout. Use --output to specify output path. Job has been submitted with JobID 58d5049ebbf28d515159f2f88563f5fd
$./bin/start-scala-shell.sh yarn -n 2 -jm 1024 -s 2 -tm 1024 -nm flink-yarn Starting Flink Shell: 2019-03-2509:47:44,695 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.address, localhost 2019-03-2509:47:44,697 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.port, 6123 2019-03-2509:47:44,697 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.heap.size, 1024m 2019-03-2509:47:44,697 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.heap.size, 1024m 2019-03-2509:47:44,697 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.numberOfTaskSlots, 4 2019-03-2509:47:44,698 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: parallelism.default, 1 2019-03-2509:47:44,698 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: state.savepoints.dir, file:///tmp/savepoint 2019-03-2509:47:44,698 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: rest.port, 8081 2019-03-2509:47:44,717 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn properties file under /tmp/.yarn-properties-admin. 2019-03-2509:47:45,041 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at z05c05217.sqa.zth.tbsite.net/11.163.188.29:8050 2019-03-2509:47:45,098 WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2019-03-2509:47:45,266 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2019-03-2509:47:45,275 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - The argument yn is deprecated in will be ignored. 2019-03-2509:47:45,357 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1024, numberTaskManagers=2, slotsPerTaskManager=2} 2019-03-2509:47:45,711 WARN org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 2019-03-2509:47:45,718 WARN org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration directory ('/home/admin/flink/flink-1.7.2/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them. 2019-03-2509:47:46,514 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application master application_1532332183347_0710 2019-03-2509:47:46,534 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1532332183347_0710 2019-03-2509:47:46,534 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster to be allocated 2019-03-2509:47:46,535 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, current state ACCEPTED 2019-03-2509:47:51,051 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has been deployed successfully. 2019-03-2509:47:51,222 INFO org.apache.flink.runtime.rest.RestClient - Rest client endpoint started. Connecting to Flink cluster (host: 10.10.10.10, port: 56942).
按 CTRL + C 退出 Shell 后,这个 Flink cluster 还会继续运行,不会退出。
➜ flink-1.7.2 bin/stop-cluster.sh No taskexecutor daemon to stop on host zkb-MBP.local. No standalonesession daemon to stop on host zkb-MBP.local. ➜ flink-1.7.2 bin/start-scala-shell.sh local Starting Flink Shell: Starting local Flink cluster (host: localhost, port: 8081). Connecting to Flink cluster (host: localhost, port: 8081). scala> val text = benv.fromElements("To be, or not to be,--that is the question:--") text: org.apache.flink.api.scala.DataSet[String] = org.apache.flink.api.scala.DataSet@5b407336 scala> val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1) }.groupBy(0).sum(1) counts: org.apache.flink.api.scala.AggregateDataSet[(String, Int)] = org.apache.flink.api.scala.AggregateDataSet@6ee34fe4 scala> counts.print() (be,2) (is,1) (not,1) (or,1) (question,1) (that,1) (the,1) (to,2)
➜ flink-1.7.2 bin/start-cluster.sh Starting cluster. Starting standalonesession daemon on host zkb-MBP.local. Starting taskexecutor daemon on host zkb-MBP.local. ➜ flink-1.7.2 ./bin/sql-client.sh embedded Nodefault environment specified. Searching for'/Users/baoniu/Documents/work/tool/flink/flink-1.7.2/conf/sql-client-defaults.yaml'...found. Reading default environment from: file:/Users/baoniu/Documents/work/tool/flink/flink-1.7.2/conf/sql-client-defaults.yaml Nosession environment specified. Validating current environment...done. … … Flink SQL> help; The following commands are available: QUIT Quits the SQL CLI client. CLEAR Clears the current terminal. HELP Prints the available commands. SHOWTABLES Shows all registered tables. SHOW FUNCTIONS Shows all registered user-defined functions. DESCRIBE Describes the schemaof a tablewith the given name. EXPLAIN Describes the execution plan of a queryortablewith the given name. SELECT Executes a SQLSELECTqueryon the Flink cluster. INSERTINTO Inserts the results of a SQLSELECTqueryinto a declared table sink. CREATEVIEW Creates a virtualtablefrom a SQL query. Syntax: 'CREATE VIEW <name> AS <query>;' DROPVIEW Deletes a previously created virtual table. Syntax: 'DROP VIEW <name>;' SOURCEReads a SQLSELECTqueryfrom a fileand executes it on the Flink cluster. SETSets a session configuration property. Syntax: 'SET <key>=<value>;'. Use'SET;'for listing all properties. RESET Resets allsession configuration properties. Hint: Make sure that a statement ends with';'for finalizing (multi-line) statements.
Flink SQL> SET execution.result-mode=table; [INFO] Session property has been set. Flink SQL> SELECTname, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;
运行结果如下图所示:
Changlog mode
1 2 3 4
Flink SQL> SET execution.result-mode=changelog; [INFO] Session property has been set. Flink SQL> SELECTname, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;
➜ flink-1.7.2 ./bin/sql-client.sh embedded -e /tmp/env.yaml No default environment specified. Searching for'/Users/baoniu/Documents/work/tool/flink/flink-1.7.2/conf/sql-client-defaults.yaml'...found. Reading default environment from:file:/Users/baoniu/Documents/work/tool/flink/flink-1.7.2/conf/sql-client-defaults.yaml Reading session environment from:file:/tmp/env.yaml Validating current environment...done. Flink SQL> show tables; MyCustomView MyTableSink MyTableSource Flink SQL> describe MyTableSource; root |-- MyField1: Integer |-- MyField2: String Flink SQL> describe MyCustomView; root |-- MyField2: String Flink SQL> create view MyView1 as select MyField1 from MyTableSource; [INFO] View has been created. Flink SQL> show tables; MyCustomView MyTableSource MyView1 Flink SQL> describe MyView1; root |-- MyField1: Integer Flink SQL> select * from MyTableSource;
使用 insert into 写入结果表:
1 2 3 4 5 6
Flink SQL> insertinto MyTableSink select * from MyTableSource; [INFO] Submitting SQL update statement to the cluster... [INFO] Table update statement has been successfully submitted to the cluster: Cluster ID: StandaloneClusterId Job ID: 3fac2be1fd891e3e07595c684bb7b7a0 Web interface: http://localhost:8081