From c0cf32988d5c77655c09e9c798bbb49cb8b68250 Mon Sep 17 00:00:00 2001 From: Joseph Batchik Date: Thu, 16 Jul 2015 10:15:21 -0700 Subject: [PATCH] implemented @squito suggestion for SparkEnv --- .../spark/serializer/GenericAvroSerializer.scala | 12 +++++------- .../serializer/GenericAvroSerializerSuite.scala | 1 - 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala index 5d90ff10ed9c8..3105bc7fbf386 100644 --- a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala @@ -20,20 +20,17 @@ package org.apache.spark.serializer import java.io.{ByteArrayInputStream, ByteArrayOutputStream} import java.nio.ByteBuffer -import org.apache.spark.SparkConf -import org.apache.spark.io.CompressionCodec - import scala.collection.mutable -import org.apache.commons.io.IOUtils import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer} import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} -import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream} - import org.apache.avro.{Schema, SchemaNormalization} import org.apache.avro.generic.{GenericData, GenericRecord} import org.apache.avro.io._ +import org.apache.commons.io.IOUtils +import org.apache.spark.SparkEnv +import org.apache.spark.io.CompressionCodec /** * Custom serializer used for generic Avro records. If the user registers the schemas @@ -57,7 +54,8 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) private val fingerprintCache = new mutable.HashMap[Schema, Long]() private val schemaCache = new mutable.HashMap[Long, Schema]() - private val codec = CompressionCodec.createCodec(new SparkConf()) + /** This needs to be lazy since SparkEnv is not initialized yet sometimes when this is called */ + private lazy val codec = CompressionCodec.createCodec(SparkEnv.get.conf) /** * Used to compress Schemas when they are being sent over the wire. diff --git a/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala index b7f72da666fef..bc9f3708ed69d 100644 --- a/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala @@ -23,7 +23,6 @@ import java.nio.ByteBuffer import com.esotericsoftware.kryo.io.{Output, Input} import org.apache.avro.{SchemaBuilder, Schema} import org.apache.avro.generic.GenericData.Record -import org.apache.spark.io.CompressionCodec import org.apache.spark.{SparkFunSuite, SharedSparkContext}