Skip to content

Commit

Permalink
Fixed test
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Nov 29, 2016
1 parent c64632c commit 247ada6
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
extends StreamAction

/** Advance the trigger clock's time manually. */
case class AdvanceManualClock(
timeToAdd: Long, waitForStreamExecThreadToBlock: Boolean = true) extends StreamAction
case class AdvanceManualClock(timeToAdd: Long) extends StreamAction

/** Signals that a failure is expected and should not kill the test. */
case class ExpectFailure[T <: Throwable : ClassTag]() extends StreamAction {
Expand Down Expand Up @@ -365,19 +364,19 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
}
})

case AdvanceManualClock(timeToAdd, waitForStreamExecThreadToBlock) =>
case AdvanceManualClock(timeToAdd) =>
verify(currentStream != null,
"can not advance manual clock when a stream is not running")
verify(currentStream.triggerClock.isInstanceOf[StreamManualClock],
s"can not advance clock of type ${currentStream.triggerClock.getClass}")
val clock = currentStream.triggerClock.asInstanceOf[StreamManualClock]
assert(manualClockExpectedTime >= 0)
if (waitForStreamExecThreadToBlock) {
// Make sure we don't advance ManualClock too early. See SPARK-16002.
eventually("StreamManualClock has not yet entered the waiting state") {
assert(clock.isStreamWaitingAt(manualClockExpectedTime))
}

// Make sure we don't advance ManualClock too early. See SPARK-16002.
eventually("StreamManualClock has not yet entered the waiting state") {
assert(clock.isStreamWaitingAt(manualClockExpectedTime))
}

clock.advance(timeToAdd)
manualClockExpectedTime += timeToAdd
verify(clock.getTimeMillis() === manualClockExpectedTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0),

// Test status while batch processing has completed
AdvanceManualClock(500, waitForStreamExecThreadToBlock = false), // time = 1100 to unblock job
AdvanceManualClock(500), // time = 1100 to unblock job
AssertOnQuery { _ => clock.getTimeMillis() === 1100 },
CheckAnswer(2),
AssertOnQuery(_.status.isDataAvailable === true),
Expand Down Expand Up @@ -240,7 +240,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
AssertStreamExecThreadToWaitForClock(),
AssertOnQuery(_.status.isDataAvailable === false),
AssertOnQuery(_.status.isTriggerActive === false),
AssertOnQuery(_.status.message === "Waiting for data to arrive")
AssertOnQuery(_.status.message === "Waiting for next trigger")
)
}

Expand Down

0 comments on commit 247ada6

Please sign in to comment.