如何使用 PySpark 并行运行独立转换?

编程入门 行业动态 更新时间:2024-10-26 19:38:47
本文介绍了如何使用 PySpark 并行运行独立转换?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

限时送ChatGPT账号..

我正在尝试使用 PySpark 在单个 RDD 上并行运行 2 个执行完全独立转换的函数.有哪些方法可以做到这一点?

I am trying to run 2 functions doing completely independent transformations on a single RDD in parallel using PySpark. What are some methods to do the same?

def doXTransforms(sampleRDD):
    (X transforms)

def doYTransforms(sampleRDD):
    (Y Transforms)

if __name__ == "__main__":
    sc = SparkContext(appName="parallelTransforms")
    sqlContext = SQLContext(sc)
    hive_context = HiveContext(sc)

    rows_rdd = hive_context.sql("select * from tables.X_table")

    p1 = Process(target=doXTransforms , args=(rows_rdd,))
    p1.start()
    p2 = Process(target=doYTransforms, args=(rows_rdd,))  
    p2.start()
    p1.join()
    p2.join()
    sc.stop()

这行不通,我现在明白这行不通.但是有没有其他方法可以使这项工作?具体有没有python-spark特定的解决方案?

This does not work and I now understand this will not work. But is there any alternative way to make this work? Specifically are there any python-spark specific solutions?

推荐答案

只需使用线程并确保集群有足够的资源同时处理两个任务.

Just use threads and make sure that cluster have enough resources to process both tasks at the same time.

from threading import Thread
import time

def process(rdd, f):
    def delay(x):
        time.sleep(1)
        return f(x)
    return rdd.map(delay).sum()


rdd = sc.parallelize(range(100), int(sc.defaultParallelism / 2))

t1 = Thread(target=process, args=(rdd, lambda x: x * 2))
t2  = Thread(target=process, args=(rdd, lambda x: x + 1))
t1.start(); t2.start()

可以说这在实践中并不常用,但其他方面应该可以正常工作.

Arguably this is not that often useful in practice but otherwise should work just fine.

您可以进一步使用应用内调度 带有 FAIR 调度器和调度器池,可以更好地控制执行策略.

You can further use in-application scheduling with FAIR scheduler and scheduler pools for a better control over execution strategy.

您也可以尝试 pyspark-asyncactions(免责声明 -这个答案的作者也是包的作者)它提供了一组围绕 Spark API 和 concurrent.futures 的包装器:

You can also try pyspark-asyncactions (disclaimer - the author of this answer is also the author of the package) which provides a set of wrappers around Spark API and concurrent.futures:

import asyncactions
import concurrent.futures

f1 = rdd.filter(lambda x: x % 3 == 0).countAsync()
f2 = rdd.filter(lambda x: x % 11 == 0).countAsync()

[x.result() for x in concurrent.futures.as_completed([f1, f2])]

这篇关于如何使用 PySpark 并行运行独立转换?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

更多推荐

[db:关键词]

本文发布于:2023-04-19 01:40:22,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/951992.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:如何使用   独立   PySpark

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!