Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[STREAMING] SPARK-2009 Key not found exception when slow receiver starts #961

Closed
wants to merge 2 commits into from
Closed

Conversation

vchekan
Copy link
Contributor

@vchekan vchekan commented Jun 3, 2014

I got "java.util.NoSuchElementException: key not found: 1401756085000 ms" exception when using kafka stream and 1 sec batchPeriod.

Investigation showed that the reason is that ReceiverLauncher.startReceivers is asynchronous (started in a thread).
https://github.com/vchekan/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala#L206

In case of slow starting receiver, such as Kafka, it easily takes more than 2sec to start. In result, no single "compute" will be called on ReceiverInputDStream before first batch job is executed and receivedBlockInfo remains empty (obviously). Batch job will cause ReceiverInputDStream.getReceivedBlockInfo call and "key not found" exception.

The patch makes getReceivedBlockInfo more robust by tolerating missing values.

… that getReceivedBlockInfo will be called before compute has been called
@vchekan vchekan changed the title Key not found exception when slow receiver starts [STREAMING] SPARK-2009 Key not found exception when slow receiver starts Jun 3, 2014
@@ -74,7 +74,7 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont

/** Get information on received blocks. */
private[streaming] def getReceivedBlockInfo(time: Time) = {
receivedBlockInfo(time)
receivedBlockInfo.get(time).getOrElse(Array.empty[ReceivedBlockInfo])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mind using the two-space indentation like before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, thanks!

On Tue, Jun 3, 2014 at 9:04 PM, Patrick Wendell [email protected]
wrote:

In
streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala:

@@ -74,7 +74,7 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont

/** Get information on received blocks. */
private[streaming] def getReceivedBlockInfo(time: Time) = {

  • receivedBlockInfo(time)
  • receivedBlockInfo.get(time).getOrElse(Array.empty[ReceivedBlockInfo])
    

Mind using the two-space indentation like before?


Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/961/files#r13370405.

From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT is
explicitly specified

@vchekan
Copy link
Contributor Author

vchekan commented Jun 4, 2014

Perhaps my initial interpretation of what lead to this exception was wrong.
When JobGenerator generates a job, it calls DStreamGraph, which calls output streams, which calls DStream.getOrCompute.
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala#L290
As you can see, "compute" wont be called if time is not right. For windowed DStream for example, only those matching window sliding period will be valid times and those not matching will be ignored.
But ReceiverInputDStream.receivedBlockInfo is updated only when "compute" is called.
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala#L67
Thus, receivedBlockInfo may have missing times when window DStream is used.

Next, after graph.generateJobs, JobGenerator calls every getReceiverInputStreams getReceivedBlockInfo(). It does not check, is the time valid or not. And
@pwendell could you please advise, if the way I've fixed the bug is the right one, or it is better to check DStream.isTimeValid before calling receiver input stream's getReceivedBlockInfo?

And btw, here is full stacktrace of the exception:

java.util.NoSuchElementException: key not found: 1401754908000 ms
    at scala.collection.MapLike$class.default(MapLike.scala:228)
    at scala.collection.AbstractMap.default(Map.scala:58)
    at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
    at org.apache.spark.streaming.dstream.ReceiverInputDStream.getReceivedBlockInfo(ReceiverInputDStream.scala:77)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:225)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:223)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
    at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:223)
    at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:165)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

@tdas
Copy link
Contributor

tdas commented Jun 5, 2014

Jenkins, test this please.

@tdas
Copy link
Contributor

tdas commented Jun 5, 2014

Thanks for the patch. I will take a look shortly. In the meantime, can you merge with the master. The jenkins build is failing because your branch is a little outdated.

@vchekan
Copy link
Contributor Author

vchekan commented Jun 5, 2014

Hi Tathagata,

I've tried to merge branch-1.0 with master but the conflict in
ReceiverInputDStream is "this file was created in both branches". This
conflict wasn't created by my patch, so I think it would be better for
somebody else to merge "branch-1.0" and "master" and I will follow up with
my patch.

BTW, I thought that branch-1.0 is a maintenance branch for version-1 so I
created patch there. Am I right?
Also, could you take a look at my comment and advise which way it is better
to fix this bug: tolerate "invalid" time or prevent calling DStream at
invalid times?
#961 (comment)

Thanks,
Vadim.

On Wed, Jun 4, 2014 at 6:03 PM, Tathagata Das [email protected]
wrote:

Thanks for the patch. I will take a look shortly. In the meantime, can you
merge with the master. The jenkins build is failing because your branch is
a little outdated.


Reply to this email directly or view it on GitHub
#961 (comment).

From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT is
explicitly specified

@pwendell
Copy link
Contributor

Hey Vadim - thanks for this fix! I'll defer to @tdas on the optimal architecture of the fix, but this one seems good to me.

We usually ask that people submit fixes against master and then, when merging the fix, we'll back port it into one or more maintenance branches. The easiest way to do this might be for you to just open a new pull request against master (it's only a one line change).

@pwendell
Copy link
Contributor

Jenkins, retest this please. Okay I think we can merge this purely on the grounds of more defensive coding (it avoids a potential null value).

asfgit pushed a commit that referenced this pull request Jun 18, 2014
I got "java.util.NoSuchElementException: key not found: 1401756085000 ms" exception when using kafka stream and 1 sec batchPeriod.

Investigation showed that the reason is that ReceiverLauncher.startReceivers is asynchronous (started in a thread).
https://github.com/vchekan/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala#L206

In case of slow starting receiver, such as Kafka, it easily takes more than 2sec to start. In result, no single "compute" will be called on ReceiverInputDStream before first batch job is executed and receivedBlockInfo remains empty (obviously). Batch job will cause ReceiverInputDStream.getReceivedBlockInfo call and "key not found" exception.

The patch makes getReceivedBlockInfo more robust by tolerating missing values.

Author: Vadim Chekan <[email protected]>

Closes #961 from vchekan/branch-1.0 and squashes the following commits:

e86f82b [Vadim Chekan] Fixed indentation
4609563 [Vadim Chekan] Key not found exception: if receiver is slow to start, it is possible that getReceivedBlockInfo will be called before compute has been called
@asfgit asfgit closed this in 889f7b7 Jun 18, 2014
pdeyhim pushed a commit to pdeyhim/spark-1 that referenced this pull request Jun 25, 2014
I got "java.util.NoSuchElementException: key not found: 1401756085000 ms" exception when using kafka stream and 1 sec batchPeriod.

Investigation showed that the reason is that ReceiverLauncher.startReceivers is asynchronous (started in a thread).
https://github.com/vchekan/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala#L206

In case of slow starting receiver, such as Kafka, it easily takes more than 2sec to start. In result, no single "compute" will be called on ReceiverInputDStream before first batch job is executed and receivedBlockInfo remains empty (obviously). Batch job will cause ReceiverInputDStream.getReceivedBlockInfo call and "key not found" exception.

The patch makes getReceivedBlockInfo more robust by tolerating missing values.

Author: Vadim Chekan <[email protected]>

Closes apache#961 from vchekan/branch-1.0 and squashes the following commits:

e86f82b [Vadim Chekan] Fixed indentation
4609563 [Vadim Chekan] Key not found exception: if receiver is slow to start, it is possible that getReceivedBlockInfo will be called before compute has been called
(cherry picked from commit 26f6b98)

Signed-off-by: Patrick Wendell <[email protected]>
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
I got "java.util.NoSuchElementException: key not found: 1401756085000 ms" exception when using kafka stream and 1 sec batchPeriod.

Investigation showed that the reason is that ReceiverLauncher.startReceivers is asynchronous (started in a thread).
https://github.com/vchekan/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala#L206

In case of slow starting receiver, such as Kafka, it easily takes more than 2sec to start. In result, no single "compute" will be called on ReceiverInputDStream before first batch job is executed and receivedBlockInfo remains empty (obviously). Batch job will cause ReceiverInputDStream.getReceivedBlockInfo call and "key not found" exception.

The patch makes getReceivedBlockInfo more robust by tolerating missing values.

Author: Vadim Chekan <[email protected]>

Closes apache#961 from vchekan/branch-1.0 and squashes the following commits:

e86f82b [Vadim Chekan] Fixed indentation
4609563 [Vadim Chekan] Key not found exception: if receiver is slow to start, it is possible that getReceivedBlockInfo will be called before compute has been called
(cherry picked from commit 26f6b98)

Signed-off-by: Patrick Wendell <[email protected]>
@ouyangshourui
Copy link

I use spark1.5.2 on yarn,alse meeting the same problem:
16/04/12 14:36:45 ERROR scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(32566, 41)
java.util.NoSuchElementException: key not found: 45448
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:58)
at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:940)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:936)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
:
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:936)
at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1033)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1493)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

@andrewor14
Copy link
Contributor

@ouyangshourui this is not at all the same issue. Please file a JIRA with your stack trace and mark the affected version as 1.5.2

@tianwanpeng
Copy link

tianwanpeng commented Sep 2, 2016

I use version 1.6.2, it still not be fixed, i found such warns :
Time 1472747100000 ms is invalid as zeroTime is 1472746980000 ms and slideDuration is 300000 ms and difference is 120000 ms

and my code like this:
WXADSparkStreamingContext.tdbankBytesStreamSS(context, tubeConf, msgP, parr)
.map { case realMsg => (realMsg.rki, realMsg.value) }
.reduceByKeyAndWindow((x: Long, y: Long) => (x + y), Seconds(stat_interval), Seconds(stat_interval))
.map { case (rki, value) => (rki.rk, (rki.idx, value)) }
.groupByKeyAndWindow(Seconds(stat_interval), Seconds(stat_interval))
.map { case (rk, itr) => (rk, itr.toMap) }
.foreachRDD {
(rdd, time) =>
..........

where stream interval is 60s, and stat_interval is 300s

dongjoon-hyun pushed a commit that referenced this pull request Apr 14, 2021
### What changes were proposed in this pull request?

This PR bumps up the version of pycodestyle from 2.6.0 to 2.7.0 released a month ago.

### Why are the changes needed?

2.7.0 includes three major fixes below (see https://readthedocs.org/projects/pycodestyle/downloads/pdf/latest/):

- Fix physical checks (such as W191) at end of file. PR #961.
- Add --indent-size option (defaulting to 4). PR #970.
- W605: fix escaped crlf false positive on windows. PR #976

The first and third ones could be useful for dev to detect the styles.

### Does this PR introduce _any_ user-facing change?

No, dev-only.

### How was this patch tested?

Manually tested locally.

Closes #32160 from HyukjinKwon/SPARK-35061.

Authored-by: HyukjinKwon <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
wangyum pushed a commit that referenced this pull request May 26, 2023
…bled by planner (#961)

* [CARMEL-5986] Make Coalesce/Rebucketing effective when bucketing disabled by planner

* fix ut
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants