Spark源码分析之分区器的作用

发表于:2017-4-25 09:48

字体: | 上一篇 | 下一篇 | 我要投稿

 作者:xingoo    来源:36大数据

  确定边界
  最后就可以通过获取的样本数据,确定边界了。
def determineBounds[K : Ordering : ClassTag](
candidates: ArrayBuffer[(K, Float)],
partitions: Int): Array[K] = {
val ordering = implicitly[Ordering[K]]
// 数据格式为(key,权重)
val ordered = candidates.sortBy(_._1)
val numCandidates = ordered.size
val sumWeights = ordered.map(_._2.toDouble).sum
val step = sumWeights / partitions
var cumWeight = 0.0
var target = step
val bounds = ArrayBuffer.empty[K]
var i = 0
var j = 0
var previousBound = Option.empty[K]
while ((i < numCandidates) && (j < partitions - 1)) {
val (key, weight) = ordered(i)
cumWeight += weight
if (cumWeight >= target) {
// Skip duplicate values.
if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {
bounds += key
target += step
j += 1
previousBound = Some(key)
}
}
i += 1
}
bounds.toArray
}
  直接看代码,还是有些晦涩难懂,我们举个例子,一步一步解释下:
  按照上面的算法流程,大致可以理解:
  抽样-->确定边界(排序)
  首先对spark有一定了解的都应该知道,在spark中每个RDD可以理解为一组分区,这些分区对应了内存块block,他们才是数据最终的载体。那么一个RDD由不同的分区组成,这样在处理一些map,filter等算子的时候,就可以直接以分区为单位并行计算了。直到遇到shuffle的时候才需要和其他的RDD配合。
  在上面的图中,如果我们不特殊设置的话,一个RDD由3个分区组成,那么在对它进行groupbykey的时候,就会按照3进行分区。
  按照上面的算法流程,如果分区数为3,那么采样的大小为:
  val sampleSize = math.min(20.0 * partitions, 1e6)
  即采样数为60,每个分区取60个数。但是考虑到数据倾斜的情况,有的分区可能数据很多,因此在实际的采样时,会按照3倍大小采样:
  val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.size).toInt
  也就是说,最多会取60个样本数据。
  然后就是遍历每个分区,取对应的样本数。
  val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
  val seed = byteswap32(idx ^ (shift << 16))
  val (sample, n) = SamplingUtils.reservoirSampleAndCount(
  iter, sampleSizePerPartition, seed)
  //包装成三元组,(索引号,分区的内容个数,抽样的内容)
  Iterator((idx, n, sample))
  }.collect()
  然后检查,是否有分区的样本数过多,如果多于平均值,则继续采样,这时直接用sample 就可以了
sketched.foreach { case (idx, n, sample) =>
if (fraction * n > sampleSizePerPartition) {
imbalancedPartitions += idx
} else {
// The weight is 1 over the sampling probability.
val weight = (n.toDouble / sample.size).toFloat
for (key <- sample) {
candidates += ((key, weight))
}
}
}
if (imbalancedPartitions.nonEmpty) {
// Re-sample imbalanced partitions with the desired sampling probability.
val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
val seed = byteswap32(-rdd.id - 1)
//基于RDD获取采样数据
val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
val weight = (1.0 / fraction).toFloat
candidates ++= reSampled.map(x => (x, weight))
}
  取出样本后,就到了确定边界的时候了。
  注意每个key都会有一个权重,这个权重是 【分区的数据总数/样本数】
  RangePartitioner.determineBounds(candidates, partitions)
  首先排序val ordered = candidates.sortBy(_._1),然后确定一个权重的步长
  val sumWeights = ordered.map(_._2.toDouble).sum
  val step = sumWeights / partitions
  基于该步长,确定边界,最后就形成了几个范围数据。
  然后分区器形成二叉树,遍历该数确定每个key对应的分区id
  partition = binarySearch(rangeBounds, k)
  实践 —— 自定义分区器
  自定义分区器,也是很简单的,只需要实现对应的两个方法就行:
public class MyPartioner extends Partitioner {
@Override
public int numPartitions() {
return 1000;
}
@Override
public int getPartition(Object key) {
String k = (String) key;
int code = k.hashCode() % 1000;
System.out.println(k+":"+code);
return  code < 0?code+1000:code;
}
@Override
public boolean equals(Object obj) {
if(obj instanceof MyPartioner){
if(this.numPartitions()==((MyPartioner) obj).numPartitions()){
return true;
}
return false;
}
return super.equals(obj);
}
}
  使用的时候,可以直接new一个对象即可。
  pairRdd.groupbykey(new MyPartitioner())
22/2<12
《2023软件测试行业现状调查报告》独家发布~

关注51Testing

联系我们

快捷面板 站点地图 联系我们 广告服务 关于我们 站长统计 发展历程

法律顾问:上海兰迪律师事务所 项棋律师
版权所有 上海博为峰软件技术股份有限公司 Copyright©51testing.com 2003-2024
投诉及意见反馈:webmaster@51testing.com; 业务联系:service@51testing.com 021-64471599-8017

沪ICP备05003035号

沪公网安备 31010102002173号