我试图了解如何将Spark作业提交给Apache Livy.
I am trying to understand how to submit Spark job to Apache Livy.
我在POM.xml中添加了以下API:
I added the following API to my POM.xml:
<dependency> <groupId>com.cloudera.livy</groupId> <artifactId>livy-api</artifactId> <version>0.3.0</version> </dependency> <dependency> <groupId>com.cloudera.livy</groupId> <artifactId>livy-scala-api_2.11</artifactId> <version>0.3.0</version> </dependency>然后我在Spark中有以下代码,我想根据请求提交给Livy.
Then I have the following code in Spark that I want to submit to Livy on request.
import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.functions._ object Test { def main(args: Array[String]) { val spark = SparkSession.builder() .appName("Test") .master("local[*]") .getOrCreate() import spark.sqlContext.implicits._ implicit val sparkContext = spark.sparkContext // ... } }要使用以下代码创建LivyClient实例并将应用程序代码上传到Spark上下文:
To have the following code that creates a LivyClient instance and uploads the application code to the Spark context:
val client = new LivyClientBuilder() .setURI(new URI(livyUrl)) .build() try { client.uploadJar(new File(testJarPath)).get() client.submit(new Test()) } finally { client.stop(true) }但是,问题是Test的代码不适用于Apache Livy.
However, the problem is that the code of Test is not adapted to be used with Apache Livy.
如何调整Test对象的代码以便能够运行client.submit(new Test())?
How can I adjust the code of Test object in order to be able to run client.submit(new Test())?
推荐答案您的Test类需要实现Livy的Job接口,并且需要在您的Test类中实现其call方法,从此处开始将获得对jobContext/SparkContext的访问权限.然后,您可以在submit方法中传递Test的实例
Your Test class needs to implement Livy's Job interface and you need to implement its call method in your Test class, from where you will get access to jobContext/SparkContext. You can then pass the instance of Test in the submit method
您不必自己创建SparkSession,Livy会在集群上创建它,您可以在call方法中访问该上下文.
You don't have to create SparkSession by yourself, Livy will create it on the cluster and you can access that context in your call method.
您可以在此处找到有关Livy编程API的更多详细信息: https ://livy.incubator.apache/docs/latest/programmatic-api.html
You can find more detailed information on Livy's programmatic API here: livy.incubator.apache/docs/latest/programmatic-api.html
这是Test Class的示例实现:
Here's a sample implementation of Test Class:
import com.cloudera.livy.{Job, JobContext} class Test extends Job[Int]{ override def call(jc: JobContext): Int = { val spark = jc.sparkSession() // Do anything with SparkSession 1 //Return value } }更多推荐
如何将Spark作业提交给Apache Livy?
发布评论