-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-18617][CORE][STREAMING] Close "kryo auto pick" feature for Spark Streaming #16052
Conversation
cc @zsxwing |
Test build #69300 has finished for PR 16052 at commit
|
e4416bd
to
d205ebd
Compare
Test build #69303 has started for PR 16052 at commit |
@@ -77,8 +77,8 @@ private[spark] class SerializerManager(defaultSerializer: Serializer, conf: Spar | |||
primitiveAndPrimitiveArrayClassTags.contains(ct) || ct == stringClassTag | |||
} | |||
|
|||
def getSerializer(ct: ClassTag[_]): Serializer = { | |||
if (canUseKryo(ct)) { | |||
def getSerializer(ct: ClassTag[_], autoPick: Boolean): Serializer = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
document what this means
@@ -155,7 +155,12 @@ private[spark] class SerializerManager(defaultSerializer: Serializer, conf: Spar | |||
outputStream: OutputStream, | |||
values: Iterator[T]): Unit = { | |||
val byteStream = new BufferedOutputStream(outputStream) | |||
val ser = getSerializer(implicitly[ClassTag[T]]).newInstance() | |||
val ser = blockId match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this simpler as ...
val autoPick = blockId.isInstanceOf[StreamBlockId]
val ser = getSerializer(implicitly[ClassTag[T]], autoPick).newInstance()
to avoid duplication?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, yours is better.
Test build #69323 has finished for PR 16052 at commit
|
@uncleGen Could you add the following test to StreamingContextSuite? Otherwise, LGTM. test("SPARK-18560 Receiver data should be deserialized properly.") {
// Start a two nodes cluster, so receiver will use one node, and Spark jobs will use the
// other one. Then Spark jobs need to fetch remote blocks and it will trigger SPARK-18560.
val conf = new SparkConf().setMaster("local-cluster[2,1,1024]").setAppName(appName)
sc = new SparkContext(conf)
ssc = new StreamingContext(sc, Milliseconds(100))
val input = ssc.receiverStream(new TestReceiver)
input.count().foreachRDD { rdd =>
// Make sure we can read from BlockRDD
if (rdd.collect().headOption.getOrElse(0L) > 0) {
// Stop StreamingContext to unblock "awaitTerminationOrTimeout"
new Thread() {
setDaemon(true)
override def run(): Unit = {
ssc.stop(stopSparkContext = true, stopGracefully = true)
}
}.start()
}
}
ssc.start()
ssc.awaitTerminationOrTimeout(60000)
} |
OK |
Test build #69380 has finished for PR 16052 at commit
|
Merging in master/branch-2.1. |
…rk Streaming ## What changes were proposed in this pull request? #15992 provided a solution to fix the bug, i.e. **receiver data can not be deserialized properly**. As zsxwing said, it is a critical bug, but we should not break APIs between maintenance releases. It may be a rational choice to close auto pick kryo serializer for Spark Streaming in the first step. I will continue #15992 to optimize the solution. ## How was this patch tested? existing ut Author: uncleGen <[email protected]> Closes #16052 from uncleGen/SPARK-18617. (cherry picked from commit 56c82ed) Signed-off-by: Reynold Xin <[email protected]>
This doesn't merge cleanly into branch-2.0. @uncleGen can you submit a pull request for branch-2.0? |
@@ -869,6 +891,31 @@ object TestReceiver { | |||
val counter = new AtomicInteger(1) | |||
} | |||
|
|||
class FakeByteArrayReceiver extends Receiver[Array[Byte]](StorageLevel.MEMORY_ONLY) with Logging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Why create a new class? Is there any concern to just use TestReceiver?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zsxwing yes, failure occurs when receiver store Array[Byte]
data and the automatic serializer selection would pick JavaSerializer. However, after get from remote executor, the input-stream data will be deserialized with KryoSerializer, leading to the com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 13994
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, existing unit test could cover other cases besides Array[Byte]
type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any class in primitiveAndPrimitiveArrayClassTags can trigger this issue. That's why you can also use the existing TestReceiver. It's Receiver[Int]
.
@rxin OK, I will backport it to branch-2.0 |
…e. Receiver data should be deserialized properly ## What changes were proposed in this pull request? Fixed the potential SparkContext leak in `StreamingContextSuite.SPARK-18560 Receiver data should be deserialized properly` which was added in #16052. I also removed FakeByteArrayReceiver and used TestReceiver directly. ## How was this patch tested? Jenkins Author: Shixiong Zhu <[email protected]> Closes #16091 from zsxwing/SPARK-18617-follow-up. (cherry picked from commit 0a81121) Signed-off-by: Reynold Xin <[email protected]>
…e. Receiver data should be deserialized properly ## What changes were proposed in this pull request? Fixed the potential SparkContext leak in `StreamingContextSuite.SPARK-18560 Receiver data should be deserialized properly` which was added in #16052. I also removed FakeByteArrayReceiver and used TestReceiver directly. ## How was this patch tested? Jenkins Author: Shixiong Zhu <[email protected]> Closes #16091 from zsxwing/SPARK-18617-follow-up.
…e for Spark Streaming ## What changes were proposed in this pull request? This is a follow-up PR to backport #16052 to branch-2.0 with incremental update in #16091 ## How was this patch tested? new unit test cc zsxwing rxin Author: uncleGen <[email protected]> Closes #16096 from uncleGen/branch-2.0-SPARK-18617.
…rk Streaming ## What changes were proposed in this pull request? apache#15992 provided a solution to fix the bug, i.e. **receiver data can not be deserialized properly**. As zsxwing said, it is a critical bug, but we should not break APIs between maintenance releases. It may be a rational choice to close auto pick kryo serializer for Spark Streaming in the first step. I will continue apache#15992 to optimize the solution. ## How was this patch tested? existing ut Author: uncleGen <[email protected]> Closes apache#16052 from uncleGen/SPARK-18617.
…e. Receiver data should be deserialized properly ## What changes were proposed in this pull request? Fixed the potential SparkContext leak in `StreamingContextSuite.SPARK-18560 Receiver data should be deserialized properly` which was added in apache#16052. I also removed FakeByteArrayReceiver and used TestReceiver directly. ## How was this patch tested? Jenkins Author: Shixiong Zhu <[email protected]> Closes apache#16091 from zsxwing/SPARK-18617-follow-up.
…rk Streaming ## What changes were proposed in this pull request? apache#15992 provided a solution to fix the bug, i.e. **receiver data can not be deserialized properly**. As zsxwing said, it is a critical bug, but we should not break APIs between maintenance releases. It may be a rational choice to close auto pick kryo serializer for Spark Streaming in the first step. I will continue apache#15992 to optimize the solution. ## How was this patch tested? existing ut Author: uncleGen <[email protected]> Closes apache#16052 from uncleGen/SPARK-18617.
…e. Receiver data should be deserialized properly ## What changes were proposed in this pull request? Fixed the potential SparkContext leak in `StreamingContextSuite.SPARK-18560 Receiver data should be deserialized properly` which was added in apache#16052. I also removed FakeByteArrayReceiver and used TestReceiver directly. ## How was this patch tested? Jenkins Author: Shixiong Zhu <[email protected]> Closes apache#16091 from zsxwing/SPARK-18617-follow-up.
…rk Streaming ## What changes were proposed in this pull request? apache#15992 provided a solution to fix the bug, i.e. **receiver data can not be deserialized properly**. As zsxwing said, it is a critical bug, but we should not break APIs between maintenance releases. It may be a rational choice to close auto pick kryo serializer for Spark Streaming in the first step. I will continue apache#15992 to optimize the solution. ## How was this patch tested? existing ut Author: uncleGen <[email protected]> Closes apache#16052 from uncleGen/SPARK-18617.
…e. Receiver data should be deserialized properly ## What changes were proposed in this pull request? Fixed the potential SparkContext leak in `StreamingContextSuite.SPARK-18560 Receiver data should be deserialized properly` which was added in apache#16052. I also removed FakeByteArrayReceiver and used TestReceiver directly. ## How was this patch tested? Jenkins Author: Shixiong Zhu <[email protected]> Closes apache#16091 from zsxwing/SPARK-18617-follow-up.
What changes were proposed in this pull request?
#15992 provided a solution to fix the bug, i.e. receiver data can not be deserialized properly. As @zsxwing said, it is a critical bug, but we should not break APIs between maintenance releases. It may be a rational choice to close auto pick kryo serializer for Spark Streaming in the first step. I will continue #15992 to optimize the solution.
How was this patch tested?
existing ut