From 7c5fc28af42daaa6725af083d78c2372f3d0a338 Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Wed, 30 Jul 2014 00:18:59 -0700 Subject: [PATCH] SPARK-2543: Allow user to set maximum Kryo buffer size Author: Koert Kuipers Closes #735 from koertkuipers/feat-kryo-max-buffersize and squashes the following commits: 15f6d81 [Koert Kuipers] change default for spark.kryoserializer.buffer.max.mb to 64mb and add some documentation 1bcc22c [Koert Kuipers] Merge branch 'master' into feat-kryo-max-buffersize 0c9f8eb [Koert Kuipers] make default for kryo max buffer size 16MB 143ec4d [Koert Kuipers] test resizable buffer in kryo Output 0732445 [Koert Kuipers] support setting maxCapacity to something different than capacity in kryo Output --- .../spark/serializer/KryoSerializer.scala | 3 +- .../serializer/KryoSerializerSuite.scala | 30 +++++++++++++++++++ docs/configuration.md | 16 +++++++--- 3 files changed, 44 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index fa79b25759153..e60b802a86a14 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -48,11 +48,12 @@ class KryoSerializer(conf: SparkConf) with Serializable { private val bufferSize = conf.getInt("spark.kryoserializer.buffer.mb", 2) * 1024 * 1024 + private val maxBufferSize = conf.getInt("spark.kryoserializer.buffer.max.mb", 64) * 1024 * 1024 private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true) private val registrationRequired = conf.getBoolean("spark.kryo.registrationRequired", false) private val registrator = conf.getOption("spark.kryo.registrator") - def newKryoOutput() = new KryoOutput(bufferSize) + def newKryoOutput() = new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize)) def newKryo(): Kryo = { val instantiator = new EmptyScalaKryoInstantiator diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index 79280d1a06653..789b773bae316 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -209,6 +209,36 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { } } +class KryoSerializerResizableOutputSuite extends FunSuite { + import org.apache.spark.SparkConf + import org.apache.spark.SparkContext + import org.apache.spark.LocalSparkContext + import org.apache.spark.SparkException + + // trial and error showed this will not serialize with 1mb buffer + val x = (1 to 400000).toArray + + test("kryo without resizable output buffer should fail on large array") { + val conf = new SparkConf(false) + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + conf.set("spark.kryoserializer.buffer.mb", "1") + conf.set("spark.kryoserializer.buffer.max.mb", "1") + val sc = new SparkContext("local", "test", conf) + intercept[SparkException](sc.parallelize(x).collect) + LocalSparkContext.stop(sc) + } + + test("kryo with resizable output buffer should succeed on large array") { + val conf = new SparkConf(false) + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + conf.set("spark.kryoserializer.buffer.mb", "1") + conf.set("spark.kryoserializer.buffer.max.mb", "2") + val sc = new SparkContext("local", "test", conf) + assert(sc.parallelize(x).collect === x) + LocalSparkContext.stop(sc) + } +} + object KryoTest { case class CaseClass(i: Int, s: String) {} diff --git a/docs/configuration.md b/docs/configuration.md index 2e6c85cc2bcca..ea69057b5be10 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -414,10 +414,18 @@ Apart from these, the following properties are also available, and may be useful spark.kryoserializer.buffer.mb 2 - Maximum object size to allow within Kryo (the library needs to create a buffer at least as - large as the largest single object you'll serialize). Increase this if you get a "buffer limit - exceeded" exception inside Kryo. Note that there will be one buffer per core on each - worker. + Initial size of Kryo's serialization buffer, in megabytes. Note that there will be one buffer + per core on each worker. This buffer will grow up to + spark.kryoserializer.buffer.max.mb if needed. + + + + spark.kryoserializer.buffer.max.mb + 64 + + Maximum allowable size of Kryo serialization buffer, in megabytes. This must be larger than any + object you attempt to serialize. Increase this if you get a "buffer limit exceeded" exception + inside Kryo.