From a5d965ca5b883dd015cd166150ab2826156edc41 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 2 Dec 2015 10:05:37 -0800 Subject: [PATCH] Fix GenericAvroSerializer and address comments --- .../apache/spark/serializer/GenericAvroSerializer.scala | 8 +++++--- .../datasources/parquet/UnsafeRowParquetRecordReader.java | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala index ce196e4aabbf5..8d6af9cae8927 100644 --- a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala @@ -17,7 +17,7 @@ package org.apache.spark.serializer -import java.io.ByteArrayOutputStream +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} import java.nio.ByteBuffer import scala.collection.mutable @@ -31,7 +31,6 @@ import org.apache.commons.io.IOUtils import org.apache.spark.{SparkException, SparkEnv} import org.apache.spark.io.CompressionCodec -import org.apache.spark.util.ByteBufferInputStream /** * Custom serializer used for generic Avro records. If the user registers the schemas @@ -82,7 +81,10 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) * seen values so to limit the number of times that decompression has to be done. */ def decompress(schemaBytes: ByteBuffer): Schema = decompressCache.getOrElseUpdate(schemaBytes, { - val bis = new ByteBufferInputStream(schemaBytes) + val bis = new ByteArrayInputStream( + schemaBytes.array(), + schemaBytes.arrayOffset() + schemaBytes.position(), + schemaBytes.remaining()) val bytes = IOUtils.toByteArray(codec.compressedInputStream(bis)) new Schema.Parser().parse(new String(bytes, "UTF-8")) }) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java index 34edcb4ae62d5..0cc4566c9cdde 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java @@ -332,7 +332,7 @@ private void decodeBinaryBatch(int col, int num) throws IOException { for (int n = 0; n < num; ++n) { if (columnReaders[col].next()) { ByteBuffer bytes = columnReaders[col].nextBinary().toByteBuffer(); - int len = bytes.limit() - bytes.position(); + int len = bytes.remaining(); if (originalTypes[col] == OriginalType.UTF8) { UTF8String str = UTF8String.fromBytes(bytes.array(), bytes.arrayOffset() + bytes.position(), len);