Spark官方文档-Spark编程指南(第四节)

Spark官方文档-Spark编程指南(第四节)

理解闭包(closures)

关于Spark最难的一个问题是理解当在集群上执行代码时,变量和方法的作用域和生命周期。变量在其作用域之外被RDD操作修改是一个常见的问题。下面的例子我们将会看到使用foreach()来增加一个计数器的代码,不过其他操作也会出现类似的问题。

示例

考虑下面这个幼稚的RDD元素加和程序,它会依赖是否在相同的JVM上执行而表现出不同的行为。一个常见的例子是分别以本地模式(–master = local[n])运行Spark和将Spark应用部署到集群上(例如通过spark-submit部署到YARN):

var counter = 0
var rdd = sc.parallelize(data)

// Wrong: Don't do this!!
rdd.foreach(x => counter += x)

println("Counter value: " + counter)

本地模式 vs. 集群模式

上述代码的行为是未定义的,可能不会按照预想的工作。为了执行jobs,Spark将对RDD操作的处理分解成多个任务,每个任务都由一个executor执行。执行任务之前,Spark会先计算任务的闭包。闭包就是对于executor来说必须可见的那些变量和方法,从而在RDD上执行相关的计算(这个例子中就是foreach())。 闭包会被序列化并发送给每个executor。

被发送给各个executor的在闭包内的变量都是复制品,因此在函数foreach内引用的counter不再是在驱动节点上的counter了。在驱动节点的内存里仍然存在一个counter,但是它对executors不再可见!executors只能看到来自序列化闭包的拷贝。由于所有操作的counter都引用的是序列化的闭包内的值,因此counter的最终值仍然还是0,

在本地模式中,某些情况下函数foreach将会与驱动程序在同一个JVM内执行,并会引用相同的原始counter,这就可能会更新counter的值。

为了确保这类情况的行为是定义明确的,我们应该使用一个Accumulator。当任务被划分到一个集群中的worker节点上时,Spark中的Accumulator被用来提供一种可以安全更新一个变量的机制。本指南的Accumulators章节会更加详细地讨论这些。

一般而言,闭包-像循环或者局部定义的方法这些结构,不应该用来修改一些全局状态。Spark没有定义或者保证修改引用闭包外面的对象的行为。在本地模式下一些代码这么做可能是有效的,但那只是巧合,并且在分布式模式下它表现的就不会符合预期了。如果需要某种全局的聚合就要使用一个Accumulator。

打印一个RDD的元素

另外一个常见的用法是使用rdd.foreach(println)或者rdd.map(println)尝试打印一个RDD的元素。在单个机器上,这将会生成符合预期的产出,并打印RDD的所有元素。然而,在集群模式下,由executors调用的到stdout的输出就会写到executor的stdout,而不是写到驱动的stdout,所以在驱动上的stdout将不会展示这些输出!为了在驱动上打印所有元素,可以使用collect()方法先将RDD放到驱动节点上:rdd.collect().foreach(println)。由于collect()会将整个RDD放到一个机器上,这样可能会导致驱动内存溢出;如果你只需要打印RDD中的一些元素,一个更安全的方法是使用take():rdd.take(100).foreach(println)

操作Key-Value对

尽管大部分Spark操作对于包含任何类型的对象的RDDs都是有效的,但是有一些特殊的操作只对元素为key-value对的RDDs有效。最常见的一种是分布式的“shuffle”操作,例如通过key对元素进行分组或者聚合。

在Scala中,这些操作对于包含Tuple2对象(语言中内置的元组,通过写作(a, b)就可以创建)的RDDs是自动有效的。在PairRDDFunctions类中,key-value对操作是有效的,该类会自动将由元组组成的RDD封装起来。

例如,下面对key-value对使用reduceByKey操作来统计文件中各行文本出现次数的代码:

val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)

我们还可以使用counts.sortByKey()对key-value对按字母顺序排列,最后用counts.collect()将它们以对象数组的形式带回到驱动程序。

注意:当在key-value对操作中使用自定义对象作为key时,必须确保有一个自定义的方法equals(),并伴随有一个相匹配的方法hashCode()。想了解完整的细节,请查看Object.hashCode()文档。

上一篇:Spark官方文档-Spark编程指南(第三节)

下一篇:Spark官方文档-Spark编程指南(第五节)

参考资料

本文译自Spark Programming Guide

发表评论

电子邮件地址不会被公开。 必填项已用*标注