第28课 Spark天堂之门解密

编程入门 行业动态 更新时间:2024-10-24 00:32:31

第28课 Spark天堂<a href=https://www.elefans.com/category/jswz/34/1761089.html style=之门解密"/>

第28课 Spark天堂之门解密

一:Spark天堂之门:SparkContext!

1,  Spark程序在运行的时候分为Driver和Executors两部分;

2,  Spark的程序编写是基于SparkContext的,具体来说包含两方面:

a)        Spark编程的核心基础---RDD,是由SparkContext来最初创建(第一个RDD一定是由SparkContext来创建的);

b)        Spark程序的调度优化也是基于SparkContext;

3, Spark程序的注册时通过SparkContext实例化时候生产的对象来完成的(其实是SchedulerBackend来注册程序)

4, Spark程序运行的时候要通过Cluster Manager获得具体的计算资源,计算资源的获取也是通过SparkContext产生的对象来申请的(其实是SchedulerBackend来获取计算资源的);

5, SparkContext崩溃或者结束的时候整个Spark程序也结束啦!

 

总结:

SparkContext开启天堂之门:Spark程序是通过SparkContext发布到Spark集群的;

         SparkContext导演天堂世界:Spark程序的运行都是在SparkContext为核心的调度器的指挥下进行的;

SparkContext关闭天堂之门:SparkContext崩溃或者结束的时候整个Spark程序也结束啦!

 

二:SparkContext使用案例鉴赏


三:SparkContext天堂内幕

1,   SparkContext构建的顶级三大核心对象:DAGScheduler、TaskScheduler、ShedulerBackend,其中:

a)        DAGScheduler是面向Job的Stage的高层调度器;

b)        TaskScheduler是一个接口,根据具体的ClusterManager的不同会有不同的实现,Standalone模式下具体的实现是TaskSchedulerImpl;

c)        SchedulerBackend是一个接口,根据具体的ClusterManager的不同会有不同的实现,Standalone模式下具体的实现是SparkDeploySchedulerBackend;

2,从整个程序运行的角度来讲,SparkContext包含四大核心对象:DAGScheduler、TaskScheduler、ShedulerBackend、MapOutputTrackerMaster。

 


// Create and start the scheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

// start TaskScheduler after taskScheduler sets DAGScheduler reference inDAGScheduler's
// constructor
_taskScheduler.start()

 

 

createTaskScheduler:

case   SPARK_REGEX(sparkUrl) =>
   val  scheduler =  new  TaskSchedulerImpl(sc)
   val  masterUrls = sparkUrl.split( "," ).map( "spark://"  + _)
   val  backend =  new  SparkDeploySchedulerBackend(scheduler ,  sc , masterUrls)
  scheduler.initialize(backend)
  (backend ,  scheduler)

 

在sheduler.initialize调用的时候会创建ShedulerPool

this . backend  = backend
// temporarily set rootPool name to empty
rootPool  new  Pool( "" ,  schedulingMode ,  0 ,  0 )
schedulableBuilder  = {
   schedulingMode  match  {
     case  SchedulingMode. FIFO  =>
       new  FIFOSchedulableBuilder( rootPool )
     case  SchedulingMode. FAIR  =>
       new  FairSchedulableBuilder( rootPool ,  conf )
  }
}
schedulableBuilder .buildPools()

 

         SparkDeploySchedulerBackend有三大核心功能:

负责与Master链接注册当前程序;

接收集群中为当前应用程序而分配的计算资源Executor的注册并管理Executors;

负责发送Task到具体的 Executor执行;

 

         补充说明的是:SparkDeploySchedulerBackend是被TaskSchedulerImpl来管理的!

 

// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
// constructor
_taskScheduler .start()

 

 

 

val  command =Command( "org.apache.spark.executor.CoarseGrainedExecutorBackend" ,
   args ,  sc. executorEnvs ,  classPathEntries ++ testingClassPath , libraryPathEntries ,  javaOpts)

 

         当通过SparkDeploySchedulerBackend注册程序给Master的时候会把上述command提交给Master,Master发指令给Worker去启动Executor所在的进程的时候加载的main方法所在的入口类就是command中的CoarseGrainedExecutorBackend,当然你可以实现自己的ExecutorBackened,在CoarseGrainedExecutorBackend中启动Executor(Executor是先注册再实例化),Executor通过线程池并发执行Task

 

 

private [spark]  case class  ApplicationDescription(
    name:  String ,
     maxCores: Option[ Int ] ,
     memoryPerExecutorMB:  Int,
     command: Command ,
     appUiUrl:  String ,
     eventLogDir: Option[URI] = None ,
     // short name of compression codec used when writing event logs, if any (e.g. lzf)
     eventLogCodec: Option[ String ] = None ,
     coresPerExecutor: Option[ Int ] = None ,
     user:  String  = System.getProperty( "user.name" ,  "" )) {

   override def  toString :  String  "ApplicationDescription("  + name + ")"
}  

DT大数据梦工厂

新浪微博: /

更多推荐

第28课 Spark天堂之门解密

本文发布于:2024-02-13 22:55:40,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1760972.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:之门   天堂   Spark

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!