Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-5735] Replace uses of EasyMock with Mockito #4578

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -329,16 +329,6 @@
<artifactId>scalacheck_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymockclassextension</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
42 changes: 16 additions & 26 deletions core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,18 @@

package org.apache.spark

import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.mock.EasyMockSugar
import org.scalatest.mock.MockitoSugar

import org.apache.spark.executor.{DataReadMethod, TaskMetrics}
import org.apache.spark.executor.DataReadMethod
import org.apache.spark.rdd.RDD
import org.apache.spark.storage._

// TODO: Test the CacheManager's thread-safety aspects
class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar {
var sc : SparkContext = _
class CacheManagerSuite extends FunSuite with LocalSparkContext with BeforeAndAfter
with MockitoSugar {

var blockManager: BlockManager = _
var cacheManager: CacheManager = _
var split: Partition = _
Expand Down Expand Up @@ -57,10 +59,6 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
}.cache()
}

after {
sc.stop()
}

test("get uncached rdd") {
// Do not mock this test, because attempting to match Array[Any], which is not covariant,
// in blockManager.put is a losing battle. You have been warned.
Expand All @@ -75,29 +73,21 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
}

test("get cached rdd") {
expecting {
val result = new BlockResult(Array(5, 6, 7).iterator, DataReadMethod.Memory, 12)
blockManager.get(RDDBlockId(0, 0)).andReturn(Some(result))
}
val result = new BlockResult(Array(5, 6, 7).iterator, DataReadMethod.Memory, 12)
when(blockManager.get(RDDBlockId(0, 0))).thenReturn(Some(result))

whenExecuting(blockManager) {
val context = new TaskContextImpl(0, 0, 0, 0)
val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
assert(value.toList === List(5, 6, 7))
}
val context = new TaskContextImpl(0, 0, 0, 0)
val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
assert(value.toList === List(5, 6, 7))
}

test("get uncached local rdd") {
expecting {
// Local computation should not persist the resulting value, so don't expect a put().
blockManager.get(RDDBlockId(0, 0)).andReturn(None)
}
// Local computation should not persist the resulting value, so don't expect a put().
when(blockManager.get(RDDBlockId(0, 0))).thenReturn(None)

whenExecuting(blockManager) {
val context = new TaskContextImpl(0, 0, 0, 0, true)
val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
assert(value.toList === List(1, 2, 3, 4))
}
val context = new TaskContextImpl(0, 0, 0, 0, true)
val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
assert(value.toList === List(1, 2, 3, 4))
}

test("verify task metrics updated correctly") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,45 +17,48 @@

package org.apache.spark.scheduler.mesos

import org.apache.spark.executor.MesosExecutorBackend
import org.scalatest.FunSuite
import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext}
import org.apache.spark.scheduler.{SparkListenerExecutorAdded, LiveListenerBus,
TaskDescription, WorkerOffer, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.scheduler.cluster.mesos.{MemoryUtils, MesosSchedulerBackend}
import org.apache.mesos.SchedulerDriver
import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, _}
import org.apache.mesos.Protos.Value.Scalar
import org.easymock.{Capture, EasyMock}
import java.nio.ByteBuffer
import java.util.Collections
import java.util
import org.scalatest.mock.EasyMockSugar

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with EasyMockSugar {
import org.apache.mesos.SchedulerDriver
import org.apache.mesos.Protos._
import org.apache.mesos.Protos.Value.Scalar
import org.scalatest.FunSuite
import org.scalatest.mock.MockitoSugar
import org.mockito.Mockito._
import org.mockito.Matchers._
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: sorting

import org.mockito.{Matchers, ArgumentCaptor}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you reorganize this?


import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext}
import org.apache.spark.executor.MesosExecutorBackend
import org.apache.spark.scheduler.{SparkListenerExecutorAdded, LiveListenerBus, TaskDescription,
WorkerOffer, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.scheduler.cluster.mesos.{MemoryUtils, MesosSchedulerBackend}

class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with MockitoSugar {

test("check spark-class location correctly") {
val conf = new SparkConf
conf.set("spark.mesos.executor.home" , "/mesos-home")

val listenerBus = EasyMock.createMock(classOf[LiveListenerBus])
listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
EasyMock.replay(listenerBus)

val sc = EasyMock.createMock(classOf[SparkContext])
EasyMock.expect(sc.getSparkHome()).andReturn(Option("/spark-home")).anyTimes()
EasyMock.expect(sc.conf).andReturn(conf).anyTimes()
EasyMock.expect(sc.executorEnvs).andReturn(new mutable.HashMap).anyTimes()
EasyMock.expect(sc.executorMemory).andReturn(100).anyTimes()
EasyMock.expect(sc.listenerBus).andReturn(listenerBus)
EasyMock.replay(sc)
val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl])
EasyMock.expect(taskScheduler.CPUS_PER_TASK).andReturn(2).anyTimes()
EasyMock.replay(taskScheduler)
val listenerBus = mock[LiveListenerBus]
listenerBus.post(
SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))

val sc = mock[SparkContext]
when(sc.getSparkHome()).thenReturn(Option("/spark-home"))

when(sc.conf).thenReturn(conf)
when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
when(sc.executorMemory).thenReturn(100)
when(sc.listenerBus).thenReturn(listenerBus)
val taskScheduler = mock[TaskSchedulerImpl]
when(taskScheduler.CPUS_PER_TASK).thenReturn(2)

val mesosSchedulerBackend = new MesosSchedulerBackend(taskScheduler, sc, "master")

Expand Down Expand Up @@ -84,20 +87,19 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
.setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}")).setHostname(s"host${id.toString}").build()
}

val driver = EasyMock.createMock(classOf[SchedulerDriver])
val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl])
val driver = mock[SchedulerDriver]
val taskScheduler = mock[TaskSchedulerImpl]

val listenerBus = EasyMock.createMock(classOf[LiveListenerBus])
listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
EasyMock.replay(listenerBus)
val listenerBus = mock[LiveListenerBus]
listenerBus.post(
SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))

val sc = EasyMock.createMock(classOf[SparkContext])
EasyMock.expect(sc.executorMemory).andReturn(100).anyTimes()
EasyMock.expect(sc.getSparkHome()).andReturn(Option("/path")).anyTimes()
EasyMock.expect(sc.executorEnvs).andReturn(new mutable.HashMap).anyTimes()
EasyMock.expect(sc.conf).andReturn(new SparkConf).anyTimes()
EasyMock.expect(sc.listenerBus).andReturn(listenerBus)
EasyMock.replay(sc)
val sc = mock[SparkContext]
when(sc.executorMemory).thenReturn(100)
when(sc.getSparkHome()).thenReturn(Option("/path"))
when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
when(sc.conf).thenReturn(new SparkConf)
when(sc.listenerBus).thenReturn(listenerBus)

val minMem = MemoryUtils.calculateTotalMemory(sc).toInt
val minCpu = 4
Expand All @@ -121,25 +123,29 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
2
))
val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0)))
EasyMock.expect(taskScheduler.resourceOffers(EasyMock.eq(expectedWorkerOffers))).andReturn(Seq(Seq(taskDesc)))
EasyMock.expect(taskScheduler.CPUS_PER_TASK).andReturn(2).anyTimes()
EasyMock.replay(taskScheduler)
when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc)))
when(taskScheduler.CPUS_PER_TASK).thenReturn(2)

val capture = new Capture[util.Collection[TaskInfo]]
EasyMock.expect(
val capture = ArgumentCaptor.forClass(classOf[util.Collection[TaskInfo]])
when(
driver.launchTasks(
EasyMock.eq(Collections.singleton(mesosOffers.get(0).getId)),
EasyMock.capture(capture),
EasyMock.anyObject(classOf[Filters])
Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
capture.capture(),
any(classOf[Filters])
)
).andReturn(Status.valueOf(1)).once
EasyMock.expect(driver.declineOffer(mesosOffers.get(1).getId)).andReturn(Status.valueOf(1)).times(1)
EasyMock.expect(driver.declineOffer(mesosOffers.get(2).getId)).andReturn(Status.valueOf(1)).times(1)
EasyMock.replay(driver)
).thenReturn(Status.valueOf(1))
when(driver.declineOffer(mesosOffers.get(1).getId)).thenReturn(Status.valueOf(1))
when(driver.declineOffer(mesosOffers.get(2).getId)).thenReturn(Status.valueOf(1))

backend.resourceOffers(driver, mesosOffers)

EasyMock.verify(driver)
verify(driver, times(1)).launchTasks(
Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
capture.capture(),
any(classOf[Filters])
)
verify(driver, times(1)).declineOffer(mesosOffers.get(1).getId)
verify(driver, times(1)).declineOffer(mesosOffers.get(2).getId)
assert(capture.getValue.size() == 1)
val taskInfo = capture.getValue.iterator().next()
assert(taskInfo.getName.equals("n1"))
Expand All @@ -151,15 +157,13 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
// Unwanted resources offered on an existing node. Make sure they are declined
val mesosOffers2 = new java.util.ArrayList[Offer]
mesosOffers2.add(createOffer(1, minMem, minCpu))
EasyMock.reset(taskScheduler)
EasyMock.reset(driver)
EasyMock.expect(taskScheduler.resourceOffers(EasyMock.anyObject(classOf[Seq[WorkerOffer]])).andReturn(Seq(Seq())))
EasyMock.expect(taskScheduler.CPUS_PER_TASK).andReturn(2).anyTimes()
EasyMock.replay(taskScheduler)
EasyMock.expect(driver.declineOffer(mesosOffers2.get(0).getId)).andReturn(Status.valueOf(1)).times(1)
EasyMock.replay(driver)
reset(taskScheduler)
reset(driver)
when(taskScheduler.resourceOffers(any(classOf[Seq[WorkerOffer]]))).thenReturn(Seq(Seq()))
when(taskScheduler.CPUS_PER_TASK).thenReturn(2)
when(driver.declineOffer(mesosOffers2.get(0).getId)).thenReturn(Status.valueOf(1))

backend.resourceOffers(driver, mesosOffers2)
EasyMock.verify(driver)
verify(driver, times(1)).declineOffer(mesosOffers2.get(0).getId)
}
}
5 changes: 0 additions & 5 deletions extras/kinesis-asl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,6 @@
<artifactId>scalacheck_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymockclassextension</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.novocode</groupId>
<artifactId>junit-interface</artifactId>
Expand Down
Loading