Skip to content

Commit

Permalink
Remove EasyMock usage in MesosSchedulerBackendSuite
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Feb 12, 2015
1 parent fc5e94d commit 7cca486
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 67 deletions.
11 changes: 4 additions & 7 deletions core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.mock.MockitoSugar

import org.apache.spark.executor.{DataReadMethod, TaskMetrics}
import org.apache.spark.executor.DataReadMethod
import org.apache.spark.rdd.RDD
import org.apache.spark.storage._

// TODO: Test the CacheManager's thread-safety aspects
class CacheManagerSuite extends FunSuite with BeforeAndAfter with MockitoSugar {
var sc : SparkContext = _
class CacheManagerSuite extends FunSuite with LocalSparkContext with BeforeAndAfter
with MockitoSugar {

var blockManager: BlockManager = _
var cacheManager: CacheManager = _
var split: Partition = _
Expand Down Expand Up @@ -58,10 +59,6 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with MockitoSugar {
}.cache()
}

after {
sc.stop()
}

test("get uncached rdd") {
// Do not mock this test, because attempting to match Array[Any], which is not covariant,
// in blockManager.put is a losing battle. You have been warned.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,45 +17,48 @@

package org.apache.spark.scheduler.mesos

import org.apache.spark.executor.MesosExecutorBackend
import org.scalatest.FunSuite
import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext}
import org.apache.spark.scheduler.{SparkListenerExecutorAdded, LiveListenerBus,
TaskDescription, WorkerOffer, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.scheduler.cluster.mesos.{MemoryUtils, MesosSchedulerBackend}
import org.apache.mesos.SchedulerDriver
import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, _}
import org.apache.mesos.Protos.Value.Scalar
import org.easymock.{Capture, EasyMock}
import java.nio.ByteBuffer
import java.util.Collections
import java.util
import org.scalatest.mock.EasyMockSugar

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with EasyMockSugar {
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.mock.MockitoSugar
import org.apache.mesos.SchedulerDriver
import org.apache.mesos.Protos._
import org.apache.mesos.Protos.Value.Scalar
import org.mockito.Mockito._
import org.mockito.Matchers._
import org.mockito.{Matchers, ArgumentCaptor}

import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext}
import org.apache.spark.executor.MesosExecutorBackend
import org.apache.spark.scheduler.{SparkListenerExecutorAdded, LiveListenerBus, TaskDescription,
WorkerOffer, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.scheduler.cluster.mesos.{MemoryUtils, MesosSchedulerBackend}

class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with MockitoSugar {

test("check spark-class location correctly") {
val conf = new SparkConf
conf.set("spark.mesos.executor.home" , "/mesos-home")

val listenerBus = EasyMock.createMock(classOf[LiveListenerBus])
listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
EasyMock.replay(listenerBus)

val sc = EasyMock.createMock(classOf[SparkContext])
EasyMock.expect(sc.getSparkHome()).andReturn(Option("/spark-home")).anyTimes()
EasyMock.expect(sc.conf).andReturn(conf).anyTimes()
EasyMock.expect(sc.executorEnvs).andReturn(new mutable.HashMap).anyTimes()
EasyMock.expect(sc.executorMemory).andReturn(100).anyTimes()
EasyMock.expect(sc.listenerBus).andReturn(listenerBus)
EasyMock.replay(sc)
val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl])
EasyMock.expect(taskScheduler.CPUS_PER_TASK).andReturn(2).anyTimes()
EasyMock.replay(taskScheduler)
val listenerBus = mock[LiveListenerBus]
listenerBus.post(
SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))

val sc = mock[SparkContext]
when(sc.getSparkHome()).thenReturn(Option("/spark-home"))

when(sc.conf).thenReturn(conf)
when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
when(sc.executorMemory).thenReturn(100)
when(sc.listenerBus).thenReturn(listenerBus)
val taskScheduler = mock[TaskSchedulerImpl]
when(taskScheduler.CPUS_PER_TASK).thenReturn(2)

val mesosSchedulerBackend = new MesosSchedulerBackend(taskScheduler, sc, "master")

Expand Down Expand Up @@ -84,20 +87,19 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
.setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}")).setHostname(s"host${id.toString}").build()
}

val driver = EasyMock.createMock(classOf[SchedulerDriver])
val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl])
val driver = mock[SchedulerDriver]
val taskScheduler = mock[TaskSchedulerImpl]

val listenerBus = EasyMock.createMock(classOf[LiveListenerBus])
listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
EasyMock.replay(listenerBus)
val listenerBus = mock[LiveListenerBus]
listenerBus.post(
SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))

val sc = EasyMock.createMock(classOf[SparkContext])
EasyMock.expect(sc.executorMemory).andReturn(100).anyTimes()
EasyMock.expect(sc.getSparkHome()).andReturn(Option("/path")).anyTimes()
EasyMock.expect(sc.executorEnvs).andReturn(new mutable.HashMap).anyTimes()
EasyMock.expect(sc.conf).andReturn(new SparkConf).anyTimes()
EasyMock.expect(sc.listenerBus).andReturn(listenerBus)
EasyMock.replay(sc)
val sc = mock[SparkContext]
when(sc.executorMemory).thenReturn(100)
when(sc.getSparkHome()).thenReturn(Option("/path"))
when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
when(sc.conf).thenReturn(new SparkConf)
when(sc.listenerBus).thenReturn(listenerBus)

val minMem = MemoryUtils.calculateTotalMemory(sc).toInt
val minCpu = 4
Expand All @@ -121,25 +123,24 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
2
))
val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0)))
EasyMock.expect(taskScheduler.resourceOffers(EasyMock.eq(expectedWorkerOffers))).andReturn(Seq(Seq(taskDesc)))
EasyMock.expect(taskScheduler.CPUS_PER_TASK).andReturn(2).anyTimes()
EasyMock.replay(taskScheduler)
when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc)))
when(taskScheduler.CPUS_PER_TASK).thenReturn(2)

val capture = new Capture[util.Collection[TaskInfo]]
EasyMock.expect(
val capture = ArgumentCaptor.forClass(classOf[util.Collection[TaskInfo]])
when(
driver.launchTasks(
EasyMock.eq(Collections.singleton(mesosOffers.get(0).getId)),
EasyMock.capture(capture),
EasyMock.anyObject(classOf[Filters])
Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
capture.capture(),
any(classOf[Filters])
)
).andReturn(Status.valueOf(1)).once
EasyMock.expect(driver.declineOffer(mesosOffers.get(1).getId)).andReturn(Status.valueOf(1)).times(1)
EasyMock.expect(driver.declineOffer(mesosOffers.get(2).getId)).andReturn(Status.valueOf(1)).times(1)
EasyMock.replay(driver)
).thenReturn(Status.valueOf(1))
when(driver.declineOffer(mesosOffers.get(1).getId)).thenReturn(Status.valueOf(1))
when(driver.declineOffer(mesosOffers.get(2).getId)).thenReturn(Status.valueOf(1))

backend.resourceOffers(driver, mesosOffers)

EasyMock.verify(driver)
verify(driver, times(1)).declineOffer(mesosOffers.get(1).getId)
verify(driver, times(1)).declineOffer(mesosOffers.get(2).getId)
assert(capture.getValue.size() == 1)
val taskInfo = capture.getValue.iterator().next()
assert(taskInfo.getName.equals("n1"))
Expand All @@ -151,15 +152,13 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
// Unwanted resources offered on an existing node. Make sure they are declined
val mesosOffers2 = new java.util.ArrayList[Offer]
mesosOffers2.add(createOffer(1, minMem, minCpu))
EasyMock.reset(taskScheduler)
EasyMock.reset(driver)
EasyMock.expect(taskScheduler.resourceOffers(EasyMock.anyObject(classOf[Seq[WorkerOffer]])).andReturn(Seq(Seq())))
EasyMock.expect(taskScheduler.CPUS_PER_TASK).andReturn(2).anyTimes()
EasyMock.replay(taskScheduler)
EasyMock.expect(driver.declineOffer(mesosOffers2.get(0).getId)).andReturn(Status.valueOf(1)).times(1)
EasyMock.replay(driver)
reset(taskScheduler)
reset(driver)
when(taskScheduler.resourceOffers(any(classOf[Seq[WorkerOffer]]))).thenReturn(Seq(Seq()))
when(taskScheduler.CPUS_PER_TASK).thenReturn(2)
when(driver.declineOffer(mesosOffers2.get(0).getId)).thenReturn(Status.valueOf(1))

backend.resourceOffers(driver, mesosOffers2)
EasyMock.verify(driver)
verify(driver, times(1)).declineOffer(mesosOffers2.get(0).getId)
}
}

0 comments on commit 7cca486

Please sign in to comment.