多读书多实践,勤思考善领悟

Flink命令行界面

本文于1823天之前发表,文中内容可能已经过时。

Flink提供命令行界面(CLI)来运行打包为JAR文件的程序,并控制它们的执行。CLI是任何Flink设置的一部分,可在本地单节点设置和分布式设置中使用。它位于<flink-home>/bin/flink 默认情况下,并连接到从同一安装目录启动的正在运行的Flink主服务器(JobManager)。

使用命令行界面的先决条件是Flink主机(JobManager)已启动(通过 <flink-home>/bin/start-cluster.sh)或YARN环境可用。

例子

  • 运行没有参数的示例程序:

    1
    ./bin/flink run ./examples/batch/WordCount.jar
  • 使用输入和结果文件的参数运行示例程序:

    1
    2
    ./bin/flink run ./examples/batch/WordCount.jar \
    --input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
  • 运行带有并行性的示例程序16以及输入和结果文件的参数:

    1
    2
    ./bin/flink run -p 16 ./examples/batch/WordCount.jar \
    --input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
  • 运行禁用flink log输出的示例程序:

    1
    ./bin/flink run -q ./examples/batch/WordCount.jar
  • 以分离模式运行示例程序:

    1
    ./bin/flink run -d ./examples/batch/WordCount.jar
  • 在特定的JobManager上运行示例程序:

    1
    2
    3
    ./bin/flink run -m myJMHost:8081 \
    ./examples/batch/WordCount.jar \
    --input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
  • 以特定类作为入口点运行示例程序:

    1
    2
    3
    ./bin/flink run -c org.apache.flink.examples.java.wordcount.WordCount \
    ./examples/batch/WordCount.jar \
    --input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
  • 使用具有2个TaskManagers 的每作业YARN群集运行示例程序:

    1
    2
    3
    ./bin/flink run -m yarn-cluster -yn 2 \
    ./examples/batch/WordCount.jar \
    --input hdfs:///user/hamlet.txt --output hdfs:///user/wordcount_out
  • 以Word为单位显示WordCount示例程序的优化执行计划:

    1
    2
    ./bin/flink info ./examples/batch/WordCount.jar \
    --input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
  • 列出计划和正在运行的作业(包括其JobID):

    1
    ./bin/flink list
  • 列出预定作业(包括其作业ID):

    1
    ./bin/flink list -s
  • 列出正在运行的作业(包括其作业ID):

    1
    ./bin/flink list -r
  • 列出所有现有工作(包括其作业ID):

    1
    ./bin/flink list -a
  • 列出在Flink YARN会话中运行Flink作业:

    1
    ./bin/flink list -m yarn-cluster -yid <yarnApplicationID> -r
  • 取消工作:

    1
    ./bin/flink cancel <jobID>
  • 使用保存点取消作业:

    1
    ./bin/flink cancel -s [targetDirectory] <jobID>
  • 停止工作(仅限流处理工作):

    1
    ./bin/flink stop <jobID>
  • 修改正在运行的作业(仅限流式处理作业):. / bin/flink modify -p

注意:取消和停止(流处理)作业的区别如下:

在取消呼叫中,作业中的算子立即接收cancel()方法调用以尽快取消它们。如果算子在取消呼叫后没有停止,Flink将开始定期中断线程,直到它停止。

“停止”呼叫是一种更优雅的方式来停止正在运行的流处理作业。Stop仅适用于使用实现StoppableFunction接口的源的作业。当用户请求停止作业时,所有源都将接收stop()方法调用。该工作将继续运行,直到所有资源正常关闭。这允许作业完成处理所有飞行数据。

保存点

保存点通过命令行客户端控制:

触发保存点

1
./bin/flink savepoint <jobId> [savepointDirectory]

这将触发具有ID的作业的保存点jobId,并返回创建的保存点的路径。您需要此路径来还原和部署保存点。

此外,您可以选择指定目标文件系统目录以存储保存点。该目录需要可由JobManager访问。

如果未指定目标目录,则需要配置默认目录。否则,触发保存点将失败。

使用YARN触发保存点

1
./bin/flink savepoint <jobId> [savepointDirectory] -yid <yarnAppId>

这将触发具有ID jobId和YARN应用程序ID 的作业的保存点yarnAppId,并返回创建的保存点的路径。

其他所有内容与上面触发保存点部分中描述的相同。

使用保存点取消

您可以自动触发保存点并取消作业。

1
./bin/flink cancel -s [savepointDirectory] <jobID>

如果未配置保存点目录,则需要为Flink安装配置默认保存点目录。

只有保存点成功,才会取消该作业。

恢复保存点

1
./bin/flink run -s <savepointPath> ...

run命令有一个保存点标志来提交作业,该作业从保存点恢复其状态。savepoint trigger命令返回保存点路径。

默认情况下,我们尝试将所有保存点状态与正在提交的作业进行匹配。如果要允许跳过无法使用新作业恢复的保存点状态,可以设置allowNonRestoredState标志。如果在触发保存点并且仍想使用保存点时从程序中删除了作为程序一部分的 算子,则需要允许此 算子操作。

1
./bin/flink run -s <savepointPath> -n ...

如果您的程序删除了属于保存点的 算子,这将非常有用。

配置保存点

1
./bin/flink savepoint -d <savepointPath>

在给定路径处理保存点。savepoint trigger命令返回保存点路径。

如果使用自定义状态实例(例如自定义还原状态或RocksDB状态),则必须指定触发保存点的程序JAR的路径,以便使用用户代码类加载器处理保存点:

1
./bin/flink savepoint -d <savepointPath> -j <jarFile>

否则,你会遇到一个ClassNotFoundException

用法

命令行语法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
./flink <ACTION> [OPTIONS] [ARGUMENTS]

The following actions are available:

Action "run" compiles and runs a program.

Syntax: run [OPTIONS] <jar-file> <arguments>
"run" action options:
-c,--class <classname> Class with the program entry point
("main" method or "getPlan()" method.
Only needed if the JAR file does not
specify the class in its manifest.
-C,--classpath <url> Adds a URL to each user code
classloader on all nodes in the
cluster. The paths must specify a
protocol (e.g. file://) and be
accessible on all nodes (e.g. by means
of a NFS share). You can use this
option multiple times for specifying
more than one URL. The protocol must
be supported by the {@link
java.net.URLClassLoader}.
-d,--detached If present, runs the job in detached
mode
-n,--allowNonRestoredState Allow to skip savepoint state that
cannot be restored. You need to allow
this if you removed an operator from
your program that was part of the
program when the savepoint was
triggered.
-p,--parallelism <parallelism> The parallelism with which to run the
program. Optional flag to override the
default value specified in the
configuration.
-q,--sysoutLogging If present, suppress logging output to
standard out.
-s,--fromSavepoint <savepointPath> Path to a savepoint to restore the job
from (for example
hdfs:///flink/savepoint-1537).
Options for yarn-cluster mode:
-d,--detached If present, runs the job in detached
mode
-m,--jobmanager <arg> Address of the JobManager (master) to
which to connect. Use this flag to
connect to a different JobManager than
the one specified in the
configuration.
-yD <property=value> use value for given property
-yd,--yarndetached If present, runs the job in detached
mode (deprecated; use non-YARN
specific option instead)
-yh,--yarnhelp Help for the Yarn session CLI.
-yid,--yarnapplicationId <arg> Attach to running YARN session
-yj,--yarnjar <arg> Path to Flink jar file
-yjm,--yarnjobManagerMemory <arg> Memory for JobManager Container
with optional unit (default: MB)
-yn,--yarncontainer <arg> Number of YARN container to allocate
(=Number of Task Managers)
-ynm,--yarnname <arg> Set a custom name for the application
on YARN
-yq,--yarnquery Display available YARN resources
(memory, cores)
-yqu,--yarnqueue <arg> Specify YARN queue.
-ys,--yarnslots <arg> Number of slots per TaskManager
-yst,--yarnstreaming Start Flink in streaming mode
-yt,--yarnship <arg> Ship files in the specified directory
(t for transfer)
-ytm,--yarntaskManagerMemory <arg> Memory per TaskManager Container
with optional unit (default: MB)
-yz,--yarnzookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths for high availability mode
-ynl,--yarnnodeLabel <arg> Specify YARN node label for
the YARN application
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths for high availability mode

Options for default mode:
-m,--jobmanager <arg> Address of the JobManager (master) to which
to connect. Use this flag to connect to a
different JobManager than the one specified
in the configuration.
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths
for high availability mode



Action "info" shows the optimized execution plan of the program (JSON).

Syntax: info [OPTIONS] <jar-file> <arguments>
"info" action options:
-c,--class <classname> Class with the program entry point ("main"
method or "getPlan()" method. Only needed
if the JAR file does not specify the class
in its manifest.
-p,--parallelism <parallelism> The parallelism with which to run the
program. Optional flag to override the
default value specified in the
configuration.


Action "list" lists running and scheduled programs.

Syntax: list [OPTIONS]
"list" action options:
-r,--running Show only running programs and their JobIDs
-s,--scheduled Show only scheduled programs and their JobIDs
Options for yarn-cluster mode:
-m,--jobmanager <arg> Address of the JobManager (master) to
which to connect. Use this flag to connect
to a different JobManager than the one
specified in the configuration.
-yid,--yarnapplicationId <arg> Attach to running YARN session
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths for high availability mode

Options for default mode:
-m,--jobmanager <arg> Address of the JobManager (master) to which
to connect. Use this flag to connect to a
different JobManager than the one specified
in the configuration.
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths
for high availability mode



Action "stop" stops a running program (streaming jobs only).

Syntax: stop [OPTIONS] <Job ID>
"stop" action options:

Options for yarn-cluster mode:
-m,--jobmanager <arg> Address of the JobManager (master) to
which to connect. Use this flag to connect
to a different JobManager than the one
specified in the configuration.
-yid,--yarnapplicationId <arg> Attach to running YARN session
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths for high availability mode

Options for default mode:
-m,--jobmanager <arg> Address of the JobManager (master) to which
to connect. Use this flag to connect to a
different JobManager than the one specified
in the configuration.
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths
for high availability mode



Action "cancel" cancels a running program.

Syntax: cancel [OPTIONS] <Job ID>
"cancel" action options:
-s,--withSavepoint <targetDirectory> Trigger savepoint and cancel job.
The target directory is optional. If
no directory is specified, the
configured default directory
(state.savepoints.dir) is used.
Options for yarn-cluster mode:
-m,--jobmanager <arg> Address of the JobManager (master) to
which to connect. Use this flag to connect
to a different JobManager than the one
specified in the configuration.
-yid,--yarnapplicationId <arg> Attach to running YARN session
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths for high availability mode

Options for default mode:
-m,--jobmanager <arg> Address of the JobManager (master) to which
to connect. Use this flag to connect to a
different JobManager than the one specified
in the configuration.
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths
for high availability mode



Action "savepoint" triggers savepoints for a running job or disposes existing ones.

Syntax: savepoint [OPTIONS] <Job ID> [<target directory>]
"savepoint" action options:
-d,--dispose <arg> Path of savepoint to dispose.
-j,--jarfile <jarfile> Flink program JAR file.
Options for yarn-cluster mode:
-m,--jobmanager <arg> Address of the JobManager (master) to
which to connect. Use this flag to connect
to a different JobManager than the one
specified in the configuration.
-yid,--yarnapplicationId <arg> Attach to running YARN session
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths for high availability mode

Options for default mode:
-m,--jobmanager <arg> Address of the JobManager (master) to which
to connect. Use this flag to connect to a
different JobManager than the one specified
in the configuration.
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths
for high availability mode



Action "modify" modifies a running job (e.g. change of parallelism).

Syntax: modify <Job ID> [OPTIONS]
"modify" action options:
-h,--help Show the help message for the CLI
Frontend or the action.
-p,--parallelism <newParallelism> New parallelism for the specified job.
-v,--verbose This option is deprecated.
Options for yarn-cluster mode:
-m,--jobmanager <arg> Address of the JobManager (master) to
which to connect. Use this flag to connect
to a different JobManager than the one
specified in the configuration.
-yid,--yarnapplicationId <arg> Attach to running YARN session
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths for high availability mode

Options for default mode:
-m,--jobmanager <arg> Address of the JobManager (master) to which
to connect. Use this flag to connect to a
different JobManager than the one specified
in the configuration.
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths
for high availability mode