Spark官方文档-快速入门

Spark官方文档-快速入门

本文为使用Spark提供了简明介绍。我们首先会通过Spark的交互shell(使用Python或者Scala语言)介绍API,然后展示如何用Java,Scala和Python写应用程序。更加完整的介绍请看编程指南

为了能跟随本文进行实践,首先需要搭建Spark运行环境(可以参考搭建Spark单节点本地运行环境)。由于我们不会用到HDFS,你可以下载任何版本的Hadoop包。

使用Spark Shell进行交互的分析

基础

Spark的shell提供了一种学习API的简单方式,也是一种交互地分析数据的强大工具。既适用于Scala(Scala运行在Java虚拟机上,因此可以很好的使用已有的Java库),也适用于Python。在Spark目录下执行如下命令来启动shell:

./bin/spark-shell

Spark的基本抽象是叫做RDD的弹性分布式数据集合。RDDs可以从Hadoop InputFormats(例如HDFS文件)创建或者从其它RDDs转变而来。让我们从位于Spark目录的README文件创建一个新的RDD:

scala> val textFile = sc.textFile("README.md")
textFile: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:25

RDDs具有可以返回values的actions,以及返回指向新RDDs的指针的transformations。首先从一些actions开始:

scala> textFile.count() // Number of items in this RDD
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs

scala> textFile.first() // First item in this RDD
res1: String = # Apache Spark

现在让我们使用一个transformation。我们将会用filter transformation来返回一个新的RDD,这个新的RDD是文件中的数据子集。

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:27

我们可以将transformations和actions链到一起:

scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15

关于RDD的更多操作

RDD actions和transformations可以被用在更加复杂的计算中。例如我们想找到单词数最多的一行:

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15

首先将一行映射为一个整数值,创建了一个新的RDD。在这个新RDD之上调用reduce,来找到最大值。map和reduce的参数是Scala函数,这里的参数可以使用任何语言特征或者Scala/Java库。例如,我们可以简单地调用其它地方声明的函数。我们将会使用函数Math.max()使得代码更容易理解:

scala> import java.lang.Math
import java.lang.Math

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15

MapReduce是一种通用的数据流模式,通过Hadoop普及开来。Spark可以很容易地实现MapReduce流:

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[8] at reduceByKey at <console>:28

这里,我们将flatMap,map和reduceByKey transformations组合在一起作为(String, Int) pairs的RDD,来计算文件中每个单词的个数。为了在shell中收集单词个数,我们可以使用collect action:

scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)

缓存

Spark也支持将数据集合拉进集群范围内的缓存中。当数据被重复访问时,这很有用,例如当查询一小部分热点数据或者运行一个类似PageRank的迭代算法时。作为一个简单的例子,让我们将linesWithSpark数据集合标记为缓存的:

scala> linesWithSpark.cache()
res7: linesWithSpark.type = MapPartitionsRDD[2] at filter at <console>:27

scala> linesWithSpark.count()
res8: Long = 15

scala> linesWithSpark.count()
res9: Long = 15

使用Spark探索并缓存一个100行的文本文件可能看起来很傻。有趣的是这些函数同样可以应用到非常大的数据集合,即使它们是分布在几十甚至几百个节点上的。你也可以通过将bin/spark-shell连接到集群上做这种交互,正如编程指南上所描述的那样。

独立的应用程序

假如我们想使用Spark API写一个独立的应用程序。我们将会学习一个用Scala(使用sbt管理项目),Java(使用Maven管理项目)和Python编写的简单的应用程序。

我们将会创建一个非常简单的Spark应用程序,将其命名为SimpleApp.scala:

/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println(s"Lines with a: $numAs, Lines with b: $numBs")
    sc.stop()
  }
}

注意应用程序应该定义一个方法main(),而不是扩展scala.Appscala.App的子类可能无法正确工作。

这个程序只是统计Spark目录下文件README.md含有’a’的行数以及含有’b’的行数。注意你需要将YOUR_SPARK_HOME替换成Spark安装的位置路径。和之前使用Spark shell的例子不同,这里我们需要在程序中初始化一个SparkContext

我们给SparkContext构造函数传递了一个SparkConf对象,SparkConf对象中包含了我们的应用程序的信息。

我们的应用程序依赖Spark API,所以需要一个sbt配置文件build.sbt。这个文件还增加了Spark依赖的一个库:

name := "Simple Project"

version := "1.0"

scalaVersion := "2.11.7"

libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.1"

为了使sbt工作正常,我们需要将SimpleApp.scala和build.sbt放到正确的目录结构中。一旦放好之后,我们就可以创建一个包含了应用程序代码的JAR包,然后使用脚本spark-submit运行我们的程序。

# Your directory layout should look like this
$ find .
.
./build.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala

# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.11/simple-project_2.11-1.0.jar

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "SimpleApp" \
  --master local[4] \
  target/scala-2.11/simple-project_2.11-1.0.jar
...
Lines with a: 46, Lines with b: 23

下一步学习什么

恭喜运行了你的第一个Spark应用程序!

为了深入地学习API,从Spark编程指南开始学习,或者查看编程指南菜单学习其它组成部分。

为了在集群上运行应用程序,可以学习集群模式

最后,Spark在目录examples中提供了一些例子(Scala, Java, Python, R)。你可以按如下方式运行它们:

# For Scala and Java, use run-example:
./bin/run-example SparkPi

# For Python examples, use spark-submit directly:
./bin/spark-submit examples/src/main/python/pi.py

# For R examples, use spark-submit directly:
./bin/spark-submit examples/src/main/r/dataframe.R

参考资料

本文译自Quick Start

发表评论

电子邮件地址不会被公开。 必填项已用*标注