Spark DataFrame / Dataset查找每个密钥效率方式的最常见值(Spark DataFrame/Dataset Find most common value for each key

编程入门 行业动态 更新时间:2024-10-25 05:25:56
Spark DataFrame / Dataset查找每个密钥效率方式的最常见值(Spark DataFrame/Dataset Find most common value for each key Efficiency way)

问题:我有一个问题来映射spark中一个键的最常见值(使用scala)。 我已经完成了RDD,但不知道如何有效地使用DF / DS(sparksql)

数据集就像

key1 = value_a key1 = value_b key1 = value_b key2 = value_a key2 = value_c key2 = value_c key3 = value_a

在火花转换和访问输出应该是每个关键与它的共同价值

产量

key1 = valueb key2 = valuec key3 = valuea

直到现在尝试:

RDD

我试图映射和减少按(key,value),count RDD,它使逻辑,但我不能把它转换成sparksql(DataFrame / Dataset)(因为我想跨越网络最小混洗)

这是我的RDD代码

val data = List( "key1,value_a", "key1,value_b", "key1,value_b", "key2,value_a", "key2,value_c", "key2,value_c", "key3,value_a" ) val sparkConf = new SparkConf().setMaster("local").setAppName("example") val sc = new SparkContext(sparkConf) val lineRDD = sc.parallelize(data) val pairedRDD = lineRDD.map { line => val fields = line.split(",") (fields(0), fields(2)) } val flatPairsRDD = pairedRDD.flatMap { (key, val) => ((key, val), 1) } val SumRDD = flatPairsRDD.reduceByKey((a, b) => a + b) val resultsRDD = SumRDD.map{ case ((key, val), count) => (key, (val,count)) }.groupByKey.map{ case (key, valList) => (name, valList.toList.sortBy(_._2).reverse.head) } resultsRDD.collect().foreach(println)

DataFrame,使用窗口:我想用Window.partitionBy("key", "value") count over the window上聚合count over the window 。 和sorting和agg()分别

Problem: I have a problem to map most common value of a key in spark(using scala). I have done it with RDD but don't know how to do efficiently with DF/DS(sparksql)

dataset is like

key1 = value_a key1 = value_b key1 = value_b key2 = value_a key2 = value_c key2 = value_c key3 = value_a

After spark transformation and access output should be each key with its common value

Output

key1 = valueb key2 = valuec key3 = valuea

Tried until now:

RDD

I have tried to map and reduce by group of (key,value),count in RDD and it makes logic but I cant translate this into sparksql(DataFrame/Dataset) (as I want minimum shuffle across network)

Here is my code for RDD

val data = List( "key1,value_a", "key1,value_b", "key1,value_b", "key2,value_a", "key2,value_c", "key2,value_c", "key3,value_a" ) val sparkConf = new SparkConf().setMaster("local").setAppName("example") val sc = new SparkContext(sparkConf) val lineRDD = sc.parallelize(data) val pairedRDD = lineRDD.map { line => val fields = line.split(",") (fields(0), fields(2)) } val flatPairsRDD = pairedRDD.flatMap { (key, val) => ((key, val), 1) } val SumRDD = flatPairsRDD.reduceByKey((a, b) => a + b) val resultsRDD = SumRDD.map{ case ((key, val), count) => (key, (val,count)) }.groupByKey.map{ case (key, valList) => (name, valList.toList.sortBy(_._2).reverse.head) } resultsRDD.collect().foreach(println)

DataFrame , Using Windowing: I am trying with Window.partitionBy("key", "value") to aggregate the count over the window. and thn sorting and agg() respectively

最满意答案

根据我的理解,你可以做什么

首先,您必须读取数据并将其转换为dataframe

val df = sc.textFile("path to the data file") //reading file line by line .map(line => line.split("=")) // splitting each line by = .map(array => (array(0).trim, array(1).trim)) //tuple2(key, value) created .toDF("key", "value") //rdd converted to dataframe which required import sqlContext.implicits._

这将是

+----+-------+ |key |value | +----+-------+ |key1|value_a| |key1|value_b| |key1|value_b| |key2|value_a| |key2|value_c| |key2|value_c| |key3|value_a| +----+-------+

下一步是计算每个键的相同值的重复次数,并选择可以通过使用Window函数完成的每个键重复最多的值,并且aggregations如下

import org.apache.spark.sql.expressions._ //import Window library def windowSpec = Window.partitionBy("key", "value") //defining a window frame for the aggregation import org.apache.spark.sql.functions._ //importing inbuilt functions df.withColumn("count", count("value").over(windowSpec)) // counting repeatition of value for each group of key, value and assigning that value to new column called as count .orderBy($"count".desc) // order dataframe with count in descending order .groupBy("key") // group by key .agg(first("value").as("value")) //taking the first row of each key with count column as the highest

因此最终输出应该等于

+----+-------+ |key |value | +----+-------+ |key3|value_a| |key1|value_b| |key2|value_c| +----+-------+

According to what I understood from your question here's what you can do

First you have to read the data and convert it to dataframe

val df = sc.textFile("path to the data file") //reading file line by line .map(line => line.split("=")) // splitting each line by = .map(array => (array(0).trim, array(1).trim)) //tuple2(key, value) created .toDF("key", "value") //rdd converted to dataframe which required import sqlContext.implicits._

which would be

+----+-------+ |key |value | +----+-------+ |key1|value_a| |key1|value_b| |key1|value_b| |key2|value_a| |key2|value_c| |key2|value_c| |key3|value_a| +----+-------+

Next step would be to count the repetition of identical values for each key and select the value that repeated the most for each key which can be done by using Window function, and aggregations as below

import org.apache.spark.sql.expressions._ //import Window library def windowSpec = Window.partitionBy("key", "value") //defining a window frame for the aggregation import org.apache.spark.sql.functions._ //importing inbuilt functions df.withColumn("count", count("value").over(windowSpec)) // counting repeatition of value for each group of key, value and assigning that value to new column called as count .orderBy($"count".desc) // order dataframe with count in descending order .groupBy("key") // group by key .agg(first("value").as("value")) //taking the first row of each key with count column as the highest

thus the final output should be equal to

+----+-------+ |key |value | +----+-------+ |key3|value_a| |key1|value_b| |key2|value_c| +----+-------+

更多推荐

本文发布于:2023-07-05 01:06:00,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1031252.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:密钥   最常见   效率   方式   Dataset

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!