Spark官方文档-Spark编程指南(第二节)

Spark官方文档-Spark编程指南(第二节)

弹性分布式数据集(RDDs)

Spark是围绕着弹性分布式数据集(RDD)来解决问题的,RDD是一个可容错的并且可以并行操作的元素集合。有两种创建RDDs的方式:并行化一个已经存在于你的驱动程序中的集合,或者引用一个存在于外部存储系统中的数据集,例如共享文件系统,HDFS,HBase,或者任何可以提供Hadoop InputFormat的数据源。

并行化集合

并行化集合是通过在一个存在于你的驱动程序中的集合(一个Scala Seq)上调用SparkContext的并行化方法来创建的。集合的元素被复制来生成一个可以并行操作的分布式数据集。例如,下面是如何创建一个具有从数字1到5的并行化集合的方法:

val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)

分布式数据集(distData)一旦创建就可以被并行操作了。例如,我们可以通过调用distData.reduce((a, b) => a + b)来将数组的元素加起来。我们稍后会介绍在分布式数据集上的操作。

对于并行集合来说一个重要的参数是将数据集切分成多少个partition。Spark将会为集群的每个partition运行一个任务。一般都会为集群中的每个CPU分配2-4个partition。正常情况,Spark会尝试基于你的集群来自动设置partition的个数。然而,你也可以通过给parallelize传入第二个参数人为设置partition的个数(例如sc.parallelize(data, 10))。注意:代码中的一些地方使用术语slices(partitions的同义词) 来保持后续兼容。

外部数据集

Spark可以从Hadoop支持的任何存储源创建分布式数据集,包括本地文件系统,HDFS,Cassandra,HBase,Amazon S3等等。Spark支持文本文件,SequenceFiles和任何其他Hadoop InputFormat

Text file RDDs可以使用SparkContext的方法textFile来创建。这个方法以文件的URI(或者是机器上的本地路径,或者是hdfs://,s3n://等等)为参数,并且读取文件生成文本行的集合。下面是一个例子:

scala> val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26

一经创建,对distFile就可以执行数据集操作了。例如,我们可以像下面这样使用map和reduce操作将所有行的单词个数加起来:distFile.map(s => s.length).reduce((a, b) => a + b)

使用Spark读取文件的一些注意事项:

  • 如果使用一个本地文件系统上的路径,则该文件必须也可以在worker节点上通过相同的路径被访问。或者将该文件拷贝到所有worker上,或者使用网络挂载的共享文件系统。

  • Spark所有的基于文件的输入方法,包括textFile,都支持目录,压缩文件和通配符。例如,你可以使用textFile("/my/directory")textFile("/my/directory/*.txt")textFile("/my/directory/*.gz")

  • 还可以给方法textFile传入一个可选的第二个参数,用来控制文件的partition个数。默认地,Spark会为文件的每个block创建一个partition(HDFS的block大小默认128MB),但是你也可以通过传入一个更大的值来要求创建更多的partition。注意你不能创建比block个数少的partition。

除了文本文件,Spark的Scala API还支持一些其他的数据格式:

  • SparkContext.wholeTextFiles支持读取一个包含多个小文本文件的目录,并且针对每个文件返回(文件名,文件内容)对。这与textFile是相反的,textFile会为每个文件中的每一行返回一条记录。根据数据位置切分partition,在某些情况下,可能会导致创建的partition太少。针对这些情况,wholeTextFiles提供了一个可选的第二参数来控制partition的最小个数。

  • 针对SequenceFiles,使用SparkContext的sequenceFile[K, V]方法,其中K和V是文件中key和value的类型。这些应该是Hadoop的Writable接口的子类,例如IntWritableText。另外,Spark允许你为一些常用的Writables指定native类型;例如,sequenceFile[Int, String]将会自动读取IntWritables和Texts。

  • 对于其他Hadoop的InputFormats,你可以使用方法SparkContext.hadoopRDD,需要给该方法传入一个任意的JobConf以及输入格式类,包括key类和value类。这与为一个Hadoop job设置输入源是相同的。你也可以为InputFormats使用方法SparkContext.newAPIHadoopRDD,该方法是基于新的MapReduce API (org.apache.hadoop.mapreduce)。

  • RDD.saveAsObjectFile和SparkContext.objectFile支持以一种包含序列化Java对象的简单格式来保存一个RDD。虽然这种格式不能像Avro这些专门的格式那样高效,但是它提供了一种保存RDD的简单方法。

上一篇:Spark官方文档-Spark编程指南(第一节)

下一篇:Spark官方文档-Spark编程指南(第三节)

参考资料

本文译自Spark Programming Guide

发表评论

电子邮件地址不会被公开。 必填项已用*标注