Skip to content

Commit

Permalink
[STREAMING] SPARK-2009 Key not found exception when slow receiver starts
Browse files Browse the repository at this point in the history
I got "java.util.NoSuchElementException: key not found: 1401756085000 ms" exception when using kafka stream and 1 sec batchPeriod.

Investigation showed that the reason is that ReceiverLauncher.startReceivers is asynchronous (started in a thread).
https://github.com/vchekan/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala#L206

In case of slow starting receiver, such as Kafka, it easily takes more than 2sec to start. In result, no single "compute" will be called on ReceiverInputDStream before first batch job is executed and receivedBlockInfo remains empty (obviously). Batch job will cause ReceiverInputDStream.getReceivedBlockInfo call and "key not found" exception.

The patch makes getReceivedBlockInfo more robust by tolerating missing values.

Author: Vadim Chekan <[email protected]>

Closes apache#961 from vchekan/branch-1.0 and squashes the following commits:

e86f82b [Vadim Chekan] Fixed indentation
4609563 [Vadim Chekan] Key not found exception: if receiver is slow to start, it is possible that getReceivedBlockInfo will be called before compute has been called
  • Loading branch information
vchekan authored and pwendell committed Jun 18, 2014
1 parent d1e22b3 commit 26f6b98
Showing 1 changed file with 1 addition and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont

/** Get information on received blocks. */
private[streaming] def getReceivedBlockInfo(time: Time) = {
receivedBlockInfo(time)
receivedBlockInfo.get(time).getOrElse(Array.empty[ReceivedBlockInfo])
}

/**
Expand Down

0 comments on commit 26f6b98

Please sign in to comment.