diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 5286f7b4c211a..e62f3415b4a5e 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -46,10 +46,11 @@ class KryoSerializer(conf: SparkConf) with Serializable { private val bufferSize = conf.getInt("spark.kryoserializer.buffer.mb", 2) * 1024 * 1024 + private val maxBufferSize = conf.getInt("spark.kryoserializer.buffer.max.mb", 64) * 1024 * 1024 private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true) private val registrator = conf.getOption("spark.kryo.registrator") - def newKryoOutput() = new KryoOutput(bufferSize) + def newKryoOutput() = new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize)) def newKryo(): Kryo = { val instantiator = new EmptyScalaKryoInstantiator diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index cdd6b3d8feed7..5d10f10ad1f63 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -194,6 +194,36 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { } } +class KryoSerializerResizableOutputSuite extends FunSuite { + import org.apache.spark.SparkConf + import org.apache.spark.SparkContext + import org.apache.spark.LocalSparkContext + import org.apache.spark.SparkException + + // trial and error showed this will not serialize with 1mb buffer + val x = (1 to 400000).toArray + + test("kryo without resizable output buffer should fail on large array") { + val conf = new SparkConf(false) + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + conf.set("spark.kryoserializer.buffer.mb", "1") + conf.set("spark.kryoserializer.buffer.max.mb", "1") + val sc = new SparkContext("local", "test", conf) + intercept[SparkException](sc.parallelize(x).collect) + LocalSparkContext.stop(sc) + } + + test("kryo with resizable output buffer should succeed on large array") { + val conf = new SparkConf(false) + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + conf.set("spark.kryoserializer.buffer.mb", "1") + conf.set("spark.kryoserializer.buffer.max.mb", "2") + val sc = new SparkContext("local", "test", conf) + assert(sc.parallelize(x).collect === x) + LocalSparkContext.stop(sc) + } +} + object KryoTest { case class CaseClass(i: Int, s: String) {} diff --git a/docs/configuration.md b/docs/configuration.md index b84104cc7e653..c7949c0a115b9 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -385,10 +385,17 @@ Apart from these, the following properties are also available, and may be useful spark.kryoserializer.buffer.mb 2 - Maximum object size to allow within Kryo (the library needs to create a buffer at least as - large as the largest single object you'll serialize). Increase this if you get a "buffer limit - exceeded" exception inside Kryo. Note that there will be one buffer per core on each - worker. + Object size to allow within Kryo using default (pre-allocated) buffers (the library needs to create + a buffer at least as large as the largest single object you'll serialize). Note that there will be + one buffer per core on each worker. + + + + spark.kryoserializer.buffer.max.mb + 64 + + Maximum object size to allow within Kryo by resizing buffers as needed (which has some overhead). + Increase this if you get a "buffer limit exceeded" exception inside Kryo.