Spark 2在分区上迭代以创建新分区

编程入门 行业动态 更新时间:2024-10-26 12:32:05
本文介绍了Spark 2在分区上迭代以创建新分区的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我一直在想办法想出一种方法来将火花中的数据帧减少到一个记录数据帧中间隙的帧,最好不要完全消除并行性.这是一个大大简化的示例(这有点冗长,因为我希望它能够运行):

I have been scratching my head trying to come up with a way to reduce a dataframe in spark to a frame which records gaps in the dataframe, preferably without completely killing parallelism. Here is a much-simplified example (It's a bit lengthy because I wanted it to be able to run):

import org.apache.spark.sql.SparkSession case class Record(typ: String, start: Int, end: Int); object Sample { def main(argv: Array[String]): Unit = { val sparkSession = SparkSession.builder() .master("local") .getOrCreate(); val df = sparkSession.createDataFrame( Seq( Record("One", 0, 5), Record("One", 10, 15), Record("One", 5, 8), Record("Two", 10, 25), Record("Two", 40, 45), Record("Three", 30, 35) ) ); df.repartition(df("typ")).sortWithinPartitions(df("start")).show(); } }

完成后,我希望能够输出这样的数据帧:

When I get done I would like to be able to output a dataframe like this:

typ start end --- ----- --- One 0 8 One 10 15 Two 10 25 Two 40 45 Three 30 35

我猜想按'typ'值进行分区将使我得到每个具有不同数据值1-1的分区,例如1-1.在样本中,我将最终分为三个部分,每个部分分别代表一个",第二"和三个".此外,sortWithinPartitions调用旨在在开始"时按排序顺序为我提供每个分区,以便我可以从头到尾进行迭代并记录间隔.最后一部分是我被困住的地方.这可能吗?如果没有,还有另一种方法吗?

I guessed that partitioning by the 'typ' value would give me partitions with each distinct data value, 1-1, E.G. in the sample I would end up with three partions, one each for 'One', 'Two' and 'Three'. Furthermore, the sortWithinPartitions call is intended to give me each partition in sorted order on 'start' so that I can iterate from the beginning to the end and record gaps. That last part is where I am stuck. Is this possible? If not, is there another approach that is?

推荐答案

我建议跳过重新分区和排序步骤,直接跳转到分布式压缩合并排序(我刚刚发明了算法的名称,就像算法本身一样.)

I propose to skip the repartitioning and the sorting steps, and jump directly to a distributed compressed merge sort (I've just invented the name for the algorithm, just like the algorithm itself).

这里是应该用作 reduce 操作的算法的一部分:

Here is the part of the algorithm that is supposed to be used as reduce operation:

type Gap = (Int, Int) def mergeIntervals(as: List[Gap], bs: List[Gap]): List[Gap] = { require(!as.isEmpty, "as must be non-empty") require(!bs.isEmpty, "bs must be non-empty") @annotation.tailrec def mergeRec( gaps: List[Gap], gapStart: Int, gapEndAccum: Int, as: List[Gap], bs: List[Gap] ): List[Gap] = { as match { case Nil => { bs match { case Nil => (gapStart, gapEndAccum) :: gaps case notEmpty => mergeRec(gaps, gapStart, gapEndAccum, bs, Nil) } } case (a0, a1) :: at => { if (a0 <= gapEndAccum) { mergeRec(gaps, gapStart, gapEndAccum max a1, at, bs) } else { bs match { case Nil => mergeRec((gapStart, gapEndAccum) :: gaps, a0, gapEndAccum max a1, at, bs) case (b0, b1) :: bt => if (b0 <= gapEndAccum) { mergeRec(gaps, gapStart, gapEndAccum max b1, as, bt) } else { if (a0 < b0) { mergeRec((gapStart, gapEndAccum) :: gaps, a0, a1, at, bs) } else { mergeRec((gapStart, gapEndAccum) :: gaps, b0, b1, as, bt) } } } } } } } val (a0, a1) :: at = as val (b0, b1) :: bt = bs val reverseRes = if (a0 < b0) mergeRec(Nil, a0, a1, at, bs) else mergeRec(Nil, b0, b1, as, bt) reverseRes.reverse }

它的工作原理如下:

println(mergeIntervals( List((0, 3), (4, 7), (9, 11), (15, 16), (18, 22)), List((1, 2), (4, 5), (6, 10), (12, 13), (15, 17)) )) // Outputs: // List((0,3), (4,11), (12,13), (15,17), (18,22))

现在,如果将其与Spark的并行 reduce 组合,

Now, if you combine it with the parallel reduce of Spark,

val mergedIntervals = df. as[(String, Int, Int)]. rdd. map{case (t, s, e) => (t, List((s, e)))}. // Convert start end to list with one interval reduceByKey(mergeIntervals). // perform parallel compressed merge-sort flatMap{ case (k, vs) => vs.map(v => (k, v._1, v._2))}.// explode resulting lists of merged intervals toDF("typ", "start", "end") // convert back to DF mergedIntervals.show()

您将获得类似于并行合并排序的功能,该功能可直接用于整数序列(因此称为名称)的压缩表示形式.

you obtain something like a parallel merge sort that works directly on compressed representations of integer sequences (thus the name).

结果:

+-----+-----+---+ | typ|start|end| +-----+-----+---+ | Two| 10| 25| | Two| 40| 45| | One| 0| 8| | One| 10| 15| |Three| 30| 35| +-----+-----+---+

讨论

mergeIntervals 方法实现了可交换的关联操作,用于合并已经按升序排序的非重叠间隔列表.然后将所有重叠间隔合并,并再次以递增顺序存储.可以在 reduce 步骤中重复此过程,直到合并所有间隔序列.

The mergeIntervals method implements a commutative, associative operation for merging lists of non-overlapping intervals that are already sorted in increasing order. All the overlapping intervals are then merged, and again stored in increasing order. This procedure can be repeated in a reduce step until all interval sequences are merged.

该算法的有趣特性是,它最大程度地压缩了还原的每个中间结果.因此,如果您有很多时间间隔且重叠很多,那么该算法实际上可能比其他基于输入时间间隔排序的算法更快.

The interesting property of the algorithm is that it maximally compresses every intermediate result of reduction. Thus, if you have many intervals with a lot of overlap, this algorithm might actually be faster then other algorithms that are based on sorting of input intervals.

但是,如果您有很多间隔而很少重叠,则此方法可能会耗尽内存并且根本无法工作,因此必须使用其他算法来首先对间隔进行排序,然后进行某种扫描并在本地合并相邻间隔.因此,是否可行取决于用例.

However, if you have very many intervals with very seldom overlaps, then this method might run out of memory and not work at all, so that other algorithms must be used that first sort the intervals, and then make some kind of scan and merge adjacent intervals locally. So, whether this will work or not depends on the use-case.

完整代码

val df = Seq( ("One", 0, 5), ("One", 10, 15), ("One", 5, 8), ("Two", 10, 25), ("Two", 40, 45), ("Three", 30, 35) ).toDF("typ", "start", "end") type Gap = (Int, Int) /** The `merge`-step of a variant of merge-sort * that works directly on compressed sequences of integers, * where instead of individual integers, the sequence is * represented by sorted, non-overlapping ranges of integers. */ def mergeIntervals(as: List[Gap], bs: List[Gap]): List[Gap] = { require(!as.isEmpty, "as must be non-empty") require(!bs.isEmpty, "bs must be non-empty") // assuming that `as` and `bs` both are either lists with a single // interval, or sorted lists that arise as output of // this method, recursively merges them into a single list of // gaps, merging all overlapping gaps. @annotation.tailrec def mergeRec( gaps: List[Gap], gapStart: Int, gapEndAccum: Int, as: List[Gap], bs: List[Gap] ): List[Gap] = { as match { case Nil => { bs match { case Nil => (gapStart, gapEndAccum) :: gaps case notEmpty => mergeRec(gaps, gapStart, gapEndAccum, bs, Nil) } } case (a0, a1) :: at => { if (a0 <= gapEndAccum) { mergeRec(gaps, gapStart, gapEndAccum max a1, at, bs) } else { bs match { case Nil => mergeRec((gapStart, gapEndAccum) :: gaps, a0, gapEndAccum max a1, at, bs) case (b0, b1) :: bt => if (b0 <= gapEndAccum) { mergeRec(gaps, gapStart, gapEndAccum max b1, as, bt) } else { if (a0 < b0) { mergeRec((gapStart, gapEndAccum) :: gaps, a0, a1, at, bs) } else { mergeRec((gapStart, gapEndAccum) :: gaps, b0, b1, as, bt) } } } } } } } val (a0, a1) :: at = as val (b0, b1) :: bt = bs val reverseRes = if (a0 < b0) mergeRec(Nil, a0, a1, at, bs) else mergeRec(Nil, b0, b1, as, bt) reverseRes.reverse } val mergedIntervals = df. as[(String, Int, Int)]. rdd. map{case (t, s, e) => (t, List((s, e)))}. // Convert start end to list with one interval reduceByKey(mergeIntervals). // perform parallel compressed merge-sort flatMap{ case (k, vs) => vs.map(v => (k, v._1, v._2))}.// explode resulting lists of merged intervals toDF("typ", "start", "end") // convert back to DF mergedIntervals.show()

测试

mergeIntervals 的实现已经过测试.如果您想将其实际合并到您的代码库中,至少这里是对其进行一次重复随机测试的草图:

The implementation of mergeIntervals is tested a little bit. If you want to actually incorporate it into your codebase, here is at least a sketch of one repeated randomized test for it:

def randomIntervalSequence(): List[Gap] = { def recHelper(acc: List[Gap], open: Option[Int], currIdx: Int): List[Gap] = { if (math.random > 0.999) acc.reverse else { if (math.random > 0.90) { if (open.isEmpty) { recHelper(acc, Some(currIdx), currIdx + 1) } else { recHelper((open.get, currIdx) :: acc, None, currIdx + 1) } } else { recHelper(acc, open, currIdx + 1) } } } recHelper(Nil, None, 0) } def intervalsToInts(is: List[Gap]): List[Int] = is.flatMap{ case (a, b) => a to b } var numNonTrivialTests = 0 while(numNonTrivialTests < 1000) { val as = randomIntervalSequence() val bs = randomIntervalSequence() if (!as.isEmpty && !bs.isEmpty) { numNonTrivialTests += 1 val merged = mergeIntervals(as, bs) assert((intervalsToInts(as).toSet ++ intervalsToInts(bs)) == intervalsToInts(merged).toSet) } }

根据您的框架,您显然必须用更加文明的方式替换原始的 assert .

You would obviously have to replace the raw assert by something more civilized, depending on your framework.

更多推荐

Spark 2在分区上迭代以创建新分区

本文发布于:2023-10-29 07:44:34,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1539121.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:分区   迭代   Spark

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!