Scala-Flink> val text = benv.fromElements( "To be, or not to be,--that is the question:--", "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune", "Or to take arms against a sea of troubles,") Scala-Flink> val counts = text .flatMap { _.toLowerCase.split("\\W+") } .map { (_, 1) }.groupBy(0).sum(1) Scala-Flink> counts.print()
print()命令将自动将指定的任务发送到JobManager执行,并在终端中显示计算结果。
可以将结果写入文件。但是,在这种情况下,您需要调用execute,以运行您的程序:
1
Scala-Flink> benv.execute("MyProgram")
DataStream API
与上面的批处理程序类似,我们可以通过DataStream API执行流程序:
1 2 3 4 5 6 7 8 9 10
Scala-Flink> val textStreaming = senv.fromElements( "To be, or not to be,--that is the question:--", "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune", "Or to take arms against a sea of troubles,") Scala-Flink> val countsStreaming = textStreaming .flatMap { _.toLowerCase.split("\\W+") } .map { (_, 1) }.keyBy(0).sum(1) Scala-Flink> countsStreaming.print() Scala-Flink> senv.execute("Streaming Wordcount")
Command: local [options] Starts Flink scala shellwithalocal Flink cluster -a <path/to/jar> | --addclasspath <path/to/jar> Specifies additional jars to be used in Flink Command: remote [options] <host> <port> Starts Flink scala shell connecting toa remote cluster <host> Remote host name asstring <port> Remote port asinteger
-a <path/to/jar> | --addclasspath <path/to/jar> Specifies additional jars to be used in Flink Command: yarn [options] Starts Flink scala shell connecting toa yarn cluster -n arg | --container arg Number of YARN container to allocate (= Number of TaskManagers) -jm arg | --jobManagerMemory arg Memory for JobManager container with optional unit (default: MB) -nm <value> | --name <value> Set a custom name forthe application onYARN -qu <arg> | --queue <arg> Specifies YARN queue -s <arg> | --slots <arg> Number of slots per TaskManager -tm <arg> | --taskManagerMemory <arg> Memory per TaskManager container with optional unit (default: MB) -a <path/to/jar> | --addclasspath <path/to/jar> Specifies additional jars to be used in Flink --configDir <value> The configuration directory. -h | --help Prints this usage text