Spark官方文档-Spark编程指南(第七节)

Spark官方文档-Spark编程指南(第七节)

共享变量

一般地,当一个函数被传递给一个在远程集群节点上执行的Spark操作时,在函数中用到的所有变量都是以独立拷贝的形式被使用。这些变量被拷贝到各个机器上,并且在远程机器上对变量的更新不会被传递回驱动程序。在任务之间读写通常支持的共享变量是很低效的。然而,Spark为两种常见的使用模式提供了共享变量的两种限制类型:broadcast变量和accumulators。

Broadcast变量

Broadcast变量允许我们将一个只读变量保存在各个机器的缓存中,而不是用任务发送它的拷贝。例如可以用broadcast变量以一种高效的方式将大量输入数据集的拷贝发送给每个节点。Spark也会尝试使用高效的broadcast算法来分发broadcast变量,以减少通信开销。

Spark的actions是通过一系列阶段来执行的,这些阶段是被分布式的“shuffle”操作划分开的。Spark会自动广播被各个阶段内的任务所需要的共用数据。用这种方式广播的数据是按序列化形式缓存的,并且在运行各个任务之前被反序列化。这意味着只有当跨越多个阶段的任务需要相同的数据或者当以反序列化形式缓存数据很重要时,才有必要显式地创建broadcast变量。

通过调用SparkContext.broadcast(v)从一个变量v创建broadcast变量。Broadcast变量是对v的一层包装,它的值可以通过调用方法value来访问。下面的代码展示了这一点:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

在创建了broadcast变量之后,就应该在运行在集群上的所有函数中用broadcast变量代替值v,这样v被发送给节点的次数就不会超过一次。另外,对象v在被广播之后就不应该被修改了,以确保所有节点得到的这个broadcast变量的值都是相同的(例如该变量稍后被发送给一个新节点)。

Accumulators

Accumulators是只能通过支持结合律和交换律的操作来“增加”的变量,因此它可以支持高效的并发操作。Accumulators可以被用来实现counters(就像在MapReduce中的)或者sums。Spark原生地支持数字类型的accumulators,大家可以为新类型添加accumulators。

作为用户,你可以创建命名的或者未命名的accumulators。如下面的图片所示,一个命名的accumulator(在这个例子中是counter)将会把那些修改了accumulator的阶段展现在web用户界面上。Spark会把在“Tasks”表格中的各个任务修改的accumulator的值展现出来。

enter image description here

在用户界面跟踪accumulators对于理解运行各个阶段的过程很有帮助(注意:这个功能在Python中还没有支持)。

一个数字类型的accumulator可以通过调用SparkContext.longAccumulator()或者SparkContext.doubleAccumulator()来创建,它们分别计算Long或者Double类型的值。然后运行在集群上的任务就可以用方法add来增加到它上面。然而,这些任务无法读取它的值。只有驱动程序可以用方法value来读取accumulator的值。

下面的代码展示了一个用来加和一个数组的各个元素的accumulator:

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Long = 10

上面的代码使用的是内置支持的Long类型accumulators,我们也可以通过创建AccumulatorV2的子类来创建自己的类型。AccumulatorV2抽象类有几个方法是必须要覆盖的:将accumulator重置为0的reset,将另一个值加到这个accumulator上的add,将另一个相同类型的accumulator合并进这个accumulator的merge。其它必须覆盖的方法请查阅API文档。例如,假如我们有一个表示数字的vectors的MyVector类,可以这样写:

class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {

  private val myVector: MyVector = MyVector.createZeroVector

  def reset(): Unit = {
    myVector.reset()
  }

  def add(v: MyVector): Unit = {
    myVector.add(v)
  }
  ...
}

// Then, create an Accumulator of this type:
val myVectorAcc = new VectorAccumulatorV2
// Then, register it into spark context:
sc.register(myVectorAcc, "MyVectorAcc1")

请注意当我们定义自己的AccumulatorV2类型时,结果的类型可以与做加和的元素的类型不同。

对于只在actions中执行更新accumulator的情况,Spark保证每个任务对accumulator的更新都只会执行一次,也就是说重启任务不会更新accumulator的值。在transformations中,用户应该知道如果任务或者job的阶段被重新执行,则每个任务的更新操作可能会被执行多次。

Accumulators不会改变Spark的懒惰求值模式。如果accumulators在一个RDD的一个操作中被更新,则只有当那个RDD作为一个action的一部分被计算时它们的值才会被更新。因此,当在一个像map()这种懒惰的transformation中更新accumulator时,是无法保证更新能被执行的。下面的代码片段证明了这种特性:

val accum = sc.longAccumulator
data.map { x => accum.add(x); x }
// Here, accum is still 0 because no actions have caused the map operation to be computed.

上一篇:Spark官方文档-Spark编程指南(第六节)

下一篇:Spark官方文档-Spark编程指南(第八节)

参考资料

本文译自Spark Programming Guide

发表评论

电子邮件地址不会被公开。 必填项已用*标注