Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/spark into SPARK-5592
Browse files Browse the repository at this point in the history
  • Loading branch information
scwf committed Feb 5, 2015
2 parents 9998177 + 9a7ce70 commit f24624f
Show file tree
Hide file tree
Showing 37 changed files with 1,936 additions and 234 deletions.
57 changes: 54 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark
import scala.language.implicitConversions

import java.io._
import java.lang.reflect.Constructor
import java.net.URI
import java.util.{Arrays, Properties, UUID}
import java.util.concurrent.atomic.AtomicInteger
Expand Down Expand Up @@ -387,9 +388,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
executorAllocationManager.foreach(_.start())

// At this point, all relevant SparkListeners have been registered, so begin releasing events
listenerBus.start()

private[spark] val cleaner: Option[ContextCleaner] = {
if (conf.getBoolean("spark.cleaner.referenceTracking", true)) {
Some(new ContextCleaner(this))
Expand All @@ -399,6 +397,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
cleaner.foreach(_.start())

setupAndStartListenerBus()
postEnvironmentUpdate()
postApplicationStart()

Expand Down Expand Up @@ -1563,6 +1562,58 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
/** Register a new RDD, returning its RDD ID */
private[spark] def newRddId(): Int = nextRddId.getAndIncrement()

/**
* Registers listeners specified in spark.extraListeners, then starts the listener bus.
* This should be called after all internal listeners have been registered with the listener bus
* (e.g. after the web UI and event logging listeners have been registered).
*/
private def setupAndStartListenerBus(): Unit = {
// Use reflection to instantiate listeners specified via `spark.extraListeners`
try {
val listenerClassNames: Seq[String] =
conf.get("spark.extraListeners", "").split(',').map(_.trim).filter(_ != "")
for (className <- listenerClassNames) {
// Use reflection to find the right constructor
val constructors = {
val listenerClass = Class.forName(className)
listenerClass.getConstructors.asInstanceOf[Array[Constructor[_ <: SparkListener]]]
}
val constructorTakingSparkConf = constructors.find { c =>
c.getParameterTypes.sameElements(Array(classOf[SparkConf]))
}
lazy val zeroArgumentConstructor = constructors.find { c =>
c.getParameterTypes.isEmpty
}
val listener: SparkListener = {
if (constructorTakingSparkConf.isDefined) {
constructorTakingSparkConf.get.newInstance(conf)
} else if (zeroArgumentConstructor.isDefined) {
zeroArgumentConstructor.get.newInstance()
} else {
throw new SparkException(
s"$className did not have a zero-argument constructor or a" +
" single-argument constructor that accepts SparkConf. Note: if the class is" +
" defined inside of another Scala class, then its constructors may accept an" +
" implicit parameter that references the enclosing class; in this case, you must" +
" define the listener as a top-level class in order to prevent this extra" +
" parameter from breaking Spark's ability to find a valid constructor.")
}
}
listenerBus.addListener(listener)
logInfo(s"Registered listener $className")
}
} catch {
case e: Exception =>
try {
stop()
} finally {
throw new SparkException(s"Exception when registering SparkListener", e)
}
}

listenerBus.start()
}

/** Post the application start event */
private def postApplicationStart() {
// Note: this code assumes that the task scheduler has been initialized and has contacted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.spark.api.python

import java.io.{File, InputStream, IOException, OutputStream}
import java.io.{File}
import java.util.{List => JList}

import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.SparkContext
Expand All @@ -44,4 +46,11 @@ private[spark] object PythonUtils {
def generateRDDWithNull(sc: JavaSparkContext): JavaRDD[String] = {
sc.parallelize(List("a", null, "b"))
}

/**
* Convert list of T into seq of T (for calling API with varargs)
*/
def toSeq[T](cols: JList[T]): Seq[T] = {
cols.toList.toSeq
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -506,13 +506,64 @@ private[spark] class TaskSetManager(
* Get the level we can launch tasks according to delay scheduling, based on current wait time.
*/
private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
while (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex) &&
currentLocalityIndex < myLocalityLevels.length - 1)
{
// Jump to the next locality level, and remove our waiting time for the current one since
// we don't want to count it again on the next one
lastLaunchTime += localityWaits(currentLocalityIndex)
currentLocalityIndex += 1
// Remove the scheduled or finished tasks lazily
def tasksNeedToBeScheduledFrom(pendingTaskIds: ArrayBuffer[Int]): Boolean = {
var indexOffset = pendingTaskIds.size
while (indexOffset > 0) {
indexOffset -= 1
val index = pendingTaskIds(indexOffset)
if (copiesRunning(index) == 0 && !successful(index)) {
return true
} else {
pendingTaskIds.remove(indexOffset)
}
}
false
}
// Walk through the list of tasks that can be scheduled at each location and returns true
// if there are any tasks that still need to be scheduled. Lazily cleans up tasks that have
// already been scheduled.
def moreTasksToRunIn(pendingTasks: HashMap[String, ArrayBuffer[Int]]): Boolean = {
val emptyKeys = new ArrayBuffer[String]
val hasTasks = pendingTasks.exists {
case (id: String, tasks: ArrayBuffer[Int]) =>
if (tasksNeedToBeScheduledFrom(tasks)) {
true
} else {
emptyKeys += id
false
}
}
// The key could be executorId, host or rackId
emptyKeys.foreach(id => pendingTasks.remove(id))
hasTasks
}

while (currentLocalityIndex < myLocalityLevels.length - 1) {
val moreTasks = myLocalityLevels(currentLocalityIndex) match {
case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor)
case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost)
case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty
case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack)
}
if (!moreTasks) {
// This is a performance optimization: if there are no more tasks that can
// be scheduled at a particular locality level, there is no point in waiting
// for the locality wait timeout (SPARK-4939).
lastLaunchTime = curTime
logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " +
s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}")
currentLocalityIndex += 1
} else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) {
// Jump to the next locality level, and reset lastLaunchTime so that the next locality
// wait timer doesn't immediately expire
lastLaunchTime += localityWaits(currentLocalityIndex)
currentLocalityIndex += 1
logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex)} after waiting for " +
s"${localityWaits(currentLocalityIndex)}ms")
} else {
return myLocalityLevels(currentLocalityIndex)
}
}
myLocalityLevels(currentLocalityIndex)
}
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/util/ListenerBus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import org.apache.spark.Logging
*/
private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {

private val listeners = new CopyOnWriteArrayList[L]
// Marked `private[spark]` for access in tests.
private[spark] val listeners = new CopyOnWriteArrayList[L]

/**
* Add a listener to listen events. This method is thread-safe and can be called in any thread.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,22 @@ package org.apache.spark.scheduler
import java.util.concurrent.Semaphore

import scala.collection.mutable
import scala.collection.JavaConversions._

import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
import org.scalatest.Matchers
import org.scalatest.{FunSuite, Matchers}

import org.apache.spark.{LocalSparkContext, SparkContext}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.util.ResetSystemProperties
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext}

class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers with BeforeAndAfter
with BeforeAndAfterAll with ResetSystemProperties {
class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
with ResetSystemProperties {

/** Length of time to wait while draining listener events. */
val WAIT_TIMEOUT_MILLIS = 10000

val jobCompletionTime = 1421191296660L

before {
sc = new SparkContext("local", "SparkListenerSuite")
}

test("basic creation and shutdown of LiveListenerBus") {
val counter = new BasicJobCounter
val bus = new LiveListenerBus
Expand Down Expand Up @@ -127,6 +123,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
}

test("basic creation of StageInfo") {
sc = new SparkContext("local", "SparkListenerSuite")
val listener = new SaveStageAndTaskInfo
sc.addSparkListener(listener)
val rdd1 = sc.parallelize(1 to 100, 4)
Expand All @@ -148,6 +145,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
}

test("basic creation of StageInfo with shuffle") {
sc = new SparkContext("local", "SparkListenerSuite")
val listener = new SaveStageAndTaskInfo
sc.addSparkListener(listener)
val rdd1 = sc.parallelize(1 to 100, 4)
Expand Down Expand Up @@ -185,6 +183,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
}

test("StageInfo with fewer tasks than partitions") {
sc = new SparkContext("local", "SparkListenerSuite")
val listener = new SaveStageAndTaskInfo
sc.addSparkListener(listener)
val rdd1 = sc.parallelize(1 to 100, 4)
Expand All @@ -201,6 +200,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
}

test("local metrics") {
sc = new SparkContext("local", "SparkListenerSuite")
val listener = new SaveStageAndTaskInfo
sc.addSparkListener(listener)
sc.addSparkListener(new StatsReportListener)
Expand Down Expand Up @@ -267,6 +267,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
}

test("onTaskGettingResult() called when result fetched remotely") {
sc = new SparkContext("local", "SparkListenerSuite")
val listener = new SaveTaskEvents
sc.addSparkListener(listener)

Expand All @@ -287,6 +288,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
}

test("onTaskGettingResult() not called when result sent directly") {
sc = new SparkContext("local", "SparkListenerSuite")
val listener = new SaveTaskEvents
sc.addSparkListener(listener)

Expand All @@ -302,6 +304,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
}

test("onTaskEnd() should be called for all started tasks, even after job has been killed") {
sc = new SparkContext("local", "SparkListenerSuite")
val WAIT_TIMEOUT_MILLIS = 10000
val listener = new SaveTaskEvents
sc.addSparkListener(listener)
Expand Down Expand Up @@ -356,21 +359,24 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
assert(jobCounter2.count === 5)
}

test("registering listeners via spark.extraListeners") {
val conf = new SparkConf().setMaster("local").setAppName("test")
.set("spark.extraListeners", classOf[ListenerThatAcceptsSparkConf].getName + "," +
classOf[BasicJobCounter].getName)
sc = new SparkContext(conf)
sc.listenerBus.listeners.collect { case x: BasicJobCounter => x}.size should be (1)
sc.listenerBus.listeners.collect {
case x: ListenerThatAcceptsSparkConf => x
}.size should be (1)
}

/**
* Assert that the given list of numbers has an average that is greater than zero.
*/
private def checkNonZeroAvg(m: Traversable[Long], msg: String) {
assert(m.sum / m.size.toDouble > 0.0, msg)
}

/**
* A simple listener that counts the number of jobs observed.
*/
private class BasicJobCounter extends SparkListener {
var count = 0
override def onJobEnd(job: SparkListenerJobEnd) = count += 1
}

/**
* A simple listener that saves all task infos and task metrics.
*/
Expand Down Expand Up @@ -423,3 +429,19 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
}

}

// These classes can't be declared inside of the SparkListenerSuite class because we don't want
// their constructors to contain references to SparkListenerSuite:

/**
* A simple listener that counts the number of jobs observed.
*/
private class BasicJobCounter extends SparkListener {
var count = 0
override def onJobEnd(job: SparkListenerJobEnd) = count += 1
}

private class ListenerThatAcceptsSparkConf(conf: SparkConf) extends SparkListener {
var count = 0
override def onJobEnd(job: SparkListenerJobEnd) = count += 1
}
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,8 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {

test("delay scheduling with failed hosts") {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"),
("exec3", "host3"))
val taskSet = FakeTask.createTaskSet(3,
Seq(TaskLocation("host1")),
Seq(TaskLocation("host2")),
Expand Down Expand Up @@ -649,6 +650,47 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(manager.resourceOffer("execA", "host3", NO_PREF).get.index === 2)
}

test("SPARK-4939: node-local tasks should be scheduled right after process-local tasks finished") {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"))
val taskSet = FakeTask.createTaskSet(4,
Seq(TaskLocation("host1")),
Seq(TaskLocation("host2")),
Seq(ExecutorCacheTaskLocation("host1", "execA")),
Seq(ExecutorCacheTaskLocation("host2", "execB")))
val clock = new FakeClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)

// process-local tasks are scheduled first
assert(manager.resourceOffer("execA", "host1", NODE_LOCAL).get.index === 2)
assert(manager.resourceOffer("execB", "host2", NODE_LOCAL).get.index === 3)
// node-local tasks are scheduled without delay
assert(manager.resourceOffer("execA", "host1", NODE_LOCAL).get.index === 0)
assert(manager.resourceOffer("execB", "host2", NODE_LOCAL).get.index === 1)
assert(manager.resourceOffer("execA", "host1", NODE_LOCAL) == None)
assert(manager.resourceOffer("execB", "host2", NODE_LOCAL) == None)
}

test("SPARK-4939: no-pref tasks should be scheduled after process-local tasks finished") {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"))
val taskSet = FakeTask.createTaskSet(3,
Seq(),
Seq(ExecutorCacheTaskLocation("host1", "execA")),
Seq(ExecutorCacheTaskLocation("host2", "execB")))
val clock = new FakeClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)

// process-local tasks are scheduled first
assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL).get.index === 1)
assert(manager.resourceOffer("execB", "host2", PROCESS_LOCAL).get.index === 2)
// no-pref tasks are scheduled without delay
assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL) == None)
assert(manager.resourceOffer("execA", "host1", NODE_LOCAL) == None)
assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index === 0)
assert(manager.resourceOffer("execA", "host1", ANY) == None)
}

test("Ensure TaskSetManager is usable after addition of levels") {
// Regression test for SPARK-2931
sc = new SparkContext("local", "test")
Expand Down
Loading

0 comments on commit f24624f

Please sign in to comment.