Spark中的并发作业执行

编程入门 行业动态 更新时间:2024-10-27 12:24:42
本文介绍了Spark中的并发作业执行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我使用了以下格式的输入数据:

0 1 2 3 4 5 … 14 Input Location: hdfs://localhost:9000/Input/datasource

我已使用以下代码段使用多个线程将RDD保存为文本文件:

package org.apache.spark.examples; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.avro.ipc.specific.Person; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import scala.Tuple2; class RunnableDemo implements Runnable { private Thread t; private String threadName; private String path; private JavaRDD<String> javaRDD; // private JavaSparkContext javaSparkContext; RunnableDemo(String threadName,JavaRDD<String> javaRDD,String path) { this.threadName=threadName; this.javaRDD=javaRDD; this.path=path; // this.javaSparkContext=javaSparkContext; } @Override public void run() { System.out.println("Running " + threadName ); try { this.javaRDD.saveAsTextFile(path); // System.out.println(this.javaRDD.count()); Thread.sleep(50); } catch (InterruptedException e) { System.out.println("Thread " + threadName + " interrupted."); } System.out.println("Thread " + threadName + " exiting."); // this.javaSparkContext.stop(); } public void start () { System.out.println("Starting " + threadName ); if (t == null) { t = new Thread (this, threadName); t.start (); } } } public class SparkJavaTest { public static void main(String[] args) { //Spark Configurations: SparkConf sparkConf=new SparkConf().setAppName("SparkJavaTest"); JavaSparkContext ctx=new JavaSparkContext(sparkConf); SQLContext sqlContext = new SQLContext(ctx); JavaRDD<String> dataCollection=ctx.textFile("hdfs://yarncluster/Input/datasource"); List<StructField> fields= new ArrayList<StructField>(); fields.add(DataTypes.createStructField("Id", DataTypes.IntegerType,true)); JavaRDD<Row> rowRDD =dataCollection.map( new Function<String, Row>() { @Override public Row call(String record) throws Exception { String[] fields = record.split("\u0001"); return RowFactory.create(Integer.parseInt(fields[0].trim())); } }); StructType schema = DataTypes.createStructType(fields); DataFrame dataFrame =sqlContext.createDataFrame(rowRDD, schema); dataFrame.registerTempTable("data"); long recordsCount=dataFrame.count(); long splitRecordsCount=5; long splitCount =recordsCount/splitRecordsCount; List<JavaRDD<Row>> list1=new ArrayList<JavaRDD<Row>>(); for(int i=0;i<splitCount;i++) { long start = i*splitRecordsCount; long end = (i+1)*splitRecordsCount; DataFrame temp=sqlContext.sql("SELECT * FROM data WHERE Id >="+ start +" AND Id < " + end); list1.add(temp.toJavaRDD()); } long length =list1.size(); int split=0; for (int i = 0; i < length; i++) { JavaRDD rdd1 =list1.get(i); JavaPairRDD rdd3=rdd1.cartesian(rdd1); JavaPairRDD<Row,Row> rdd4=rdd3.filter( new Function<Tuple2<Row,Row>,Boolean>() { public Boolean call(Tuple2<Row,Row> s) { Row line1=s._1; Row line2=s._2; long app1 = Integer.parseInt(line1.get(0).toString()); long app2 = Integer.parseInt(line2.get(0).toString()); if(app1<app2) { return true; } return false; } }); JavaRDD<String> test=rdd4.map(new Function<Tuple2<Row,Row>, String>() { @Override public String call(Tuple2<Row, Row> s) throws Exception { Row data1=s._1; Row data2=s._2; int x =Integer.parseInt(data1.get(0).toString()); int y =Integer.parseInt(data2.get(0).toString()); String result =x +","+ y+","+(x+y); return result; } }); RunnableDemo R =new RunnableDemo("Thread-"+split,test,"hdfs://yarncluster/GettingStarted/Output/"+split); R.start(); split++; R.start(); int index =i; while(index<length) { JavaRDD rdd2 =list1.get(index); rdd3=rdd1.cartesian(rdd2); rdd4=rdd3.filter( new Function<Tuple2<Row,Row>,Boolean>() { public Boolean call(Tuple2<Row,Row> s) { Row line1=s._1; Row line2=s._2; long app1 = Integer.parseInt(line1.get(0).toString()); long app2 = Integer.parseInt(line2.get(0).toString()); if(app1<app2) { return true; } return false; } }); test=rdd4.map(new Function<Tuple2<Row,Row>, String>() { @Override public String call(Tuple2<Row, Row> s) throws Exception { Row data1=s._1; Row data2=s._2; int x =Integer.parseInt(data1.get(0).toString()); int y =Integer.parseInt(data2.get(0).toString()); String result =x +","+ y+","+(x+y); return result; } }); R =new RunnableDemo("Thread-"+split,test,"hdfs://yarncluster/GettingStarted/Output/"+split); R.start(); split++; index++; } } } }

在这种情况下,我遇到了以下异常

我已经尝试了以下链接中提供的解决方案

如何运行并发作业单个Spark上下文在Apache Spark中进行(操作)

但是,我仍然无法解决此问题.

您能指导我解决这个问题吗?

解决方案

首先,您试图使用多个线程在驱动程序节点上执行所有工作.这实际上不是Spark的精神,因为您案例中的每个工作单元都是独立的,并且可以在不同的机器上执行.您在这里有一个玩具示例,但是对于大量数据而言,这将变得非常重要.

更好的方法是使用 mapPartitions 将键范围发送给每个工作程序,并让它们执行相应的SQL查询,然后保存结果,每个工作者一个线程.这将使代码更简洁,更容易推理(一旦您习惯了RDD的工作方式).您显然需要设置并行度和分区数(在此处讨论)适当地输入数据.

您的代码的直接问题是主线程启动其他线程,但不等待它们完成.通常,这会导致生成的线程与父代一起终止(请参见 javadoc ).请注意,在答案中,主函数如何在返回之前在衍生的期货上对生成的期货进行get().

I have used input data with the below format:

0 1 2 3 4 5 … 14 Input Location: hdfs://localhost:9000/Input/datasource

I have used the following code snippet to save RDD as text file using multiple threads:

package org.apache.spark.examples; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.avro.ipc.specific.Person; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import scala.Tuple2; class RunnableDemo implements Runnable { private Thread t; private String threadName; private String path; private JavaRDD<String> javaRDD; // private JavaSparkContext javaSparkContext; RunnableDemo(String threadName,JavaRDD<String> javaRDD,String path) { this.threadName=threadName; this.javaRDD=javaRDD; this.path=path; // this.javaSparkContext=javaSparkContext; } @Override public void run() { System.out.println("Running " + threadName ); try { this.javaRDD.saveAsTextFile(path); // System.out.println(this.javaRDD.count()); Thread.sleep(50); } catch (InterruptedException e) { System.out.println("Thread " + threadName + " interrupted."); } System.out.println("Thread " + threadName + " exiting."); // this.javaSparkContext.stop(); } public void start () { System.out.println("Starting " + threadName ); if (t == null) { t = new Thread (this, threadName); t.start (); } } } public class SparkJavaTest { public static void main(String[] args) { //Spark Configurations: SparkConf sparkConf=new SparkConf().setAppName("SparkJavaTest"); JavaSparkContext ctx=new JavaSparkContext(sparkConf); SQLContext sqlContext = new SQLContext(ctx); JavaRDD<String> dataCollection=ctx.textFile("hdfs://yarncluster/Input/datasource"); List<StructField> fields= new ArrayList<StructField>(); fields.add(DataTypes.createStructField("Id", DataTypes.IntegerType,true)); JavaRDD<Row> rowRDD =dataCollection.map( new Function<String, Row>() { @Override public Row call(String record) throws Exception { String[] fields = record.split("\u0001"); return RowFactory.create(Integer.parseInt(fields[0].trim())); } }); StructType schema = DataTypes.createStructType(fields); DataFrame dataFrame =sqlContext.createDataFrame(rowRDD, schema); dataFrame.registerTempTable("data"); long recordsCount=dataFrame.count(); long splitRecordsCount=5; long splitCount =recordsCount/splitRecordsCount; List<JavaRDD<Row>> list1=new ArrayList<JavaRDD<Row>>(); for(int i=0;i<splitCount;i++) { long start = i*splitRecordsCount; long end = (i+1)*splitRecordsCount; DataFrame temp=sqlContext.sql("SELECT * FROM data WHERE Id >="+ start +" AND Id < " + end); list1.add(temp.toJavaRDD()); } long length =list1.size(); int split=0; for (int i = 0; i < length; i++) { JavaRDD rdd1 =list1.get(i); JavaPairRDD rdd3=rdd1.cartesian(rdd1); JavaPairRDD<Row,Row> rdd4=rdd3.filter( new Function<Tuple2<Row,Row>,Boolean>() { public Boolean call(Tuple2<Row,Row> s) { Row line1=s._1; Row line2=s._2; long app1 = Integer.parseInt(line1.get(0).toString()); long app2 = Integer.parseInt(line2.get(0).toString()); if(app1<app2) { return true; } return false; } }); JavaRDD<String> test=rdd4.map(new Function<Tuple2<Row,Row>, String>() { @Override public String call(Tuple2<Row, Row> s) throws Exception { Row data1=s._1; Row data2=s._2; int x =Integer.parseInt(data1.get(0).toString()); int y =Integer.parseInt(data2.get(0).toString()); String result =x +","+ y+","+(x+y); return result; } }); RunnableDemo R =new RunnableDemo("Thread-"+split,test,"hdfs://yarncluster/GettingStarted/Output/"+split); R.start(); split++; R.start(); int index =i; while(index<length) { JavaRDD rdd2 =list1.get(index); rdd3=rdd1.cartesian(rdd2); rdd4=rdd3.filter( new Function<Tuple2<Row,Row>,Boolean>() { public Boolean call(Tuple2<Row,Row> s) { Row line1=s._1; Row line2=s._2; long app1 = Integer.parseInt(line1.get(0).toString()); long app2 = Integer.parseInt(line2.get(0).toString()); if(app1<app2) { return true; } return false; } }); test=rdd4.map(new Function<Tuple2<Row,Row>, String>() { @Override public String call(Tuple2<Row, Row> s) throws Exception { Row data1=s._1; Row data2=s._2; int x =Integer.parseInt(data1.get(0).toString()); int y =Integer.parseInt(data2.get(0).toString()); String result =x +","+ y+","+(x+y); return result; } }); R =new RunnableDemo("Thread-"+split,test,"hdfs://yarncluster/GettingStarted/Output/"+split); R.start(); split++; index++; } } } }

In this case I have faced the following exception

I have tried the solution provided in the following link

How to run concurrent jobs(actions) in Apache Spark using single spark context

But still, I can’t resolve this issue.

Could you please guide me to resolve this?

解决方案

First of all, you're trying to execute all the work on the driver node using several threads. This is not really in the spirit of Spark, since each unit of work in your case is independent of the others and can be executed on different machines. Your have a toy example here, but this will become really important with large volumes of data.

A better approach would be to use something like mapPartitions to send the range of keys to each worker and let them execute the corresponding SQL queries, then save the results, with one thread per worker. This will make the code cleaner and easier to reason about (once you get used to the way RDDs work). You'd obviously need to set the level of parallelism and number of partitions (talked about here) for your input data appropriately.

The immediate issue with your code is that the main thread starts other threads, but doesn't wait for them to finish. Normally this causes the spawned threads to terminate along with the parent (see the javadoc). Notice how in the answer to the linked question the main function does a get() on the spawned futures before returning.

更多推荐

Spark中的并发作业执行

本文发布于:2023-11-24 03:13:13,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1623763.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:作业   Spark

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!