问题:我有一个问题来映射spark中一个键的最常见值(使用scala)。 我已经完成了RDD,但不知道如何有效地使用DF / DS(sparksql)
数据集就像
key1 = value_a key1 = value_b key1 = value_b key2 = value_a key2 = value_c key2 = value_c key3 = value_a在火花转换和访问输出应该是每个关键与它的共同价值
产量
key1 = valueb key2 = valuec key3 = valuea直到现在尝试:
RDD
我试图映射和减少按(key,value),count RDD,它使逻辑,但我不能把它转换成sparksql(DataFrame / Dataset)(因为我想跨越网络最小混洗)
这是我的RDD代码
val data = List( "key1,value_a", "key1,value_b", "key1,value_b", "key2,value_a", "key2,value_c", "key2,value_c", "key3,value_a" ) val sparkConf = new SparkConf().setMaster("local").setAppName("example") val sc = new SparkContext(sparkConf) val lineRDD = sc.parallelize(data) val pairedRDD = lineRDD.map { line => val fields = line.split(",") (fields(0), fields(2)) } val flatPairsRDD = pairedRDD.flatMap { (key, val) => ((key, val), 1) } val SumRDD = flatPairsRDD.reduceByKey((a, b) => a + b) val resultsRDD = SumRDD.map{ case ((key, val), count) => (key, (val,count)) }.groupByKey.map{ case (key, valList) => (name, valList.toList.sortBy(_._2).reverse.head) } resultsRDD.collect().foreach(println)DataFrame,使用窗口:我想用Window.partitionBy("key", "value") count over the window上聚合count over the window 。 和sorting和agg()分别
Problem: I have a problem to map most common value of a key in spark(using scala). I have done it with RDD but don't know how to do efficiently with DF/DS(sparksql)
dataset is like
key1 = value_a key1 = value_b key1 = value_b key2 = value_a key2 = value_c key2 = value_c key3 = value_aAfter spark transformation and access output should be each key with its common value
Output
key1 = valueb key2 = valuec key3 = valueaTried until now:
RDD
I have tried to map and reduce by group of (key,value),count in RDD and it makes logic but I cant translate this into sparksql(DataFrame/Dataset) (as I want minimum shuffle across network)
Here is my code for RDD
val data = List( "key1,value_a", "key1,value_b", "key1,value_b", "key2,value_a", "key2,value_c", "key2,value_c", "key3,value_a" ) val sparkConf = new SparkConf().setMaster("local").setAppName("example") val sc = new SparkContext(sparkConf) val lineRDD = sc.parallelize(data) val pairedRDD = lineRDD.map { line => val fields = line.split(",") (fields(0), fields(2)) } val flatPairsRDD = pairedRDD.flatMap { (key, val) => ((key, val), 1) } val SumRDD = flatPairsRDD.reduceByKey((a, b) => a + b) val resultsRDD = SumRDD.map{ case ((key, val), count) => (key, (val,count)) }.groupByKey.map{ case (key, valList) => (name, valList.toList.sortBy(_._2).reverse.head) } resultsRDD.collect().foreach(println)DataFrame , Using Windowing: I am trying with Window.partitionBy("key", "value") to aggregate the count over the window. and thn sorting and agg() respectively
最满意答案
根据我的理解,你可以做什么
首先,您必须读取数据并将其转换为dataframe
val df = sc.textFile("path to the data file") //reading file line by line .map(line => line.split("=")) // splitting each line by = .map(array => (array(0).trim, array(1).trim)) //tuple2(key, value) created .toDF("key", "value") //rdd converted to dataframe which required import sqlContext.implicits._这将是
+----+-------+ |key |value | +----+-------+ |key1|value_a| |key1|value_b| |key1|value_b| |key2|value_a| |key2|value_c| |key2|value_c| |key3|value_a| +----+-------+下一步是计算每个键的相同值的重复次数,并选择可以通过使用Window函数完成的每个键重复最多的值,并且aggregations如下
import org.apache.spark.sql.expressions._ //import Window library def windowSpec = Window.partitionBy("key", "value") //defining a window frame for the aggregation import org.apache.spark.sql.functions._ //importing inbuilt functions df.withColumn("count", count("value").over(windowSpec)) // counting repeatition of value for each group of key, value and assigning that value to new column called as count .orderBy($"count".desc) // order dataframe with count in descending order .groupBy("key") // group by key .agg(first("value").as("value")) //taking the first row of each key with count column as the highest因此最终输出应该等于
+----+-------+ |key |value | +----+-------+ |key3|value_a| |key1|value_b| |key2|value_c| +----+-------+According to what I understood from your question here's what you can do
First you have to read the data and convert it to dataframe
val df = sc.textFile("path to the data file") //reading file line by line .map(line => line.split("=")) // splitting each line by = .map(array => (array(0).trim, array(1).trim)) //tuple2(key, value) created .toDF("key", "value") //rdd converted to dataframe which required import sqlContext.implicits._which would be
+----+-------+ |key |value | +----+-------+ |key1|value_a| |key1|value_b| |key1|value_b| |key2|value_a| |key2|value_c| |key2|value_c| |key3|value_a| +----+-------+Next step would be to count the repetition of identical values for each key and select the value that repeated the most for each key which can be done by using Window function, and aggregations as below
import org.apache.spark.sql.expressions._ //import Window library def windowSpec = Window.partitionBy("key", "value") //defining a window frame for the aggregation import org.apache.spark.sql.functions._ //importing inbuilt functions df.withColumn("count", count("value").over(windowSpec)) // counting repeatition of value for each group of key, value and assigning that value to new column called as count .orderBy($"count".desc) // order dataframe with count in descending order .groupBy("key") // group by key .agg(first("value").as("value")) //taking the first row of each key with count column as the highestthus the final output should be equal to
+----+-------+ |key |value | +----+-------+ |key3|value_a| |key1|value_b| |key2|value_c| +----+-------+更多推荐
发布评论