Skip to content

Commit

Permalink
Adds a new set of Parquet test suites
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed Dec 9, 2014
1 parent 51b1fe1 commit 7f07af0
Show file tree
Hide file tree
Showing 7 changed files with 928 additions and 3 deletions.
4 changes: 4 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ private[sql] trait SQLConf {
*/
def getAllConfs: immutable.Map[String, String] = settings.synchronized { settings.toMap }

private[spark] def unsetConf(key: String) {
settings -= key
}

private[spark] def clear() {
settings.clear()
}
Expand Down
119 changes: 119 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.parquet

import java.io.File

import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
import scala.util.Try

import org.apache.spark.sql.SchemaRDD
import org.apache.spark.sql.catalyst.util
import org.apache.spark.sql.test.TestSQLContext._
import org.apache.spark.util.Utils

/**
* A helper trait that provides convenient facilities for Parquet testing.
*
* NOTE: Considering classes `Tuple1` ... `Tuple22` all extend `Product`, it would be more
* convenient to use tuples rather than special case classes when writing test cases/suites.
* Especially, `Tuple1.apply` can be used to easily wrap a single type/value.
*/
trait ParquetTest {
protected val configuration = sparkContext.hadoopConfiguration

/**
* Sets all SQL configurations specified in `pairs`, calls `f`, and then restore all SQL
* configurations.
*
* @todo Probably this method should be moved to a more general place
*/
protected def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
val (keys, values) = pairs.unzip
val currentValues = keys.map(key => Try(getConf(key)).toOption)
(keys, values).zipped.foreach(setConf)
try f finally {
keys.zip(currentValues).foreach {
case (key, Some(value)) => setConf(key, value)
case (key, None) => unsetConf(key)
}
}
}

/**
* Generates a temporary path without creating the actual file/directory, then pass it to `f`.
*
* @todo Probably this method should be moved to a more general place
*/
protected def withTempPath(f: File => Unit): Unit = {
val file = util.getTempFilePath("parquetTest").getCanonicalFile
try f(file) finally Utils.deleteRecursively(file)
}

/**
* Creates a temporary directory, which is then passed to `f`.
*
* @todo Probably this method should be moved to a more general place
*/
protected def withTempDir(f: File => Unit): Unit = {
val dir = Utils.createTempDir().getCanonicalFile
try f(dir) finally Utils.deleteRecursively(dir)
}

/**
* Writes `data` to a Parquet file, which is then passed to `f`.
*/
protected def withParquetFile[T <: Product: ClassTag: TypeTag]
(data: Seq[T])
(f: String => Unit): Unit = {
withTempPath { file =>
sparkContext.parallelize(data).toSchemaRDD.saveAsParquetFile(file.getCanonicalPath)
f(file.getCanonicalPath)
}
}

/**
* Writes `data` to a Parquet file and reads it back as a SchemaRDD, which is then passed to `f`.
*/
protected def withParquetRDD[T <: Product: ClassTag: TypeTag]
(data: Seq[T])
(f: SchemaRDD => Unit): Unit = {
withParquetFile(data)(path => f(parquetFile(path)))
}

/**
* Drops temporary table `tableName` after calling `f`.
*/
protected def withTempTable(tableName: String)(f: => Unit): Unit = {
try f finally dropTempTable(tableName)
}

/**
* Writes `data` to a Parquet file, reads it back as a SchemaRDD and registers it as a temporary
* table named `tableName`, then call `f`.
*/
protected def withParquetTable[T <: Product: ClassTag: TypeTag]
(data: Seq[T], tableName: String)
(f: => Unit): Unit = {
withParquetRDD(data) { rdd =>
rdd.registerTempTable(tableName)
withTempTable(tableName)(f)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.parquet

import parquet.filter2.predicate.Operators._
import parquet.filter2.predicate.{FilterPredicate, Operators}

import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions.{Literal, Predicate, Row}
import org.apache.spark.sql.{QueryTest, SQLConf, SchemaRDD}

/**
* A test suite that tests Parquet filter2 API based filter pushdown optimization.
*
* Notice that `!(a cmp b)` are always transformed to its negated form `a cmp' b` by the
* `BooleanSimplification` optimization rule whenever possible. As a result, predicate `!(a < 1)`
* results a `GtEq` filter predicate rather than a `Not`.
*
* @todo Add test cases for `IsNull` and `IsNotNull` after merging PR #3367
*/
class ParquetFilterSuite extends QueryTest with ParquetTest {
private def checkFilterPushdown(
rdd: SchemaRDD,
output: Seq[Symbol],
predicate: Predicate,
filterClass: Class[_ <: FilterPredicate],
checker: (SchemaRDD, Any) => Unit,
expectedResult: => Any): Unit = {
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED -> "true") {
val query = rdd.select(output.map(_.attr): _*).where(predicate)

val maybeAnalyzedPredicate = query.queryExecution.executedPlan.collect {
case plan: ParquetTableScan => plan.columnPruningPred
}.flatten.reduceOption(_ && _)

assert(maybeAnalyzedPredicate.isDefined)
maybeAnalyzedPredicate.foreach { pred =>
val maybeFilter = ParquetFilters.createFilter(pred)
assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred")
maybeFilter.foreach(f => assert(f.getClass === filterClass))
}

checker(query, expectedResult)
}
}

private def checkFilterPushdown
(rdd: SchemaRDD, output: Symbol*)
(predicate: Predicate, filterClass: Class[_ <: FilterPredicate])
(expectedResult: => Any): Unit = {
checkFilterPushdown(rdd, output, predicate, filterClass, checkAnswer _, expectedResult)
}

def checkBinaryFilterPushdown
(rdd: SchemaRDD, output: Symbol*)
(predicate: Predicate, filterClass: Class[_ <: FilterPredicate])
(expectedResult: => Any): Unit = {
def checkBinaryAnswer(rdd: SchemaRDD, result: Any): Unit = {
val actual = rdd.map(_.getAs[Array[Byte]](0).mkString(",")).collect().toSeq
val expected = result match {
case s: Seq[_] => s.map(_.asInstanceOf[Row].getAs[Array[Byte]](0).mkString(","))
case s => Seq(s.asInstanceOf[Array[Byte]].mkString(","))
}
assert(actual.sameElements(expected))
}
checkFilterPushdown(rdd, output, predicate, filterClass, checkBinaryAnswer _, expectedResult)
}

test("filter pushdown - boolean") {
withParquetRDD((true :: false :: Nil).map(Tuple1.apply)) { rdd =>
checkFilterPushdown(rdd, '_1)('_1 === true, classOf[Eq[java.lang.Boolean]])(true)
checkFilterPushdown(rdd, '_1)('_1 !== true, classOf[Operators.Not])(false)
}
}

test("filter pushdown - integer") {
withParquetRDD((1 to 4).map(Tuple1.apply)) { rdd =>
checkFilterPushdown(rdd, '_1)('_1 === 1, classOf[Eq[Integer]])(1)
checkFilterPushdown(rdd, '_1)('_1 !== 1, classOf[Operators.Not]) {
(2 to 4).map(Row.apply(_))
}

checkFilterPushdown(rdd, '_1)('_1 < 2, classOf[Lt [Integer]])(1)
checkFilterPushdown(rdd, '_1)('_1 > 3, classOf[Gt [Integer]])(4)
checkFilterPushdown(rdd, '_1)('_1 <= 1, classOf[LtEq[Integer]])(1)
checkFilterPushdown(rdd, '_1)('_1 >= 4, classOf[GtEq[Integer]])(4)

checkFilterPushdown(rdd, '_1)(Literal(1) === '_1, classOf[Eq [Integer]])(1)
checkFilterPushdown(rdd, '_1)(Literal(2) > '_1, classOf[Lt [Integer]])(1)
checkFilterPushdown(rdd, '_1)(Literal(3) < '_1, classOf[Gt [Integer]])(4)
checkFilterPushdown(rdd, '_1)(Literal(1) >= '_1, classOf[LtEq[Integer]])(1)
checkFilterPushdown(rdd, '_1)(Literal(4) <= '_1, classOf[GtEq[Integer]])(4)

checkFilterPushdown(rdd, '_1)(!('_1 < 4), classOf[Operators.GtEq[Integer]])(4)
checkFilterPushdown(rdd, '_1)('_1 > 2 && '_1 < 4, classOf[Operators.And])(3)
checkFilterPushdown(rdd, '_1)('_1 < 2 || '_1 > 3, classOf[Operators.Or]) {
Seq(Row(1), Row(4))
}
}
}

test("filter pushdown - long") {
withParquetRDD((1 to 4).map(i => Tuple1.apply(i.toLong))) { rdd =>
checkFilterPushdown(rdd, '_1)('_1 === 1, classOf[Eq[java.lang.Long]])(1)
checkFilterPushdown(rdd, '_1)('_1 !== 1, classOf[Operators.Not]) {
(2 to 4).map(Row.apply(_))
}

checkFilterPushdown(rdd, '_1)('_1 < 2, classOf[Lt [java.lang.Long]])(1)
checkFilterPushdown(rdd, '_1)('_1 > 3, classOf[Gt [java.lang.Long]])(4)
checkFilterPushdown(rdd, '_1)('_1 <= 1, classOf[LtEq[java.lang.Long]])(1)
checkFilterPushdown(rdd, '_1)('_1 >= 4, classOf[GtEq[java.lang.Long]])(4)

checkFilterPushdown(rdd, '_1)(Literal(1) === '_1, classOf[Eq [Integer]])(1)
checkFilterPushdown(rdd, '_1)(Literal(2) > '_1, classOf[Lt [java.lang.Long]])(1)
checkFilterPushdown(rdd, '_1)(Literal(3) < '_1, classOf[Gt [java.lang.Long]])(4)
checkFilterPushdown(rdd, '_1)(Literal(1) >= '_1, classOf[LtEq[java.lang.Long]])(1)
checkFilterPushdown(rdd, '_1)(Literal(4) <= '_1, classOf[GtEq[java.lang.Long]])(4)

checkFilterPushdown(rdd, '_1)(!('_1 < 4), classOf[Operators.GtEq[java.lang.Long]])(4)
checkFilterPushdown(rdd, '_1)('_1 > 2 && '_1 < 4, classOf[Operators.And])(3)
checkFilterPushdown(rdd, '_1)('_1 < 2 || '_1 > 3, classOf[Operators.Or]) {
Seq(Row(1), Row(4))
}
}
}

test("filter pushdown - float") {
withParquetRDD((1 to 4).map(i => Tuple1.apply(i.toFloat))) { rdd =>
checkFilterPushdown(rdd, '_1)('_1 === 1, classOf[Eq[java.lang.Float]])(1)
checkFilterPushdown(rdd, '_1)('_1 !== 1, classOf[Operators.Not]) {
(2 to 4).map(Row.apply(_))
}

checkFilterPushdown(rdd, '_1)('_1 < 2, classOf[Lt [java.lang.Float]])(1)
checkFilterPushdown(rdd, '_1)('_1 > 3, classOf[Gt [java.lang.Float]])(4)
checkFilterPushdown(rdd, '_1)('_1 <= 1, classOf[LtEq[java.lang.Float]])(1)
checkFilterPushdown(rdd, '_1)('_1 >= 4, classOf[GtEq[java.lang.Float]])(4)

checkFilterPushdown(rdd, '_1)(Literal(1) === '_1, classOf[Eq [Integer]])(1)
checkFilterPushdown(rdd, '_1)(Literal(2) > '_1, classOf[Lt [java.lang.Float]])(1)
checkFilterPushdown(rdd, '_1)(Literal(3) < '_1, classOf[Gt [java.lang.Float]])(4)
checkFilterPushdown(rdd, '_1)(Literal(1) >= '_1, classOf[LtEq[java.lang.Float]])(1)
checkFilterPushdown(rdd, '_1)(Literal(4) <= '_1, classOf[GtEq[java.lang.Float]])(4)

checkFilterPushdown(rdd, '_1)(!('_1 < 4), classOf[Operators.GtEq[java.lang.Float]])(4)
checkFilterPushdown(rdd, '_1)('_1 > 2 && '_1 < 4, classOf[Operators.And])(3)
checkFilterPushdown(rdd, '_1)('_1 < 2 || '_1 > 3, classOf[Operators.Or]) {
Seq(Row(1), Row(4))
}
}
}

test("filter pushdown - double") {
withParquetRDD((1 to 4).map(i => Tuple1.apply(i.toDouble))) { rdd =>
checkFilterPushdown(rdd, '_1)('_1 === 1, classOf[Eq[java.lang.Double]])(1)
checkFilterPushdown(rdd, '_1)('_1 !== 1, classOf[Operators.Not]) {
(2 to 4).map(Row.apply(_))
}

checkFilterPushdown(rdd, '_1)('_1 < 2, classOf[Lt [java.lang.Double]])(1)
checkFilterPushdown(rdd, '_1)('_1 > 3, classOf[Gt [java.lang.Double]])(4)
checkFilterPushdown(rdd, '_1)('_1 <= 1, classOf[LtEq[java.lang.Double]])(1)
checkFilterPushdown(rdd, '_1)('_1 >= 4, classOf[GtEq[java.lang.Double]])(4)

checkFilterPushdown(rdd, '_1)(Literal(1) === '_1, classOf[Eq[Integer]])(1)
checkFilterPushdown(rdd, '_1)(Literal(2) > '_1, classOf[Lt [java.lang.Double]])(1)
checkFilterPushdown(rdd, '_1)(Literal(3) < '_1, classOf[Gt [java.lang.Double]])(4)
checkFilterPushdown(rdd, '_1)(Literal(1) >= '_1, classOf[LtEq[java.lang.Double]])(1)
checkFilterPushdown(rdd, '_1)(Literal(4) <= '_1, classOf[GtEq[java.lang.Double]])(4)

checkFilterPushdown(rdd, '_1)(!('_1 < 4), classOf[Operators.GtEq[java.lang.Double]])(4)
checkFilterPushdown(rdd, '_1)('_1 > 2 && '_1 < 4, classOf[Operators.And])(3)
checkFilterPushdown(rdd, '_1)('_1 < 2 || '_1 > 3, classOf[Operators.Or]) {
Seq(Row(1), Row(4))
}
}
}

test("filter pushdown - string") {
withParquetRDD((1 to 4).map(i => Tuple1.apply(i.toString))) { rdd =>
checkFilterPushdown(rdd, '_1)('_1 === "1", classOf[Eq[String]])("1")
checkFilterPushdown(rdd, '_1)('_1 !== "1", classOf[Operators.Not]) {
(2 to 4).map(i => Row.apply(i.toString))
}

checkFilterPushdown(rdd, '_1)('_1 < "2", classOf[Lt [java.lang.String]])("1")
checkFilterPushdown(rdd, '_1)('_1 > "3", classOf[Gt [java.lang.String]])("4")
checkFilterPushdown(rdd, '_1)('_1 <= "1", classOf[LtEq[java.lang.String]])("1")
checkFilterPushdown(rdd, '_1)('_1 >= "4", classOf[GtEq[java.lang.String]])("4")

checkFilterPushdown(rdd, '_1)(Literal("1") === '_1, classOf[Eq [java.lang.String]])("1")
checkFilterPushdown(rdd, '_1)(Literal("2") > '_1, classOf[Lt [java.lang.String]])("1")
checkFilterPushdown(rdd, '_1)(Literal("3") < '_1, classOf[Gt [java.lang.String]])("4")
checkFilterPushdown(rdd, '_1)(Literal("1") >= '_1, classOf[LtEq[java.lang.String]])("1")
checkFilterPushdown(rdd, '_1)(Literal("4") <= '_1, classOf[GtEq[java.lang.String]])("4")

checkFilterPushdown(rdd, '_1)(!('_1 < "4"), classOf[Operators.GtEq[java.lang.String]])("4")
checkFilterPushdown(rdd, '_1)('_1 > "2" && '_1 < "4", classOf[Operators.And])("3")
checkFilterPushdown(rdd, '_1)('_1 < "2" || '_1 > "3", classOf[Operators.Or]) {
Seq(Row("1"), Row("4"))
}
}
}

test("filter pushdown - binary") {
implicit class IntToBinary(int: Int) {
def b: Array[Byte] = int.toString.getBytes("UTF-8")
}

withParquetRDD((1 to 4).map(i => Tuple1.apply(i.b))) { rdd =>
checkBinaryFilterPushdown(rdd, '_1)('_1 === 1.b, classOf[Eq[Array[Byte]]])(1.b)
checkBinaryFilterPushdown(rdd, '_1)('_1 !== 1.b, classOf[Operators.Not]) {
(2 to 4).map(i => Row.apply(i.b)).toSeq
}

checkBinaryFilterPushdown(rdd, '_1)('_1 < 2.b, classOf[Lt [Array[Byte]]])(1.b)
checkBinaryFilterPushdown(rdd, '_1)('_1 > 3.b, classOf[Gt [Array[Byte]]])(4.b)
checkBinaryFilterPushdown(rdd, '_1)('_1 <= 1.b, classOf[LtEq[Array[Byte]]])(1.b)
checkBinaryFilterPushdown(rdd, '_1)('_1 >= 4.b, classOf[GtEq[Array[Byte]]])(4.b)

checkBinaryFilterPushdown(rdd, '_1)(Literal(1.b) === '_1, classOf[Eq [Array[Byte]]])(1.b)
checkBinaryFilterPushdown(rdd, '_1)(Literal(2.b) > '_1, classOf[Lt [Array[Byte]]])(1.b)
checkBinaryFilterPushdown(rdd, '_1)(Literal(3.b) < '_1, classOf[Gt [Array[Byte]]])(4.b)
checkBinaryFilterPushdown(rdd, '_1)(Literal(1.b) >= '_1, classOf[LtEq[Array[Byte]]])(1.b)
checkBinaryFilterPushdown(rdd, '_1)(Literal(4.b) <= '_1, classOf[GtEq[Array[Byte]]])(4.b)

checkBinaryFilterPushdown(rdd, '_1)(!('_1 < 4.b), classOf[Operators.GtEq[Array[Byte]]])(4.b)
checkBinaryFilterPushdown(rdd, '_1)('_1 > 2.b && '_1 < 4.b, classOf[Operators.And])(3.b)
checkBinaryFilterPushdown(rdd, '_1)('_1 < 2.b || '_1 > 3.b, classOf[Operators.Or]) {
Seq(Row(1.b), Row(4.b))
}
}
}
}
Loading

0 comments on commit 7f07af0

Please sign in to comment.