是否可以注册用Scala编写的UDF(或函数)以在PySpark中使用? 例如:
Is it possible to register a UDF (or function) written in Scala to use in PySpark ? E.g.:
val mytable = sc.parallelize(1 to 2).toDF("spam") mytable.registerTempTable("mytable") def addOne(m: Integer): Integer = m + 1 // Spam: 1, 2在Scala中,现在可以进行以下操作:
In Scala, the following is now possible:
val UDFaddOne = sqlContext.udf.register("UDFaddOne", addOne _) val mybiggertable = mytable.withColumn("moreSpam", UDFaddOne(mytable("spam"))) // Spam: 1, 2 // moreSpam: 2, 3我想像在PySpark中使用"UDFaddOne"
I would like to use "UDFaddOne" in PySpark like
%pyspark mytable = sqlContext.table("mytable") UDFaddOne = sqlContext.udf("UDFaddOne") # does not work mybiggertable = mytable.withColumn("+1", UDFaddOne(mytable("spam"))) # does not work背景:我们是一个开发人员团队,一些使用Scala编码,一些使用Python,并且希望共享已经编写的函数.也可以将其保存到库中并导入.
Background: We are a team of developpers, some coding in Scala and some in Python, and would like to share already written functions. It would also be possible to save it into a library and import it.
推荐答案据我所知,PySpark没有提供与 callUDF 功能,因此无法直接访问已注册的UDF.
As far as I know PySpark doesn't provide any equivalent of the callUDF function and because of that it is not possible to access registered UDF directly.
这里最简单的解决方案是使用原始SQL表达式:
The simplest solution here is to use raw SQL expression:
mytable.withColumn("moreSpam", expr("UDFaddOne({})".format("spam"))) ## OR sqlContext.sql("SELECT *, UDFaddOne(spam) AS moreSpam FROM mytable") ## OR mytable.selectExpr("*", "UDFaddOne(spam) AS moreSpam")这种方法相当有限,因此,如果您需要支持更复杂的工作流程,则应构建一个程序包并提供完整的Python包装器.在我对 Spark:如何使用Scala或Java用户定义函数映射Python的答案中,您会找到并举例说明UDAF包装器.
This approach is rather limited so if you need to support more complex workflows you should build a package and provide complete Python wrappers. You'll find and example UDAF wrapper in my answer to Spark: How to map Python with Scala or Java User Defined Functions?
更多推荐
从Scala将UDF注册到SqlContext以在PySpark中使用
发布评论