Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-18617][CORE][STREAMING] Close "kryo auto pick" feature for Spark Streaming #16052

Closed
wants to merge 4 commits into from

Conversation

uncleGen
Copy link
Contributor

@uncleGen uncleGen commented Nov 29, 2016

What changes were proposed in this pull request?

#15992 provided a solution to fix the bug, i.e. receiver data can not be deserialized properly. As @zsxwing said, it is a critical bug, but we should not break APIs between maintenance releases. It may be a rational choice to close auto pick kryo serializer for Spark Streaming in the first step. I will continue #15992 to optimize the solution.

How was this patch tested?

existing ut

@uncleGen
Copy link
Contributor Author

cc @zsxwing

@SparkQA
Copy link

SparkQA commented Nov 29, 2016

Test build #69300 has finished for PR 16052 at commit e4416bd.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 29, 2016

Test build #69303 has started for PR 16052 at commit d205ebd.

@@ -77,8 +77,8 @@ private[spark] class SerializerManager(defaultSerializer: Serializer, conf: Spar
primitiveAndPrimitiveArrayClassTags.contains(ct) || ct == stringClassTag
}

def getSerializer(ct: ClassTag[_]): Serializer = {
if (canUseKryo(ct)) {
def getSerializer(ct: ClassTag[_], autoPick: Boolean): Serializer = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

document what this means

@@ -155,7 +155,12 @@ private[spark] class SerializerManager(defaultSerializer: Serializer, conf: Spar
outputStream: OutputStream,
values: Iterator[T]): Unit = {
val byteStream = new BufferedOutputStream(outputStream)
val ser = getSerializer(implicitly[ClassTag[T]]).newInstance()
val ser = blockId match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this simpler as ...

val autoPick = blockId.isInstanceOf[StreamBlockId]
val ser = getSerializer(implicitly[ClassTag[T]], autoPick).newInstance()

to avoid duplication?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yours is better.

@SparkQA
Copy link

SparkQA commented Nov 29, 2016

Test build #69323 has finished for PR 16052 at commit cd7e595.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Nov 30, 2016

@uncleGen Could you add the following test to StreamingContextSuite? Otherwise, LGTM.

  test("SPARK-18560 Receiver data should be deserialized properly.") {
    // Start a two nodes cluster, so receiver will use one node, and Spark jobs will use the
    // other one. Then Spark jobs need to fetch remote blocks and it will trigger SPARK-18560.
    val conf = new SparkConf().setMaster("local-cluster[2,1,1024]").setAppName(appName)
    sc = new SparkContext(conf)
    ssc = new StreamingContext(sc, Milliseconds(100))
    val input = ssc.receiverStream(new TestReceiver)
    input.count().foreachRDD { rdd =>
      // Make sure we can read from BlockRDD
      if (rdd.collect().headOption.getOrElse(0L) > 0) {
        // Stop StreamingContext to unblock "awaitTerminationOrTimeout"
        new Thread() {
          setDaemon(true)
          override def run(): Unit = {
            ssc.stop(stopSparkContext = true, stopGracefully = true)
          }
        }.start()
      }
    }
    ssc.start()
    ssc.awaitTerminationOrTimeout(60000)
  }

@uncleGen
Copy link
Contributor Author

OK

@SparkQA
Copy link

SparkQA commented Nov 30, 2016

Test build #69380 has finished for PR 16052 at commit 39b4867.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class FakeByteArrayReceiver extends Receiver[Array[Byte]](StorageLevel.MEMORY_ONLY) with Logging

@rxin
Copy link
Contributor

rxin commented Nov 30, 2016

Merging in master/branch-2.1.

asfgit pushed a commit that referenced this pull request Nov 30, 2016
…rk Streaming

## What changes were proposed in this pull request?

#15992 provided a solution to fix the bug, i.e. **receiver data can not be deserialized properly**. As zsxwing said, it is a critical bug, but we should not break APIs between maintenance releases. It may be a rational choice to close auto pick kryo serializer for Spark Streaming in the first step. I will continue #15992 to optimize the solution.

## How was this patch tested?

existing ut

Author: uncleGen <[email protected]>

Closes #16052 from uncleGen/SPARK-18617.

(cherry picked from commit 56c82ed)
Signed-off-by: Reynold Xin <[email protected]>
@rxin
Copy link
Contributor

rxin commented Nov 30, 2016

This doesn't merge cleanly into branch-2.0. @uncleGen can you submit a pull request for branch-2.0?

@asfgit asfgit closed this in 56c82ed Nov 30, 2016
@@ -869,6 +891,31 @@ object TestReceiver {
val counter = new AtomicInteger(1)
}

class FakeByteArrayReceiver extends Receiver[Array[Byte]](StorageLevel.MEMORY_ONLY) with Logging {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Why create a new class? Is there any concern to just use TestReceiver?

Copy link
Contributor Author

@uncleGen uncleGen Nov 30, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zsxwing yes, failure occurs when receiver store Array[Byte] data and the automatic serializer selection would pick JavaSerializer. However, after get from remote executor, the input-stream data will be deserialized with KryoSerializer, leading to the com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 13994

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, existing unit test could cover other cases besides Array[Byte] type.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any class in primitiveAndPrimitiveArrayClassTags can trigger this issue. That's why you can also use the existing TestReceiver. It's Receiver[Int].

@uncleGen
Copy link
Contributor Author

@rxin OK, I will backport it to branch-2.0

asfgit pushed a commit that referenced this pull request Dec 1, 2016
…e. Receiver data should be deserialized properly

## What changes were proposed in this pull request?

Fixed the potential SparkContext leak in `StreamingContextSuite.SPARK-18560 Receiver data should be deserialized properly` which was added in #16052. I also removed FakeByteArrayReceiver and used TestReceiver directly.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <[email protected]>

Closes #16091 from zsxwing/SPARK-18617-follow-up.

(cherry picked from commit 0a81121)
Signed-off-by: Reynold Xin <[email protected]>
asfgit pushed a commit that referenced this pull request Dec 1, 2016
…e. Receiver data should be deserialized properly

## What changes were proposed in this pull request?

Fixed the potential SparkContext leak in `StreamingContextSuite.SPARK-18560 Receiver data should be deserialized properly` which was added in #16052. I also removed FakeByteArrayReceiver and used TestReceiver directly.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <[email protected]>

Closes #16091 from zsxwing/SPARK-18617-follow-up.
asfgit pushed a commit that referenced this pull request Dec 1, 2016
…e for Spark Streaming

## What changes were proposed in this pull request?

This is a follow-up PR to backport #16052 to branch-2.0 with incremental update in #16091

## How was this patch tested?

new unit test

cc zsxwing rxin

Author: uncleGen <[email protected]>

Closes #16096 from uncleGen/branch-2.0-SPARK-18617.
robert3005 pushed a commit to palantir/spark that referenced this pull request Dec 2, 2016
…rk Streaming

## What changes were proposed in this pull request?

apache#15992 provided a solution to fix the bug, i.e. **receiver data can not be deserialized properly**. As zsxwing said, it is a critical bug, but we should not break APIs between maintenance releases. It may be a rational choice to close auto pick kryo serializer for Spark Streaming in the first step. I will continue apache#15992 to optimize the solution.

## How was this patch tested?

existing ut

Author: uncleGen <[email protected]>

Closes apache#16052 from uncleGen/SPARK-18617.
robert3005 pushed a commit to palantir/spark that referenced this pull request Dec 2, 2016
…e. Receiver data should be deserialized properly

## What changes were proposed in this pull request?

Fixed the potential SparkContext leak in `StreamingContextSuite.SPARK-18560 Receiver data should be deserialized properly` which was added in apache#16052. I also removed FakeByteArrayReceiver and used TestReceiver directly.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <[email protected]>

Closes apache#16091 from zsxwing/SPARK-18617-follow-up.
robert3005 pushed a commit to palantir/spark that referenced this pull request Dec 15, 2016
…rk Streaming

## What changes were proposed in this pull request?

apache#15992 provided a solution to fix the bug, i.e. **receiver data can not be deserialized properly**. As zsxwing said, it is a critical bug, but we should not break APIs between maintenance releases. It may be a rational choice to close auto pick kryo serializer for Spark Streaming in the first step. I will continue apache#15992 to optimize the solution.

## How was this patch tested?

existing ut

Author: uncleGen <[email protected]>

Closes apache#16052 from uncleGen/SPARK-18617.
robert3005 pushed a commit to palantir/spark that referenced this pull request Dec 15, 2016
…e. Receiver data should be deserialized properly

## What changes were proposed in this pull request?

Fixed the potential SparkContext leak in `StreamingContextSuite.SPARK-18560 Receiver data should be deserialized properly` which was added in apache#16052. I also removed FakeByteArrayReceiver and used TestReceiver directly.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <[email protected]>

Closes apache#16091 from zsxwing/SPARK-18617-follow-up.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
…rk Streaming

## What changes were proposed in this pull request?

apache#15992 provided a solution to fix the bug, i.e. **receiver data can not be deserialized properly**. As zsxwing said, it is a critical bug, but we should not break APIs between maintenance releases. It may be a rational choice to close auto pick kryo serializer for Spark Streaming in the first step. I will continue apache#15992 to optimize the solution.

## How was this patch tested?

existing ut

Author: uncleGen <[email protected]>

Closes apache#16052 from uncleGen/SPARK-18617.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
…e. Receiver data should be deserialized properly

## What changes were proposed in this pull request?

Fixed the potential SparkContext leak in `StreamingContextSuite.SPARK-18560 Receiver data should be deserialized properly` which was added in apache#16052. I also removed FakeByteArrayReceiver and used TestReceiver directly.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <[email protected]>

Closes apache#16091 from zsxwing/SPARK-18617-follow-up.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants