From 5d4bfb5fc06f3dcc68f2eb862679592029f98e77 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Sun, 3 Aug 2014 16:45:59 -0700 Subject: [PATCH] Make objectStreamReset counter count the last object written too This makes it precise -- before we'd only reset after (reset + 1) writes --- .../scala/org/apache/spark/serializer/JavaSerializer.scala | 5 ++--- .../spark/util/collection/ExternalAppendOnlyMapSuite.scala | 2 +- .../apache/spark/util/collection/ExternalSorterSuite.scala | 2 +- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala index eec953966d5bb..34bc3124097bb 100644 --- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala @@ -40,11 +40,10 @@ private[spark] class JavaSerializationStream(out: OutputStream, counterReset: In */ def writeObject[T: ClassTag](t: T): SerializationStream = { objOut.writeObject(t) - if (counterReset >= 0 && counter >= counterReset) { + counter += 1 + if (counterReset > 0 && counter >= counterReset) { objOut.reset() counter = 0 - } else { - counter += 1 } this } diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index 67fbd337341ca..04d7338488628 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -34,7 +34,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext { val conf = new SparkConf(loadDefaults) // Make the Java serializer write a reset instruction (TC_RESET) after each object to test // for a bug we had with bytes written past the last object in a batch (SPARK-2792) - conf.set("spark.serializer.objectStreamReset", "0") + conf.set("spark.serializer.objectStreamReset", "1") conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer") // Ensure that we actually have multiple batches per spill file conf.set("spark.shuffle.spill.batchSize", "10") diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala index 3a7a19bf1f321..57dcb4ffabac1 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala @@ -29,7 +29,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext { val conf = new SparkConf(loadDefaults) // Make the Java serializer write a reset instruction (TC_RESET) after each object to test // for a bug we had with bytes written past the last object in a batch (SPARK-2792) - conf.set("spark.serializer.objectStreamReset", "0") + conf.set("spark.serializer.objectStreamReset", "1") conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer") // Ensure that we actually have multiple batches per spill file conf.set("spark.shuffle.spill.batchSize", "10")