Flink批处理示例(java和Scala)
本文于2067天之前发表,文中内容可能已经过时。
以下示例程序展示了Flink的不同应用程序,从简单的字数统计到图形算法。代码示例说明了Flink的DataSet API的使用。
可以在Flink源存储库的flink-examples-batch模块中找到以下和更多示例的完整源代码。
运行一个例子
为了运行Flink示例,我们假设您有一个正在运行的Flink实例。导航中的“快速入门”和“设置”选项卡描述了启动Flink的各种方法。
最简单的方法是运行./bin/start-cluster.sh
,默认情况下启动一个带有一个JobManager和一个TaskManager的本地集群。
Flink的每个二进制版本都包含一个examples
目录,其中包含此页面上每个示例的jar文件。
要运行WordCount示例,请发出以下命令:
1 | ./bin/flink run ./examples/batch/WordCount.jar |
其他示例可以以类似的方式启动。
请注意,通过使用内置数据,许多示例在不传递任何参数的情况下运行。要使用实际数据运行WordCount,您必须将路径传递给数据:
1 | ./bin/flink run ./examples/batch/WordCount.jar --input /path/to/some/text/data --output /path/to/result |
请注意,非本地文件系统需要模式前缀,例如hdfs://
。
字数
WordCount是大数据处理系统的“Hello World”。它计算文本集合中单词的频率。该算法分两步进行:首先,文本将文本分成单个单词。其次,对单词进行分组和计数。
- Java
1 | ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); |
- Scala
1 | val env = ExecutionEnvironment.getExecutionEnvironment |
该字计数示例实现上述算法的输入参数:--input <path> --output <path>
。作为测试数据,任何文本文件都可以。
网页排名
PageRank算法计算链接定义的图形中页面的“重要性”,链接指向一个页面到另一个页面。它是一种迭代图算法,这意味着它重复应用相同的计算。在每次迭代中,每个页面在其所有邻居上分配其当前等级,并将其新等级计算为从其邻居接收的等级的纳税总和。PageRank算法由Google搜索引擎推广,该搜索引擎利用网页的重要性对搜索查询的结果进行排名。
在这个简单的例子中,PageRank通过批量迭代和固定数量的迭代来实现。
- Java
1 | ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); |
- Scala
1 | // User-defined types |
所述的PageRank程序实现上述实施例。它需要运行以下参数:--pages <path> --links <path> --output <path> --numPages <n> --iterations <n>
。
输入文件是纯文本文件,必须格式如下:
- 页面表示为由新行字符分隔的(长)ID。
- 例如,
"1\n2\n12\n42\n63\n"
给出五个页面ID为1,2,12,42和63的页面。
- 例如,
- 链接表示为由空格字符分隔的页面ID对。链接由换行符分隔:
- 例如,
"1 2\n2 12\n1 12\n42 63\n"
给出四个(定向)链接(1) - >(2),(2) - >(12),(1) - >(12)和(42) - >(63)。
- 例如,
对于这个简单的实现,要求每个页面至少有一个传入链接和一个传出链接(页面可以指向自身)。
连接组件
连通分量算法通过为同一连接部分中的所有顶点分配相同的组件ID来识别较大图形的部分。与PageRank类似,Connected Components是一种迭代算法。在每个步骤中,每个顶点将其当前组件ID传播到其所有邻居。如果顶点小于其自己的组件ID,则顶点接受来自邻居的组件ID。
此实现使用增量迭代:未更改其组件ID的顶点不参与下一步。这会产生更好的性能,因为后面的迭代通常只处理一些异常值顶点。
- Java
1 | // read vertex and edge data |
- Scala
1 | // set up execution environment |
该ConnectedComponents程序实现上述实施例。它需要运行以下参数:--vertices <path> --edges <path> --output <path> --iterations <n>
。
输入文件是纯文本文件,必须格式如下:
- 顶点表示为ID并用换行符分隔。
- 例如,
"1\n2\n12\n42\n63\n"
给出五个顶点(1),(2),(12),(42)和(63)。
- 例如,
- 边缘表示为由空格字符分隔的顶点ID的对。边线由换行符分隔:
- 例如,
"1 2\n2 12\n1 12\n42 63\n"
给出四个(无向)链路(1) - (2),(2) - (12),(1) - (12)和(42) - (63)。
- 例如,