使用python单元测试pyspark代码

编程入门 行业动态 更新时间:2024-10-12 14:25:12
本文介绍了使用python单元测试pyspark代码的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我在 pyspark 中有脚本,如下所示.我想在这个脚本中对 function 进行单元测试.

I have script in pyspark like below. I want to unit test a function in this script.

def rename_chars(column_name): chars = ((' ', '_&'), ('.', '_$')) new_cols = reduce(lambda a, kv: a.replace(*kv), chars, column_name) return new_cols def column_names(df): changed_col_names = df.schema.names for cols in changed_col_names: df = df.withColumnRenamed(cols, rename_chars(cols)) return df

我写了一个像下面这样的unittest来测试这个功能.

I wrote a unittest like below to test the function.

但是我不知道如何提交unittest.我已经完成了 spark-submit 没有做任何事情.

But I don't know how to submit the unittest. I have done spark-submit which doesn't do anything.

import unittest from my_script import column_names from pyspark import SparkContext, SparkConf from pyspark.sql import HiveContext conf = SparkConf() sc = SparkContext(conf=conf) sqlContext = HiveContext(sc) cols = ['ID', 'NAME', 'last.name', 'abc test'] val = [(1, 'Sam', 'SMITH', 'eng'), (2, 'RAM', 'Reddy', 'turbine')] df = sqlContext.createDataFrame(val, cols) class RenameColumnNames(unittest.TestCase): def test_column_names(self): df1 = column_names(df) result = df1.schema.names expected = ['ID', 'NAME', 'last_$name', 'abc_&test'] self.assertEqual(result, expected)

如何集成此脚本以用作 unittest

How can I integrate this script to work as a unittest

我可以在安装了 pyspark 的节点上运行什么?

what can I run this on a node where I have pyspark installed?

推荐答案

Pyspark 单元测试指南

1.您需要从站点下载 Spark 发行版并解压.或者,如果您已经有 Spark 和 Python 的工作分发版,只需安装 pyspark:pip install pyspark

1.You need to download Spark distribution from site and unpack it. Or if you already have a working distribution of Spark and Python just install pyspark: pip install pyspark

2.如果需要,像这样设置系统变量:

2.Set system variables like this if needed:

export SPARK_HOME="/home/eugene/spark-1.6.0-bin-hadoop2.6" export PYTHONPATH="$SPARK_HOME/python/:$SPARK_HOME/python/lib/py4j-0.9-src.zip:$PYTHONPATH" export PATH="SPARK_HOME/bin:$PATH"

我在主目录的 .profile 中添加了这个.如果您已经有 Spark 的工作分发版,则可以设置此变量.

I added this in .profile in my home directory. If you already have an working distribution of Spark this variables may be set.

3.另外你可能需要设置:

3.Additionally you may need to setup:

PYSPARK_SUBMIT_ARGS="--jars path/to/hive/jars/jar.jar,path/to/other/jars/jar.jar --conf spark.driver.userClassPathFirst=true --master local[*] pyspark-shell" PYSPARK_PYTHON="/home/eugene/anaconda3/envs/ste/bin/python3"

Python 和 jars? Pyspark使用py4j与java部分进行通信的火花.如果您想解决更复杂的情况,例如使用 Python 中的测试运行 Kafka 服务器,或者像示例中那样使用 Scala 中的 TestHiveContext你应该指定罐子.我是通过Idea运行配置环境变量做到的.

Python and jars? Yes. Pyspark uses py4j to communicate with java part of Spark. And if you want to solve more complicated situation like run Kafka server with tests in Python or use TestHiveContext from Scala like in the example you should specify jars. I did it through Idea run configuration environment variables.

4.你可以使用pyspark/tests.py、pyspark/streaming/tests.py、pyspark/sql/tests.pycode>、pyspark/ml/tests.py、pyspark/mllib/tests.py 脚本,其中包含用于测试 pyspark 应用程序的各种 TestCase 类和示例.在您的情况下,您可以这样做(来自 pyspark/sql/tests.py 的示例):

4.And you could to use pyspark/tests.py, pyspark/streaming/tests.py, pyspark/sql/tests.py, pyspark/ml/tests.py, pyspark/mllib/tests.pyscripts wich contain various TestCase classes and examples for testing pyspark apps. In your case you could do (example from pyspark/sql/tests.py):

class HiveContextSQLTests(ReusedPySparkTestCase): @classmethod def setUpClass(cls): ReusedPySparkTestCase.setUpClass() cls.tempdir = tempfile.NamedTemporaryFile(delete=False) try: cls.sc._jvm.apache.hadoop.hive.conf.HiveConf() except py4j.protocol.Py4JError: cls.tearDownClass() raise unittest.SkipTest("Hive is not available") except TypeError: cls.tearDownClass() raise unittest.SkipTest("Hive is not available") os.unlink(cls.tempdir.name) _scala_HiveContext =\ cls.sc._jvm.apache.spark.sql.hive.test.TestHiveContext(cls.sc._jsc.sc()) cls.sqlCtx = HiveContext(cls.sc, _scala_HiveContext) cls.testData = [Row(key=i, value=str(i)) for i in range(100)] cls.df = cls.sc.parallelize(cls.testData).toDF() @classmethod def tearDownClass(cls): ReusedPySparkTestCase.tearDownClass() shutil.rmtree(cls.tempdir.name, ignore_errors=True)

但您需要在 PYSPARK_SUBMIT_ARGS 中指定 --jars 和 Hive 库,如前所述

but you need to specify --jars with Hive libs in PYSPARK_SUBMIT_ARGS as described earlier

或不使用 Hive:

class SQLContextTests(ReusedPySparkTestCase): def test_get_or_create(self): sqlCtx = SQLContext.getOrCreate(self.sc) self.assertTrue(SQLContext.getOrCreate(self.sc) is sqlCtx)

据我所知,如果 pyspark 是通过 pip 安装的,您还没有在示例中描述的 tests.py. 在这种情况下,只需从 Spark 站点下载分发版并复制代码示例.

As I know if pyspark have been installed through pip, you haven't tests.py described in example. In this case just download the distribution from Spark site and copy code examples.

现在您可以正常运行您的测试用例:python -m unittest test.py

Now you could run your TestCase as a normal: python -m unittest test.py

更新:由于不推荐使用 HiveContext 和 SqlContext 的 spark 2.3.您可以使用 SparkSession Hive API.

update: Since spark 2.3 using of HiveContext and SqlContext is deprecated. You could use SparkSession Hive API.

更多推荐

使用python单元测试pyspark代码

本文发布于:2023-11-24 02:28:04,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1623636.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:单元测试   代码   python   pyspark

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!