Skip to content

Commit

Permalink
[SPARK-19347] ReceiverSupervisorImpl can add block to ReceiverTracker…
Browse files Browse the repository at this point in the history
… multiple times because of askWithRetry.

## What changes were proposed in this pull request?

`ReceiverSupervisorImpl` on executor side reports block's meta back to `ReceiverTracker` on driver side. In current code, `askWithRetry` is used. However, for `AddBlock`, `ReceiverTracker` is not idempotent, which may result in messages are processed multiple times.

**To reproduce**:

1. Check if it is the first time receiving `AddBlock` in `ReceiverTracker`, if so sleep long enough(say 200 seconds), thus the first RPC call will be timeout in `askWithRetry`, then `AddBlock` will be resent.
2. Rebuild Spark and run following job:
```
  def streamProcessing(): Unit = {
    val conf = new SparkConf()
      .setAppName("StreamingTest")
      .setMaster(masterUrl)
    val ssc = new StreamingContext(conf, Seconds(200))
    val stream = ssc.socketTextStream("localhost", 1234)
    stream.print()
    ssc.start()
    ssc.awaitTermination()
  }
```
**To fix**:

It makes sense to provide a blocking version `ask` in RpcEndpointRef, as mentioned in SPARK-18113 (apache#16503 (comment)). Because Netty RPC layer will not drop messages. `askWithRetry` is a leftover from akka days. It imposes restrictions on the caller(e.g. idempotency) and other things that people generally don't pay that much attention to when using it.

## How was this patch tested?
Test manually. The scenario described above doesn't happen with this patch.

Author: jinxing <[email protected]>

Closes apache#16690 from jinxing64/SPARK-19347.
  • Loading branch information
jinxing authored and Marcelo Vanzin committed Feb 1, 2017
1 parent df4a27c commit c5fcb7f
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 4 deletions.
38 changes: 35 additions & 3 deletions core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,38 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf)
def ask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout)

/**
* Send a message to the corresponding [[RpcEndpoint]] and get its result within a default
* timeout, or throw a SparkException if this fails even after the default number of retries.
* Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a
* default timeout, throw an exception if this fails.
*
* Note: this is a blocking action which may cost a lot of time, so don't call it in a message
* loop of [[RpcEndpoint]].
* @param message the message to send
* @tparam T type of the reply message
* @return the reply message from the corresponding [[RpcEndpoint]]
*/
def askSync[T: ClassTag](message: Any): T = askSync(message, defaultAskTimeout)

/**
* Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a
* specified timeout, throw an exception if this fails.
*
* Note: this is a blocking action which may cost a lot of time, so don't call it in a message
* loop of [[RpcEndpoint]].
*
* @param message the message to send
* @param timeout the timeout duration
* @tparam T type of the reply message
* @return the reply message from the corresponding [[RpcEndpoint]]
*/
def askSync[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
val future = ask[T](message, timeout)
timeout.awaitResult(future)
}

/**
* Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a
* default timeout, throw a SparkException if this fails even after the default number of retries.
* The default `timeout` will be used in every trial of calling `sendWithReply`. Because this
* method retries, the message handling in the receiver side should be idempotent.
*
Expand All @@ -75,10 +105,11 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf)
* @tparam T type of the reply message
* @return the reply message from the corresponding [[RpcEndpoint]]
*/
@deprecated("use 'askSync' instead.", "2.2.0")
def askWithRetry[T: ClassTag](message: Any): T = askWithRetry(message, defaultAskTimeout)

/**
* Send a message to the corresponding [[RpcEndpoint.receive]] and get its result within a
* Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a
* specified timeout, throw a SparkException if this fails even after the specified number of
* retries. `timeout` will be used in every trial of calling `sendWithReply`. Because this method
* retries, the message handling in the receiver side should be idempotent.
Expand All @@ -91,6 +122,7 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf)
* @tparam T type of the reply message
* @return the reply message from the corresponding [[RpcEndpoint]]
*/
@deprecated("use 'askSync' instead.", "2.2.0")
def askWithRetry[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
// TODO: Consider removing multiple attempts
var attempts = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ private[streaming] class ReceiverSupervisorImpl(
logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
val numRecords = blockStoreResult.numRecords
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
trackerEndpoint.askSync[Boolean](AddBlock(blockInfo))
logDebug(s"Reported block $blockId")
}

Expand Down

0 comments on commit c5fcb7f

Please sign in to comment.