@@ -81,6 +81,10 @@ private[spark] class TaskSchedulerImpl(
8181 private val speculationScheduler =
8282 ThreadUtils .newDaemonSingleThreadScheduledExecutor(" task-scheduler-speculation" )
8383
84+ // whether to prefer assigning tasks to executors that contain shuffle files
85+ val shuffleBiasedTaskSchedulingEnabled =
86+ conf.getBoolean(" spark.scheduler.shuffleBiasedTaskScheduling.enabled" , false )
87+
8488 // Threshold above which we warn user initial TaskSet may be starved
8589 val STARVATION_TIMEOUT_MS = conf.getTimeAsMs(" spark.starvation.timeout" , " 15s" )
8690
@@ -377,11 +381,7 @@ private[spark] class TaskSchedulerImpl(
377381 }
378382 }.getOrElse(offers)
379383
380- val shuffledOffers = shuffleOffers(filteredOffers)
381- // Build a list of tasks to assign to each worker.
382- val tasks = shuffledOffers.map(o => new ArrayBuffer [TaskDescription ](o.cores / CPUS_PER_TASK ))
383- val availableCpus = shuffledOffers.map(o => o.cores).toArray
384- val availableSlots = shuffledOffers.map(o => o.cores / CPUS_PER_TASK ).sum
384+ var tasks : Seq [Seq [TaskDescription ]] = Nil
385385 val sortedTaskSets = rootPool.getSortedTaskSetQueue
386386 for (taskSet <- sortedTaskSets) {
387387 logDebug(" parentName: %s, name: %s, runningTasks: %s" .format(
@@ -391,11 +391,36 @@ private[spark] class TaskSchedulerImpl(
391391 }
392392 }
393393
394+ // If shuffle-biased task scheduling is enabled, then first assign as many tasks as possible to
395+ // executors containing active shuffle files, followed by assigning to executors with inactive
396+ // shuffle files, and then finally to those without shuffle files. This bin packing allows for
397+ // more efficient dynamic allocation in the absence of an external shuffle service.
398+ val partitionedAndShuffledOffers = partitionAndShuffleOffers(filteredOffers)
399+ for (shuffledOffers <- partitionedAndShuffledOffers.map(_._2)) {
400+ tasks ++= doResourceOffers(shuffledOffers, sortedTaskSets)
401+ }
402+
403+ // TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the barrier tasks don't get
404+ // launched within a configured time.
405+ if (tasks.size > 0 ) {
406+ hasLaunchedTask = true
407+ }
408+ return tasks
409+ }
410+
411+ private def doResourceOffers (
412+ shuffledOffers : IndexedSeq [WorkerOffer ],
413+ sortedTaskSets : IndexedSeq [TaskSetManager ]): Seq [Seq [TaskDescription ]] = {
414+ // Build a list of tasks to assign to each worker.
415+ val tasks = shuffledOffers.map(o => new ArrayBuffer [TaskDescription ](o.cores / CPUS_PER_TASK ))
416+ val availableCpus = shuffledOffers.map(o => o.cores).toArray
417+ val availableSlots = shuffledOffers.map(o => o.cores / CPUS_PER_TASK ).sum
418+
394419 // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
395420 // of locality levels so that it gets a chance to launch local tasks on all of them.
396421 // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
397422 for (taskSet <- sortedTaskSets) {
398- // Skip the barrier taskSet if the available slots are less than the number of pending tasks.
423+ // Skip the barrier taskSet if the available slots are less than the number of pending tasks
399424 if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
400425 // Skip the launch process.
401426 // TODO SPARK-24819 If the job requires more slots than available (both busy and free
@@ -439,25 +464,40 @@ private[spark] class TaskSchedulerImpl(
439464 .mkString(" ," )
440465 addressesWithDescs.foreach(_._2.properties.setProperty(" addresses" , addressesStr))
441466
442- logInfo(s " Successfully scheduled all the ${addressesWithDescs.size} tasks for barrier " +
443- s " stage ${taskSet.stageId}. " )
467+ logInfo(s " Successfully scheduled all the ${addressesWithDescs.size} tasks for " +
468+ s " barrier stage ${taskSet.stageId}. " )
444469 }
445470 }
446471 }
472+ tasks
473+ }
447474
448- // TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the barrier tasks don't get
449- // launched within a configured time.
450- if (tasks.size > 0 ) {
451- hasLaunchedTask = true
475+ /**
476+ * Shuffle offers around to avoid always placing tasks on the same workers.
477+ * If shuffle-biased task scheduling is enabled, this function partitions the offers based on
478+ * whether they have active/inactive/no shuffle files present.
479+ */
480+ def partitionAndShuffleOffers (offers : IndexedSeq [WorkerOffer ])
481+ : IndexedSeq [(ExecutorShuffleStatus .Value , IndexedSeq [WorkerOffer ])] = {
482+ if (shuffleBiasedTaskSchedulingEnabled && offers.length > 1 ) {
483+ // bias towards executors that have active shuffle outputs
484+ val execShuffles = mapOutputTracker.getExecutorShuffleStatus
485+ offers
486+ .groupBy(offer => execShuffles.getOrElse(offer.executorId, ExecutorShuffleStatus .Unknown ))
487+ .mapValues(doShuffleOffers)
488+ .toStream
489+ .sortBy(_._1) // order: Active, Inactive, Unknown
490+ .toIndexedSeq
491+ } else {
492+ IndexedSeq ((ExecutorShuffleStatus .Unknown , doShuffleOffers(offers)))
452493 }
453- return tasks
454494 }
455495
456496 /**
457- * Shuffle offers around to avoid always placing tasks on the same workers. Exposed to allow
458- * overriding in tests, so it can be deterministic.
497+ * Does the shuffling for [[ partitionAndShuffleOffers() ]]. Exposed to allow overriding in tests,
498+ * so that it can be deterministic.
459499 */
460- protected def shuffleOffers (offers : IndexedSeq [WorkerOffer ]): IndexedSeq [WorkerOffer ] = {
500+ protected def doShuffleOffers (offers : IndexedSeq [WorkerOffer ]): IndexedSeq [WorkerOffer ] = {
461501 Random .shuffle(offers)
462502 }
463503
0 commit comments