diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala index cdf2bc68d9c5e..c76958b91a150 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala @@ -24,6 +24,7 @@ import scala.math._ import scala.math.Numeric.{FloatAsIfIntegral, DoubleAsIfIntegral} import scala.reflect.ClassTag import scala.reflect.runtime.universe.{TypeTag, runtimeMirror, typeTag} +import scala.util.Try import scala.util.parsing.combinator.RegexParsers import org.json4s._ @@ -39,6 +40,10 @@ import org.apache.spark.util.Utils object DataType { + private[sql] def fromString(raw: String) = { + Try(DataType.fromJson(raw)).getOrElse(DataType.fromCaseClassString(raw)) + } + def fromJson(json: String): DataType = parseDataType(parse(json)) private object JSortedObject { @@ -887,6 +892,11 @@ case class StructField( object StructType { + private[sql] def fromString(raw: String): StructType = DataType.fromString(raw) match { + case t: StructType => t + case _ => throw new RuntimeException(s"Failed parsing StructType: $raw") + } + protected[sql] def fromAttributes(attributes: Seq[Attribute]): StructType = StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable, a.metadata))) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala index 43ca359b51735..02defc2a4dcc1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala @@ -18,480 +18,406 @@ package org.apache.spark.sql.parquet import java.sql.Timestamp -import java.util.{TimeZone, Calendar} +import java.util.{Calendar, TimeZone} -import scala.collection.mutable.{Buffer, ArrayBuffer, HashMap} +import scala.collection.JavaConversions._ +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer import jodd.datetime.JDateTime import parquet.column.Dictionary -import parquet.io.api.{PrimitiveConverter, GroupConverter, Binary, Converter} -import parquet.schema.MessageType +import parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter} +import parquet.schema.{GroupType, PrimitiveType => ParquetPrimitiveType, Type} import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.parquet.CatalystConverter.FieldType -import org.apache.spark.sql.types._ import org.apache.spark.sql.parquet.timestamp.NanoTime +import org.apache.spark.sql.types._ /** - * Collection of converters of Parquet types (group and primitive types) that - * model arrays and maps. The conversions are partly based on the AvroParquet - * converters that are part of Parquet in order to be able to process these - * types. - * - * There are several types of converters: - * + * A [[ParentContainerUpdater]] is used by a Parquet converter to set converted values to some + * corresponding parent container. For example, a converter for a `Struct` field may set converted + * values to a [[MutableRow]]; or a converter for array element may append converted values to an + * [[ArrayBuffer]]. */ - -private[sql] object CatalystConverter { - // The type internally used for fields - type FieldType = StructField - - // This is mostly Parquet convention (see, e.g., `ConversionPatterns`). - // Note that "array" for the array elements is chosen by ParquetAvro. - // Using a different value will result in Parquet silently dropping columns. - val ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME = "bag" - val ARRAY_ELEMENTS_SCHEMA_NAME = "array" - // SPARK-4520: Thrift generated parquet files have different array element - // schema names than avro. Thrift parquet uses array_schema_name + "_tuple" - // as opposed to "array" used by default. For more information, check - // TestThriftSchemaConverter.java in parquet.thrift. - val THRIFT_ARRAY_ELEMENTS_SCHEMA_NAME_SUFFIX = "_tuple" - val MAP_KEY_SCHEMA_NAME = "key" - val MAP_VALUE_SCHEMA_NAME = "value" - val MAP_SCHEMA_NAME = "map" - - // TODO: consider using Array[T] for arrays to avoid boxing of primitive types - type ArrayScalaType[T] = Seq[T] - type StructScalaType[T] = Row - type MapScalaType[K, V] = Map[K, V] - - protected[parquet] def createConverter( - field: FieldType, - fieldIndex: Int, - parent: CatalystConverter): Converter = { - val fieldType: DataType = field.dataType - fieldType match { - case udt: UserDefinedType[_] => { - createConverter(field.copy(dataType = udt.sqlType), fieldIndex, parent) - } - // For native JVM types we use a converter with native arrays - case ArrayType(elementType: NativeType, false) => { - new CatalystNativeArrayConverter(elementType, fieldIndex, parent) - } - // This is for other types of arrays, including those with nested fields - case ArrayType(elementType: DataType, false) => { - new CatalystArrayConverter(elementType, fieldIndex, parent) - } - case ArrayType(elementType: DataType, true) => { - new CatalystArrayContainsNullConverter(elementType, fieldIndex, parent) - } - case StructType(fields: Array[StructField]) => { - new CatalystStructConverter(fields, fieldIndex, parent) - } - case MapType(keyType: DataType, valueType: DataType, valueContainsNull: Boolean) => { - new CatalystMapConverter( - Array( - new FieldType(MAP_KEY_SCHEMA_NAME, keyType, false), - new FieldType(MAP_VALUE_SCHEMA_NAME, valueType, valueContainsNull)), - fieldIndex, - parent) - } - // Strings, Shorts and Bytes do not have a corresponding type in Parquet - // so we need to treat them separately - case StringType => - new CatalystPrimitiveStringConverter(parent, fieldIndex) - case ShortType => { - new CatalystPrimitiveConverter(parent, fieldIndex) { - override def addInt(value: Int): Unit = - parent.updateShort(fieldIndex, value.asInstanceOf[ShortType.JvmType]) - } - } - case ByteType => { - new CatalystPrimitiveConverter(parent, fieldIndex) { - override def addInt(value: Int): Unit = - parent.updateByte(fieldIndex, value.asInstanceOf[ByteType.JvmType]) - } - } - case DateType => { - new CatalystPrimitiveConverter(parent, fieldIndex) { - override def addInt(value: Int): Unit = - parent.updateDate(fieldIndex, value.asInstanceOf[DateType.JvmType]) - } - } - case d: DecimalType => { - new CatalystPrimitiveConverter(parent, fieldIndex) { - override def addBinary(value: Binary): Unit = - parent.updateDecimal(fieldIndex, value, d) - } - } - case TimestampType => { - new CatalystPrimitiveConverter(parent, fieldIndex) { - override def addBinary(value: Binary): Unit = - parent.updateTimestamp(fieldIndex, value) - } - } - // All other primitive types use the default converter - case ctype: PrimitiveType => { // note: need the type tag here! - new CatalystPrimitiveConverter(parent, fieldIndex) - } - case _ => throw new RuntimeException( - s"unable to convert datatype ${field.dataType.toString} in CatalystConverter") - } - } - - protected[parquet] def createRootConverter( - parquetSchema: MessageType, - attributes: Seq[Attribute]): CatalystConverter = { - // For non-nested types we use the optimized Row converter - if (attributes.forall(a => ParquetTypesConverter.isPrimitiveType(a.dataType))) { - new CatalystPrimitiveRowConverter(attributes.toArray) - } else { - new CatalystGroupConverter(attributes.toArray) - } - } +private[parquet] trait ParentContainerUpdater { + def set(value: Any): Unit = () + def setBoolean(value: Boolean): Unit = set(value) + def setByte(value: Byte): Unit = set(value) + def setShort(value: Short): Unit = set(value) + def setInt(value: Int): Unit = set(value) + def setLong(value: Long): Unit = set(value) + def setFloat(value: Float): Unit = set(value) + def setDouble(value: Double): Unit = set(value) } -private[parquet] abstract class CatalystConverter extends GroupConverter { - /** - * The number of fields this group has - */ - protected[parquet] val size: Int - - /** - * The index of this converter in the parent - */ - protected[parquet] val index: Int +/** A no-op updater used for root converter (who doesn't have a parent). */ +private[parquet] object NoopUpdater extends ParentContainerUpdater - /** - * The parent converter - */ - protected[parquet] val parent: CatalystConverter +/** + * This Parquet converter converts Parquet records to Spark SQL rows. + * + * @param parquetType Parquet type of Parquet records + * @param sparkType Spark SQL schema that corresponds to the Parquet record type + * @param updater An updater which takes care of the converted row object + */ +private[parquet] class CatalystRowConverter( + parquetType: GroupType, + sparkType: StructType, + updater: ParentContainerUpdater) + extends GroupConverter { /** - * Called by child converters to update their value in its parent (this). - * Note that if possible the more specific update methods below should be used - * to avoid auto-boxing of native JVM types. - * - * @param fieldIndex - * @param value + * Updater used together with [[CatalystRowConverter]]. It sets converted filed values to the + * `ordinal`-th cell in `row`. */ - protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit + private final class RowUpdater(row: MutableRow, ordinal: Int) extends ParentContainerUpdater { + override def set(value: Any): Unit = row(ordinal) = value + override def setBoolean(value: Boolean): Unit = row.setBoolean(ordinal, value) + override def setByte(value: Byte): Unit = row.setByte(ordinal, value) + override def setShort(value: Short): Unit = row.setShort(ordinal, value) + override def setInt(value: Int): Unit = row.setInt(ordinal, value) + override def setLong(value: Long): Unit = row.setLong(ordinal, value) + override def setDouble(value: Double): Unit = row.setDouble(ordinal, value) + override def setFloat(value: Float): Unit = row.setFloat(ordinal, value) + } + + /** Represents the converted row object once an entire Parquet record is converted. */ + val currentRow = new SpecificMutableRow(sparkType.map(_.dataType)) + + // Converters for each field. + private val fieldConverters: Array[Converter] = { + parquetType.getFields.zip(sparkType).zipWithIndex.map { + case ((parquetFieldType, sparkField), ordinal) => + // Converted field value should be set to the `ordinal`-th cell of `currentRow` + newConverter(parquetFieldType, sparkField.dataType, new RowUpdater(currentRow, ordinal)) + }.toArray + } - protected[parquet] def updateBoolean(fieldIndex: Int, value: Boolean): Unit = - updateField(fieldIndex, value) + override def getConverter(fieldIndex: Int): Converter = fieldConverters(fieldIndex) - protected[parquet] def updateInt(fieldIndex: Int, value: Int): Unit = - updateField(fieldIndex, value) + override def end(): Unit = updater.set(currentRow) - protected[parquet] def updateDate(fieldIndex: Int, value: Int): Unit = - updateField(fieldIndex, value) + override def start(): Unit = { + var i = 0 + while (i < currentRow.length) { + currentRow.setNullAt(i) + i += 1 + } + } - protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit = - updateField(fieldIndex, value) + /** + * Creates a converter for the given Parquet type `parquetType` and Spark SQL data type + * `sparkType`. Converted values are handled by `updater`. + */ + private def newConverter( + parquetType: Type, + sparkType: DataType, + updater: ParentContainerUpdater): Converter = { - protected[parquet] def updateShort(fieldIndex: Int, value: Short): Unit = - updateField(fieldIndex, value) + sparkType match { + case BooleanType | IntegerType | LongType | FloatType | DoubleType | BinaryType => + new CatalystPrimitiveConverter(updater) - protected[parquet] def updateByte(fieldIndex: Int, value: Byte): Unit = - updateField(fieldIndex, value) + case ByteType => + new PrimitiveConverter { + override def addInt(value: Int): Unit = + updater.setByte(value.asInstanceOf[ByteType#JvmType]) + } - protected[parquet] def updateDouble(fieldIndex: Int, value: Double): Unit = - updateField(fieldIndex, value) + case ShortType => + new PrimitiveConverter { + override def addInt(value: Int): Unit = + updater.setShort(value.asInstanceOf[ShortType#JvmType]) + } - protected[parquet] def updateFloat(fieldIndex: Int, value: Float): Unit = - updateField(fieldIndex, value) + case t: DecimalType => + new CatalystDecimalConverter(t, updater) - protected[parquet] def updateBinary(fieldIndex: Int, value: Binary): Unit = - updateField(fieldIndex, value.getBytes) + case StringType => + new CatalystStringConverter(updater) - protected[parquet] def updateString(fieldIndex: Int, value: String): Unit = - updateField(fieldIndex, value) + case TimestampType => + new PrimitiveConverter { + override def addBinary(value: Binary): Unit = { + updater.set(CatalystTimestampConverter.convertToTimestamp(value)) + } + } - protected[parquet] def updateTimestamp(fieldIndex: Int, value: Binary): Unit = - updateField(fieldIndex, readTimestamp(value)) + case DateType => + new PrimitiveConverter { + override def addInt(value: Int): Unit = { + // DateType is not specialized in `SpecificMutableRow`, have to box it here. + updater.set(value.asInstanceOf[DateType#JvmType]) + } + } - protected[parquet] def updateDecimal(fieldIndex: Int, value: Binary, ctype: DecimalType): Unit = - updateField(fieldIndex, readDecimal(new Decimal(), value, ctype)) + case t: ArrayType => + new CatalystArrayConverter(parquetType.asGroupType(), t, updater) - protected[parquet] def isRootConverter: Boolean = parent == null + case t: MapType => + new CatalystMapConverter(parquetType.asGroupType(), t, updater) - protected[parquet] def clearBuffer(): Unit + case t: StructType => + new CatalystRowConverter(parquetType.asGroupType(), t, updater) - /** - * Should only be called in the root (group) converter! - * - * @return - */ - def getCurrentRecord: Row = throw new UnsupportedOperationException + case t: UserDefinedType[_] => + val sparkTypeForUDT = t.sqlType + val parquetTypeForUDT = ParquetTypesConverter.fromDataType(sparkTypeForUDT, "") + newConverter(parquetTypeForUDT, sparkTypeForUDT, updater) - /** - * Read a decimal value from a Parquet Binary into "dest". Only supports decimals that fit in - * a long (i.e. precision <= 18) - */ - protected[parquet] def readDecimal(dest: Decimal, value: Binary, ctype: DecimalType): Unit = { - val precision = ctype.precisionInfo.get.precision - val scale = ctype.precisionInfo.get.scale - val bytes = value.getBytes - require(bytes.length <= 16, "Decimal field too large to read") - var unscaled = 0L - var i = 0 - while (i < bytes.length) { - unscaled = (unscaled << 8) | (bytes(i) & 0xFF) - i += 1 + case _ => + throw new RuntimeException( + s"Unable to create Parquet converter for data type ${sparkType.json}") } - // Make sure unscaled has the right sign, by sign-extending the first bit - val numBits = 8 * bytes.length - unscaled = (unscaled << (64 - numBits)) >> (64 - numBits) - dest.set(unscaled, precision, scale) } /** - * Read a Timestamp value from a Parquet Int96Value + * Parquet converter for Parquet primitive types. Note that not all Spark SQL primitive types + * are handled by this converter. Parquet primitive types are only a subset of those of Spark + * SQL. For example, BYTE, SHORT and INT in Spark SQL are all covered by INT32 in Parquet. */ - protected[parquet] def readTimestamp(value: Binary): Timestamp = { - CatalystTimestampConverter.convertToTimestamp(value) - } -} + private final class CatalystPrimitiveConverter(updater: ParentContainerUpdater) + extends PrimitiveConverter { -/** - * A `parquet.io.api.GroupConverter` that is able to convert a Parquet record - * to a [[org.apache.spark.sql.catalyst.expressions.Row]] object. - * - * @param schema The corresponding Catalyst schema in the form of a list of attributes. - */ -private[parquet] class CatalystGroupConverter( - protected[parquet] val schema: Array[FieldType], - protected[parquet] val index: Int, - protected[parquet] val parent: CatalystConverter, - protected[parquet] var current: ArrayBuffer[Any], - protected[parquet] var buffer: ArrayBuffer[Row]) - extends CatalystConverter { - - def this(schema: Array[FieldType], index: Int, parent: CatalystConverter) = - this( - schema, - index, - parent, - current = null, - buffer = new ArrayBuffer[Row]( - CatalystArrayConverter.INITIAL_ARRAY_SIZE)) + override def addBoolean(value: Boolean): Unit = updater.setBoolean(value) + override def addInt(value: Int): Unit = updater.setInt(value) + override def addLong(value: Long): Unit = updater.setLong(value) + override def addFloat(value: Float): Unit = updater.setFloat(value) + override def addDouble(value: Double): Unit = updater.setDouble(value) + override def addBinary(value: Binary): Unit = updater.set(value.getBytes) + } /** - * This constructor is used for the root converter only! + * Parquet converter for strings. A dictionary is used to minimize string decoding cost. */ - def this(attributes: Array[Attribute]) = - this(attributes.map(a => new FieldType(a.name, a.dataType, a.nullable)), 0, null) - - protected [parquet] val converters: Array[Converter] = - schema.zipWithIndex.map { - case (field, idx) => CatalystConverter.createConverter(field, idx, this) - }.toArray - - override val size = schema.size + private final class CatalystStringConverter(updater: ParentContainerUpdater) + extends PrimitiveConverter { - override def getCurrentRecord: Row = { - assert(isRootConverter, "getCurrentRecord should only be called in root group converter!") - // TODO: use iterators if possible - // Note: this will ever only be called in the root converter when the record has been - // fully processed. Therefore it will be difficult to use mutable rows instead, since - // any non-root converter never would be sure when it would be safe to re-use the buffer. - new GenericRow(current.toArray) - } - - override def getConverter(fieldIndex: Int): Converter = converters(fieldIndex) - - // for child converters to update upstream values - override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = { - current.update(fieldIndex, value) - } + private var expandedDictionary: Array[String] = null - override protected[parquet] def clearBuffer(): Unit = buffer.clear() + override def hasDictionarySupport: Boolean = true - override def start(): Unit = { - current = ArrayBuffer.fill(size)(null) - converters.foreach { - converter => if (!converter.isPrimitive) { - converter.asInstanceOf[CatalystConverter].clearBuffer + override def setDictionary(dictionary: Dictionary): Unit = { + this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { + dictionary.decodeToBinary(_).toStringUsingUTF8 } } - } - override def end(): Unit = { - if (!isRootConverter) { - assert(current != null) // there should be no empty groups - buffer.append(new GenericRow(current.toArray)) - parent.updateField(index, new GenericRow(buffer.toArray.asInstanceOf[Array[Any]])) + override def addValueFromDictionary(dictionaryId: Int): Unit = { + updater.set(expandedDictionary(dictionaryId)) } - } -} -/** - * A `parquet.io.api.GroupConverter` that is able to convert a Parquet record - * to a [[org.apache.spark.sql.catalyst.expressions.Row]] object. Note that his - * converter is optimized for rows of primitive types (non-nested records). - */ -private[parquet] class CatalystPrimitiveRowConverter( - protected[parquet] val schema: Array[FieldType], - protected[parquet] var current: MutableRow) - extends CatalystConverter { - - // This constructor is used for the root converter only - def this(attributes: Array[Attribute]) = - this( - attributes.map(a => new FieldType(a.name, a.dataType, a.nullable)), - new SpecificMutableRow(attributes.map(_.dataType))) - - protected [parquet] val converters: Array[Converter] = - schema.zipWithIndex.map { - case (field, idx) => CatalystConverter.createConverter(field, idx, this) - }.toArray - - override val size = schema.size + override def addBinary(value: Binary): Unit = updater.set(value.toStringUsingUTF8) + } - override val index = 0 + /** + * Parquet converter for fixed-precision decimals. + * + * @todo Handle fixed-precision decimals stored as INT32 and INT64 + */ + private final class CatalystDecimalConverter( + decimalType: DecimalType, + updater: ParentContainerUpdater) + extends PrimitiveConverter { - override val parent = null + override def addBinary(value: Binary): Unit = { + updater.set(toDecimal(value)) + } - // Should be only called in root group converter! - override def getCurrentRecord: Row = current + private def toDecimal(value: Binary): Decimal = { + val precision = decimalType.precision + val scale = decimalType.scale + val bytes = value.getBytes - override def getConverter(fieldIndex: Int): Converter = converters(fieldIndex) + require(bytes.length <= 16, "Decimal field too large to read") - // for child converters to update upstream values - override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = { - throw new UnsupportedOperationException // child converters should use the - // specific update methods below - } + var unscaled = 0L + var i = 0 - override protected[parquet] def clearBuffer(): Unit = {} + while (i < bytes.length) { + unscaled = (unscaled << 8) | (bytes(i) & 0xff) + i += 1 + } - override def start(): Unit = { - var i = 0 - while (i < size) { - current.setNullAt(i) - i = i + 1 + val bits = 8 * bytes.length + unscaled = (unscaled << (64 - bits)) >> (64 - bits) + Decimal(unscaled, precision, scale) } } - override def end(): Unit = {} - - // Overridden here to avoid auto-boxing for primitive types - override protected[parquet] def updateBoolean(fieldIndex: Int, value: Boolean): Unit = - current.setBoolean(fieldIndex, value) - - override protected[parquet] def updateInt(fieldIndex: Int, value: Int): Unit = - current.setInt(fieldIndex, value) + /** + * Parquet converter for arrays. Spark SQL arrays are represented as Parquet lists. Standard + * Parquet lists are represented as a 3-level group annotated by `LIST`: + * {{{ + * group (LIST) { <-- parquetSchema points here + * repeated group list { + * element; + * } + * } + * }}} + * The `parquetSchema` constructor argument points to the outermost group. + * + * However, before this representation is standardized, some Parquet libraries/tools also use some + * non-standard formats to represent list-like structures. Backwards-compatibility rules for + * handling these cases are described in Parquet format spec. + * + * @see https://github.com/apache/incubator-parquet-format/blob/master/LogicalTypes.md#lists + */ + private final class CatalystArrayConverter( + parquetSchema: GroupType, + sparkSchema: ArrayType, + updater: ParentContainerUpdater) + extends GroupConverter { + + // TODO This is slow! Needs specialization. + private val currentArray = ArrayBuffer.empty[Any] + + private val elementConverter: Converter = { + val repeatedType = parquetSchema.getType(0) + val elementType = sparkSchema.elementType + + if (isElementType(repeatedType, elementType)) { + newConverter(repeatedType, elementType, new ParentContainerUpdater { + override def set(value: Any): Unit = currentArray += value + }) + } else { + new ElementConverter(repeatedType.asGroupType().getType(0), elementType) + } + } - override protected[parquet] def updateDate(fieldIndex: Int, value: Int): Unit = - current.update(fieldIndex, value) + override def getConverter(fieldIndex: Int): Converter = elementConverter + + override def end(): Unit = updater.set(currentArray) + + override def start(): Unit = currentArray.clear() + + // scalastyle:off + /** + * Returns whether the given type is the element type of a list or is a syntactic group with + * one field that is the element type. This is determined by checking whether the type can be + * a syntactic group and by checking whether a potential syntactic group matches the expected + * schema. + * {{{ + * group (LIST) { + * repeated group list { <-- repeatedType points here + * element; + * } + * } + * }}} + * In short, here we handle Parquet list backwards-compatibility rules on the read path. This + * method is based on `AvroIndexedRecordConverter.isElementType`. + * + * @see https://github.com/apache/incubator-parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules + */ + // scalastyle:on + private def isElementType(repeatedType: Type, elementType: DataType): Boolean = { + (repeatedType, elementType) match { + case (t: ParquetPrimitiveType, _) => true + case (t: GroupType, _) if t.getFieldCount > 1 => true + case (t: GroupType, StructType(Array(f))) if f.name == t.getFieldName(0) => true + case _ => false + } + } - override protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit = - current.setLong(fieldIndex, value) + /** Array element converter */ + private final class ElementConverter(parquetType: Type, sparkType: DataType) + extends GroupConverter { - override protected[parquet] def updateShort(fieldIndex: Int, value: Short): Unit = - current.setShort(fieldIndex, value) + private var currentElement: Any = _ - override protected[parquet] def updateByte(fieldIndex: Int, value: Byte): Unit = - current.setByte(fieldIndex, value) + private val converter = newConverter(parquetType, sparkType, new ParentContainerUpdater { + override def set(value: Any): Unit = currentElement = value + }) - override protected[parquet] def updateDouble(fieldIndex: Int, value: Double): Unit = - current.setDouble(fieldIndex, value) + override def getConverter(fieldIndex: Int): Converter = converter - override protected[parquet] def updateFloat(fieldIndex: Int, value: Float): Unit = - current.setFloat(fieldIndex, value) + override def end(): Unit = currentArray += currentElement - override protected[parquet] def updateBinary(fieldIndex: Int, value: Binary): Unit = - current.update(fieldIndex, value.getBytes) + override def start(): Unit = currentElement = null + } + } - override protected[parquet] def updateString(fieldIndex: Int, value: String): Unit = - current.setString(fieldIndex, value) + /** Parquet converter for maps */ + private final class CatalystMapConverter( + parquetType: GroupType, + sparkType: MapType, + updater: ParentContainerUpdater) + extends GroupConverter { - override protected[parquet] def updateTimestamp(fieldIndex: Int, value: Binary): Unit = - current.update(fieldIndex, readTimestamp(value)) + private val currentMap = mutable.Map.empty[Any, Any] - override protected[parquet] def updateDecimal( - fieldIndex: Int, value: Binary, ctype: DecimalType): Unit = { - var decimal = current(fieldIndex).asInstanceOf[Decimal] - if (decimal == null) { - decimal = new Decimal - current(fieldIndex) = decimal + private val keyValueConverter = { + val repeatedType = parquetType.getType(0).asGroupType() + new KeyValueConverter( + repeatedType.getType(0), + repeatedType.getType(1), + sparkType.keyType, + sparkType.valueType) } - readDecimal(decimal, value, ctype) - } -} -/** - * A `parquet.io.api.PrimitiveConverter` that converts Parquet types to Catalyst types. - * - * @param parent The parent group converter. - * @param fieldIndex The index inside the record. - */ -private[parquet] class CatalystPrimitiveConverter( - parent: CatalystConverter, - fieldIndex: Int) extends PrimitiveConverter { - override def addBinary(value: Binary): Unit = - parent.updateBinary(fieldIndex, value) + override def getConverter(fieldIndex: Int): Converter = keyValueConverter - override def addBoolean(value: Boolean): Unit = - parent.updateBoolean(fieldIndex, value) + override def end(): Unit = updater.set(currentMap) - override def addDouble(value: Double): Unit = - parent.updateDouble(fieldIndex, value) + override def start(): Unit = currentMap.clear() - override def addFloat(value: Float): Unit = - parent.updateFloat(fieldIndex, value) + /** Parquet converter for key-value pairs within the map. */ + private final class KeyValueConverter( + parquetKeyType: Type, + parquetValueType: Type, + sparkKeyType: DataType, + sparkValueType: DataType) + extends GroupConverter { - override def addInt(value: Int): Unit = - parent.updateInt(fieldIndex, value) + private var currentKey: Any = _ - override def addLong(value: Long): Unit = - parent.updateLong(fieldIndex, value) -} + private var currentValue: Any = _ -/** - * A `parquet.io.api.PrimitiveConverter` that converts Parquet Binary to Catalyst String. - * Supports dictionaries to reduce Binary to String conversion overhead. - * - * Follows pattern in Parquet of using dictionaries, where supported, for String conversion. - * - * @param parent The parent group converter. - * @param fieldIndex The index inside the record. - */ -private[parquet] class CatalystPrimitiveStringConverter(parent: CatalystConverter, fieldIndex: Int) - extends CatalystPrimitiveConverter(parent, fieldIndex) { + private val converters = Array( + // Converter for keys + newConverter(parquetKeyType, sparkKeyType, new ParentContainerUpdater { + override def set(value: Any): Unit = currentKey = value + }), - private[this] var dict: Array[String] = null + // Converter for values + newConverter(parquetValueType, sparkValueType, new ParentContainerUpdater { + override def set(value: Any): Unit = currentValue = value + })) - override def hasDictionarySupport: Boolean = true + override def getConverter(fieldIndex: Int): Converter = converters(fieldIndex) - override def setDictionary(dictionary: Dictionary):Unit = - dict = Array.tabulate(dictionary.getMaxId + 1) {dictionary.decodeToBinary(_).toStringUsingUTF8} + override def end(): Unit = currentMap(currentKey) = currentValue + override def start(): Unit = { + currentKey = null + currentValue = null + } + } + } +} - override def addValueFromDictionary(dictionaryId: Int): Unit = - parent.updateString(fieldIndex, dict(dictionaryId)) +private[sql] object CatalystConverter { + // The type internally used for fields + type FieldType = StructField - override def addBinary(value: Binary): Unit = - parent.updateString(fieldIndex, value.toStringUsingUTF8) -} + // This is mostly Parquet convention (see, e.g., `ConversionPatterns`). + // Note that "array" for the array elements is chosen by ParquetAvro. + // Using a different value will result in Parquet silently dropping columns. + val ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME = "bag" + val ARRAY_ELEMENTS_SCHEMA_NAME = "array" + // SPARK-4520: Thrift generated parquet files have different array element + // schema names than avro. Thrift parquet uses array_schema_name + "_tuple" + // as opposed to "array" used by default. For more information, check + // TestThriftSchemaConverter.java in parquet.thrift. + val THRIFT_ARRAY_ELEMENTS_SCHEMA_NAME_SUFFIX = "_tuple" + val MAP_KEY_SCHEMA_NAME = "key" + val MAP_VALUE_SCHEMA_NAME = "value" + val MAP_SCHEMA_NAME = "map" -private[parquet] object CatalystArrayConverter { - val INITIAL_ARRAY_SIZE = 20 + // TODO: consider using Array[T] for arrays to avoid boxing of primitive types + type ArrayScalaType[T] = Seq[T] + type StructScalaType[T] = Row + type MapScalaType[K, V] = Map[K, V] } private[parquet] object CatalystTimestampConverter { @@ -560,366 +486,3 @@ private[parquet] object CatalystTimestampConverter { NanoTime(julianDay, nanosOfDay).toBinary } } - -/** - * A `parquet.io.api.GroupConverter` that converts a single-element groups that - * match the characteristics of an array (see - * [[org.apache.spark.sql.parquet.ParquetTypesConverter]]) into an - * [[org.apache.spark.sql.types.ArrayType]]. - * - * @param elementType The type of the array elements (complex or primitive) - * @param index The position of this (array) field inside its parent converter - * @param parent The parent converter - * @param buffer A data buffer - */ -private[parquet] class CatalystArrayConverter( - val elementType: DataType, - val index: Int, - protected[parquet] val parent: CatalystConverter, - protected[parquet] var buffer: Buffer[Any]) - extends CatalystConverter { - - def this(elementType: DataType, index: Int, parent: CatalystConverter) = - this( - elementType, - index, - parent, - new ArrayBuffer[Any](CatalystArrayConverter.INITIAL_ARRAY_SIZE)) - - protected[parquet] val converter: Converter = CatalystConverter.createConverter( - new CatalystConverter.FieldType( - CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, - elementType, - false), - fieldIndex=0, - parent=this) - - override def getConverter(fieldIndex: Int): Converter = converter - - // arrays have only one (repeated) field, which is its elements - override val size = 1 - - override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = { - // fieldIndex is ignored (assumed to be zero but not checked) - if(value == null) { - throw new IllegalArgumentException("Null values inside Parquet arrays are not supported!") - } - buffer += value - } - - override protected[parquet] def clearBuffer(): Unit = { - buffer.clear() - } - - override def start(): Unit = { - if (!converter.isPrimitive) { - converter.asInstanceOf[CatalystConverter].clearBuffer - } - } - - override def end(): Unit = { - assert(parent != null) - // here we need to make sure to use ArrayScalaType - parent.updateField(index, buffer.toArray.toSeq) - clearBuffer() - } -} - -/** - * A `parquet.io.api.GroupConverter` that converts a single-element groups that - * match the characteristics of an array (see - * [[org.apache.spark.sql.parquet.ParquetTypesConverter]]) into an - * [[org.apache.spark.sql.types.ArrayType]]. - * - * @param elementType The type of the array elements (native) - * @param index The position of this (array) field inside its parent converter - * @param parent The parent converter - * @param capacity The (initial) capacity of the buffer - */ -private[parquet] class CatalystNativeArrayConverter( - val elementType: NativeType, - val index: Int, - protected[parquet] val parent: CatalystConverter, - protected[parquet] var capacity: Int = CatalystArrayConverter.INITIAL_ARRAY_SIZE) - extends CatalystConverter { - - type NativeType = elementType.JvmType - - private var buffer: Array[NativeType] = elementType.classTag.newArray(capacity) - - private var elements: Int = 0 - - protected[parquet] val converter: Converter = CatalystConverter.createConverter( - new CatalystConverter.FieldType( - CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, - elementType, - false), - fieldIndex=0, - parent=this) - - override def getConverter(fieldIndex: Int): Converter = converter - - // arrays have only one (repeated) field, which is its elements - override val size = 1 - - override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = - throw new UnsupportedOperationException - - // Overridden here to avoid auto-boxing for primitive types - override protected[parquet] def updateBoolean(fieldIndex: Int, value: Boolean): Unit = { - checkGrowBuffer() - buffer(elements) = value.asInstanceOf[NativeType] - elements += 1 - } - - override protected[parquet] def updateInt(fieldIndex: Int, value: Int): Unit = { - checkGrowBuffer() - buffer(elements) = value.asInstanceOf[NativeType] - elements += 1 - } - - override protected[parquet] def updateShort(fieldIndex: Int, value: Short): Unit = { - checkGrowBuffer() - buffer(elements) = value.asInstanceOf[NativeType] - elements += 1 - } - - override protected[parquet] def updateByte(fieldIndex: Int, value: Byte): Unit = { - checkGrowBuffer() - buffer(elements) = value.asInstanceOf[NativeType] - elements += 1 - } - - override protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit = { - checkGrowBuffer() - buffer(elements) = value.asInstanceOf[NativeType] - elements += 1 - } - - override protected[parquet] def updateDouble(fieldIndex: Int, value: Double): Unit = { - checkGrowBuffer() - buffer(elements) = value.asInstanceOf[NativeType] - elements += 1 - } - - override protected[parquet] def updateFloat(fieldIndex: Int, value: Float): Unit = { - checkGrowBuffer() - buffer(elements) = value.asInstanceOf[NativeType] - elements += 1 - } - - override protected[parquet] def updateBinary(fieldIndex: Int, value: Binary): Unit = { - checkGrowBuffer() - buffer(elements) = value.getBytes.asInstanceOf[NativeType] - elements += 1 - } - - override protected[parquet] def updateString(fieldIndex: Int, value: String): Unit = { - checkGrowBuffer() - buffer(elements) = value.asInstanceOf[NativeType] - elements += 1 - } - - override protected[parquet] def clearBuffer(): Unit = { - elements = 0 - } - - override def start(): Unit = {} - - override def end(): Unit = { - assert(parent != null) - // here we need to make sure to use ArrayScalaType - parent.updateField( - index, - buffer.slice(0, elements).toSeq) - clearBuffer() - } - - private def checkGrowBuffer(): Unit = { - if (elements >= capacity) { - val newCapacity = 2 * capacity - val tmp: Array[NativeType] = elementType.classTag.newArray(newCapacity) - Array.copy(buffer, 0, tmp, 0, capacity) - buffer = tmp - capacity = newCapacity - } - } -} - -/** - * A `parquet.io.api.GroupConverter` that converts a single-element groups that - * match the characteristics of an array contains null (see - * [[org.apache.spark.sql.parquet.ParquetTypesConverter]]) into an - * [[org.apache.spark.sql.types.ArrayType]]. - * - * @param elementType The type of the array elements (complex or primitive) - * @param index The position of this (array) field inside its parent converter - * @param parent The parent converter - * @param buffer A data buffer - */ -private[parquet] class CatalystArrayContainsNullConverter( - val elementType: DataType, - val index: Int, - protected[parquet] val parent: CatalystConverter, - protected[parquet] var buffer: Buffer[Any]) - extends CatalystConverter { - - def this(elementType: DataType, index: Int, parent: CatalystConverter) = - this( - elementType, - index, - parent, - new ArrayBuffer[Any](CatalystArrayConverter.INITIAL_ARRAY_SIZE)) - - protected[parquet] val converter: Converter = new CatalystConverter { - - private var current: Any = null - - val converter = CatalystConverter.createConverter( - new CatalystConverter.FieldType( - CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, - elementType, - false), - fieldIndex = 0, - parent = this) - - override def getConverter(fieldIndex: Int): Converter = converter - - override def end(): Unit = parent.updateField(index, current) - - override def start(): Unit = { - current = null - } - - override protected[parquet] val size: Int = 1 - override protected[parquet] val index: Int = 0 - override protected[parquet] val parent = CatalystArrayContainsNullConverter.this - - override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = { - current = value - } - - override protected[parquet] def clearBuffer(): Unit = {} - } - - override def getConverter(fieldIndex: Int): Converter = converter - - // arrays have only one (repeated) field, which is its elements - override val size = 1 - - override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = { - buffer += value - } - - override protected[parquet] def clearBuffer(): Unit = { - buffer.clear() - } - - override def start(): Unit = {} - - override def end(): Unit = { - assert(parent != null) - // here we need to make sure to use ArrayScalaType - parent.updateField(index, buffer.toArray.toSeq) - clearBuffer() - } -} - -/** - * This converter is for multi-element groups of primitive or complex types - * that have repetition level optional or required (so struct fields). - * - * @param schema The corresponding Catalyst schema in the form of a list of - * attributes. - * @param index - * @param parent - */ -private[parquet] class CatalystStructConverter( - override protected[parquet] val schema: Array[FieldType], - override protected[parquet] val index: Int, - override protected[parquet] val parent: CatalystConverter) - extends CatalystGroupConverter(schema, index, parent) { - - override protected[parquet] def clearBuffer(): Unit = {} - - // TODO: think about reusing the buffer - override def end(): Unit = { - assert(!isRootConverter) - // here we need to make sure to use StructScalaType - // Note: we need to actually make a copy of the array since we - // may be in a nested field - parent.updateField(index, new GenericRow(current.toArray)) - } -} - -/** - * A `parquet.io.api.GroupConverter` that converts two-element groups that - * match the characteristics of a map (see - * [[org.apache.spark.sql.parquet.ParquetTypesConverter]]) into an - * [[org.apache.spark.sql.types.MapType]]. - * - * @param schema - * @param index - * @param parent - */ -private[parquet] class CatalystMapConverter( - protected[parquet] val schema: Array[FieldType], - override protected[parquet] val index: Int, - override protected[parquet] val parent: CatalystConverter) - extends CatalystConverter { - - private val map = new HashMap[Any, Any]() - - private val keyValueConverter = new CatalystConverter { - private var currentKey: Any = null - private var currentValue: Any = null - val keyConverter = CatalystConverter.createConverter(schema(0), 0, this) - val valueConverter = CatalystConverter.createConverter(schema(1), 1, this) - - override def getConverter(fieldIndex: Int): Converter = { - if (fieldIndex == 0) keyConverter else valueConverter - } - - override def end(): Unit = CatalystMapConverter.this.map += currentKey -> currentValue - - override def start(): Unit = { - currentKey = null - currentValue = null - } - - override protected[parquet] val size: Int = 2 - override protected[parquet] val index: Int = 0 - override protected[parquet] val parent: CatalystConverter = CatalystMapConverter.this - - override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = { - fieldIndex match { - case 0 => - currentKey = value - case 1 => - currentValue = value - case _ => - new RuntimePermission(s"trying to update Map with fieldIndex $fieldIndex") - } - } - - override protected[parquet] def clearBuffer(): Unit = {} - } - - override protected[parquet] val size: Int = 1 - - override protected[parquet] def clearBuffer(): Unit = {} - - override def start(): Unit = { - map.clear() - } - - override def end(): Unit = { - // here we need to make sure to use MapScalaType - parent.updateField(index, map.toMap) - } - - override def getConverter(fieldIndex: Int): Converter = keyValueConverter - - override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = - throw new UnsupportedOperationException -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala index 5a1b15490d273..6c3329590e448 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala @@ -33,18 +33,16 @@ import org.apache.spark.sql.types._ /** * A `parquet.io.api.RecordMaterializer` for Rows. - * - *@param root The root group converter for the record. */ -private[parquet] class RowRecordMaterializer(root: CatalystConverter) +private[parquet] class RowRecordMaterializer(parquetSchema: MessageType, attributes: Seq[Attribute]) extends RecordMaterializer[Row] { - def this(parquetSchema: MessageType, attributes: Seq[Attribute]) = - this(CatalystConverter.createRootConverter(parquetSchema, attributes)) + private val rootConverter = + new CatalystRowConverter(parquetSchema, StructType.fromAttributes(attributes), NoopUpdater) - override def getCurrentRecord: Row = root.getCurrentRecord + override def getCurrentRecord: Row = rootConverter.currentRow - override def getRootConverter: GroupConverter = root.asInstanceOf[GroupConverter] + override def getRootConverter: GroupConverter = rootConverter } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala index 203bc79f153dd..4b6a80d325b04 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala @@ -28,7 +28,7 @@ import parquet.example.data.simple.SimpleGroup import parquet.example.data.{Group, GroupWriter} import parquet.hadoop.api.WriteSupport import parquet.hadoop.api.WriteSupport.WriteContext -import parquet.hadoop.metadata.{ParquetMetadata, FileMetaData, CompressionCodecName} +import parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, ParquetMetadata} import parquet.hadoop.{Footer, ParquetFileWriter, ParquetWriter} import parquet.io.api.RecordConsumer import parquet.schema.{MessageType, MessageTypeParser} @@ -101,10 +101,8 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest { } test("fixed-length decimals") { - - def makeDecimalRDD(decimal: DecimalType): DataFrame = - sparkContext - .parallelize(0 to 1000) + def makeDecimalDataFrame(decimal: DecimalType): DataFrame = + (0 to 1000) .map(i => Tuple1(i / 100.0)) .toDF() // Parquet doesn't allow column names with spaces, have to add an alias here @@ -112,7 +110,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest { for ((precision, scale) <- Seq((5, 2), (1, 0), (1, 1), (18, 10), (18, 17))) { withTempPath { dir => - val data = makeDecimalRDD(DecimalType(precision, scale)) + val data = makeDecimalDataFrame(DecimalType(precision, scale)) data.saveAsParquetFile(dir.getCanonicalPath) checkAnswer(parquetFile(dir.getCanonicalPath), data.collect().toSeq) } @@ -121,7 +119,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest { // Decimals with precision above 18 are not yet supported intercept[RuntimeException] { withTempPath { dir => - makeDecimalRDD(DecimalType(19, 10)).saveAsParquetFile(dir.getCanonicalPath) + makeDecimalDataFrame(DecimalType(19, 10)).saveAsParquetFile(dir.getCanonicalPath) parquetFile(dir.getCanonicalPath).collect() } } @@ -129,7 +127,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest { // Unlimited-length decimals are not yet supported intercept[RuntimeException] { withTempPath { dir => - makeDecimalRDD(DecimalType.Unlimited).saveAsParquetFile(dir.getCanonicalPath) + makeDecimalDataFrame(DecimalType.Unlimited).saveAsParquetFile(dir.getCanonicalPath) parquetFile(dir.getCanonicalPath).collect() } }