Spark 聚合原理
spark在触发shuffle的时候,在一些场景下会涉及到聚合操作。聚合操作起到了一个优化整体计算效率的作用。
聚合算法简介
Aggregator类表示Spark的聚合算法。 聚合只能用于数据源是key,value类型。
它有三个重要的函数,都在算法中起着不同的作用。里使用了泛型,分别代表着不同的数据类型。K表示数据源key的类型,V表示数据源value的类型,C表示中间数据Combiner的类型:
- createCombiner,V => C,将value数据,返回Combiner类型
- mergeValue,(C, V) => C, 将value数据,合并到Combiner类型
- mergeCombiners, (C, C) => C), 合并Combiner类型的数据
这里演示一个例子, 如下图所示:
(key_0, value_0), (key_0, value_1), (key_0, value_2)
(key_1, value_3), (key_1, value_4)
首先将相同的Key分组。然后对每一个分组,做聚合。以key_0分组为例
第一步,将第一个value数据转化为Combiner类型。遇到(key_0, value_0),将value_0 生成 combine_0数据。这里会调用了createCombiner函数
第二步,将后面的value,依次合并到C类型的数据。这就是mergeValue的作用。当遇到(key_0, value_1),讲述value_1添加到combine_0数据里
第三部,然后继续遇到(key_0, value_2),恰好这时候触发spill,会新建一个combiner_1数据, 将数据value_2添加combiner_1里。
第四部,最后将所有的combiner数据汇总,比如 合并combiner_0 和 combiner_1,这里调用了mergeCombiners函数。
任务运行
Spark生成Aggregator, 是在combineByKeyWithClassTag方法。它根据前后rdd的分区器是否一样,来判断是否需要shuffle。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
// 生成aggregator
val aggregator = new Aggregator[K, V, C](
self.context.clean(createCombiner),
self.context.clean(mergeValue),
self.context.clean(mergeCombiners))
if (self.partitioner == Some(partitioner)) {
// 如果新的分区器和原有的一样,则表示key的分布是一样。所以没必要shuffle,直接调用mapPartitions
self.mapPartitions(iter => {
val context = TaskContext.get()
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} else {
// 否则,生成ShuffledRDD
new ShuffledRDD[K, V, C](self, partitioner)
.setSerializer(serializer)
.setAggregator(aggregator)
.setMapSideCombine(mapSideCombine)
}
}
|
上面的代码可以看到,如果分区器是一样的,这里仅仅是调用了mapPartitions方法。传递的函数是aggregator调用combineValuesByKey方法返回的。阅读下面的代码,这里是调用了ExternalAppendOnlyMap类,实现了聚合的执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
class Aggregator[K, V, C] (
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C) {
def combineValuesByKey(
iter: Iterator[_ <: Product2[K, V]],
context: TaskContext): Iterator[(K, C)] = {
val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
combiners.insertAll(iter)
updateMetrics(context, combiners)
combiners.iterator
}
}
|
而如果分区器不是一样的,会生成ShuffleRDD。aggregator会保存在ShuffleRDD里面,提供后续的shuffle计算使用。
RDD常见聚合操作
这里需要说明下,RDD类指定了隐式转换 ,可以转换成PairRDDFunctions类。PairRDDFunctions类支持聚合操作。
groupby 操作
当rdd触发groupby操作时,就会触发聚合。先看看Rdd的groupby方法
1
2
3
4
5
6
7
8
9
10
11
12
|
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {
// 使用默认的分区器
groupBy[K](f, defaultPartitioner(this))
}
def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null)
: RDD[(K, Iterable[T])] = withScope {
val cleanF = sc.clean(f)
// 首先使用函数f,生成key。调用map返回(key, value)类型的RDD
// 这里RDD隐式转换成PairRDDFunctions, 然后调用groupByKey方法
this.map(t => (cleanF(t), t)).groupByKey(p)
}
|
接下来看看groupByKey方法,这里生成了Aggregator的函数。它使用Combiner类型是CompactBuffer。
CompactBuffer可以简单的理解成一个array,只不过将第一个和第二个元素单独存储,将后面的元素存到array里,但是它对外提供了和array一样的接口。这样对于存储数量小的集合,减少了数组的分配。
1
2
3
4
5
6
7
8
9
10
11
12
|
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
// 生成CompactBuffer,然后将value添加到CompactBuffer里
val createCombiner = (v: V) => CompactBuffer(v)
// 将新的value添加到CompactBuffer
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
// 将多个CompactBuffer合并
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
// 调用combineByKeyWithClassTag 生成RDD
val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}
|
reduceByKey 操作
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
// 这里可以看到createCombiner函数,只是返回value值
// mergeValues函数,是传入的func函数
// mergeCombiners函数,也还是传入的func函数
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {
reduceByKey(new HashPartitioner(numPartitions), func)
}
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
reduceByKey(defaultPartitioner(self), func)
}
countByKey 操作
1
2
3
4
|
def countByKey(): Map[K, Long] = self.withScope {
// 调用mapValues生成(key, 1)类型的RDD,然后定义了相加的函数
self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap
}
|