From 7f07af04bb22323a68662bb58a243f64d1d2a226 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 9 Dec 2014 21:00:09 +0800 Subject: [PATCH] Adds a new set of Parquet test suites --- .../scala/org/apache/spark/sql/SQLConf.scala | 4 + .../spark/sql/parquet/ParquetTest.scala | 119 ++++++++ .../sql/parquet/ParquetFilterSuite.scala | 250 +++++++++++++++ .../spark/sql/parquet/ParquetIOSuite.scala | 284 ++++++++++++++++++ .../sql/parquet/ParquetQuerySuite2.scala | 107 +++++++ .../sql/parquet/ParquetSchemaSuite.scala | 161 ++++++++++ .../spark/sql/parquet/parquetSuites.scala | 6 +- 7 files changed, 928 insertions(+), 3 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite2.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 9697beb132fbb..8731c035a74ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -181,6 +181,10 @@ private[sql] trait SQLConf { */ def getAllConfs: immutable.Map[String, String] = settings.synchronized { settings.toMap } + private[spark] def unsetConf(key: String) { + settings -= key + } + private[spark] def clear() { settings.clear() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala new file mode 100644 index 0000000000000..4c72fdfca0124 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.parquet + +import java.io.File + +import scala.reflect.ClassTag +import scala.reflect.runtime.universe.TypeTag +import scala.util.Try + +import org.apache.spark.sql.SchemaRDD +import org.apache.spark.sql.catalyst.util +import org.apache.spark.sql.test.TestSQLContext._ +import org.apache.spark.util.Utils + +/** + * A helper trait that provides convenient facilities for Parquet testing. + * + * NOTE: Considering classes `Tuple1` ... `Tuple22` all extend `Product`, it would be more + * convenient to use tuples rather than special case classes when writing test cases/suites. + * Especially, `Tuple1.apply` can be used to easily wrap a single type/value. + */ +trait ParquetTest { + protected val configuration = sparkContext.hadoopConfiguration + + /** + * Sets all SQL configurations specified in `pairs`, calls `f`, and then restore all SQL + * configurations. + * + * @todo Probably this method should be moved to a more general place + */ + protected def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { + val (keys, values) = pairs.unzip + val currentValues = keys.map(key => Try(getConf(key)).toOption) + (keys, values).zipped.foreach(setConf) + try f finally { + keys.zip(currentValues).foreach { + case (key, Some(value)) => setConf(key, value) + case (key, None) => unsetConf(key) + } + } + } + + /** + * Generates a temporary path without creating the actual file/directory, then pass it to `f`. + * + * @todo Probably this method should be moved to a more general place + */ + protected def withTempPath(f: File => Unit): Unit = { + val file = util.getTempFilePath("parquetTest").getCanonicalFile + try f(file) finally Utils.deleteRecursively(file) + } + + /** + * Creates a temporary directory, which is then passed to `f`. + * + * @todo Probably this method should be moved to a more general place + */ + protected def withTempDir(f: File => Unit): Unit = { + val dir = Utils.createTempDir().getCanonicalFile + try f(dir) finally Utils.deleteRecursively(dir) + } + + /** + * Writes `data` to a Parquet file, which is then passed to `f`. + */ + protected def withParquetFile[T <: Product: ClassTag: TypeTag] + (data: Seq[T]) + (f: String => Unit): Unit = { + withTempPath { file => + sparkContext.parallelize(data).toSchemaRDD.saveAsParquetFile(file.getCanonicalPath) + f(file.getCanonicalPath) + } + } + + /** + * Writes `data` to a Parquet file and reads it back as a SchemaRDD, which is then passed to `f`. + */ + protected def withParquetRDD[T <: Product: ClassTag: TypeTag] + (data: Seq[T]) + (f: SchemaRDD => Unit): Unit = { + withParquetFile(data)(path => f(parquetFile(path))) + } + + /** + * Drops temporary table `tableName` after calling `f`. + */ + protected def withTempTable(tableName: String)(f: => Unit): Unit = { + try f finally dropTempTable(tableName) + } + + /** + * Writes `data` to a Parquet file, reads it back as a SchemaRDD and registers it as a temporary + * table named `tableName`, then call `f`. + */ + protected def withParquetTable[T <: Product: ClassTag: TypeTag] + (data: Seq[T], tableName: String) + (f: => Unit): Unit = { + withParquetRDD(data) { rdd => + rdd.registerTempTable(tableName) + withTempTable(tableName)(f) + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala new file mode 100644 index 0000000000000..54341166ca95b --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.parquet + +import parquet.filter2.predicate.Operators._ +import parquet.filter2.predicate.{FilterPredicate, Operators} + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.expressions.{Literal, Predicate, Row} +import org.apache.spark.sql.{QueryTest, SQLConf, SchemaRDD} + +/** + * A test suite that tests Parquet filter2 API based filter pushdown optimization. + * + * Notice that `!(a cmp b)` are always transformed to its negated form `a cmp' b` by the + * `BooleanSimplification` optimization rule whenever possible. As a result, predicate `!(a < 1)` + * results a `GtEq` filter predicate rather than a `Not`. + * + * @todo Add test cases for `IsNull` and `IsNotNull` after merging PR #3367 + */ +class ParquetFilterSuite extends QueryTest with ParquetTest { + private def checkFilterPushdown( + rdd: SchemaRDD, + output: Seq[Symbol], + predicate: Predicate, + filterClass: Class[_ <: FilterPredicate], + checker: (SchemaRDD, Any) => Unit, + expectedResult: => Any): Unit = { + withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED -> "true") { + val query = rdd.select(output.map(_.attr): _*).where(predicate) + + val maybeAnalyzedPredicate = query.queryExecution.executedPlan.collect { + case plan: ParquetTableScan => plan.columnPruningPred + }.flatten.reduceOption(_ && _) + + assert(maybeAnalyzedPredicate.isDefined) + maybeAnalyzedPredicate.foreach { pred => + val maybeFilter = ParquetFilters.createFilter(pred) + assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred") + maybeFilter.foreach(f => assert(f.getClass === filterClass)) + } + + checker(query, expectedResult) + } + } + + private def checkFilterPushdown + (rdd: SchemaRDD, output: Symbol*) + (predicate: Predicate, filterClass: Class[_ <: FilterPredicate]) + (expectedResult: => Any): Unit = { + checkFilterPushdown(rdd, output, predicate, filterClass, checkAnswer _, expectedResult) + } + + def checkBinaryFilterPushdown + (rdd: SchemaRDD, output: Symbol*) + (predicate: Predicate, filterClass: Class[_ <: FilterPredicate]) + (expectedResult: => Any): Unit = { + def checkBinaryAnswer(rdd: SchemaRDD, result: Any): Unit = { + val actual = rdd.map(_.getAs[Array[Byte]](0).mkString(",")).collect().toSeq + val expected = result match { + case s: Seq[_] => s.map(_.asInstanceOf[Row].getAs[Array[Byte]](0).mkString(",")) + case s => Seq(s.asInstanceOf[Array[Byte]].mkString(",")) + } + assert(actual.sameElements(expected)) + } + checkFilterPushdown(rdd, output, predicate, filterClass, checkBinaryAnswer _, expectedResult) + } + + test("filter pushdown - boolean") { + withParquetRDD((true :: false :: Nil).map(Tuple1.apply)) { rdd => + checkFilterPushdown(rdd, '_1)('_1 === true, classOf[Eq[java.lang.Boolean]])(true) + checkFilterPushdown(rdd, '_1)('_1 !== true, classOf[Operators.Not])(false) + } + } + + test("filter pushdown - integer") { + withParquetRDD((1 to 4).map(Tuple1.apply)) { rdd => + checkFilterPushdown(rdd, '_1)('_1 === 1, classOf[Eq[Integer]])(1) + checkFilterPushdown(rdd, '_1)('_1 !== 1, classOf[Operators.Not]) { + (2 to 4).map(Row.apply(_)) + } + + checkFilterPushdown(rdd, '_1)('_1 < 2, classOf[Lt [Integer]])(1) + checkFilterPushdown(rdd, '_1)('_1 > 3, classOf[Gt [Integer]])(4) + checkFilterPushdown(rdd, '_1)('_1 <= 1, classOf[LtEq[Integer]])(1) + checkFilterPushdown(rdd, '_1)('_1 >= 4, classOf[GtEq[Integer]])(4) + + checkFilterPushdown(rdd, '_1)(Literal(1) === '_1, classOf[Eq [Integer]])(1) + checkFilterPushdown(rdd, '_1)(Literal(2) > '_1, classOf[Lt [Integer]])(1) + checkFilterPushdown(rdd, '_1)(Literal(3) < '_1, classOf[Gt [Integer]])(4) + checkFilterPushdown(rdd, '_1)(Literal(1) >= '_1, classOf[LtEq[Integer]])(1) + checkFilterPushdown(rdd, '_1)(Literal(4) <= '_1, classOf[GtEq[Integer]])(4) + + checkFilterPushdown(rdd, '_1)(!('_1 < 4), classOf[Operators.GtEq[Integer]])(4) + checkFilterPushdown(rdd, '_1)('_1 > 2 && '_1 < 4, classOf[Operators.And])(3) + checkFilterPushdown(rdd, '_1)('_1 < 2 || '_1 > 3, classOf[Operators.Or]) { + Seq(Row(1), Row(4)) + } + } + } + + test("filter pushdown - long") { + withParquetRDD((1 to 4).map(i => Tuple1.apply(i.toLong))) { rdd => + checkFilterPushdown(rdd, '_1)('_1 === 1, classOf[Eq[java.lang.Long]])(1) + checkFilterPushdown(rdd, '_1)('_1 !== 1, classOf[Operators.Not]) { + (2 to 4).map(Row.apply(_)) + } + + checkFilterPushdown(rdd, '_1)('_1 < 2, classOf[Lt [java.lang.Long]])(1) + checkFilterPushdown(rdd, '_1)('_1 > 3, classOf[Gt [java.lang.Long]])(4) + checkFilterPushdown(rdd, '_1)('_1 <= 1, classOf[LtEq[java.lang.Long]])(1) + checkFilterPushdown(rdd, '_1)('_1 >= 4, classOf[GtEq[java.lang.Long]])(4) + + checkFilterPushdown(rdd, '_1)(Literal(1) === '_1, classOf[Eq [Integer]])(1) + checkFilterPushdown(rdd, '_1)(Literal(2) > '_1, classOf[Lt [java.lang.Long]])(1) + checkFilterPushdown(rdd, '_1)(Literal(3) < '_1, classOf[Gt [java.lang.Long]])(4) + checkFilterPushdown(rdd, '_1)(Literal(1) >= '_1, classOf[LtEq[java.lang.Long]])(1) + checkFilterPushdown(rdd, '_1)(Literal(4) <= '_1, classOf[GtEq[java.lang.Long]])(4) + + checkFilterPushdown(rdd, '_1)(!('_1 < 4), classOf[Operators.GtEq[java.lang.Long]])(4) + checkFilterPushdown(rdd, '_1)('_1 > 2 && '_1 < 4, classOf[Operators.And])(3) + checkFilterPushdown(rdd, '_1)('_1 < 2 || '_1 > 3, classOf[Operators.Or]) { + Seq(Row(1), Row(4)) + } + } + } + + test("filter pushdown - float") { + withParquetRDD((1 to 4).map(i => Tuple1.apply(i.toFloat))) { rdd => + checkFilterPushdown(rdd, '_1)('_1 === 1, classOf[Eq[java.lang.Float]])(1) + checkFilterPushdown(rdd, '_1)('_1 !== 1, classOf[Operators.Not]) { + (2 to 4).map(Row.apply(_)) + } + + checkFilterPushdown(rdd, '_1)('_1 < 2, classOf[Lt [java.lang.Float]])(1) + checkFilterPushdown(rdd, '_1)('_1 > 3, classOf[Gt [java.lang.Float]])(4) + checkFilterPushdown(rdd, '_1)('_1 <= 1, classOf[LtEq[java.lang.Float]])(1) + checkFilterPushdown(rdd, '_1)('_1 >= 4, classOf[GtEq[java.lang.Float]])(4) + + checkFilterPushdown(rdd, '_1)(Literal(1) === '_1, classOf[Eq [Integer]])(1) + checkFilterPushdown(rdd, '_1)(Literal(2) > '_1, classOf[Lt [java.lang.Float]])(1) + checkFilterPushdown(rdd, '_1)(Literal(3) < '_1, classOf[Gt [java.lang.Float]])(4) + checkFilterPushdown(rdd, '_1)(Literal(1) >= '_1, classOf[LtEq[java.lang.Float]])(1) + checkFilterPushdown(rdd, '_1)(Literal(4) <= '_1, classOf[GtEq[java.lang.Float]])(4) + + checkFilterPushdown(rdd, '_1)(!('_1 < 4), classOf[Operators.GtEq[java.lang.Float]])(4) + checkFilterPushdown(rdd, '_1)('_1 > 2 && '_1 < 4, classOf[Operators.And])(3) + checkFilterPushdown(rdd, '_1)('_1 < 2 || '_1 > 3, classOf[Operators.Or]) { + Seq(Row(1), Row(4)) + } + } + } + + test("filter pushdown - double") { + withParquetRDD((1 to 4).map(i => Tuple1.apply(i.toDouble))) { rdd => + checkFilterPushdown(rdd, '_1)('_1 === 1, classOf[Eq[java.lang.Double]])(1) + checkFilterPushdown(rdd, '_1)('_1 !== 1, classOf[Operators.Not]) { + (2 to 4).map(Row.apply(_)) + } + + checkFilterPushdown(rdd, '_1)('_1 < 2, classOf[Lt [java.lang.Double]])(1) + checkFilterPushdown(rdd, '_1)('_1 > 3, classOf[Gt [java.lang.Double]])(4) + checkFilterPushdown(rdd, '_1)('_1 <= 1, classOf[LtEq[java.lang.Double]])(1) + checkFilterPushdown(rdd, '_1)('_1 >= 4, classOf[GtEq[java.lang.Double]])(4) + + checkFilterPushdown(rdd, '_1)(Literal(1) === '_1, classOf[Eq[Integer]])(1) + checkFilterPushdown(rdd, '_1)(Literal(2) > '_1, classOf[Lt [java.lang.Double]])(1) + checkFilterPushdown(rdd, '_1)(Literal(3) < '_1, classOf[Gt [java.lang.Double]])(4) + checkFilterPushdown(rdd, '_1)(Literal(1) >= '_1, classOf[LtEq[java.lang.Double]])(1) + checkFilterPushdown(rdd, '_1)(Literal(4) <= '_1, classOf[GtEq[java.lang.Double]])(4) + + checkFilterPushdown(rdd, '_1)(!('_1 < 4), classOf[Operators.GtEq[java.lang.Double]])(4) + checkFilterPushdown(rdd, '_1)('_1 > 2 && '_1 < 4, classOf[Operators.And])(3) + checkFilterPushdown(rdd, '_1)('_1 < 2 || '_1 > 3, classOf[Operators.Or]) { + Seq(Row(1), Row(4)) + } + } + } + + test("filter pushdown - string") { + withParquetRDD((1 to 4).map(i => Tuple1.apply(i.toString))) { rdd => + checkFilterPushdown(rdd, '_1)('_1 === "1", classOf[Eq[String]])("1") + checkFilterPushdown(rdd, '_1)('_1 !== "1", classOf[Operators.Not]) { + (2 to 4).map(i => Row.apply(i.toString)) + } + + checkFilterPushdown(rdd, '_1)('_1 < "2", classOf[Lt [java.lang.String]])("1") + checkFilterPushdown(rdd, '_1)('_1 > "3", classOf[Gt [java.lang.String]])("4") + checkFilterPushdown(rdd, '_1)('_1 <= "1", classOf[LtEq[java.lang.String]])("1") + checkFilterPushdown(rdd, '_1)('_1 >= "4", classOf[GtEq[java.lang.String]])("4") + + checkFilterPushdown(rdd, '_1)(Literal("1") === '_1, classOf[Eq [java.lang.String]])("1") + checkFilterPushdown(rdd, '_1)(Literal("2") > '_1, classOf[Lt [java.lang.String]])("1") + checkFilterPushdown(rdd, '_1)(Literal("3") < '_1, classOf[Gt [java.lang.String]])("4") + checkFilterPushdown(rdd, '_1)(Literal("1") >= '_1, classOf[LtEq[java.lang.String]])("1") + checkFilterPushdown(rdd, '_1)(Literal("4") <= '_1, classOf[GtEq[java.lang.String]])("4") + + checkFilterPushdown(rdd, '_1)(!('_1 < "4"), classOf[Operators.GtEq[java.lang.String]])("4") + checkFilterPushdown(rdd, '_1)('_1 > "2" && '_1 < "4", classOf[Operators.And])("3") + checkFilterPushdown(rdd, '_1)('_1 < "2" || '_1 > "3", classOf[Operators.Or]) { + Seq(Row("1"), Row("4")) + } + } + } + + test("filter pushdown - binary") { + implicit class IntToBinary(int: Int) { + def b: Array[Byte] = int.toString.getBytes("UTF-8") + } + + withParquetRDD((1 to 4).map(i => Tuple1.apply(i.b))) { rdd => + checkBinaryFilterPushdown(rdd, '_1)('_1 === 1.b, classOf[Eq[Array[Byte]]])(1.b) + checkBinaryFilterPushdown(rdd, '_1)('_1 !== 1.b, classOf[Operators.Not]) { + (2 to 4).map(i => Row.apply(i.b)).toSeq + } + + checkBinaryFilterPushdown(rdd, '_1)('_1 < 2.b, classOf[Lt [Array[Byte]]])(1.b) + checkBinaryFilterPushdown(rdd, '_1)('_1 > 3.b, classOf[Gt [Array[Byte]]])(4.b) + checkBinaryFilterPushdown(rdd, '_1)('_1 <= 1.b, classOf[LtEq[Array[Byte]]])(1.b) + checkBinaryFilterPushdown(rdd, '_1)('_1 >= 4.b, classOf[GtEq[Array[Byte]]])(4.b) + + checkBinaryFilterPushdown(rdd, '_1)(Literal(1.b) === '_1, classOf[Eq [Array[Byte]]])(1.b) + checkBinaryFilterPushdown(rdd, '_1)(Literal(2.b) > '_1, classOf[Lt [Array[Byte]]])(1.b) + checkBinaryFilterPushdown(rdd, '_1)(Literal(3.b) < '_1, classOf[Gt [Array[Byte]]])(4.b) + checkBinaryFilterPushdown(rdd, '_1)(Literal(1.b) >= '_1, classOf[LtEq[Array[Byte]]])(1.b) + checkBinaryFilterPushdown(rdd, '_1)(Literal(4.b) <= '_1, classOf[GtEq[Array[Byte]]])(4.b) + + checkBinaryFilterPushdown(rdd, '_1)(!('_1 < 4.b), classOf[Operators.GtEq[Array[Byte]]])(4.b) + checkBinaryFilterPushdown(rdd, '_1)('_1 > 2.b && '_1 < 4.b, classOf[Operators.And])(3.b) + checkBinaryFilterPushdown(rdd, '_1)('_1 < 2.b || '_1 > 3.b, classOf[Operators.Or]) { + Seq(Row(1.b), Row(4.b)) + } + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala new file mode 100644 index 0000000000000..7973847e0faa5 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.parquet + +import scala.collection.JavaConversions._ +import scala.reflect.ClassTag +import scala.reflect.runtime.universe.TypeTag + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import parquet.example.data.simple.SimpleGroup +import parquet.example.data.{Group, GroupWriter} +import parquet.hadoop.api.WriteSupport +import parquet.hadoop.api.WriteSupport.WriteContext +import parquet.hadoop.metadata.CompressionCodecName +import parquet.hadoop.{ParquetFileWriter, ParquetWriter} +import parquet.io.api.RecordConsumer +import parquet.schema.{MessageType, MessageTypeParser} + +import org.apache.spark.sql.catalyst.ScalaReflection +import org.apache.spark.sql.catalyst.expressions.Row +import org.apache.spark.sql.catalyst.types.DecimalType +import org.apache.spark.sql.test.TestSQLContext._ +import org.apache.spark.sql.{QueryTest, SQLConf, SchemaRDD} + +// Write support class for nested groups: ParquetWriter initializes GroupWriteSupport +// with an empty configuration (it is after all not intended to be used in this way?) +// and members are private so we need to make our own in order to pass the schema +// to the writer. +private[parquet] class TestGroupWriteSupport(schema: MessageType) extends WriteSupport[Group] { + var groupWriter: GroupWriter = null + + override def prepareForWrite(recordConsumer: RecordConsumer): Unit = { + groupWriter = new GroupWriter(recordConsumer, schema) + } + + override def init(configuration: Configuration): WriteContext = { + new WriteContext(schema, new java.util.HashMap[String, String]()) + } + + override def write(record: Group) { + groupWriter.write(record) + } +} + +/** + * A test suite that tests basic Parquet I/O. + */ +class ParquetIOSuite extends QueryTest with ParquetTest { + /** + * Writes `data` to a Parquet file, reads it back and check file contents. + */ + protected def checkParquetFile[T <: Product: ClassTag: TypeTag](data: Seq[T]): Unit = { + withParquetRDD(data)(checkAnswer(_, data)) + } + + test("basic data types (without binary)") { + val data = (1 to 4).map { i => + (i % 2 == 0, i, i.toLong, i.toFloat, i.toDouble) + } + checkParquetFile(data) + } + + test("raw binary") { + val data = (1 to 4).map(i => Tuple1(Array.fill(3)(i.toByte))) + withParquetRDD(data) { rdd => + assertResult(data.map(_._1.mkString(","))) { + rdd.collect().map(_.getAs[Array[Byte]](0).mkString(",")) + } + } + } + + test("string") { + val data = (1 to 4).map(i => Tuple1(i.toString)) + // Property spark.sql.parquet.binaryAsString shouldn't affect Parquet files written by Spark SQL + // as we store Spark SQL schema in the extra metadata. + withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING -> "false")(checkParquetFile(data)) + withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING -> "true")(checkParquetFile(data)) + } + + test("fixed-length decimals") { + def makeDecimalRDD(decimal: DecimalType): SchemaRDD = + sparkContext + .parallelize(0 to 1000) + .map(i => Tuple1(i / 100.0)) + .select('_1 cast decimal) + + for ((precision, scale) <- Seq((5, 2), (1, 0), (1, 1), (18, 10), (18, 17))) { + withTempPath { dir => + val data = makeDecimalRDD(DecimalType(precision, scale)) + data.saveAsParquetFile(dir.getCanonicalPath) + checkAnswer(parquetFile(dir.getCanonicalPath), data.collect().toSeq) + } + } + + // Decimals with precision above 18 are not yet supported + intercept[RuntimeException] { + withTempPath { dir => + makeDecimalRDD(DecimalType(19, 10)).saveAsParquetFile(dir.getCanonicalPath) + parquetFile(dir.getCanonicalPath).collect() + } + } + + // Unlimited-length decimals are not yet supported + intercept[RuntimeException] { + withTempPath { dir => + makeDecimalRDD(DecimalType.Unlimited).saveAsParquetFile(dir.getCanonicalPath) + parquetFile(dir.getCanonicalPath).collect() + } + } + } + + test("map") { + val data = (1 to 4).map(i => Tuple1(Map(i -> s"val_$i"))) + checkParquetFile(data) + } + + test("array") { + val data = (1 to 4).map(i => Tuple1(Seq(i, i + 1))) + checkParquetFile(data) + } + + test("struct") { + val data = (1 to 4).map(i => Tuple1((i, s"val_$i"))) + withParquetRDD(data) { rdd => + // Structs are converted to `Row`s + checkAnswer(rdd, data.map { case Tuple1(struct) => + Tuple1(Row(struct.productIterator.toSeq: _*)) + }) + } + } + + test("nested struct with array of array as field") { + val data = (1 to 4).map(i => Tuple1((i, Seq(Seq(s"val_$i"))))) + withParquetRDD(data) { rdd => + // Structs are converted to `Row`s + checkAnswer(rdd, data.map { case Tuple1(struct) => + Tuple1(Row(struct.productIterator.toSeq: _*)) + }) + } + } + + test("nested map with struct as value type") { + val data = (1 to 4).map(i => Tuple1(Map(i -> (i, s"val_$i")))) + withParquetRDD(data) { rdd => + checkAnswer(rdd, data.map { case Tuple1(m) => + Tuple1(m.mapValues(struct => Row(struct.productIterator.toSeq: _*))) + }) + } + } + + test("nulls") { + val allNulls = ( + null.asInstanceOf[java.lang.Boolean], + null.asInstanceOf[Integer], + null.asInstanceOf[java.lang.Long], + null.asInstanceOf[java.lang.Float], + null.asInstanceOf[java.lang.Double]) + + withParquetRDD(allNulls :: Nil) { rdd => + val rows = rdd.collect() + assert(rows.size === 1) + assert(rows.head === Row(Seq.fill(5)(null): _*)) + } + } + + test("nones") { + val allNones = ( + None.asInstanceOf[Option[Int]], + None.asInstanceOf[Option[Long]], + None.asInstanceOf[Option[String]]) + + withParquetRDD(allNones :: Nil) { rdd => + val rows = rdd.collect() + assert(rows.size === 1) + assert(rows.head === Row(Seq.fill(3)(null): _*)) + } + } + + test("compression codec") { + def compressionCodecFor(path: String) = { + val codecs = ParquetTypesConverter + .readMetaData(new Path(path), Some(configuration)) + .getBlocks + .flatMap(_.getColumns) + .map(_.getCodec.name()) + .distinct + + assert(codecs.size === 1) + codecs.head + } + + val data = (0 until 10).map(i => (i, i.toString)) + + def checkCompressionCodec(codec: CompressionCodecName): Unit = { + withSQLConf(SQLConf.PARQUET_COMPRESSION -> codec.name()) { + withParquetFile(data) { path => + assertResult(parquetCompressionCodec.toUpperCase) { + compressionCodecFor(path) + } + } + } + } + + // Checks default compression codec + checkCompressionCodec(CompressionCodecName.fromConf(parquetCompressionCodec)) + + checkCompressionCodec(CompressionCodecName.UNCOMPRESSED) + checkCompressionCodec(CompressionCodecName.GZIP) + checkCompressionCodec(CompressionCodecName.SNAPPY) + } + + test("read raw Parquet file") { + def makeRawParquetFile(path: Path): Unit = { + val schema = MessageTypeParser.parseMessageType( + """ + |message root { + | required boolean _1; + | required int32 _2; + | required int64 _3; + | required float _4; + | required double _5; + |} + """.stripMargin) + + val writeSupport = new TestGroupWriteSupport(schema) + val writer = new ParquetWriter[Group](path, writeSupport) + + (0 until 10).foreach { i => + val record = new SimpleGroup(schema) + record.add(0, i % 2 == 0) + record.add(1, i) + record.add(2, i.toLong) + record.add(3, i.toFloat) + record.add(4, i.toDouble) + writer.write(record) + } + + writer.close() + } + + withTempDir { dir => + val path = new Path(dir.toURI.toString, "part-r-0.parquet") + makeRawParquetFile(path) + checkAnswer(parquetFile(path.toString), (0 until 10).map { i => + (i % 2 == 0, i, i.toLong, i.toFloat, i.toDouble) + }) + } + } + + test("write metadata") { + withTempPath { file => + val path = new Path(file.toURI.toString) + val fs = FileSystem.getLocal(configuration) + val attributes = ScalaReflection.attributesFor[(Int, String)] + ParquetTypesConverter.writeMetaData(attributes, path, configuration) + + assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_COMMON_METADATA_FILE))) + assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE))) + + val metaData = ParquetTypesConverter.readMetaData(path, Some(configuration)) + val actualSchema = metaData.getFileMetaData.getSchema + val expectedSchema = ParquetTypesConverter.convertFromAttributes(attributes) + + actualSchema.checkContains(expectedSchema) + expectedSchema.checkContains(actualSchema) + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite2.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite2.scala new file mode 100644 index 0000000000000..734cca4852f11 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite2.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.parquet + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.expressions.Row +import org.apache.spark.sql.test.TestSQLContext._ + +/** + * A test suite that tests various Parquet queries. + */ +class ParquetQuerySuite2 extends QueryTest with ParquetTest { + test("simple projection") { + withParquetTable((0 until 10).map(i => (i, i.toString)), "t") { + checkAnswer(sql("SELECT _1 FROM t"), (0 until 10).map(Row.apply(_))) + } + } + + test("insertion") { + withTempDir { dir => + val data = (0 until 10).map(i => (i, i.toString)) + withParquetTable(data, "t") { + createParquetFile[(Int, String)](dir.toString).registerTempTable("dest") + withTempTable("dest") { + sql("INSERT OVERWRITE INTO dest SELECT * FROM t") + checkAnswer(table("dest"), data) + } + } + } + } + + test("appending") { + val data = (0 until 10).map(i => (i, i.toString)) + withParquetTable(data, "t") { + sql("INSERT INTO t SELECT * FROM t") + checkAnswer(table("t"), data ++ data) + } + } + + test("self-join") { + // 4 rows, cells of column 1 of row 2 and row 4 are null + val data = (1 to 4).map { i => + val maybeInt = if (i % 2 == 0) None else Some(i) + (maybeInt, i.toString) + } + + withParquetTable(data, "t") { + val selfJoin = sql("SELECT * FROM t x JOIN t y WHERE x._1 = y._1") + val queryOutput = selfJoin.queryExecution.analyzed.output + + assertResult(4, s"Field count mismatches")(queryOutput.size) + assertResult(2, s"Duplicated expression ID in query plan:\n $selfJoin") { + queryOutput.filter(_.name == "_1").map(_.exprId).size + } + + checkAnswer(selfJoin, List(Row(1, "1", 1, "1"), Row(3, "3", 3, "3"))) + } + } + + test("nested data - struct with array field") { + val data = (1 to 10).map(i => Tuple1((i, Seq(s"val_$i")))) + withParquetTable(data, "t") { + checkAnswer(sql("SELECT _1._2[0] FROM t"), data.map { + case Tuple1((_, Seq(string))) => Row(string) + }) + } + } + + test("nested data - array of struct") { + val data = (1 to 10).map(i => Tuple1(Seq(i -> s"val_$i"))) + withParquetTable(data, "t") { + checkAnswer(sql("SELECT _1[0]._2 FROM t"), data.map { + case Tuple1(Seq((_, string))) => Row(string) + }) + } + } + + test("SPARK-1913 regression: columns only referenced by pushed down filters should remain") { + withParquetTable((1 to 10).map(Tuple1.apply), "t") { + checkAnswer(sql(s"SELECT _1 FROM t WHERE _1 < 10"), (1 to 9).map(Row.apply(_))) + } + } + + test("SPARK-3536 regression: query empty Parquet file shouldn't throw") { + withTempDir { dir => + createParquetFile[(Int, String)](dir.toString).registerTempTable("t") + withTempTable("t") { + checkAnswer(sql("SELECT * FROM t"), Seq.empty[Row]) + } + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala new file mode 100644 index 0000000000000..c868bda8a2289 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.parquet + +import scala.reflect.ClassTag +import scala.reflect.runtime.universe.TypeTag + +import org.scalatest.FunSuite +import parquet.schema.MessageTypeParser + +import org.apache.spark.sql.catalyst.ScalaReflection +import org.apache.spark.sql.catalyst.types.{BinaryType, IntegerType, StructField, StructType} + +class ParquetSchemaSuite extends FunSuite with ParquetTest { + /** + * Checks whether the reflected Parquet message type for product type `T` conforms `messageType`. + */ + private def testSchema[T <: Product: ClassTag: TypeTag]( + testName: String, messageType: String): Unit = { + test(testName) { + val actual = ParquetTypesConverter.convertFromAttributes(ScalaReflection.attributesFor[T]) + val expected = MessageTypeParser.parseMessageType(messageType) + actual.checkContains(expected) + expected.checkContains(actual) + } + } + + testSchema[(Boolean, Int, Long, Float, Double, Array[Byte])]( + "basic types", + """ + |message root { + | required boolean _1; + | required int32 _2; + | required int64 _3; + | required float _4; + | required double _5; + | optional binary _6; + |} + """.stripMargin) + + testSchema[(Byte, Short, Int, Long)]( + "logical integral types", + """ + |message root { + | required int32 _1 (INT_8); + | required int32 _2 (INT_16); + | required int32 _3 (INT_32); + | required int64 _4 (INT_64); + |} + """.stripMargin) + + // Currently String is the only supported logical binary type. + testSchema[Tuple1[String]]( + "binary logical types", + """ + |message root { + | optional binary _1 (UTF8); + |} + """.stripMargin) + + testSchema[Tuple1[Seq[Int]]]( + "array", + """ + |message root { + | optional group _1 (LIST) { + | repeated int32 array; + | } + |} + """.stripMargin) + + testSchema[Tuple1[Map[Int, String]]]( + "map", + """ + |message root { + | optional group _1 (MAP) { + | repeated group map (MAP_KEY_VALUE) { + | required int32 key; + | optional binary value (UTF8); + | } + | } + |} + """.stripMargin) + + testSchema[Tuple1[Pair[Int, String]]]( + "struct", + """ + |message root { + | optional group _1 { + | required int32 _1; + | optional binary _2 (UTF8); + | } + |} + """.stripMargin) + + testSchema[Tuple1[Map[Int, (String, Seq[(Int, Double)])]]]( + "deeply nested type", + """ + |message root { + | optional group _1 (MAP) { + | repeated group map (MAP_KEY_VALUE) { + | required int32 key; + | optional group value { + | optional binary _1 (UTF8); + | optional group _2 (LIST) { + | repeated group bag { + | optional group array { + | required int32 _1; + | required double _2; + | } + | } + | } + | } + | } + | } + |} + """.stripMargin) + + testSchema[(Option[Int], Map[Int, Option[Double]])]( + "optional types", + """ + |message root { + | optional int32 _1; + | optional group _2 (MAP) { + | repeated group map (MAP_KEY_VALUE) { + | required int32 key; + | optional double value; + | } + | } + |} + """.stripMargin) + + test("DataType string parser compatibility") { + val schema = StructType(List( + StructField("c1", IntegerType, false), + StructField("c2", BinaryType, true))) + + val fromCaseClassString = ParquetTypesConverter.convertFromString(schema.toString) + val fromJson = ParquetTypesConverter.convertFromString(schema.json) + + (fromCaseClassString, fromJson).zipped.foreach { (a, b) => + assert(a.name == b.name) + assert(a.dataType === b.dataType) + assert(a.nullable === b.nullable) + } + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala index 488ebba043794..fc0e42c201d56 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala @@ -37,7 +37,7 @@ case class ParquetDataWithKey(p: Int, intField: Int, stringField: String) * A suite to test the automatic conversion of metastore tables with parquet data to use the * built in parquet support. */ -class ParquetMetastoreSuite extends ParquetTest { +class ParquetMetastoreSuite extends ParquetPartitioningTest { override def beforeAll(): Unit = { super.beforeAll() @@ -112,7 +112,7 @@ class ParquetMetastoreSuite extends ParquetTest { /** * A suite of tests for the Parquet support through the data sources API. */ -class ParquetSourceSuite extends ParquetTest { +class ParquetSourceSuite extends ParquetPartitioningTest { override def beforeAll(): Unit = { super.beforeAll() @@ -145,7 +145,7 @@ class ParquetSourceSuite extends ParquetTest { /** * A collection of tests for parquet data with various forms of partitioning. */ -abstract class ParquetTest extends QueryTest with BeforeAndAfterAll { +abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll { var partitionedTableDir: File = null var partitionedTableDirWithKey: File = null