Spark与Scala
在学习Spark之前务必对Scala有所理解,否则面对完全陌生的语法是很痛苦的。
Scala的一种入门方式是:
- 学习Scala 函数式程序设计原理。这是Scala作者自己开的课程。没什么比语言作者更加能理解这门语言的了,是切入Scala编程的最好入门方式。课程习题参考了《计算机程序的构造和解释》一书,非常经典。
- 阅读《Scala in depth》一书,对一些Scala的重点概念有更加详细的讨论。
- 根据特定的topic,Google各种网络资料。
RDD (Resilient Distributed Datasets)
RDD的含义
RDD是spark中用于记录数据的数据结构。根据具体的RDD类型,数据有不同的组织形式。一个RDD包含多个partition,partition是并行的基本单位。RDD可能存在内存中,也可能存在硬盘里,或者两者皆有。一个RDD可以由数据源创建,也可能由其它RDD计算得到,所有参与计算RDD的RDD称为父RDD。若对mapreduce有所了解,可以把partition看作mapper的一个split。
RDD中的窄依赖(Narrow Dependency)和宽依赖(Wide Dependency)
若一个RDD中每一条记录仅仅依赖父RDD中唯一一条记录,则其为窄依赖,否则为宽依赖。比如在map中,每一条子RDD中的记录就对应着唯一父RDD中的对应记录。而groupByKey这样的操作中,子RDD中的一条记录,我们并不知道它究竟来自哪个父RDD中的哪个partition。
利用mapreduce的概念来理解,一组连续的窄依赖操作可以用一个mapper来实现,而宽依赖操作则只能依赖reducer。正因如此,一组连续窄依赖中产生的“中间结果”(实际并不需要产生这些中间结果)是没有存在的意义的,只要知道输入、操作就能直接计算输出了。举个具体的例子:
1 | val resRDD = srcRDD.map(_ + 1).map(_ + 2).filter( _ % 2 == 0) |
中的transformation链可以看作mapreduce下的一个mapper,一条记录从左到右执行不依赖其它记录。若把上面例子改为:
1 | val resRDD = srcRDD.map(_ + 1).distinct().filter( _ % 2 == 0) |
其中加入了distinct意味着一条记录从左到右无法利用一个mapper就完成,必须截断加入一个reducer。这里需要理解mapreduce中的一个mapper并不是等价于spark中的一个map操作,而是对应所有窄操作的组合,例如filter、flatMap、union等等。
补充材料:why spark’s mapPartitions transformation is faster than map。其中的一句话讲的非常清楚——you probably know that “narrow” transformations/tasks happen independently on each of the partitions. 即窄操作在单机即可完成,不需要依赖保存在其它主机上的partition。
RDD中persist和checkpoint的逻辑
persist的目的是为了加快后续对该RDD的操作;checkpoint的目的是为了减少长执行链失败带来的开销。由于目的不同,如果persist的RDD丢失了,可以重新计算一遍(这就是普通cache的基本逻辑)。反过来,如果checkpoint丢失了,则无法重新计算,因为该checkpoint之前的内容都遗忘了。cache只是persist的一个子操作,其storage level为memory_only。
persist和checkpoint都是异步操作,执行persist或checkpoint命令仅仅给对应的RDD加上一个mark,后续交给block manager完成具体的物化操作(???)。persist有多种storage level,包括memory, off heap memory, disk等等。在spark中,block manager负责所有的数据存储管理,包括persist、checkpoint、或shuffle产生的中间数据等。
值得一提的是关于off heap memory的概念说明。简而言之,off heap memory就是不受JVM管控的一块内存空间,由于不受管控所以不存在GC的开销;另一方面由于并非JVM native环境,所以并不能识别其中存储的Java对象这样的结构,需要序列化和反序列化来支持。off heap memory的典型应用场景则是缓存一些较大的静态数据。
重要的方法
compute
def compute(split: Partition, context: TaskContext): Iterator[T]
根据给定的partition计算一个interator,可以遍历该partition下的所有记录。有意思的是partition的名字为split,与mapreduce下mapper的处理单位名字一样。
RDD中的基础transformation
map
def map[U: ClassTag](f: T => U): RDD[U]
返回的RDD为MapPartitionsRDD类型,其compute方法会对其父RDD中的记录执行f映射。
mapPartitions
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
与map的区别在于映射f的作用对象是整个partition,而不是一条partition中的记录。在一些初始化代价较高的场景下,mapPartition比map更加合理和高效。
补充材料:why spark’s mapPartitions transformation is faster than map。
flatMap
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
与map类似,仅仅将对iterator的map操作换成flatMap操作。这里f映射的输出类型为TraversableOnce,表示只要能完成单次遍历即可,可以是Traversable或Iterable。
filter
def filter(f: T => Boolean): RDD[T]
与map类似,仅仅将对iterator的map操作换作filter操作。
distinct
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
首先这个方法存在一个implicit的参数ord,类型为scala.math.Ordering。Ordering中实现了各种基础类型(Int, Long, Double, String等)的比较方法,这意味着如果T是一种基础类型则无须实现自己的比较方法,只需要import scala.math.Ordering即可。
与前几种transformation最大的不同在于distinct依赖reduce,即它是一种宽依赖操作。其具体实现代码如下:
1 | map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) |
可见其首先将一条记录映射为一个pair,然后执行reduceByKey的操作。这里reduceByKey方法并非RDD所有,之所以可以调用是因为object RDD里定义了从RDD转换为PairRDDFunctions的implicit方法。这种针对特定情况下的RDD增加操作的抽象方式可以学习。reduceByKey中给出了合并两个value的方式,即把相同的key的alue合并为一个(在此为null),然后根据给定的numPartitions数量进行hash partition。最终结果通过map仅保留key即可。
与mapreduce一致,这里的合并会发生在本地和reducer处,类似mapreduce中的combiner。在调用reduceByKey后的调用逻辑为:
1 | reduceByKey((x, y) => x, numPartitions) |
在combineByKeyWithClassTag中会根据传入的三个映射分别创建createCombiner、mergeValue和mergeCombiner。其中,createCombiner用于产生合并的初始值;mergeValue用于合并两条记录;mergeCombiner用于将mergeValue得到的结果再次合并。上述三者组成一个Aggregator对象。
coalesce
def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null) : RDD[T]
连接的作用是重新整理原有的RDD。有两种情况:(1)若shuffle\=\=false,表示一种虚拟的RDD分区变化,此时numPartitions应该比原来的少,否则无意义。注意此时是不会发生真实的IO的;(2)若shuffle\=\=true,表示要做一次真实的shuffle,即会带有真实的数据IO。对于第二种情况,在coalesce方法内部会做一次随机的mapping操作,把每个元素与结果RDD中的partition做一次mapping。在第二种情况下,numPartitions可以比父RDD的分区数量更多。
虽然前一种情况只是虚拟的分区变化,但究竟把哪些父partition分入同一个子partition是可以考虑locality因素的,CoalescedRDD的balanceSlack参数用来控制locality在分配父partition时起的权重。
看代码中1
2
3
4
5// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(
new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
new HashPartitioner(numPartitions)),
numPartitions).values
这段话比较难懂,而实际上是做了几件事:首先,在ShuffledRDD中根据随机生成的key将父RDD各partiton中的数据分散到子RDD的各partiton中;然后,隐式转换为PairRDDFunctions的values方法转换成普通的RDD。
sample
def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T] = withScope
对当前RDD的每个partition进行一次sample。withReplacement用于控制是否可出现重复sample,fraction控制sample的比例,seed即随机种子。
randomSplit
def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]]
给定一组weights,例如Array(2.0,3.0,5.0),将父RDD按这样的比例划分,得到一个子RDD数组。
示例:1
2
3
4
5
6
7
8
9
10
11
12
13scala> val rdd = sc.makeRDD(1 to 10,10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:27
scala> rdd.collect
res1: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> val randomSplittedRDD = rdd.randomSplit(Array(2.0, 3.0, 5.0))
randomSplittedRDD: Array[org.apache.spark.rdd.RDD[Int]] = Array(MapPartitionsRDD[12] at randomSplit at <console>:29, MapPartitionsRDD[13] at randomSplit at <console>:29, MapPartitionsRDD[14] at randomSplit at <console>:29)
scala> randomSplittedRDD.foreach(x => println(x.collect.mkString(" ")))
9 10
2 4 8
1 3 5 6 7
其内部实现实际上是利用了BernoulliCellSampler完成的,每次把父RDD的某个partition做一次sample得到一个子partition,通过一个MapPartitionsRDD实现从父RDD到子RDD的映射。但由于产生的是一组子RDD,因此每多一个子RDD就需要把父RDD做一次sample。由于每次调用时random seed是在内部保持不变的,所以即使多次sample,也不会导致某个元素被分到不同的子RDD里去。这一点是开始一直想不通的,因为我一直以为只需要sample一遍就能完成整个过程。
takeSample
def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]
返回指定数量的sample。
union(同++)
def union(other: RDD[T]): RDD[T]
获取两个RDD的并集,若一个元素出现多次,并不会通过union操作去重,因此union本身属于窄依赖。根据partitioner的情况,分两种情况处理:(1)如果两个RDD的partitioner都定义了且相同,那两RDD的partition数量一样,得到的并集RDD也有相同数量的partition。在考虑locality时,会按照多数原则处理,即如果大多数属于某个并集partition的父partition都倾向某个locality选择,那么就以此多数为准;(2)如果不满足(1)的情况,则并集RDD的partition数量为两父RDD的数量之和,即简单的合并关系。
keyBy
def keyBy[K](f: T => K): RDD[(K, T)]
根据映射f抽取原RDD中每条记录的key,使结果RDD中每条记录为一个kv二元组。
sortBy
def sortBy[K](f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
对RDD排序,key由映射f抽取。这个方法的实现比较有趣,如下1
2
3this.keyBy[K](f) //生成一个基于kv二元组的RDD
.sortByKey(ascending, numPartitions) //sortByKey是OrderedRDDFunctions中的方法,由隐式转换rddToOrderedRDDFunctions支持
.values //排好序的RDD再退化由原来的元素组成,也是隐式转换支持
实现过程经过两次隐式转换,非常有scala的特色,这种隐式转换往往发生在特殊的RDD之上。排序的具体过程参考Shuffle一节。
intersection
def intersection(other: RDD[T]): RDD[T]
计算两个父RDD的交集,得到子RDD,交集元素无重复。实现如下:1
2
3this.map(v => (v, null)).cogroup(other.map(v => (v, null))) //map成kv二元组后,隐式转换PairRDDFunctions调用其cogroup方法得到(k, (v1, v2))的结构
.filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty } //把两边都不是空的情况筛选出来
.keys //退化为普通的RDD
其中cogroup依赖shuffle,所以是宽依赖操作。intersection操作还有一些重载,但基本实现是相同的。
glom
def glom(): RDD[Array[T]]
将原来的RDD变成新的RDD,其原有的每个partition变成一个数组。例如:1
2
3
4scala> val a = sc.parallelize(1 to 9, 3)
scala> a.glom.collect
res66: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9))
这篇文章把glom的作用讲的非常清楚。其中的例1和例2都是在处理一个数组要比挨个处理每个元素好很多的时候。当然,这消耗的内存要更大(TODO: 具体使用情况如何?是否会导致OOM?),是一个折衷。
cartesian
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]
生成当前RDD与另一个RDD的笛卡尔积,即列举所有a in this和b in other而组成的(a,b)的集合。生成的新RDD的partition数量等于原两个RDD各自的partition数量的乘积。
groupBy
def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null) : RDD[(K, Iterable[T])]
将当前RDD中的元素按f映射的key做group操作,结果RDD可根据传入的partitioner来进行分区。源代码中有如下注释:1
2
3
4
5
6* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
*
* Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any
* key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].
其中指出当前实现中一个key的所有value会需要保存在内存中,从而可能导致OOM,这可能是combine的过程中必须将所有value保存在内存中有关(推测)。另外,聚合或reduce可以解决大部分问题,而不需要groupBy,依此推测这个操作仅用于一些value较少又不得不获取这个中间结果的场景。
这篇文章很好的讲述了groupBy引入的内存问题的原因。
pipe
def pipe(command: String): RDD[String]
pipe类似于mapreduce中的streaming,即能通过stdin来把数据发往外部进程,在通过stdout把结果读回来。这篇文章讲的非常清楚。但是这似乎只是map的过程,并不能包括reduce。
其内部实现实际上就是把参数中包含的command启动一个进程,然后通过stdin/out来完成上述算子操作过程。
zip
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
将当前RDD与other组合成一个新的包含二元组的RDD,要求两个RDD包含相同数量的partition,且每对partition包含相同数量的元素。
zipPartitions
def zipPartitions[B: ClassTag, V: ClassTag](rdd2: RDD[B], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V]
与zip的关系类似map与mapPartitions的关系,但又不完全一样。zip要求对应的partition里包含的元素数量也完全一样,但这里f映射并不需要两个partiton里元素数量相同。但显然可以利用zipPartitions来实现zip的功能,且与zip比较起来应该有更好的效率。
subtract
def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
得到在当前RDD中且不在other中的元素组成的RDD,由于需要按元素做key,属于宽依赖。
DataFrame.repartion vs. DataFrameWriter.partitionBy
def repartition(numPartitions: Int, partitionExprs: Column*): DataFrame
def partitionBy(colNames: String*): DataFrameWriter
这里的讨论非常清楚。repartition的参数是numPartitions和partitionExprs,partitionExprs将指定的列做hash后对numPartitions求模,得到对应的partition的index。这样得到的最终分区数量是numPartitions,但实际上如果numPartitons大于分组数量,可能有一些partition是空的;反之,如果numPartitions小于分组数量,有一些partiton里包含多个分组。partitionBy是把每个partition按照指定的列拆分为一到多个文件。
一个应用实力:如果希望输出的文件里,每个文件有且仅有一个分组,那么就可以dataframe.repartiton(n, columns).write.partitionBy(columns).csv(xxx)。其中n可以控制并发的数量,跟实际的数据分布有关。
zipWithUniqueId
def zipWithUniqueId(): RDD[(T, Long)]
为了解决zipWithIndex带来的性能问题,这里放松了条件,只要求id是唯一的。zipWithUniqueId只是个算子,第k个partition的元素对应的id分别为k, k+n, k+2n, …,这里的n是partition的数量。
RDD中的actions
foreach
def foreach(f: T => Unit): Unit
将映射f应用到每个元素上。
foreachPartition
def foreachPartition(f: Iterator[T] => Unit): Unit
将映射f应用到每个partition上。
collect
def collect(): Array[T]
将RDD中所有元素作为一个数组返回。注意不要将collect作用于一个过大的RDD,否则会抛出内存异常,可先利用take和takeSample只取一个子集。
reduce
def reduce(f: (T, T) => T): T
执行映射f对应的reduce操作。其操作基本步骤是:(1)每个partition执行f映射对应的reduce过程;(2)在driver的host机器上执行基于f映射的reduce过程,输入来自各个partition的输出。步骤(2)的复杂度与partition的数量呈线性增加。
treeReduce
def treeReduce(f: (T, T) => T, depth: Int = 2): T
为了改进reduce里步骤(2)的瓶颈问题,对各partition的输出先逐层聚合,最后再到driver处生成最终结果,类似一棵树的聚合过程。在文章里有详细的描述。reduce和treeReduce的关系类似aggregate和treeAggregate的关系。
fold
def fold(zeroValue: T)(op: (T, T) => T): T
将映射op应用到每对元素上面。在实现过程中,spark不限定元素之间的执行顺序,实际上是先在partition内部做,然后再在partition之间,所以不能保证一个预先设定好的顺序来执行。因此,fold算子适用于那种不需要考虑左右操作元素的顺序,例如max。
aggregate
def aggregateU: ClassTag\(seqOp: (U, T) => U, combOp: (U, U) => U): U
与fold的不同在于aggregate可以返回一个新的类型U,而不是原来的类型T。从定义的角度,fold是aggregate的一种特例。例如:
1 | scala> val a = sc.parallelize(1 to 9, 3) |
treeAggregate
def treeAggregateU: ClassTag(seqOp: (U, T) => U, combOp: (U, U) => U, depth: Int = 2): U
aggregate与treeAggregate和reduce与treeReduce的关系类似。
count
def count(): Long
计算整个RDD中元素的个数。
countApprox
countApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble]
在给定timeout期限的情况下,返回RDD中元素个数的估计。其中confidence是认为评估结果符合高斯分布的假设条件下估算的置信度,而不是结果的可信度。其核心代码如下:
1 | override def currentResult(): BoundedDouble = { |
其中totalOutputs是partition的个数。上面代码的逻辑是:如果已经计算了所有partition,则返回的结果是100%准确的;如果一个partition都未完成,那么结果完全不可信;否则,按比例计算mean,variance跟已返回比例有关,越多则variance越小,其low/high都是根据confidence和mean算出来的。
countByValue
def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long]
实际上就是一个map + reduce的过程,而所得结果因为需要转化为Map,需要把所得内容完全载入driver的内存,所以只适合不同的value的数量比较小的情况。
countByValueApprox
def countByValueApprox(timeout: Long, confidence: Double = 0.95)(implicit ord: Ordering[T] = null) : PartialResult[Map[T, BoundedDouble]]
与前面提到的countApprox实现类似。
zipWithIndex
def zipWithIndex(): RDD[(T, Long)]
获得一个新的RDD,其中每个元素都是一个二元组,其中value是元素所在RDD中的全局index。该操作不保证重复时index的顺序不变。这个操作表面上是一个算子,但实际上会触发一个spark job,因为在执行之前需要知道每个partition的起始index,而这只能通过count每个partition来得到。
take
def take(num: Int): Array[T]
take的作用是从一个RDD中获取给定数量num个数的元素,得到一个数组。实现的基本思路是,首先尝试读一个partition,然后根据得到的元素数量与num的比较决定是否需要再探索其它的partition,以及探索的partition数量。这个探索数量的策略似乎比较heuristic,大体上是每次探索的partition数量小于等于已探索的4倍,而具体的值跟已探索到的元素数量与num的关系来定。从实现上看,take返回的所有元素都保存在一个数组内,所以如果num数量过大会引起内存问题。
takeOrdered
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
takeOrdered除了获取num个元素外,还要求这些元素按照ord给出的排序方式排序。其实现的核心代码如下:
1 | val mapRDDs = mapPartitions { items => |
首先,对每个partition需要得到一个BoundedPriorityQueue,其大小固定为num。若partition内元素少于num个,则queue不满。随后,在一个reduce中,把每个partition得到的queue拼接为一个queue。BoundedPriorityQueue的拼接会按照每个元素插入队列。根据这个实现,每次takeOrdered或top操作都需要对所有partition排序,然后在结果里拼出一个大小为num的队列,代价是比较大的。
常见的RDD派生类
Spark Architecture
http://datastrophic.io/core-concepts-architecture-and-internals-of-apache-spark/
Shuffle
Shuffle的目的是把key相同的记录发送到相同的parition以供后续处理。Mapreduce中同样存在shuffle阶段。回顾mapreduce中shuffle的过程:(1)mapper将数据分为多个partition,然后parition内按照key排序(实际分两步完成),这些partition一部分写入磁盘,一部分缓存在内存里;(2)mapper输出的partition分发到对应的reducer;(3)reducer对已经排好续的记录再次进行合并排序;(4)key相同的记录被group为一个iterable交给reduce方法处理。
补充材料:《Hadoop: The Definitive Guide》英文版,197页
Shuffle的两种方法
Spark中shuffle“目前”有两种实现,分别是基于hash和sort。
基于hash的方式在spark 1.2.0以前是默认的方式。其实现思路非常简单,对于任意输入RDD中的partition,根据hash结果产生N个文件。N表示“reducer”的数量。由于没有排序,每条记录经过hash后直接写入文件,因此速度较快。对于后续处理不需要排序的情况,基于hash的shuffle性能较好。其缺陷是产生的文件数量较大。
基于sort的方式达到的效果与mapreduce里的shuffle一样,但实现上有较大的差异。首先,从“mapper”写出的数据是不做本地排序的,只有在“reducer”从远端获取数据时才会触发排序过程。这里需要了解spark中的AppendOnlyMap的数据结构。简单来说,在数据量足够小的情况下,“mapper”输出的数据会保存在内存一个AppendOnlyMap中。如果数据较多,则会将AppendOnlyMap变换为一个priority queue,按key排序后保存到外部文件中。这样一来,一次map操作的所有数据会保存在一个内存里的AppendOnlyMap加若干外部的文件。当“reducer”请求数据的时候,这些数据分片会被组织成一个最小堆,每次读取一个key最小的记录,从而实现了排序的功能。“Reducer“拿到各个数据分片后,采用TimSort来对所有数据排序,而不是mapreduce中的合并排序。
补充材料:
Spark Architecture: Shuffle
spark的外排:AppendOnlyMap与ExternalAppendOnlyMap
Block Manager
Block Manager在spark中作为一层存储抽象层存在。RDD的iterator方法里有读取缓存的partition的入口getOrCompute,其中block的id定义为:
1 | val key = RDDBlockId(rdd.id, partition.index) |
从实现上看每个RDD的partition都有一个唯一的key,用于blockmanager存储的键值。一个partition应该与一个block一一对应的。Block的存储完全由block manager来管理。
关于block size不能超过2g限制的issue tracker
不错的参考资料
Spark源码分析之-Storage模块
Spark缓存机制分析
Apache Spark源码走读之6 – 存储子系统分析
DAG
DAGScheduler
Adaptive execution in Spark
DataFrame
可以反复学习的blog
http://dataknocker.github.io