diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala
index cdf2bc68d9c5e..c76958b91a150 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala
@@ -24,6 +24,7 @@ import scala.math._
import scala.math.Numeric.{FloatAsIfIntegral, DoubleAsIfIntegral}
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.{TypeTag, runtimeMirror, typeTag}
+import scala.util.Try
import scala.util.parsing.combinator.RegexParsers
import org.json4s._
@@ -39,6 +40,10 @@ import org.apache.spark.util.Utils
object DataType {
+ private[sql] def fromString(raw: String) = {
+ Try(DataType.fromJson(raw)).getOrElse(DataType.fromCaseClassString(raw))
+ }
+
def fromJson(json: String): DataType = parseDataType(parse(json))
private object JSortedObject {
@@ -887,6 +892,11 @@ case class StructField(
object StructType {
+ private[sql] def fromString(raw: String): StructType = DataType.fromString(raw) match {
+ case t: StructType => t
+ case _ => throw new RuntimeException(s"Failed parsing StructType: $raw")
+ }
+
protected[sql] def fromAttributes(attributes: Seq[Attribute]): StructType =
StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable, a.metadata)))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
index 43ca359b51735..02defc2a4dcc1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
@@ -18,480 +18,406 @@
package org.apache.spark.sql.parquet
import java.sql.Timestamp
-import java.util.{TimeZone, Calendar}
+import java.util.{Calendar, TimeZone}
-import scala.collection.mutable.{Buffer, ArrayBuffer, HashMap}
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
import jodd.datetime.JDateTime
import parquet.column.Dictionary
-import parquet.io.api.{PrimitiveConverter, GroupConverter, Binary, Converter}
-import parquet.schema.MessageType
+import parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter}
+import parquet.schema.{GroupType, PrimitiveType => ParquetPrimitiveType, Type}
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.parquet.CatalystConverter.FieldType
-import org.apache.spark.sql.types._
import org.apache.spark.sql.parquet.timestamp.NanoTime
+import org.apache.spark.sql.types._
/**
- * Collection of converters of Parquet types (group and primitive types) that
- * model arrays and maps. The conversions are partly based on the AvroParquet
- * converters that are part of Parquet in order to be able to process these
- * types.
- *
- * There are several types of converters:
- *
- * - [[org.apache.spark.sql.parquet.CatalystPrimitiveConverter]] for primitive
- * (numeric, boolean and String) types
- * - [[org.apache.spark.sql.parquet.CatalystNativeArrayConverter]] for arrays
- * of native JVM element types; note: currently null values are not supported!
- * - [[org.apache.spark.sql.parquet.CatalystArrayConverter]] for arrays of
- * arbitrary element types (including nested element types); note: currently
- * null values are not supported!
- * - [[org.apache.spark.sql.parquet.CatalystStructConverter]] for structs
- * - [[org.apache.spark.sql.parquet.CatalystMapConverter]] for maps; note:
- * currently null values are not supported!
- * - [[org.apache.spark.sql.parquet.CatalystPrimitiveRowConverter]] for rows
- * of only primitive element types
- * - [[org.apache.spark.sql.parquet.CatalystGroupConverter]] for other nested
- * records, including the top-level row record
- *
+ * A [[ParentContainerUpdater]] is used by a Parquet converter to set converted values to some
+ * corresponding parent container. For example, a converter for a `Struct` field may set converted
+ * values to a [[MutableRow]]; or a converter for array element may append converted values to an
+ * [[ArrayBuffer]].
*/
-
-private[sql] object CatalystConverter {
- // The type internally used for fields
- type FieldType = StructField
-
- // This is mostly Parquet convention (see, e.g., `ConversionPatterns`).
- // Note that "array" for the array elements is chosen by ParquetAvro.
- // Using a different value will result in Parquet silently dropping columns.
- val ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME = "bag"
- val ARRAY_ELEMENTS_SCHEMA_NAME = "array"
- // SPARK-4520: Thrift generated parquet files have different array element
- // schema names than avro. Thrift parquet uses array_schema_name + "_tuple"
- // as opposed to "array" used by default. For more information, check
- // TestThriftSchemaConverter.java in parquet.thrift.
- val THRIFT_ARRAY_ELEMENTS_SCHEMA_NAME_SUFFIX = "_tuple"
- val MAP_KEY_SCHEMA_NAME = "key"
- val MAP_VALUE_SCHEMA_NAME = "value"
- val MAP_SCHEMA_NAME = "map"
-
- // TODO: consider using Array[T] for arrays to avoid boxing of primitive types
- type ArrayScalaType[T] = Seq[T]
- type StructScalaType[T] = Row
- type MapScalaType[K, V] = Map[K, V]
-
- protected[parquet] def createConverter(
- field: FieldType,
- fieldIndex: Int,
- parent: CatalystConverter): Converter = {
- val fieldType: DataType = field.dataType
- fieldType match {
- case udt: UserDefinedType[_] => {
- createConverter(field.copy(dataType = udt.sqlType), fieldIndex, parent)
- }
- // For native JVM types we use a converter with native arrays
- case ArrayType(elementType: NativeType, false) => {
- new CatalystNativeArrayConverter(elementType, fieldIndex, parent)
- }
- // This is for other types of arrays, including those with nested fields
- case ArrayType(elementType: DataType, false) => {
- new CatalystArrayConverter(elementType, fieldIndex, parent)
- }
- case ArrayType(elementType: DataType, true) => {
- new CatalystArrayContainsNullConverter(elementType, fieldIndex, parent)
- }
- case StructType(fields: Array[StructField]) => {
- new CatalystStructConverter(fields, fieldIndex, parent)
- }
- case MapType(keyType: DataType, valueType: DataType, valueContainsNull: Boolean) => {
- new CatalystMapConverter(
- Array(
- new FieldType(MAP_KEY_SCHEMA_NAME, keyType, false),
- new FieldType(MAP_VALUE_SCHEMA_NAME, valueType, valueContainsNull)),
- fieldIndex,
- parent)
- }
- // Strings, Shorts and Bytes do not have a corresponding type in Parquet
- // so we need to treat them separately
- case StringType =>
- new CatalystPrimitiveStringConverter(parent, fieldIndex)
- case ShortType => {
- new CatalystPrimitiveConverter(parent, fieldIndex) {
- override def addInt(value: Int): Unit =
- parent.updateShort(fieldIndex, value.asInstanceOf[ShortType.JvmType])
- }
- }
- case ByteType => {
- new CatalystPrimitiveConverter(parent, fieldIndex) {
- override def addInt(value: Int): Unit =
- parent.updateByte(fieldIndex, value.asInstanceOf[ByteType.JvmType])
- }
- }
- case DateType => {
- new CatalystPrimitiveConverter(parent, fieldIndex) {
- override def addInt(value: Int): Unit =
- parent.updateDate(fieldIndex, value.asInstanceOf[DateType.JvmType])
- }
- }
- case d: DecimalType => {
- new CatalystPrimitiveConverter(parent, fieldIndex) {
- override def addBinary(value: Binary): Unit =
- parent.updateDecimal(fieldIndex, value, d)
- }
- }
- case TimestampType => {
- new CatalystPrimitiveConverter(parent, fieldIndex) {
- override def addBinary(value: Binary): Unit =
- parent.updateTimestamp(fieldIndex, value)
- }
- }
- // All other primitive types use the default converter
- case ctype: PrimitiveType => { // note: need the type tag here!
- new CatalystPrimitiveConverter(parent, fieldIndex)
- }
- case _ => throw new RuntimeException(
- s"unable to convert datatype ${field.dataType.toString} in CatalystConverter")
- }
- }
-
- protected[parquet] def createRootConverter(
- parquetSchema: MessageType,
- attributes: Seq[Attribute]): CatalystConverter = {
- // For non-nested types we use the optimized Row converter
- if (attributes.forall(a => ParquetTypesConverter.isPrimitiveType(a.dataType))) {
- new CatalystPrimitiveRowConverter(attributes.toArray)
- } else {
- new CatalystGroupConverter(attributes.toArray)
- }
- }
+private[parquet] trait ParentContainerUpdater {
+ def set(value: Any): Unit = ()
+ def setBoolean(value: Boolean): Unit = set(value)
+ def setByte(value: Byte): Unit = set(value)
+ def setShort(value: Short): Unit = set(value)
+ def setInt(value: Int): Unit = set(value)
+ def setLong(value: Long): Unit = set(value)
+ def setFloat(value: Float): Unit = set(value)
+ def setDouble(value: Double): Unit = set(value)
}
-private[parquet] abstract class CatalystConverter extends GroupConverter {
- /**
- * The number of fields this group has
- */
- protected[parquet] val size: Int
-
- /**
- * The index of this converter in the parent
- */
- protected[parquet] val index: Int
+/** A no-op updater used for root converter (who doesn't have a parent). */
+private[parquet] object NoopUpdater extends ParentContainerUpdater
- /**
- * The parent converter
- */
- protected[parquet] val parent: CatalystConverter
+/**
+ * This Parquet converter converts Parquet records to Spark SQL rows.
+ *
+ * @param parquetType Parquet type of Parquet records
+ * @param sparkType Spark SQL schema that corresponds to the Parquet record type
+ * @param updater An updater which takes care of the converted row object
+ */
+private[parquet] class CatalystRowConverter(
+ parquetType: GroupType,
+ sparkType: StructType,
+ updater: ParentContainerUpdater)
+ extends GroupConverter {
/**
- * Called by child converters to update their value in its parent (this).
- * Note that if possible the more specific update methods below should be used
- * to avoid auto-boxing of native JVM types.
- *
- * @param fieldIndex
- * @param value
+ * Updater used together with [[CatalystRowConverter]]. It sets converted filed values to the
+ * `ordinal`-th cell in `row`.
*/
- protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit
+ private final class RowUpdater(row: MutableRow, ordinal: Int) extends ParentContainerUpdater {
+ override def set(value: Any): Unit = row(ordinal) = value
+ override def setBoolean(value: Boolean): Unit = row.setBoolean(ordinal, value)
+ override def setByte(value: Byte): Unit = row.setByte(ordinal, value)
+ override def setShort(value: Short): Unit = row.setShort(ordinal, value)
+ override def setInt(value: Int): Unit = row.setInt(ordinal, value)
+ override def setLong(value: Long): Unit = row.setLong(ordinal, value)
+ override def setDouble(value: Double): Unit = row.setDouble(ordinal, value)
+ override def setFloat(value: Float): Unit = row.setFloat(ordinal, value)
+ }
+
+ /** Represents the converted row object once an entire Parquet record is converted. */
+ val currentRow = new SpecificMutableRow(sparkType.map(_.dataType))
+
+ // Converters for each field.
+ private val fieldConverters: Array[Converter] = {
+ parquetType.getFields.zip(sparkType).zipWithIndex.map {
+ case ((parquetFieldType, sparkField), ordinal) =>
+ // Converted field value should be set to the `ordinal`-th cell of `currentRow`
+ newConverter(parquetFieldType, sparkField.dataType, new RowUpdater(currentRow, ordinal))
+ }.toArray
+ }
- protected[parquet] def updateBoolean(fieldIndex: Int, value: Boolean): Unit =
- updateField(fieldIndex, value)
+ override def getConverter(fieldIndex: Int): Converter = fieldConverters(fieldIndex)
- protected[parquet] def updateInt(fieldIndex: Int, value: Int): Unit =
- updateField(fieldIndex, value)
+ override def end(): Unit = updater.set(currentRow)
- protected[parquet] def updateDate(fieldIndex: Int, value: Int): Unit =
- updateField(fieldIndex, value)
+ override def start(): Unit = {
+ var i = 0
+ while (i < currentRow.length) {
+ currentRow.setNullAt(i)
+ i += 1
+ }
+ }
- protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit =
- updateField(fieldIndex, value)
+ /**
+ * Creates a converter for the given Parquet type `parquetType` and Spark SQL data type
+ * `sparkType`. Converted values are handled by `updater`.
+ */
+ private def newConverter(
+ parquetType: Type,
+ sparkType: DataType,
+ updater: ParentContainerUpdater): Converter = {
- protected[parquet] def updateShort(fieldIndex: Int, value: Short): Unit =
- updateField(fieldIndex, value)
+ sparkType match {
+ case BooleanType | IntegerType | LongType | FloatType | DoubleType | BinaryType =>
+ new CatalystPrimitiveConverter(updater)
- protected[parquet] def updateByte(fieldIndex: Int, value: Byte): Unit =
- updateField(fieldIndex, value)
+ case ByteType =>
+ new PrimitiveConverter {
+ override def addInt(value: Int): Unit =
+ updater.setByte(value.asInstanceOf[ByteType#JvmType])
+ }
- protected[parquet] def updateDouble(fieldIndex: Int, value: Double): Unit =
- updateField(fieldIndex, value)
+ case ShortType =>
+ new PrimitiveConverter {
+ override def addInt(value: Int): Unit =
+ updater.setShort(value.asInstanceOf[ShortType#JvmType])
+ }
- protected[parquet] def updateFloat(fieldIndex: Int, value: Float): Unit =
- updateField(fieldIndex, value)
+ case t: DecimalType =>
+ new CatalystDecimalConverter(t, updater)
- protected[parquet] def updateBinary(fieldIndex: Int, value: Binary): Unit =
- updateField(fieldIndex, value.getBytes)
+ case StringType =>
+ new CatalystStringConverter(updater)
- protected[parquet] def updateString(fieldIndex: Int, value: String): Unit =
- updateField(fieldIndex, value)
+ case TimestampType =>
+ new PrimitiveConverter {
+ override def addBinary(value: Binary): Unit = {
+ updater.set(CatalystTimestampConverter.convertToTimestamp(value))
+ }
+ }
- protected[parquet] def updateTimestamp(fieldIndex: Int, value: Binary): Unit =
- updateField(fieldIndex, readTimestamp(value))
+ case DateType =>
+ new PrimitiveConverter {
+ override def addInt(value: Int): Unit = {
+ // DateType is not specialized in `SpecificMutableRow`, have to box it here.
+ updater.set(value.asInstanceOf[DateType#JvmType])
+ }
+ }
- protected[parquet] def updateDecimal(fieldIndex: Int, value: Binary, ctype: DecimalType): Unit =
- updateField(fieldIndex, readDecimal(new Decimal(), value, ctype))
+ case t: ArrayType =>
+ new CatalystArrayConverter(parquetType.asGroupType(), t, updater)
- protected[parquet] def isRootConverter: Boolean = parent == null
+ case t: MapType =>
+ new CatalystMapConverter(parquetType.asGroupType(), t, updater)
- protected[parquet] def clearBuffer(): Unit
+ case t: StructType =>
+ new CatalystRowConverter(parquetType.asGroupType(), t, updater)
- /**
- * Should only be called in the root (group) converter!
- *
- * @return
- */
- def getCurrentRecord: Row = throw new UnsupportedOperationException
+ case t: UserDefinedType[_] =>
+ val sparkTypeForUDT = t.sqlType
+ val parquetTypeForUDT = ParquetTypesConverter.fromDataType(sparkTypeForUDT, "")
+ newConverter(parquetTypeForUDT, sparkTypeForUDT, updater)
- /**
- * Read a decimal value from a Parquet Binary into "dest". Only supports decimals that fit in
- * a long (i.e. precision <= 18)
- */
- protected[parquet] def readDecimal(dest: Decimal, value: Binary, ctype: DecimalType): Unit = {
- val precision = ctype.precisionInfo.get.precision
- val scale = ctype.precisionInfo.get.scale
- val bytes = value.getBytes
- require(bytes.length <= 16, "Decimal field too large to read")
- var unscaled = 0L
- var i = 0
- while (i < bytes.length) {
- unscaled = (unscaled << 8) | (bytes(i) & 0xFF)
- i += 1
+ case _ =>
+ throw new RuntimeException(
+ s"Unable to create Parquet converter for data type ${sparkType.json}")
}
- // Make sure unscaled has the right sign, by sign-extending the first bit
- val numBits = 8 * bytes.length
- unscaled = (unscaled << (64 - numBits)) >> (64 - numBits)
- dest.set(unscaled, precision, scale)
}
/**
- * Read a Timestamp value from a Parquet Int96Value
+ * Parquet converter for Parquet primitive types. Note that not all Spark SQL primitive types
+ * are handled by this converter. Parquet primitive types are only a subset of those of Spark
+ * SQL. For example, BYTE, SHORT and INT in Spark SQL are all covered by INT32 in Parquet.
*/
- protected[parquet] def readTimestamp(value: Binary): Timestamp = {
- CatalystTimestampConverter.convertToTimestamp(value)
- }
-}
+ private final class CatalystPrimitiveConverter(updater: ParentContainerUpdater)
+ extends PrimitiveConverter {
-/**
- * A `parquet.io.api.GroupConverter` that is able to convert a Parquet record
- * to a [[org.apache.spark.sql.catalyst.expressions.Row]] object.
- *
- * @param schema The corresponding Catalyst schema in the form of a list of attributes.
- */
-private[parquet] class CatalystGroupConverter(
- protected[parquet] val schema: Array[FieldType],
- protected[parquet] val index: Int,
- protected[parquet] val parent: CatalystConverter,
- protected[parquet] var current: ArrayBuffer[Any],
- protected[parquet] var buffer: ArrayBuffer[Row])
- extends CatalystConverter {
-
- def this(schema: Array[FieldType], index: Int, parent: CatalystConverter) =
- this(
- schema,
- index,
- parent,
- current = null,
- buffer = new ArrayBuffer[Row](
- CatalystArrayConverter.INITIAL_ARRAY_SIZE))
+ override def addBoolean(value: Boolean): Unit = updater.setBoolean(value)
+ override def addInt(value: Int): Unit = updater.setInt(value)
+ override def addLong(value: Long): Unit = updater.setLong(value)
+ override def addFloat(value: Float): Unit = updater.setFloat(value)
+ override def addDouble(value: Double): Unit = updater.setDouble(value)
+ override def addBinary(value: Binary): Unit = updater.set(value.getBytes)
+ }
/**
- * This constructor is used for the root converter only!
+ * Parquet converter for strings. A dictionary is used to minimize string decoding cost.
*/
- def this(attributes: Array[Attribute]) =
- this(attributes.map(a => new FieldType(a.name, a.dataType, a.nullable)), 0, null)
-
- protected [parquet] val converters: Array[Converter] =
- schema.zipWithIndex.map {
- case (field, idx) => CatalystConverter.createConverter(field, idx, this)
- }.toArray
-
- override val size = schema.size
+ private final class CatalystStringConverter(updater: ParentContainerUpdater)
+ extends PrimitiveConverter {
- override def getCurrentRecord: Row = {
- assert(isRootConverter, "getCurrentRecord should only be called in root group converter!")
- // TODO: use iterators if possible
- // Note: this will ever only be called in the root converter when the record has been
- // fully processed. Therefore it will be difficult to use mutable rows instead, since
- // any non-root converter never would be sure when it would be safe to re-use the buffer.
- new GenericRow(current.toArray)
- }
-
- override def getConverter(fieldIndex: Int): Converter = converters(fieldIndex)
-
- // for child converters to update upstream values
- override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = {
- current.update(fieldIndex, value)
- }
+ private var expandedDictionary: Array[String] = null
- override protected[parquet] def clearBuffer(): Unit = buffer.clear()
+ override def hasDictionarySupport: Boolean = true
- override def start(): Unit = {
- current = ArrayBuffer.fill(size)(null)
- converters.foreach {
- converter => if (!converter.isPrimitive) {
- converter.asInstanceOf[CatalystConverter].clearBuffer
+ override def setDictionary(dictionary: Dictionary): Unit = {
+ this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) {
+ dictionary.decodeToBinary(_).toStringUsingUTF8
}
}
- }
- override def end(): Unit = {
- if (!isRootConverter) {
- assert(current != null) // there should be no empty groups
- buffer.append(new GenericRow(current.toArray))
- parent.updateField(index, new GenericRow(buffer.toArray.asInstanceOf[Array[Any]]))
+ override def addValueFromDictionary(dictionaryId: Int): Unit = {
+ updater.set(expandedDictionary(dictionaryId))
}
- }
-}
-/**
- * A `parquet.io.api.GroupConverter` that is able to convert a Parquet record
- * to a [[org.apache.spark.sql.catalyst.expressions.Row]] object. Note that his
- * converter is optimized for rows of primitive types (non-nested records).
- */
-private[parquet] class CatalystPrimitiveRowConverter(
- protected[parquet] val schema: Array[FieldType],
- protected[parquet] var current: MutableRow)
- extends CatalystConverter {
-
- // This constructor is used for the root converter only
- def this(attributes: Array[Attribute]) =
- this(
- attributes.map(a => new FieldType(a.name, a.dataType, a.nullable)),
- new SpecificMutableRow(attributes.map(_.dataType)))
-
- protected [parquet] val converters: Array[Converter] =
- schema.zipWithIndex.map {
- case (field, idx) => CatalystConverter.createConverter(field, idx, this)
- }.toArray
-
- override val size = schema.size
+ override def addBinary(value: Binary): Unit = updater.set(value.toStringUsingUTF8)
+ }
- override val index = 0
+ /**
+ * Parquet converter for fixed-precision decimals.
+ *
+ * @todo Handle fixed-precision decimals stored as INT32 and INT64
+ */
+ private final class CatalystDecimalConverter(
+ decimalType: DecimalType,
+ updater: ParentContainerUpdater)
+ extends PrimitiveConverter {
- override val parent = null
+ override def addBinary(value: Binary): Unit = {
+ updater.set(toDecimal(value))
+ }
- // Should be only called in root group converter!
- override def getCurrentRecord: Row = current
+ private def toDecimal(value: Binary): Decimal = {
+ val precision = decimalType.precision
+ val scale = decimalType.scale
+ val bytes = value.getBytes
- override def getConverter(fieldIndex: Int): Converter = converters(fieldIndex)
+ require(bytes.length <= 16, "Decimal field too large to read")
- // for child converters to update upstream values
- override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = {
- throw new UnsupportedOperationException // child converters should use the
- // specific update methods below
- }
+ var unscaled = 0L
+ var i = 0
- override protected[parquet] def clearBuffer(): Unit = {}
+ while (i < bytes.length) {
+ unscaled = (unscaled << 8) | (bytes(i) & 0xff)
+ i += 1
+ }
- override def start(): Unit = {
- var i = 0
- while (i < size) {
- current.setNullAt(i)
- i = i + 1
+ val bits = 8 * bytes.length
+ unscaled = (unscaled << (64 - bits)) >> (64 - bits)
+ Decimal(unscaled, precision, scale)
}
}
- override def end(): Unit = {}
-
- // Overridden here to avoid auto-boxing for primitive types
- override protected[parquet] def updateBoolean(fieldIndex: Int, value: Boolean): Unit =
- current.setBoolean(fieldIndex, value)
-
- override protected[parquet] def updateInt(fieldIndex: Int, value: Int): Unit =
- current.setInt(fieldIndex, value)
+ /**
+ * Parquet converter for arrays. Spark SQL arrays are represented as Parquet lists. Standard
+ * Parquet lists are represented as a 3-level group annotated by `LIST`:
+ * {{{
+ * group (LIST) { <-- parquetSchema points here
+ * repeated group list {
+ * element;
+ * }
+ * }
+ * }}}
+ * The `parquetSchema` constructor argument points to the outermost group.
+ *
+ * However, before this representation is standardized, some Parquet libraries/tools also use some
+ * non-standard formats to represent list-like structures. Backwards-compatibility rules for
+ * handling these cases are described in Parquet format spec.
+ *
+ * @see https://github.com/apache/incubator-parquet-format/blob/master/LogicalTypes.md#lists
+ */
+ private final class CatalystArrayConverter(
+ parquetSchema: GroupType,
+ sparkSchema: ArrayType,
+ updater: ParentContainerUpdater)
+ extends GroupConverter {
+
+ // TODO This is slow! Needs specialization.
+ private val currentArray = ArrayBuffer.empty[Any]
+
+ private val elementConverter: Converter = {
+ val repeatedType = parquetSchema.getType(0)
+ val elementType = sparkSchema.elementType
+
+ if (isElementType(repeatedType, elementType)) {
+ newConverter(repeatedType, elementType, new ParentContainerUpdater {
+ override def set(value: Any): Unit = currentArray += value
+ })
+ } else {
+ new ElementConverter(repeatedType.asGroupType().getType(0), elementType)
+ }
+ }
- override protected[parquet] def updateDate(fieldIndex: Int, value: Int): Unit =
- current.update(fieldIndex, value)
+ override def getConverter(fieldIndex: Int): Converter = elementConverter
+
+ override def end(): Unit = updater.set(currentArray)
+
+ override def start(): Unit = currentArray.clear()
+
+ // scalastyle:off
+ /**
+ * Returns whether the given type is the element type of a list or is a syntactic group with
+ * one field that is the element type. This is determined by checking whether the type can be
+ * a syntactic group and by checking whether a potential syntactic group matches the expected
+ * schema.
+ * {{{
+ * group (LIST) {
+ * repeated group list { <-- repeatedType points here
+ * element;
+ * }
+ * }
+ * }}}
+ * In short, here we handle Parquet list backwards-compatibility rules on the read path. This
+ * method is based on `AvroIndexedRecordConverter.isElementType`.
+ *
+ * @see https://github.com/apache/incubator-parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules
+ */
+ // scalastyle:on
+ private def isElementType(repeatedType: Type, elementType: DataType): Boolean = {
+ (repeatedType, elementType) match {
+ case (t: ParquetPrimitiveType, _) => true
+ case (t: GroupType, _) if t.getFieldCount > 1 => true
+ case (t: GroupType, StructType(Array(f))) if f.name == t.getFieldName(0) => true
+ case _ => false
+ }
+ }
- override protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit =
- current.setLong(fieldIndex, value)
+ /** Array element converter */
+ private final class ElementConverter(parquetType: Type, sparkType: DataType)
+ extends GroupConverter {
- override protected[parquet] def updateShort(fieldIndex: Int, value: Short): Unit =
- current.setShort(fieldIndex, value)
+ private var currentElement: Any = _
- override protected[parquet] def updateByte(fieldIndex: Int, value: Byte): Unit =
- current.setByte(fieldIndex, value)
+ private val converter = newConverter(parquetType, sparkType, new ParentContainerUpdater {
+ override def set(value: Any): Unit = currentElement = value
+ })
- override protected[parquet] def updateDouble(fieldIndex: Int, value: Double): Unit =
- current.setDouble(fieldIndex, value)
+ override def getConverter(fieldIndex: Int): Converter = converter
- override protected[parquet] def updateFloat(fieldIndex: Int, value: Float): Unit =
- current.setFloat(fieldIndex, value)
+ override def end(): Unit = currentArray += currentElement
- override protected[parquet] def updateBinary(fieldIndex: Int, value: Binary): Unit =
- current.update(fieldIndex, value.getBytes)
+ override def start(): Unit = currentElement = null
+ }
+ }
- override protected[parquet] def updateString(fieldIndex: Int, value: String): Unit =
- current.setString(fieldIndex, value)
+ /** Parquet converter for maps */
+ private final class CatalystMapConverter(
+ parquetType: GroupType,
+ sparkType: MapType,
+ updater: ParentContainerUpdater)
+ extends GroupConverter {
- override protected[parquet] def updateTimestamp(fieldIndex: Int, value: Binary): Unit =
- current.update(fieldIndex, readTimestamp(value))
+ private val currentMap = mutable.Map.empty[Any, Any]
- override protected[parquet] def updateDecimal(
- fieldIndex: Int, value: Binary, ctype: DecimalType): Unit = {
- var decimal = current(fieldIndex).asInstanceOf[Decimal]
- if (decimal == null) {
- decimal = new Decimal
- current(fieldIndex) = decimal
+ private val keyValueConverter = {
+ val repeatedType = parquetType.getType(0).asGroupType()
+ new KeyValueConverter(
+ repeatedType.getType(0),
+ repeatedType.getType(1),
+ sparkType.keyType,
+ sparkType.valueType)
}
- readDecimal(decimal, value, ctype)
- }
-}
-/**
- * A `parquet.io.api.PrimitiveConverter` that converts Parquet types to Catalyst types.
- *
- * @param parent The parent group converter.
- * @param fieldIndex The index inside the record.
- */
-private[parquet] class CatalystPrimitiveConverter(
- parent: CatalystConverter,
- fieldIndex: Int) extends PrimitiveConverter {
- override def addBinary(value: Binary): Unit =
- parent.updateBinary(fieldIndex, value)
+ override def getConverter(fieldIndex: Int): Converter = keyValueConverter
- override def addBoolean(value: Boolean): Unit =
- parent.updateBoolean(fieldIndex, value)
+ override def end(): Unit = updater.set(currentMap)
- override def addDouble(value: Double): Unit =
- parent.updateDouble(fieldIndex, value)
+ override def start(): Unit = currentMap.clear()
- override def addFloat(value: Float): Unit =
- parent.updateFloat(fieldIndex, value)
+ /** Parquet converter for key-value pairs within the map. */
+ private final class KeyValueConverter(
+ parquetKeyType: Type,
+ parquetValueType: Type,
+ sparkKeyType: DataType,
+ sparkValueType: DataType)
+ extends GroupConverter {
- override def addInt(value: Int): Unit =
- parent.updateInt(fieldIndex, value)
+ private var currentKey: Any = _
- override def addLong(value: Long): Unit =
- parent.updateLong(fieldIndex, value)
-}
+ private var currentValue: Any = _
-/**
- * A `parquet.io.api.PrimitiveConverter` that converts Parquet Binary to Catalyst String.
- * Supports dictionaries to reduce Binary to String conversion overhead.
- *
- * Follows pattern in Parquet of using dictionaries, where supported, for String conversion.
- *
- * @param parent The parent group converter.
- * @param fieldIndex The index inside the record.
- */
-private[parquet] class CatalystPrimitiveStringConverter(parent: CatalystConverter, fieldIndex: Int)
- extends CatalystPrimitiveConverter(parent, fieldIndex) {
+ private val converters = Array(
+ // Converter for keys
+ newConverter(parquetKeyType, sparkKeyType, new ParentContainerUpdater {
+ override def set(value: Any): Unit = currentKey = value
+ }),
- private[this] var dict: Array[String] = null
+ // Converter for values
+ newConverter(parquetValueType, sparkValueType, new ParentContainerUpdater {
+ override def set(value: Any): Unit = currentValue = value
+ }))
- override def hasDictionarySupport: Boolean = true
+ override def getConverter(fieldIndex: Int): Converter = converters(fieldIndex)
- override def setDictionary(dictionary: Dictionary):Unit =
- dict = Array.tabulate(dictionary.getMaxId + 1) {dictionary.decodeToBinary(_).toStringUsingUTF8}
+ override def end(): Unit = currentMap(currentKey) = currentValue
+ override def start(): Unit = {
+ currentKey = null
+ currentValue = null
+ }
+ }
+ }
+}
- override def addValueFromDictionary(dictionaryId: Int): Unit =
- parent.updateString(fieldIndex, dict(dictionaryId))
+private[sql] object CatalystConverter {
+ // The type internally used for fields
+ type FieldType = StructField
- override def addBinary(value: Binary): Unit =
- parent.updateString(fieldIndex, value.toStringUsingUTF8)
-}
+ // This is mostly Parquet convention (see, e.g., `ConversionPatterns`).
+ // Note that "array" for the array elements is chosen by ParquetAvro.
+ // Using a different value will result in Parquet silently dropping columns.
+ val ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME = "bag"
+ val ARRAY_ELEMENTS_SCHEMA_NAME = "array"
+ // SPARK-4520: Thrift generated parquet files have different array element
+ // schema names than avro. Thrift parquet uses array_schema_name + "_tuple"
+ // as opposed to "array" used by default. For more information, check
+ // TestThriftSchemaConverter.java in parquet.thrift.
+ val THRIFT_ARRAY_ELEMENTS_SCHEMA_NAME_SUFFIX = "_tuple"
+ val MAP_KEY_SCHEMA_NAME = "key"
+ val MAP_VALUE_SCHEMA_NAME = "value"
+ val MAP_SCHEMA_NAME = "map"
-private[parquet] object CatalystArrayConverter {
- val INITIAL_ARRAY_SIZE = 20
+ // TODO: consider using Array[T] for arrays to avoid boxing of primitive types
+ type ArrayScalaType[T] = Seq[T]
+ type StructScalaType[T] = Row
+ type MapScalaType[K, V] = Map[K, V]
}
private[parquet] object CatalystTimestampConverter {
@@ -560,366 +486,3 @@ private[parquet] object CatalystTimestampConverter {
NanoTime(julianDay, nanosOfDay).toBinary
}
}
-
-/**
- * A `parquet.io.api.GroupConverter` that converts a single-element groups that
- * match the characteristics of an array (see
- * [[org.apache.spark.sql.parquet.ParquetTypesConverter]]) into an
- * [[org.apache.spark.sql.types.ArrayType]].
- *
- * @param elementType The type of the array elements (complex or primitive)
- * @param index The position of this (array) field inside its parent converter
- * @param parent The parent converter
- * @param buffer A data buffer
- */
-private[parquet] class CatalystArrayConverter(
- val elementType: DataType,
- val index: Int,
- protected[parquet] val parent: CatalystConverter,
- protected[parquet] var buffer: Buffer[Any])
- extends CatalystConverter {
-
- def this(elementType: DataType, index: Int, parent: CatalystConverter) =
- this(
- elementType,
- index,
- parent,
- new ArrayBuffer[Any](CatalystArrayConverter.INITIAL_ARRAY_SIZE))
-
- protected[parquet] val converter: Converter = CatalystConverter.createConverter(
- new CatalystConverter.FieldType(
- CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME,
- elementType,
- false),
- fieldIndex=0,
- parent=this)
-
- override def getConverter(fieldIndex: Int): Converter = converter
-
- // arrays have only one (repeated) field, which is its elements
- override val size = 1
-
- override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = {
- // fieldIndex is ignored (assumed to be zero but not checked)
- if(value == null) {
- throw new IllegalArgumentException("Null values inside Parquet arrays are not supported!")
- }
- buffer += value
- }
-
- override protected[parquet] def clearBuffer(): Unit = {
- buffer.clear()
- }
-
- override def start(): Unit = {
- if (!converter.isPrimitive) {
- converter.asInstanceOf[CatalystConverter].clearBuffer
- }
- }
-
- override def end(): Unit = {
- assert(parent != null)
- // here we need to make sure to use ArrayScalaType
- parent.updateField(index, buffer.toArray.toSeq)
- clearBuffer()
- }
-}
-
-/**
- * A `parquet.io.api.GroupConverter` that converts a single-element groups that
- * match the characteristics of an array (see
- * [[org.apache.spark.sql.parquet.ParquetTypesConverter]]) into an
- * [[org.apache.spark.sql.types.ArrayType]].
- *
- * @param elementType The type of the array elements (native)
- * @param index The position of this (array) field inside its parent converter
- * @param parent The parent converter
- * @param capacity The (initial) capacity of the buffer
- */
-private[parquet] class CatalystNativeArrayConverter(
- val elementType: NativeType,
- val index: Int,
- protected[parquet] val parent: CatalystConverter,
- protected[parquet] var capacity: Int = CatalystArrayConverter.INITIAL_ARRAY_SIZE)
- extends CatalystConverter {
-
- type NativeType = elementType.JvmType
-
- private var buffer: Array[NativeType] = elementType.classTag.newArray(capacity)
-
- private var elements: Int = 0
-
- protected[parquet] val converter: Converter = CatalystConverter.createConverter(
- new CatalystConverter.FieldType(
- CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME,
- elementType,
- false),
- fieldIndex=0,
- parent=this)
-
- override def getConverter(fieldIndex: Int): Converter = converter
-
- // arrays have only one (repeated) field, which is its elements
- override val size = 1
-
- override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit =
- throw new UnsupportedOperationException
-
- // Overridden here to avoid auto-boxing for primitive types
- override protected[parquet] def updateBoolean(fieldIndex: Int, value: Boolean): Unit = {
- checkGrowBuffer()
- buffer(elements) = value.asInstanceOf[NativeType]
- elements += 1
- }
-
- override protected[parquet] def updateInt(fieldIndex: Int, value: Int): Unit = {
- checkGrowBuffer()
- buffer(elements) = value.asInstanceOf[NativeType]
- elements += 1
- }
-
- override protected[parquet] def updateShort(fieldIndex: Int, value: Short): Unit = {
- checkGrowBuffer()
- buffer(elements) = value.asInstanceOf[NativeType]
- elements += 1
- }
-
- override protected[parquet] def updateByte(fieldIndex: Int, value: Byte): Unit = {
- checkGrowBuffer()
- buffer(elements) = value.asInstanceOf[NativeType]
- elements += 1
- }
-
- override protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit = {
- checkGrowBuffer()
- buffer(elements) = value.asInstanceOf[NativeType]
- elements += 1
- }
-
- override protected[parquet] def updateDouble(fieldIndex: Int, value: Double): Unit = {
- checkGrowBuffer()
- buffer(elements) = value.asInstanceOf[NativeType]
- elements += 1
- }
-
- override protected[parquet] def updateFloat(fieldIndex: Int, value: Float): Unit = {
- checkGrowBuffer()
- buffer(elements) = value.asInstanceOf[NativeType]
- elements += 1
- }
-
- override protected[parquet] def updateBinary(fieldIndex: Int, value: Binary): Unit = {
- checkGrowBuffer()
- buffer(elements) = value.getBytes.asInstanceOf[NativeType]
- elements += 1
- }
-
- override protected[parquet] def updateString(fieldIndex: Int, value: String): Unit = {
- checkGrowBuffer()
- buffer(elements) = value.asInstanceOf[NativeType]
- elements += 1
- }
-
- override protected[parquet] def clearBuffer(): Unit = {
- elements = 0
- }
-
- override def start(): Unit = {}
-
- override def end(): Unit = {
- assert(parent != null)
- // here we need to make sure to use ArrayScalaType
- parent.updateField(
- index,
- buffer.slice(0, elements).toSeq)
- clearBuffer()
- }
-
- private def checkGrowBuffer(): Unit = {
- if (elements >= capacity) {
- val newCapacity = 2 * capacity
- val tmp: Array[NativeType] = elementType.classTag.newArray(newCapacity)
- Array.copy(buffer, 0, tmp, 0, capacity)
- buffer = tmp
- capacity = newCapacity
- }
- }
-}
-
-/**
- * A `parquet.io.api.GroupConverter` that converts a single-element groups that
- * match the characteristics of an array contains null (see
- * [[org.apache.spark.sql.parquet.ParquetTypesConverter]]) into an
- * [[org.apache.spark.sql.types.ArrayType]].
- *
- * @param elementType The type of the array elements (complex or primitive)
- * @param index The position of this (array) field inside its parent converter
- * @param parent The parent converter
- * @param buffer A data buffer
- */
-private[parquet] class CatalystArrayContainsNullConverter(
- val elementType: DataType,
- val index: Int,
- protected[parquet] val parent: CatalystConverter,
- protected[parquet] var buffer: Buffer[Any])
- extends CatalystConverter {
-
- def this(elementType: DataType, index: Int, parent: CatalystConverter) =
- this(
- elementType,
- index,
- parent,
- new ArrayBuffer[Any](CatalystArrayConverter.INITIAL_ARRAY_SIZE))
-
- protected[parquet] val converter: Converter = new CatalystConverter {
-
- private var current: Any = null
-
- val converter = CatalystConverter.createConverter(
- new CatalystConverter.FieldType(
- CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME,
- elementType,
- false),
- fieldIndex = 0,
- parent = this)
-
- override def getConverter(fieldIndex: Int): Converter = converter
-
- override def end(): Unit = parent.updateField(index, current)
-
- override def start(): Unit = {
- current = null
- }
-
- override protected[parquet] val size: Int = 1
- override protected[parquet] val index: Int = 0
- override protected[parquet] val parent = CatalystArrayContainsNullConverter.this
-
- override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = {
- current = value
- }
-
- override protected[parquet] def clearBuffer(): Unit = {}
- }
-
- override def getConverter(fieldIndex: Int): Converter = converter
-
- // arrays have only one (repeated) field, which is its elements
- override val size = 1
-
- override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = {
- buffer += value
- }
-
- override protected[parquet] def clearBuffer(): Unit = {
- buffer.clear()
- }
-
- override def start(): Unit = {}
-
- override def end(): Unit = {
- assert(parent != null)
- // here we need to make sure to use ArrayScalaType
- parent.updateField(index, buffer.toArray.toSeq)
- clearBuffer()
- }
-}
-
-/**
- * This converter is for multi-element groups of primitive or complex types
- * that have repetition level optional or required (so struct fields).
- *
- * @param schema The corresponding Catalyst schema in the form of a list of
- * attributes.
- * @param index
- * @param parent
- */
-private[parquet] class CatalystStructConverter(
- override protected[parquet] val schema: Array[FieldType],
- override protected[parquet] val index: Int,
- override protected[parquet] val parent: CatalystConverter)
- extends CatalystGroupConverter(schema, index, parent) {
-
- override protected[parquet] def clearBuffer(): Unit = {}
-
- // TODO: think about reusing the buffer
- override def end(): Unit = {
- assert(!isRootConverter)
- // here we need to make sure to use StructScalaType
- // Note: we need to actually make a copy of the array since we
- // may be in a nested field
- parent.updateField(index, new GenericRow(current.toArray))
- }
-}
-
-/**
- * A `parquet.io.api.GroupConverter` that converts two-element groups that
- * match the characteristics of a map (see
- * [[org.apache.spark.sql.parquet.ParquetTypesConverter]]) into an
- * [[org.apache.spark.sql.types.MapType]].
- *
- * @param schema
- * @param index
- * @param parent
- */
-private[parquet] class CatalystMapConverter(
- protected[parquet] val schema: Array[FieldType],
- override protected[parquet] val index: Int,
- override protected[parquet] val parent: CatalystConverter)
- extends CatalystConverter {
-
- private val map = new HashMap[Any, Any]()
-
- private val keyValueConverter = new CatalystConverter {
- private var currentKey: Any = null
- private var currentValue: Any = null
- val keyConverter = CatalystConverter.createConverter(schema(0), 0, this)
- val valueConverter = CatalystConverter.createConverter(schema(1), 1, this)
-
- override def getConverter(fieldIndex: Int): Converter = {
- if (fieldIndex == 0) keyConverter else valueConverter
- }
-
- override def end(): Unit = CatalystMapConverter.this.map += currentKey -> currentValue
-
- override def start(): Unit = {
- currentKey = null
- currentValue = null
- }
-
- override protected[parquet] val size: Int = 2
- override protected[parquet] val index: Int = 0
- override protected[parquet] val parent: CatalystConverter = CatalystMapConverter.this
-
- override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = {
- fieldIndex match {
- case 0 =>
- currentKey = value
- case 1 =>
- currentValue = value
- case _ =>
- new RuntimePermission(s"trying to update Map with fieldIndex $fieldIndex")
- }
- }
-
- override protected[parquet] def clearBuffer(): Unit = {}
- }
-
- override protected[parquet] val size: Int = 1
-
- override protected[parquet] def clearBuffer(): Unit = {}
-
- override def start(): Unit = {
- map.clear()
- }
-
- override def end(): Unit = {
- // here we need to make sure to use MapScalaType
- parent.updateField(index, map.toMap)
- }
-
- override def getConverter(fieldIndex: Int): Converter = keyValueConverter
-
- override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit =
- throw new UnsupportedOperationException
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
index 5a1b15490d273..6c3329590e448 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
@@ -33,18 +33,16 @@ import org.apache.spark.sql.types._
/**
* A `parquet.io.api.RecordMaterializer` for Rows.
- *
- *@param root The root group converter for the record.
*/
-private[parquet] class RowRecordMaterializer(root: CatalystConverter)
+private[parquet] class RowRecordMaterializer(parquetSchema: MessageType, attributes: Seq[Attribute])
extends RecordMaterializer[Row] {
- def this(parquetSchema: MessageType, attributes: Seq[Attribute]) =
- this(CatalystConverter.createRootConverter(parquetSchema, attributes))
+ private val rootConverter =
+ new CatalystRowConverter(parquetSchema, StructType.fromAttributes(attributes), NoopUpdater)
- override def getCurrentRecord: Row = root.getCurrentRecord
+ override def getCurrentRecord: Row = rootConverter.currentRow
- override def getRootConverter: GroupConverter = root.asInstanceOf[GroupConverter]
+ override def getRootConverter: GroupConverter = rootConverter
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
index 203bc79f153dd..4b6a80d325b04 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
@@ -28,7 +28,7 @@ import parquet.example.data.simple.SimpleGroup
import parquet.example.data.{Group, GroupWriter}
import parquet.hadoop.api.WriteSupport
import parquet.hadoop.api.WriteSupport.WriteContext
-import parquet.hadoop.metadata.{ParquetMetadata, FileMetaData, CompressionCodecName}
+import parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, ParquetMetadata}
import parquet.hadoop.{Footer, ParquetFileWriter, ParquetWriter}
import parquet.io.api.RecordConsumer
import parquet.schema.{MessageType, MessageTypeParser}
@@ -101,10 +101,8 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
}
test("fixed-length decimals") {
-
- def makeDecimalRDD(decimal: DecimalType): DataFrame =
- sparkContext
- .parallelize(0 to 1000)
+ def makeDecimalDataFrame(decimal: DecimalType): DataFrame =
+ (0 to 1000)
.map(i => Tuple1(i / 100.0))
.toDF()
// Parquet doesn't allow column names with spaces, have to add an alias here
@@ -112,7 +110,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
for ((precision, scale) <- Seq((5, 2), (1, 0), (1, 1), (18, 10), (18, 17))) {
withTempPath { dir =>
- val data = makeDecimalRDD(DecimalType(precision, scale))
+ val data = makeDecimalDataFrame(DecimalType(precision, scale))
data.saveAsParquetFile(dir.getCanonicalPath)
checkAnswer(parquetFile(dir.getCanonicalPath), data.collect().toSeq)
}
@@ -121,7 +119,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
// Decimals with precision above 18 are not yet supported
intercept[RuntimeException] {
withTempPath { dir =>
- makeDecimalRDD(DecimalType(19, 10)).saveAsParquetFile(dir.getCanonicalPath)
+ makeDecimalDataFrame(DecimalType(19, 10)).saveAsParquetFile(dir.getCanonicalPath)
parquetFile(dir.getCanonicalPath).collect()
}
}
@@ -129,7 +127,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
// Unlimited-length decimals are not yet supported
intercept[RuntimeException] {
withTempPath { dir =>
- makeDecimalRDD(DecimalType.Unlimited).saveAsParquetFile(dir.getCanonicalPath)
+ makeDecimalDataFrame(DecimalType.Unlimited).saveAsParquetFile(dir.getCanonicalPath)
parquetFile(dir.getCanonicalPath).collect()
}
}