Spark官方文档-Spark编程指南(第三节)

Spark官方文档-Spark编程指南(第三节)

RDD操作

RDDs支持两种类型的操作:transformations,从一个已有的RDD创建出一个新的数据集;actions,针对数据集运行一种计算之后给驱动程序返回一个值。例如,map是一个transformation,它将数据集的每个元素传递给一个函数,并返回一个表示结果的新的RDD。与之相反,reduce是一个action,它使用某种函数将RDD的所有元素聚合起来,并将最终结果返回给驱动程序(然而还有一个并行的reduceByKey,它返回一个分布式数据集)。

Spark中的所有transformations都是懒惰的,它们不会立刻计算结果。相反,它们只是记下应用到一些基本数据集(例如一个文件)的transformations。只有当action需要给驱动程序返回结果时,这些transformations才会计算。这种设计使得Spark运行地更加高效。例如,一个通过map创建的数据集,它将会被reduce使用,并只把reduce的结果返回给驱动程序,而不是返回更大的mapped数据集。

默认地,你每次针对一个transformed RDD运行一个action时可能都会重新计算一次。然而,你也可以用方法persist(或者cache)将RDD持久化在内存中,这样Spark就会将RDD元素保存在集群中,下次当你请求它时就可以更快地访问了。Spark还支持将RDDs持久化到磁盘,或者跨多个节点备份。

基础

为了说明RDD的基础知识,考虑如下这个简单的程序:

val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)

第一行代码从一个外部文件定义了一个基本的RDD。这个数据集并没有加载到内存:lines只是一个指向文件的指针。第二行代码将lineLengths定义为map这种transformation的结果。同样地,由于懒惰特性,lineLengths也不会立刻计算。最后,我们执行redcue这种action。此时Spark会将计算划分成在不同机器上运行的多个任务,每个机器运行map的一部分以及一个局部reduction,最后只将结果返回给驱动程序。

如果想以后重复使用lineLengths,可以在reduce之前加上如下代码:

lineLengths.persist()

这样就会使lineLengths在第一次计算出来之后,保存到内存中。

给Spark传递函数

Spark的API非常依赖给在集群上运行的驱动程序传递函数。有两种推荐的传递函数的方法:

  • 匿名函数语法,这种方法可用于代码量比较小的情况。
  • 在一个全局单实例对象中的静态方法。

例如,你可以像下面这样定义对象MyFunctions,然后传递MyFunctions.func1:

object MyFunctions {
  def func1(s: String): String = { ... }
}

myRdd.map(MyFunctions.func1)

注意虽然也可以传递对一个类的实例(与单实例对象正相反)中的方法的引用,但是要求传递的对象包含连同该方法一起的类。例如,考虑如下类:

class MyClass {
  def func1(s: String): String = { ... }
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}

如果创建一个新的MyClass实例,并且调用doStuff,doStuff中的map引用了该MyClass实例中的方法func1,因此整个对象都需要被发送给集群。这与rdd.map(x => this.func1(x))是相似的。

类似地,访问外部对象的字段将会引用整个对象:

class MyClass {
  val field = "Hello"
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}

上面的程序等价于rdd.map(x => this.field + x),引用了整个this。为了避免这个问题,最简单的方法是把字段拷贝到局部变量中,而不是从外部访问它:

def doStuff(rdd: RDD[String]): RDD[String] = {
  val field_ = this.field
  rdd.map(x => field_ + x)
}

上一篇:Spark官方文档-Spark编程指南(第二节)

下一篇:Spark官方文档-Spark编程指南(第四节)

参考资料

本文译自Spark Programming Guide

发表评论

电子邮件地址不会被公开。 必填项已用*标注