Skip to content

Commit

Permalink
Remove EasyMock usage in KinesisReceiverSuite.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Feb 12, 2015
1 parent 7cca486 commit fae1d8f
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import java.util
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.FunSuite
import org.scalatest.mock.MockitoSugar
import org.apache.mesos.SchedulerDriver
import org.apache.mesos.Protos._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@ import java.nio.ByteBuffer

import scala.collection.JavaConversions.seqAsJavaList

import org.apache.spark.annotation.Experimental
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.Milliseconds
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.TestSuiteBase
import org.apache.spark.streaming.util.Clock
import org.apache.spark.streaming.util.ManualClock

import org.mockito.Mockito._
import org.scalatest.BeforeAndAfter
import org.scalatest.Matchers
import org.scalatest.mock.EasyMockSugar
import org.scalatest.mock.MockitoSugar

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException
Expand All @@ -42,10 +43,10 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
import com.amazonaws.services.kinesis.model.Record

/**
* Suite of Kinesis streaming receiver tests focusing mostly on the KinesisRecordProcessor
* Suite of Kinesis streaming receiver tests focusing mostly on the KinesisRecordProcessor
*/
class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAfter
with EasyMockSugar {
with MockitoSugar {

val app = "TestKinesisReceiver"
val stream = "mySparkStream"
Expand Down Expand Up @@ -83,193 +84,174 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
}

test("process records including store and checkpoint") {
val expectedCheckpointIntervalMillis = 10
expecting {
receiverMock.isStopped().andReturn(false).once()
receiverMock.store(record1.getData().array()).once()
receiverMock.store(record2.getData().array()).once()
checkpointStateMock.shouldCheckpoint().andReturn(true).once()
checkpointerMock.checkpoint().once()
checkpointStateMock.advanceCheckpoint().once()
}
whenExecuting(receiverMock, checkpointerMock, checkpointStateMock) {
val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId,
checkpointStateMock)
recordProcessor.processRecords(batch, checkpointerMock)
}
when(receiverMock.isStopped()).thenReturn(false)
when(checkpointStateMock.shouldCheckpoint()).thenReturn(true)

val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, checkpointStateMock)
recordProcessor.processRecords(batch, checkpointerMock)

verify(receiverMock, times(1)).isStopped()
verify(receiverMock, times(1)).store(record1.getData().array())
verify(receiverMock, times(1)).store(record2.getData().array())
verify(checkpointerMock, times(1)).checkpoint()
verify(checkpointStateMock, times(1)).advanceCheckpoint()
}

test("shouldn't store and checkpoint when receiver is stopped") {
expecting {
receiverMock.isStopped().andReturn(true).once()
}
whenExecuting(receiverMock, checkpointerMock, checkpointStateMock) {
val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId,
checkpointStateMock)
recordProcessor.processRecords(batch, checkpointerMock)
}
when(receiverMock.isStopped()).thenReturn(true)

val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, checkpointStateMock)
recordProcessor.processRecords(batch, checkpointerMock)

verify(receiverMock, times(1)).isStopped()
}

test("shouldn't checkpoint when exception occurs during store") {
expecting {
receiverMock.isStopped().andReturn(false).once()
receiverMock.store(record1.getData().array()).andThrow(new RuntimeException()).once()
}
whenExecuting(receiverMock, checkpointerMock, checkpointStateMock) {
intercept[RuntimeException] {
val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId,
checkpointStateMock)
recordProcessor.processRecords(batch, checkpointerMock)
}
when(receiverMock.isStopped()).thenReturn(false)
when(receiverMock.store(record1.getData().array())).thenThrow(new RuntimeException())

intercept[RuntimeException] {
val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, checkpointStateMock)
recordProcessor.processRecords(batch, checkpointerMock)
}

verify(receiverMock, times(1)).isStopped()
verify(receiverMock, times(1)).store(record1.getData().array())
}

test("should set checkpoint time to currentTime + checkpoint interval upon instantiation") {
expecting {
currentClockMock.currentTime().andReturn(0).once()
}
whenExecuting(currentClockMock) {
when(currentClockMock.currentTime()).thenReturn(0)

val checkpointIntervalMillis = 10
val checkpointState = new KinesisCheckpointState(Milliseconds(checkpointIntervalMillis), currentClockMock)
val checkpointState =
new KinesisCheckpointState(Milliseconds(checkpointIntervalMillis), currentClockMock)
assert(checkpointState.checkpointClock.currentTime() == checkpointIntervalMillis)
}

verify(currentClockMock, times(1)).currentTime()
}

test("should checkpoint if we have exceeded the checkpoint interval") {
expecting {
currentClockMock.currentTime().andReturn(0).once()
}
whenExecuting(currentClockMock) {
val checkpointState = new KinesisCheckpointState(Milliseconds(Long.MinValue), currentClockMock)
assert(checkpointState.shouldCheckpoint())
}
when(currentClockMock.currentTime()).thenReturn(0)

val checkpointState = new KinesisCheckpointState(Milliseconds(Long.MinValue), currentClockMock)
assert(checkpointState.shouldCheckpoint())

verify(currentClockMock, times(1)).currentTime()
}

test("shouldn't checkpoint if we have not exceeded the checkpoint interval") {
expecting {
currentClockMock.currentTime().andReturn(0).once()
}
whenExecuting(currentClockMock) {
val checkpointState = new KinesisCheckpointState(Milliseconds(Long.MaxValue), currentClockMock)
assert(!checkpointState.shouldCheckpoint())
}
when(currentClockMock.currentTime()).thenReturn(0)

val checkpointState = new KinesisCheckpointState(Milliseconds(Long.MaxValue), currentClockMock)
assert(!checkpointState.shouldCheckpoint())

verify(currentClockMock, times(1)).currentTime()
}

test("should add to time when advancing checkpoint") {
expecting {
currentClockMock.currentTime().andReturn(0).once()
}
whenExecuting(currentClockMock) {
val checkpointIntervalMillis = 10
val checkpointState = new KinesisCheckpointState(Milliseconds(checkpointIntervalMillis), currentClockMock)
assert(checkpointState.checkpointClock.currentTime() == checkpointIntervalMillis)
checkpointState.advanceCheckpoint()
assert(checkpointState.checkpointClock.currentTime() == (2 * checkpointIntervalMillis))
}
when(currentClockMock.currentTime()).thenReturn(0)

val checkpointIntervalMillis = 10
val checkpointState =
new KinesisCheckpointState(Milliseconds(checkpointIntervalMillis), currentClockMock)
assert(checkpointState.checkpointClock.currentTime() == checkpointIntervalMillis)
checkpointState.advanceCheckpoint()
assert(checkpointState.checkpointClock.currentTime() == (2 * checkpointIntervalMillis))

verify(currentClockMock, times(1)).currentTime()
}

test("shutdown should checkpoint if the reason is TERMINATE") {
expecting {
checkpointerMock.checkpoint().once()
}
whenExecuting(checkpointerMock, checkpointStateMock) {
val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId,
checkpointStateMock)
val reason = ShutdownReason.TERMINATE
recordProcessor.shutdown(checkpointerMock, reason)
}
val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, checkpointStateMock)
val reason = ShutdownReason.TERMINATE
recordProcessor.shutdown(checkpointerMock, reason)

verify(checkpointerMock, times(1)).checkpoint()
}

test("shutdown should not checkpoint if the reason is something other than TERMINATE") {
expecting {
}
whenExecuting(checkpointerMock, checkpointStateMock) {
val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId,
checkpointStateMock)
recordProcessor.shutdown(checkpointerMock, ShutdownReason.ZOMBIE)
recordProcessor.shutdown(checkpointerMock, null)
}
val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, checkpointStateMock)
recordProcessor.shutdown(checkpointerMock, ShutdownReason.ZOMBIE)
recordProcessor.shutdown(checkpointerMock, null)

verify(checkpointerMock, never()).checkpoint()
}

test("retry success on first attempt") {
val expectedIsStopped = false
expecting {
receiverMock.isStopped().andReturn(expectedIsStopped).once()
}
whenExecuting(receiverMock) {
val actualVal = KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100)
assert(actualVal == expectedIsStopped)
}
when(receiverMock.isStopped()).thenReturn(expectedIsStopped)

val actualVal = KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100)
assert(actualVal == expectedIsStopped)

verify(receiverMock, times(1)).isStopped()
}

test("retry success on second attempt after a Kinesis throttling exception") {
val expectedIsStopped = false
expecting {
receiverMock.isStopped().andThrow(new ThrottlingException("error message"))
.andReturn(expectedIsStopped).once()
}
whenExecuting(receiverMock) {
val actualVal = KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100)
assert(actualVal == expectedIsStopped)
}
when(receiverMock.isStopped())
.thenThrow(new ThrottlingException("error message"))
.thenReturn(expectedIsStopped)

val actualVal = KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100)
assert(actualVal == expectedIsStopped)

verify(receiverMock, times(2)).isStopped()
}

test("retry success on second attempt after a Kinesis dependency exception") {
val expectedIsStopped = false
expecting {
receiverMock.isStopped().andThrow(new KinesisClientLibDependencyException("error message"))
.andReturn(expectedIsStopped).once()
}
whenExecuting(receiverMock) {
val actualVal = KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100)
assert(actualVal == expectedIsStopped)
}
when(receiverMock.isStopped())
.thenThrow(new KinesisClientLibDependencyException("error message"))
.thenReturn(expectedIsStopped)

val actualVal = KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100)
assert(actualVal == expectedIsStopped)

verify(receiverMock, times(2)).isStopped()
}

test("retry failed after a shutdown exception") {
expecting {
checkpointerMock.checkpoint().andThrow(new ShutdownException("error message")).once()
}
whenExecuting(checkpointerMock) {
intercept[ShutdownException] {
KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
}
when(checkpointerMock.checkpoint()).thenThrow(new ShutdownException("error message"))

intercept[ShutdownException] {
KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
}

verify(checkpointerMock, times(1)).checkpoint()
}

test("retry failed after an invalid state exception") {
expecting {
checkpointerMock.checkpoint().andThrow(new InvalidStateException("error message")).once()
}
whenExecuting(checkpointerMock) {
intercept[InvalidStateException] {
KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
}
when(checkpointerMock.checkpoint()).thenThrow(new InvalidStateException("error message"))

intercept[InvalidStateException] {
KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
}

verify(checkpointerMock, times(1)).checkpoint()
}

test("retry failed after unexpected exception") {
expecting {
checkpointerMock.checkpoint().andThrow(new RuntimeException("error message")).once()
}
whenExecuting(checkpointerMock) {
intercept[RuntimeException] {
KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
}
when(checkpointerMock.checkpoint()).thenThrow(new RuntimeException("error message"))

intercept[RuntimeException] {
KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
}

verify(checkpointerMock, times(1)).checkpoint()
}

test("retry failed after exhausing all retries") {
val expectedErrorMessage = "final try error message"
expecting {
checkpointerMock.checkpoint().andThrow(new ThrottlingException("error message"))
.andThrow(new ThrottlingException(expectedErrorMessage)).once()
}
whenExecuting(checkpointerMock) {
val exception = intercept[RuntimeException] {
KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
}
exception.getMessage().shouldBe(expectedErrorMessage)
when(checkpointerMock.checkpoint())
.thenThrow(new ThrottlingException("error message"))
.thenThrow(new ThrottlingException(expectedErrorMessage))

val exception = intercept[RuntimeException] {
KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
}
exception.getMessage().shouldBe(expectedErrorMessage)

verify(checkpointerMock, times(2)).checkpoint()
}
}

0 comments on commit fae1d8f

Please sign in to comment.