我想在一个RDD上同时执行2个操作.我已经写了这样的代码
I want to perform 2 operations on a single RDD concurrently. I have written code like this
val conf = new SparkConf().setAppName("Foo") val sc = new SparkContext(conf) val sqlSc = new SQLContext(sc) sc.hadoopConfiguration.set("mapreduce.input.fileinputformat.input.dir.recursive","true") val inputPath = path val rdd = sc.textFile(inputPath).cache() val f1 = Future { val schama1 = StructType(List(StructField("a", StringType, true), StructField("b", StringType, true), StructField("c", LongType, true))) val rdd1 = rdd.map(func1).filter(_.isDefined).flatMap(x => x) val df1 = sqlSc.createDataFrame(rdd, schema) formSubmissionDataFrame.save("/foo/", "com.databricks.spark.avro") 0 } val f2 = Future { val schema2 = StructType(List(StructField("d", StringType, true), StructField("e", StringType, true))) val rdd2 = rdd.map(func2).filter(_.isDefined).flatMap(x => x) val df2 = sqlSc.createDataFrame(rdd2, schema2) pageViewDataFrame.save("/bar/", "com.databricks.spark.avro") 0 } val result = for { r1 <- f1 r2 <- f2 } yield(r1 + r2) result onSuccess{ case r => println("done") } Await.result(result, Duration.Inf)运行此代码时,看不到预期的效果.目录栏上有很多临时文件,等等...但是foo没有什么...所以看来这两个数据集不是并行创建的.
When I am running this code, I don't see the desired effect. the directory bar has lots of temporary files etc... but foo has nothing... so it seems the two datasets are not being created in parallel.
在spark驱动程序中使用Future是个好主意吗?我做得对吗?我该怎么做?
Is it a good idea to use a future inside the spark driver? am I doing it correctly? should I do anything differently.
推荐答案要并行执行两个或多个Spark JOBS(动作),Spark上下文需要在FAIR调度程序模式下运行.
For executing two or more Spark JOBS (actions) in parallel, the Spark Context needs to be running in FAIR scheduler mode.
在用于所有转换的驱动程序中,仅生成依赖关系图以进行执行,但是实际执行仅在调用动作时发生.通常,驱动程序会在Spark从站管理的节点之间执行时等待执行.在您的情况下,Spark Master直到第一个作业结束才开始执行第二个作业,因为默认情况下,Spark Scheduling是FIFO.
In the driver program for all transformation only the dependency graph is generated for execution however the actual execution happens only when an action is called. Typically the driver waits as the execution happens across nodes managed by Spark slaves. In your case the Spark Master doesn't start executing the second job till the first one is over, because by default Spark Scheduling is FIFO.
您可以按如下所示设置conf以启用并行执行
You can set the conf as follows to enable parallel execution
val conf = new SparkConf().setMaster(...).setAppName(...) conf.set("spark.scheduler.mode", "FAIR") val sc = new SparkContext(conf)有关详细信息,请访问应用程序
For details visit Spark Scheduling within an application
更多推荐
在Spark作业中使用Future
发布评论