Scala-Flink> val text = benv.fromElements( "To be, or not to be,--that is the question:--", "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune", "Or to take arms against a sea of troubles,") Scala-Flink> val counts = text .flatMap { _.toLowerCase.split("\\W+") } .map { (_, 1) }.groupBy(0).sum(1) Scala-Flink> counts.print()
print()命令将自动将指定的任务发送到JobManager执行,并在终端中显示计算结果。
可以将结果写入文件。但是,在这种情况下,您需要调用execute,以运行您的程序:
1
Scala-Flink> benv.execute("MyProgram")
DataStream API
与上面的批处理程序类似,我们可以通过DataStream API执行流程序:
1 2 3 4 5 6 7 8 9 10
Scala-Flink> val textStreaming = senv.fromElements( "To be, or not to be,--that is the question:--", "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune", "Or to take arms against a sea of troubles,") Scala-Flink> val countsStreaming = textStreaming .flatMap { _.toLowerCase.split("\\W+") } .map { (_, 1) }.keyBy(0).sum(1) Scala-Flink> countsStreaming.print() Scala-Flink> senv.execute("Streaming Wordcount")
Command: local [options] Starts Flink scala shell with a local Flink cluster -a <path/to/jar> | --addclasspath <path/to/jar> Specifies additional jars to be used in Flink Command: remote [options] <host> <port> Starts Flink scala shell connecting to a remote cluster <host> Remote host nameasstring <port> Remote port asinteger
-a <path/to/jar> | --addclasspath <path/to/jar> Specifies additional jars to be used in Flink Command: yarn [options] Starts Flink scala shell connecting to a yarn cluster -n arg | --container arg Numberof YARN containertoallocate (= Numberof TaskManagers) -jm arg | --jobManagerMemory arg Memoryfor JobManager containerwith optional unit (default: MB) -nm <value> | --name <value> Set a custom namefor the application on YARN -qu <arg> | --queue <arg> Specifies YARN queue -s <arg> | --slots <arg> Numberof slots per TaskManager -tm <arg> | --taskManagerMemory <arg> Memory per TaskManager containerwith optional unit (default: MB) -a <path/to/jar> | --addclasspath <path/to/jar> Specifies additional jars to be used in Flink --configDir <value> The configuration directory. -h | --help Prints this usagetext