Spark累加器与广播变量
本文于2077天之前发表,文中内容可能已经过时。
一、简介
在Spark中,提供了两种类型的共享变量:累加器(accumulator)与广播变量(broadcast variable):
- 累加器:用来对信息进行聚合,主要用于累计计数等场景;
- 广播变量:主要用于在节点间高效分发大对象。
二、累加器
这里先看一个具体的场景,对于正常的累计求和,如果在集群模式中使用下面的代码进行计算,会发现执行结果并非预期:
1 | var counter = 0 |
counter最后的结果是0,导致这个问题的主要原因是闭包。

2.1 理解闭包
1. Scala中闭包的概念
这里先介绍一下Scala中关于闭包的概念:
1 | var more = 10 |
如上函数addMore
中有两个变量x和more:
- x : 是一个绑定变量(bound variable),因为其是该函数的入参,在函数的上下文中有明确的定义;
- more : 是一个自由变量(free variable),因为函数字面量本生并没有给more赋予任何含义。
按照定义:在创建函数时,如果需要捕获自由变量,那么包含指向被捕获变量的引用的函数就被称为闭包函数。
2. Spark中的闭包
在实际计算时,Spark会将对RDD操作分解为Task,Task运行在Worker Node上。在执行之前,Spark会对任务进行闭包,如果闭包内涉及到自由变量,则程序会进行拷贝,并将副本变量放在闭包中,之后闭包被序列化并发送给每个执行者。因此,当在foreach函数中引用counter
时,它将不再是Driver节点上的counter
,而是闭包中的副本counter
,默认情况下,副本counter
更新后的值不会回传到Driver,所以counter
的最终值仍然为零。
需要注意的是:在Local模式下,有可能执行foreach
的Worker Node与Diver处在相同的JVM,并引用相同的原始counter
,这时候更新可能是正确的,但是在集群模式下一定不正确。所以在遇到此类问题时应优先使用累加器。
累加器的原理实际上很简单:就是将每个副本变量的最终值传回Driver,由Driver聚合后得到最终值,并更新原始变量。

2.2 使用累加器
SparkContext
中定义了所有创建累加器的方法,需要注意的是:被中横线划掉的累加器方法在Spark 2.0.0之后被标识为废弃。

使用示例和执行结果分别如下:
1 | val data = Array(1, 2, 3, 4, 5) |

三、广播变量
在上面介绍中闭包的过程中我们说道每个Task任务的闭包都会持有自由变量的副本,如果变量很大且Task任务很多的情况下,这必然会对网络IO造成压力,为了解决这个情况,Spark提供了广播变量。
广播变量的做法很简单:就是不把副本变量分发到每个Task中,而是将其分发到每个Executor,Executor中的所有Task共享一个副本变量。
1 | // 把一个数组定义为一个广播变量 |