Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4453][SPARK-4213][SQL] Additional test cases for Pushdown Filter for Parquet #3333

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.spark.sql.parquet

import java.io.File
import java.sql.{Date, Timestamp}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.sql.test.TestSQLContext

Expand Down Expand Up @@ -101,6 +101,17 @@ private[sql] object ParquetTestData {
}
"""

val testFilterSchema2 =
"""
message myrecord {
required int32 mybyte;
required int32 myshort;
required binary mydate (UTF8);
required binary mytimestamp (UTF8);
required int64 mydecimal;
}
"""

// field names for test assertion error messages
val subTestSchemaFieldNames = Seq(
"myboolean:Boolean",
Expand All @@ -109,6 +120,7 @@ private[sql] object ParquetTestData {

val testDir = Utils.createTempDir()
val testFilterDir = Utils.createTempDir()
val testFilterDir2 = Utils.createTempDir()

lazy val testData = new ParquetRelation(testDir.toURI.toString, None, TestSQLContext)

Expand Down Expand Up @@ -273,7 +285,31 @@ private[sql] object ParquetTestData {
record.add(10, i.toFloat + 0.5f)
record.add(11, i.toDouble + 0.5d)
}


writer.write(record)
}
writer.close()
}

def writeFilterFile2(records: Int = 200) = {
// for microbenchmark use: records = 300000000
testFilterDir2.delete
val path: Path = new Path(new Path(testFilterDir2.toURI), new Path("part-r-0.parquet"))
val schema: MessageType = MessageTypeParser.parseMessageType(testFilterSchema2)
val writeSupport = new TestGroupWriteSupport(schema)
val writer = new ParquetWriter[Group](path, writeSupport)

var nextDate = Date.valueOf("2014-11-23")
var nextTimestamp = Timestamp.valueOf("2014-11-23 00:04:00")
for(i <- 0 to records) {
val record = new SimpleGroup(schema)
record.add(0, i.toByte)
record.add(1, i.toShort)
record.add(2, nextDate.toString)
record.add(3, nextTimestamp.toString)
record.add(4, BigDecimal(i).toLong)
nextDate = new Date(nextDate.getTime + 24 * 60 * 60 * 1000)
nextTimestamp = new Timestamp(nextTimestamp.getTime + 100 * 1000)
writer.write(record)
}
writer.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,21 @@

package org.apache.spark.sql.parquet

import java.sql.{Date, Timestamp}

import _root_.parquet.filter2.predicate.{FilterPredicate, Operators}

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapreduce.Job
import org.scalatest.{BeforeAndAfterAll, FunSuiteLike}
import parquet.hadoop.ParquetFileWriter
import parquet.hadoop.util.ContextUtil

import org.apache.spark.sql._
import org.apache.spark.sql.execution.{Filter, Project}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.types.IntegerType
import org.apache.spark.sql.catalyst.types.decimal.Decimal
import org.apache.spark.sql.catalyst.util.getTempFilePath
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.test.TestSQLContext._
Expand Down Expand Up @@ -85,10 +90,12 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
TestData // Load test data tables.

var testRDD: SchemaRDD = null
var testFilterRDD2: SchemaRDD = null

override def beforeAll() {
ParquetTestData.writeFile()
ParquetTestData.writeFilterFile()
ParquetTestData.writeFilterFile2()
ParquetTestData.writeNestedFile1()
ParquetTestData.writeNestedFile2()
ParquetTestData.writeNestedFile3()
Expand All @@ -97,7 +104,8 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
testRDD.registerTempTable("testsource")
parquetFile(ParquetTestData.testFilterDir.toString)
.registerTempTable("testfiltersource")

testFilterRDD2 = parquetFile(ParquetTestData.testFilterDir2.toString)
testFilterRDD2.registerTempTable("testfiltersource2")
setConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED, "true")
}

Expand Down Expand Up @@ -655,6 +663,165 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
assert(result15(0).getString(2) == "0")
assert(result15(46).getString(2) == "100")
assert(result15(146).getString(2) == "200")

// In this case, Literal will be casted to IntegerType because
// the type of mybyte is Int32 (Parquet does not really support byte type).
// So it can be applied Parquet's filter for IntegerType.
val query16 = testFilterRDD2
.where('mybyte < Literal(80.asInstanceOf[Byte], ByteType) &&
'mybyte >= Literal(40.asInstanceOf[Byte], ByteType))
.select('mybyte)
assert(
query16.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
"Top operator should be ParquetTableScan after pushdown")
val result16 = query16.collect()
assert(result16.size === 40)
assert(result16(0)(0) === 40)
assert(result16(39)(0) === 79)

// In this case, Pushdown filter is not applied because Parquet does not support ByteType.
val query17 = testFilterRDD2
.select(('mybyte cast ByteType) as 'mb)
.where((('mb cast ByteType) as 'm) < Literal(80.asInstanceOf[Byte], ByteType) &&
(('mb cast ByteType) as 'm) >= Literal(40.asInstanceOf[Byte], ByteType))

assert(
query17.queryExecution.executedPlan(0)(0).isInstanceOf[Project],
"The operator should be Project")
assert(
query17.queryExecution.executedPlan(0)(1).isInstanceOf[Filter],
"The Second operator from top should be Filter")
val result17 = query16.collect()
assert(result17.size === 40)
assert(result17(0)(0) === 40)
assert(result17(39)(0) === 79)

// In this case, Literal will be casted to IntegerType because
// the type of mybyte is Int32 (Parquet does not really support byte type).
// So it can be applied Parquet's filter for IntegerType.
val query18 = testFilterRDD2
.where('mybyte > Literal(80.asInstanceOf[Byte], ByteType) &&
'mybyte <= Literal(120.asInstanceOf[Byte], ByteType))
.select('mybyte)
assert(
query18.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
"Top operator should be ParquetTableScan after pushdown")
val result18 = query18.collect()
assert(result18.size === 40)
assert(result18(0)(0) === 81)
assert(result18(39)(0) === 120)

// In this case, Pushdown filter is not applied because Parquet does not support ByteType.
val query19 = testFilterRDD2
.select(('mybyte cast ByteType) as 'mb)
.where((('mb cast ByteType) as 'm) > Literal(80.asInstanceOf[Byte], ByteType) &&
(('mb cast ByteType) as 'm) <= Literal(120.asInstanceOf[Byte], ByteType))

assert(
query19.queryExecution.executedPlan(0)(0).isInstanceOf[Project],
"Top operator should be Project")
assert(
query19.queryExecution.executedPlan(0)(1).isInstanceOf[Filter],
"The Second operator from top should be Filter")
val result19 = query19.collect()
assert(result19.size === 40)
assert(result19(0)(0) === 81)
assert(result19(39)(0) === 120)

// In this case, Literal will be casted to IntegerType because
// the type of myshort is Int32 (Parquet does not really support short type).
// So it can be applied Parquet's filter for IntegerType.
val query20 = testFilterRDD2
.where('myshort < Literal(150.asInstanceOf[Short], ShortType) &&
'myshort >= Literal(100.asInstanceOf[Short], ShortType))
.select('myshort)
assert(
query20.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
"Top operator should be ParquetTableScan after pushdown")
val result20 = query20.collect()
assert(result20.size === 50)
assert(result20(0)(0) === 100)
assert(result20(49)(0) === 149)

// In this case, Pushdown filter is not applied because Parquet does not support ShortType.
val query21 = testFilterRDD2
.select(('myshort cast ShortType) as 'ms)
.where(('ms as 'm) < Literal(150.asInstanceOf[Short], ShortType) &&
('ms as 'm) >= Literal(100.asInstanceOf[Short], ShortType))
assert(
query21.queryExecution.executedPlan(0)(1).isInstanceOf[Filter],
"The Second operator from top should be Filter")
val result21 = query21.collect()
assert(result21.size === 50)
assert(result21(0)(0) === 100)
assert(result21(49)(0) === 149)

// In this case, Literal will be casted to IntegerType because
// the type of myshort is Int32 (Parquet does not really support short type).
// So it can be applied Parquet's filter for IntegerType.
val query22 = testFilterRDD2
.where('myshort > Literal(150.asInstanceOf[Short], ShortType) &&
'myshort <= Literal(200.asInstanceOf[Short], ShortType))
.select('myshort)
assert(
query22.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
"Top operator should be ParquetTableScan after pushdown")
val result22 = query22.collect()
assert(result22.size === 50)
assert(result22(0)(0) === 151)
assert(result22(49)(0) === 200)

// In this case, Pushdown filter is not applied because Parquet does not support ShortType.
val query23 = testFilterRDD2
.select(('myshort cast ShortType) as 'ms)
.where((('ms cast ShortType) as 'm) > Literal(150.asInstanceOf[Short], ShortType) &&
(('ms cast ShortType) as 'm) <= Literal(200.asInstanceOf[Short], ShortType))
assert(
query23.queryExecution.executedPlan(0)(1).isInstanceOf[Filter],
"Top operator should be ParquetTableScan after pushdown")
val result23 = query23.collect()
assert(result23.size === 50)
assert(result23(0)(0) === 151)
assert(result23(49)(0) === 200)

// Currently, we cannot apply Pushdown filter to Date type because it is not supported by Parquet
val query24 = testFilterRDD2
.select(('mydate cast DateType) as 'md)
.where((('md cast DateType) as 'm) >= Literal(Date.valueOf("2014-11-23"), DateType)
&& (('md cast DateType) as 'm) < Literal(Date.valueOf("2015-02-08"), DateType))
assert(
query24.queryExecution.executedPlan(0)(1).isInstanceOf[Filter],
"Top operator should be Filter. Pushdown is not supported for Date type")
val result24 = query24.collect()
assert(result24.size === 77)
assert(result24(0)(0) === Date.valueOf("2014-11-23"))
assert(result24(76)(0) === Date.valueOf("2015-02-07"))

// Currently, we cannot apply Pushdown filter to Timestamp type because it is not supported by Parquet
val query25 = testFilterRDD2
.select(('mytimestamp cast TimestampType) as 'mt)
.where((('mt cast TimestampType) as 'm) >= Literal(Timestamp.valueOf("2014-11-23 00:04:00"), TimestampType)
&& (('mt cast TimestampType) as 'm) < Literal(Timestamp.valueOf("2014-11-23 05:20:00"), TimestampType))
assert(
query25.queryExecution.executedPlan(0)(1).isInstanceOf[Filter],
"Top operator should be Filter. Pushdown is not supported for Date type")
val result25 = query25.collect()
assert(result25.size === 190)
assert(result25(0)(0) === Timestamp.valueOf("2014-11-23 00:04:00"))
assert(result25(189)(0) === Timestamp.valueOf("2014-11-23 05:19:00"))

// In this case, Pushdown filter is not applied because Parquet does not support DecimalType.
val query26 = testFilterRDD2
.select(('mydecimal cast DecimalType.Unlimited) as 'md)
.where(('md as 'm) < Literal(Decimal(150), DecimalType.Unlimited) &&
('md as 'm) >= Literal(Decimal(100), DecimalType.Unlimited))
assert(
query26.queryExecution.executedPlan(0)(1).isInstanceOf[Filter],
"The Second operator from top should be Filter")
val result26 = query26.collect()
assert(result26.size === 50)
assert(result26(0)(0) === 100)
assert(result26(49)(0) === 149)
}

test("SPARK-1913 regression: columns only referenced by pushed down filters should remain") {
Expand Down