admin管理员组文章数量:1612060
计算能力调度器算法
总结
- 为什么有计算能力调度器
克服现有单一队列fifo 性能低效问题,适合于多用户共享集群的环境的调度器
好比说部署一个hdfs有不同的用户在记录数据并进行分析,需要按用户级别进行集群资源使用的控制
也可以简单这么理解 有dfs 和mine 2帐号进行任务的提交,可以控制这2个用户的集群资源使用率
- Capacity Scheduler支持以下特性
1. 支持多个队列,某个作业可被提交到某一个队列中(默认只有一个命名为default的队列)。每个队列会配置一定比例的计算资源,且所有提交到队列中的作业共享该队列中的资源。
2. 空闲资源会被分配给那些未达到资源使用上限的队列,当某个未达到资源的队列需要资源时,一旦出现空闲资源资源,便会分配给他们
3.队列支持作业优先级调度(默认是FIFO) 默认不支持优先级,需要配置
mapred.capacity-scheduler.queue. queueName .supports-priority*************************************************************************** - 3. 计算能力调度器算法分析
在capacity中,存在三种粒度的对象,分别为:queue、job和task,它们均需要维护的一些信息:
(1) queue维护的信息
@ queueName:queue的名称
@ ulMin:每个用户的可用的最少资源量(所有用户均相同),需用户在配置文件中指定
@ capacityPercent:计算资源比例,需用户在配置文件中指定
@ numJobsByUser:每个用户的作业量,用以跟踪每个用户提交的作业量,并进行数量的上限限制。
该队列中map 或reduce task的属性:
@ capacity:实际的计算资源量,这个随着tasktracker中slot数目变化(用户可能在添加或减少机器节点)而动态变化,大小为:capacityPercent*mapClusterCapacity/100
@ numRunningTasks:正在running的task数目
@ numSlotsOccupied:正在running的task占用的slot总数,注意,在Capacity Scheduler中,running task与slot不一定是一一对应的,每个task可获取多个slot,这主要是因为该调度支持内存资源调度,某个task可能需要多个slot包含的内存量。
@ numSlotsOccupiedByUser:每个用户的作业占用slot总数,用以限制用户使用的资源量。
(2) job维护的信息
priority:作业优先级,分为五个等级,从大到小依次为:VERY_HIGH,HIGH,NORMAL,LOW,VERY_LOW;
numMapTasks/ numReduceTasks :job的map/reduce task总数
runningMapTasks/ runningMapTasks:job正在运行的map/reduce task数
finishedMapTasks/finishedReduceTasks:job已完成的map/reduce task数
……
(3) task维护的信息
task开始运行时间,当前状态等
3.2 计算能力调度算法
当某个tasktracker上出现空闲slot时,调度器依次选择一个queue、(选中的queue中的)job、(选中的job中的)task,并将该slot分配给该task。下面介绍选择queue、job和task所采用的策略:
(1) 选择queue:将所有queue按照资源使用率(numSlotsOccupied/capacity)由小到大排序,依次进行处理,直到找到一个合适的job。
(2) 选择job:在当前queue中,所有作业按照作业提交时间和作业优先级进行排序(假设开启支持优先级调度功能,默认不支持,需要在配置文件中开启),调度依次考虑每个作业,选择符合两个条件的job: [1] 作业所在的用户未达到资源使用上限 [2] 该TaskTracker所在的节点剩余的内存足够该job的task使用。
(3) 选择task,同大部分调度器一样,考虑task的locality和资源使用情况。(即:调用JobInProgress中的obtainNewMapTask()/obtainNewReduceTask()方法)
算法分析部分取自网上先人的一些文章,对理解源代码有一定的思络帮助 - capacity 算法的相关类不多
- CapacitySchedulerConf.java:管理配置文件
- CapacityTaskScheduler.java:调度器的核心代码
- JobQueuesManager.java:管理作业队列
- MemoryMatcher.java:用于判断job与内存容量是否匹配
- JobInitializationPoller.java:作业初始化类,用户可同时启动多个线程,加快作业初始化速度。
- 相关的粒度的各种控制都在 CapacitySchedulerQueue 类进行控制
- CapacityTaskScheduler 分析
(a)CapacityTaskScheduler.start():调度器初始化,包括加载配置文件,初始化各种
对象和变量等。
(b)CapacityTaskScheduler.assignTasks():当有一个 TaskTracker 的 HeartBeat 到达
JobTracker 时,如果有空闲的 slot,JobTracker 会调用 Capacity Scheduler 中的 assignTasks
方法,该方法会为该 TaskTracker 需找若干个合适的 task。
(c)TaskSchedulingMgr.assignTasks():对外提供的最直接的调用函数,主要作用是为
TaskTracker 选择一个合适的 task,该函数会依次扫描系统中所有的 queue(queue 已经被
排好序,排序类为 TaskSchedulingMgr.QueueComparator,根据runningTasks/Capacity(indicates how much 'free space' the queue has)由小到大排列),对于每个 queue,调用getTaskFromQueue()。
(d)TaskSchedulingMgr.getTaskFromQueue():从队列中选择一个符合条件的作业,
包括用户的资源量上限,TaskTracker 空闲内存等。
计算能力调度算法将作业以队列为单位进行划分,适合于多用户共享集群的情况。当
TaskTracker 出现空闲时,会根据计算能力调度算法依次选择队列、作业和任务。这种多队
列并行执行的方式可以提高系统资源的利用率以及系统的执行效率。
默认只有一个default队列需要增加队列
<property>
<name>mapred.queue.names</name>
<value>default,secondqueue</value>
</description>
</property>
设置调度算法<property>
<name>mapred.jobtracker.taskScheduler</name>
<value>org.apache.hadoop.mapred.CapacityTaskScheduler</value>
<description>The class responsible for scheduling the tasks.</description>
</property>
设置capacity的细节参数
mapred.capacity-scheduler.queue.<queue-name>.property-name.
详见capacity-scheduler.xml
以capacity-scheduler.xml当前配置来说明算法的运行情况
这里设置了整个集群的承载量,及2队列的占用比率queue 占用85% secondqueue占用15%
<property>
<name>mapred.capacity-scheduler.maximum-system-jobs</name>
<value>10</value>
<description>Maximum number of jobs in the system which can be initialized, concurrently, by the CapacityScheduler. </description>
</property>
<property>
<name>mapred.capacity-scheduler.queue.default.capacity</name>
<value>85</value>
<description>Percentage of the number of slots in the cluster that are to be available for jobs in this queue. </description>
</property>
<property>
<name>mapred.capacity-scheduler.queue.secondqueue.capacity</name>
<value>15</value>
</property>
用于一个队列里的每个用户的平均量<property>
<name>mapred.capacity-scheduler.queue.secondqueue.minimum-user-limit-percent</name>
<value>25</value>
</property>
队列最大吞吐量,相当用一个最大队列容量同时能容量多少任务
<property>
<name>mapred.capacity-scheduler.queue.secondqueue.init-accept-jobs-factor</name>
<value>3</value>
</property>
以上几个参数实现运行的情况如下
2011-09-11 15:07:00,705 INFO org.apache.hadoop.mapred.CapacityTaskScheduler: Initializing 'default' queue with cap=85.0, maxCap=-1.0, ulMin=100, ulMinFactor=1.0, supportsPriorities=false, maxJobsToInit=9, maxJobsToAccept=90, maxActiveTasks=200000, maxJobsPerUserToInit=9, maxJobsPerUserToAccept=90, maxActiveTasksPerUser=100000
2011-09-11 15:07:00,706 INFO org.apache.hadoop.mapred.CapacityTaskScheduler: Initializing 'secondqueue' queue with cap=15.0, maxCap=-1.0, ulMin=25, ulMinFactor=1.0, supportsPriorities=true, maxJobsToInit=2, maxJobsToAccept=6, maxActiveTasks=200000, maxJobsPerUserToInit=1, maxJobsPerUserToAccept=3, maxActiveTasksPerUser=100000
当前default的各项值是如何计算得出的
maxJobsToInit 当前queue的最大并发运行任务数
int maxJobsToInit = (int)Math.ceil(maxSystemJobs * capacityPercent/100.0);
9= 10*85%
maxJobsPerUserToInit 当前queue里的用户最大并发运行任务数
int maxJobsPerUserToInit =
(int)Math.ceil(maxSystemJobs * capacityPercent/100.0 * ulMin/100.0);
以secondqueue来说 他的最大并发运行任务数是2 设了minimum-user-limit-percent =25 得出的
maxJobsPerUserToInit = 1
int maxJobsToAccept = maxJobsToInit * jobInitToAcceptFactor;
那么按照配置 secondqueue的情况就是 queue支持任务优先级排序,队伍最大支持并发运行2个任务,最大能容量6个任务,相当于会有4个任务在waiting 状态 ,maxActiveTasks,maxActiveTasksPerUser指定任务运行过程中的最大task 参数直接指定
secondqueue 队伍中的用户单个用户最多能执行1个任务,单用户最多容量3个任务
当任务数量提交超过maxJobsToAccept或用户的maxJobsPerUserToAccept 将直接提示队伍满,不能提交任务
以上总结以dfs 及mine帐户 在 static-1.space|app-25.space作测试得出的结论
相应的代码见 CapacitySchedulerQueue checkJobSubmissionLimits ()方法int queueWaitingJobs = getNumWaitingJobs();
int queueInitializingJobs = getNumInitializingJobs();
int queueRunningJobs = getNumRunningJobs();
if ((queueWaitingJobs + queueInitializingJobs + queueRunningJobs) >= maxJobsToAccept)
{ throw new IOException( "Job '" + job.getJobID() + "' from user '" + user + "' rejected since queue '" + queueName + "' already has " + queueWaitingJobs + " waiting jobs, " + queueInitializingJobs + " initializing jobs and " + queueRunningJobs + " running jobs - Exceeds limit of " + maxJobsToAccept + " jobs to accept"); } // Across all jobs of the user
int userWaitingJobs = getNumWaitingJobsByUser(user);
int userInitializingJobs = getNumInitializingJobsByUser(user);
int userRunningJobs = getNumRunningJobsByUser(user);
if ((userWaitingJobs + userInitializingJobs + userRunningJobs) >= maxJobsPerUserToAccept)
{ throw new IOException( "Job '" + job.getJobID() + "' rejected since user '" + user + "' already has " + userWaitingJobs + " waiting jobs, " + userInitializingJobs + " initializing jobs and " + userRunningJobs + " running jobs - " + " Exceeds limit of " + maxJobsPerUserToAccept + " jobs to accept" + " in queue '" + queueName + "' per user"); }
最终的总结
Queue Name State Scheduling Information
default running Queue configuration
Capacity Percentage: 85.0%
User Limit: 100%
Priority Supported: NO
-------------
Map tasks
Capacity: 17 slots
Used capacity: 0 (0.0% of Capacity)
Running tasks: 0
-------------
Reduce tasks
Capacity: 17 slots
Used capacity: 0 (0.0% of Capacity)
Running tasks: 0
-------------
Job info
Number of Waiting Jobs: 0
Number of Initializing Jobs: 0
Number of users who have submitted jobs: 0
secondqueue running Queue configuration
Capacity Percentage: 15.0%
User Limit: 25%
Priority Supported: YES
-------------
Map tasks
Capacity: 3 slots
Used capacity: 0 (0.0% of Capacity)
Running tasks: 0
-------------
Reduce tasks
Capacity: 3 slots
Used capacity: 0 (0.0% of Capacity)
Running tasks: 0
-------------
Job info
Number of Waiting Jobs: 0
Number of Initializing Jobs: 0
Number of users who have submitted jobs: 0
capacity-scheduler在多用户任务并行运行环境下,通过对现有集群的slots进行按queue 比率来进行划分
进而达到队列内不同用户的比率控制,
相比hadoop 默认的JobQueueTaskScheduler 单一队列fifo,传统无特点的调度算法 是一个极大的改进,推荐使用
Hadoop调度算法CapacityScheduler源码分析:http://blog.csdn/zhoujq/article/details/6737441
版权声明:本文标题:hadoop Capacity Scheduler解析 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:https://www.elefans.com/dianzi/1728621846a1166454.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论