spark:根据另一个rdd的顺序加入rdd(spark: join rdd based on sequence of another rdd)

编程入门 行业动态 更新时间:2024-10-19 08:56:30
spark:根据另一个rdd的顺序加入rdd(spark: join rdd based on sequence of another rdd)

我有一个rdd说类型为RDD[(String, String, Int))] sample_rdd ,其中包含3列id,item,count。 样本数据:

id1|item1|1 id1|item2|3 id1|item3|4 id2|item1|3 id2|item4|2

我想将每个id加入lookup_rdd :

item1|0 item2|0 item3|0 item4|0 item5|0

输出应该给我跟随id1,outerjoin与lookuptable:

item1|1 item2|3 item3|4 item4|0 item5|0

同样对于id2我应该得到:

item1|3 item2|0 item3|0 item4|2 item5|0

最后,每个id的输出应该包含所有带id的计数:

id1,1,3,4,0,0 id2,3,0,0,2,0

重要提示:应始终根据查找顺序对此输出进行排序

这是我尝试过的:

val line = rdd_sample.map { case (id, item, count) => (id, (item,count)) }.map(row=>(row._1,row._2)).groupByKey() get(line).map(l=>(l._1,l._2)).mapValues(item_count=>lookup_r‌​dd.leftOuterJoin(ite‌​m_count))

def get (line: RDD[(String, Iterable[(String, Int)])]) = { for{ (id, item_cnt) <- line i = item_cnt.map(tuple => (tuple._1,tuple._2)) } yield (id,i)

I have an rdd say sample_rdd of type RDD[(String, String, Int))] with 3 columns id,item,count. sample data:

id1|item1|1 id1|item2|3 id1|item3|4 id2|item1|3 id2|item4|2

I want to join each id against a lookup_rdd this:

item1|0 item2|0 item3|0 item4|0 item5|0

The output should give me following for id1, outerjoin with lookuptable:

item1|1 item2|3 item3|4 item4|0 item5|0

Similarly for id2 i should get:

item1|3 item2|0 item3|0 item4|2 item5|0

Finally output for each id should have all counts with id:

id1,1,3,4,0,0 id2,3,0,0,2,0

IMPORTANT:this output should be always ordered according to the order in lookup

This is what i have tried:

val line = rdd_sample.map { case (id, item, count) => (id, (item,count)) }.map(row=>(row._1,row._2)).groupByKey() get(line).map(l=>(l._1,l._2)).mapValues(item_count=>lookup_r‌​dd.leftOuterJoin(ite‌​m_count))

def get (line: RDD[(String, Iterable[(String, Int)])]) = { for{ (id, item_cnt) <- line i = item_cnt.map(tuple => (tuple._1,tuple._2)) } yield (id,i)

最满意答案

试试以下。 在本地控制台上运行每个步骤以了解详细信息。

想法是zipwithindex并基于lookup_rdd形成seq。 (i1,0),(i2,1)..(i5,4)和(id1,0),(id2,1)

Index of final result wanted = [delta(length of lookup_rdd seq) * index of id1..id2 ] + index of i1...i5

所以生成的基数seq将是(0,(i1,id1)),(1,(i2,id1))...(8,(i4,id2)),(9,(i5,id2))

然后根据键(i1,id1)减少并计算计数。

val res2 = sc.parallelize(arr) //sample_rdd val res3 = sc.parallelize(cart) //lookup_rdd val delta = res3.count val res83 = res3.map(_._1).zipWithIndex.cartesian(res2.map(_._1).distinct.zipWithIndex).map(x => (((x._1._1,x._2._1),((delta * x._2._2) + x._1._2, 0))) val res86 = res2.map(x => ((x._2,x._1),x._3)).reduceByKey(_+_) val res88 = res83.leftOuterJoin(res86) val res91 = res88.map( x => { x._2._2 match { case Some(x1) => (x._2._1._1, (x._1,x._2._1._2+x1)) case None => (x._2._1._1, (x._1,x._2._1._2)) } }) val res97 = res91.sortByKey(true).map( x => { (x._2._1._2,List(x._2._2))}).reduceByKey(_++_) res97.collect // SOLUTION: Array((id1,List(1,3,4,0,0)),(id2,List(3,0,0,2,0)))

Try below. Run each step on your local console to understand whats happening in detail.

The idea is to zipwithindex and form seq based on lookup_rdd. (i1,0),(i2,1)..(i5,4) and (id1,0),(id2,1)

Index of final result wanted = [delta(length of lookup_rdd seq) * index of id1..id2 ] + index of i1...i5

So the base seq generated will be (0,(i1,id1)),(1,(i2,id1))...(8,(i4,id2)),(9,(i5,id2))

and then based on the key(i1,id1) reduce and calculate count.

val res2 = sc.parallelize(arr) //sample_rdd val res3 = sc.parallelize(cart) //lookup_rdd val delta = res3.count val res83 = res3.map(_._1).zipWithIndex.cartesian(res2.map(_._1).distinct.zipWithIndex).map(x => (((x._1._1,x._2._1),((delta * x._2._2) + x._1._2, 0))) val res86 = res2.map(x => ((x._2,x._1),x._3)).reduceByKey(_+_) val res88 = res83.leftOuterJoin(res86) val res91 = res88.map( x => { x._2._2 match { case Some(x1) => (x._2._1._1, (x._1,x._2._1._2+x1)) case None => (x._2._1._1, (x._1,x._2._1._2)) } }) val res97 = res91.sortByKey(true).map( x => { (x._2._1._2,List(x._2._2))}).reduceByKey(_++_) res97.collect // SOLUTION: Array((id1,List(1,3,4,0,0)),(id2,List(3,0,0,2,0)))

更多推荐

本文发布于:2023-07-16 13:26:00,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1128970.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:顺序   rdd   spark   sequence   based

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!