将Spark UDF与结构序列一起使用

编程入门 行业动态 更新时间:2024-10-25 17:17:33
本文介绍了将Spark UDF与结构序列一起使用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

给出一个数据帧,其中一列是由以下序列生成的结构序列

Given a dataframe in which one column is a sequence of structs generated by the following sequence

val df = spark .range(10) .map((i) => (i % 2, util.Random.nextInt(10), util.Random.nextInt(10))) .toDF("a","b","c") .groupBy("a") .agg(collect_list(struct($"b",$"c")).as("my_list")) df.printSchema df.show(false)

输出

root |-- a: long (nullable = false) |-- my_list: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- b: integer (nullable = false) | | |-- c: integer (nullable = false) +---+-----------------------------------+ |a |my_list | +---+-----------------------------------+ |0 |[[0,3], [9,5], [3,1], [4,2], [3,3]]| |1 |[[1,7], [4,6], [5,9], [6,4], [3,9]]| +---+-----------------------------------+

我需要在每个结构列表上运行一个函数.函数原型类似于下面的函数

I need to run a function over each struct list. The function prototype is similar to the function below

case class DataPoint(b: Int, c: Int) def do_something_with_data(data: Seq[DataPoint]): Double = { // This is an example. I don't actually want the sum data.map(data_point => data_point.b + data_point.c).sum }

我想将此函数的结果存储到另一个DataFrame列中.

I want to store the result of this function to another DataFrame column.

我试图跑步

val my_udf = udf(do_something_with_data(_)) val df_with_result = df.withColumn("result", my_udf($"my_list")) df_with_result.show(false)

得到了

17/07/13 12:33:42 WARN TaskSetManager: Lost task 0.0 in stage 15.0 (TID 225, REDACTED, executor 0): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (array<struct<b:int,c:int>>) => double) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.MapPartitionsRDDpute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDDputeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to $line27.$read$$iw$$iw$DataPoint at $line28.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$do_something_with_data$1.apply(<console>:29) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at $line28.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.do_something_with_data(<console>:29) at $line32.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:29) at $line32.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:29)

是否可以使用这样的UDF,而无需先使用DataFrame API将我的行强制转换为容器结构?

Is it possible to use a UDF like this without first casting my rows to a container struct with the DataFrame API?

做类似的事情:

case class MyRow(a: Long, my_list: Seq[DataPoint]) df.as[MyRow].map(_ => (a, my_list, my_udf(my_list)))

使用DataSet api可以正常工作,但我希望尽可能使用DataFrame API.

using the DataSet api works, but I'd prefer to stick with the DataFrame API if possible.

推荐答案

您不能将案例类用作UDF的输入参数(但可以从UDF返回案例类).要映射结构数组,可以将Seq[Row]传递到UDF:

You cannot use a case-class as the input-argument of your UDF (but you can return case classes from the UDF). To map an array of structs, you can pass in a Seq[Row] to your UDF:

val my_uDF = udf((data: Seq[Row]) => { // This is an example. I don't actually want the sum data.map{case Row(x:Int,y:Int) => x+y}.sum }) df.withColumn("result", my_udf($"my_list")).show +---+--------------------+------+ | a| my_list|result| +---+--------------------+------+ | 0|[[0,3], [5,5], [3...| 41| | 1|[[0,9], [4,9], [6...| 54| +---+--------------------+------+

更多推荐

将Spark UDF与结构序列一起使用

本文发布于:2023-10-04 14:55:24,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1467179.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:序列   结构   Spark   UDF

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!