diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala index c0918a40d136f..3507f71f51fe9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala @@ -17,10 +17,10 @@ package org.apache.spark.sql.parquet -import java.io.File +import java.sql.{Date, Timestamp} import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.Job import org.apache.spark.sql.test.TestSQLContext @@ -101,6 +101,17 @@ private[sql] object ParquetTestData { } """ + val testFilterSchema2 = + """ + message myrecord { + required int32 mybyte; + required int32 myshort; + required binary mydate (UTF8); + required binary mytimestamp (UTF8); + required int64 mydecimal; + } + """ + // field names for test assertion error messages val subTestSchemaFieldNames = Seq( "myboolean:Boolean", @@ -109,6 +120,7 @@ private[sql] object ParquetTestData { val testDir = Utils.createTempDir() val testFilterDir = Utils.createTempDir() + val testFilterDir2 = Utils.createTempDir() lazy val testData = new ParquetRelation(testDir.toURI.toString, None, TestSQLContext) @@ -273,7 +285,31 @@ private[sql] object ParquetTestData { record.add(10, i.toFloat + 0.5f) record.add(11, i.toDouble + 0.5d) } - + + writer.write(record) + } + writer.close() + } + + def writeFilterFile2(records: Int = 200) = { + // for microbenchmark use: records = 300000000 + testFilterDir2.delete + val path: Path = new Path(new Path(testFilterDir2.toURI), new Path("part-r-0.parquet")) + val schema: MessageType = MessageTypeParser.parseMessageType(testFilterSchema2) + val writeSupport = new TestGroupWriteSupport(schema) + val writer = new ParquetWriter[Group](path, writeSupport) + + var nextDate = Date.valueOf("2014-11-23") + var nextTimestamp = Timestamp.valueOf("2014-11-23 00:04:00") + for(i <- 0 to records) { + val record = new SimpleGroup(schema) + record.add(0, i.toByte) + record.add(1, i.toShort) + record.add(2, nextDate.toString) + record.add(3, nextTimestamp.toString) + record.add(4, BigDecimal(i).toLong) + nextDate = new Date(nextDate.getTime + 24 * 60 * 60 * 1000) + nextTimestamp = new Timestamp(nextTimestamp.getTime + 100 * 1000) writer.write(record) } writer.close() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index 7ee4f3c1e93eb..b70fcb8ab41f5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -17,7 +17,10 @@ package org.apache.spark.sql.parquet +import java.sql.{Date, Timestamp} + import _root_.parquet.filter2.predicate.{FilterPredicate, Operators} + import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.mapreduce.Job import org.scalatest.{BeforeAndAfterAll, FunSuiteLike} @@ -25,8 +28,10 @@ import parquet.hadoop.ParquetFileWriter import parquet.hadoop.util.ContextUtil import org.apache.spark.sql._ +import org.apache.spark.sql.execution.{Filter, Project} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.types.IntegerType +import org.apache.spark.sql.catalyst.types.decimal.Decimal import org.apache.spark.sql.catalyst.util.getTempFilePath import org.apache.spark.sql.test.TestSQLContext import org.apache.spark.sql.test.TestSQLContext._ @@ -85,10 +90,12 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA TestData // Load test data tables. var testRDD: SchemaRDD = null + var testFilterRDD2: SchemaRDD = null override def beforeAll() { ParquetTestData.writeFile() ParquetTestData.writeFilterFile() + ParquetTestData.writeFilterFile2() ParquetTestData.writeNestedFile1() ParquetTestData.writeNestedFile2() ParquetTestData.writeNestedFile3() @@ -97,7 +104,8 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA testRDD.registerTempTable("testsource") parquetFile(ParquetTestData.testFilterDir.toString) .registerTempTable("testfiltersource") - + testFilterRDD2 = parquetFile(ParquetTestData.testFilterDir2.toString) + testFilterRDD2.registerTempTable("testfiltersource2") setConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED, "true") } @@ -655,6 +663,165 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA assert(result15(0).getString(2) == "0") assert(result15(46).getString(2) == "100") assert(result15(146).getString(2) == "200") + + // In this case, Literal will be casted to IntegerType because + // the type of mybyte is Int32 (Parquet does not really support byte type). + // So it can be applied Parquet's filter for IntegerType. + val query16 = testFilterRDD2 + .where('mybyte < Literal(80.asInstanceOf[Byte], ByteType) && + 'mybyte >= Literal(40.asInstanceOf[Byte], ByteType)) + .select('mybyte) + assert( + query16.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan], + "Top operator should be ParquetTableScan after pushdown") + val result16 = query16.collect() + assert(result16.size === 40) + assert(result16(0)(0) === 40) + assert(result16(39)(0) === 79) + + // In this case, Pushdown filter is not applied because Parquet does not support ByteType. + val query17 = testFilterRDD2 + .select(('mybyte cast ByteType) as 'mb) + .where((('mb cast ByteType) as 'm) < Literal(80.asInstanceOf[Byte], ByteType) && + (('mb cast ByteType) as 'm) >= Literal(40.asInstanceOf[Byte], ByteType)) + + assert( + query17.queryExecution.executedPlan(0)(0).isInstanceOf[Project], + "The operator should be Project") + assert( + query17.queryExecution.executedPlan(0)(1).isInstanceOf[Filter], + "The Second operator from top should be Filter") + val result17 = query16.collect() + assert(result17.size === 40) + assert(result17(0)(0) === 40) + assert(result17(39)(0) === 79) + + // In this case, Literal will be casted to IntegerType because + // the type of mybyte is Int32 (Parquet does not really support byte type). + // So it can be applied Parquet's filter for IntegerType. + val query18 = testFilterRDD2 + .where('mybyte > Literal(80.asInstanceOf[Byte], ByteType) && + 'mybyte <= Literal(120.asInstanceOf[Byte], ByteType)) + .select('mybyte) + assert( + query18.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan], + "Top operator should be ParquetTableScan after pushdown") + val result18 = query18.collect() + assert(result18.size === 40) + assert(result18(0)(0) === 81) + assert(result18(39)(0) === 120) + + // In this case, Pushdown filter is not applied because Parquet does not support ByteType. + val query19 = testFilterRDD2 + .select(('mybyte cast ByteType) as 'mb) + .where((('mb cast ByteType) as 'm) > Literal(80.asInstanceOf[Byte], ByteType) && + (('mb cast ByteType) as 'm) <= Literal(120.asInstanceOf[Byte], ByteType)) + + assert( + query19.queryExecution.executedPlan(0)(0).isInstanceOf[Project], + "Top operator should be Project") + assert( + query19.queryExecution.executedPlan(0)(1).isInstanceOf[Filter], + "The Second operator from top should be Filter") + val result19 = query19.collect() + assert(result19.size === 40) + assert(result19(0)(0) === 81) + assert(result19(39)(0) === 120) + + // In this case, Literal will be casted to IntegerType because + // the type of myshort is Int32 (Parquet does not really support short type). + // So it can be applied Parquet's filter for IntegerType. + val query20 = testFilterRDD2 + .where('myshort < Literal(150.asInstanceOf[Short], ShortType) && + 'myshort >= Literal(100.asInstanceOf[Short], ShortType)) + .select('myshort) + assert( + query20.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan], + "Top operator should be ParquetTableScan after pushdown") + val result20 = query20.collect() + assert(result20.size === 50) + assert(result20(0)(0) === 100) + assert(result20(49)(0) === 149) + + // In this case, Pushdown filter is not applied because Parquet does not support ShortType. + val query21 = testFilterRDD2 + .select(('myshort cast ShortType) as 'ms) + .where(('ms as 'm) < Literal(150.asInstanceOf[Short], ShortType) && + ('ms as 'm) >= Literal(100.asInstanceOf[Short], ShortType)) + assert( + query21.queryExecution.executedPlan(0)(1).isInstanceOf[Filter], + "The Second operator from top should be Filter") + val result21 = query21.collect() + assert(result21.size === 50) + assert(result21(0)(0) === 100) + assert(result21(49)(0) === 149) + + // In this case, Literal will be casted to IntegerType because + // the type of myshort is Int32 (Parquet does not really support short type). + // So it can be applied Parquet's filter for IntegerType. + val query22 = testFilterRDD2 + .where('myshort > Literal(150.asInstanceOf[Short], ShortType) && + 'myshort <= Literal(200.asInstanceOf[Short], ShortType)) + .select('myshort) + assert( + query22.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan], + "Top operator should be ParquetTableScan after pushdown") + val result22 = query22.collect() + assert(result22.size === 50) + assert(result22(0)(0) === 151) + assert(result22(49)(0) === 200) + + // In this case, Pushdown filter is not applied because Parquet does not support ShortType. + val query23 = testFilterRDD2 + .select(('myshort cast ShortType) as 'ms) + .where((('ms cast ShortType) as 'm) > Literal(150.asInstanceOf[Short], ShortType) && + (('ms cast ShortType) as 'm) <= Literal(200.asInstanceOf[Short], ShortType)) + assert( + query23.queryExecution.executedPlan(0)(1).isInstanceOf[Filter], + "Top operator should be ParquetTableScan after pushdown") + val result23 = query23.collect() + assert(result23.size === 50) + assert(result23(0)(0) === 151) + assert(result23(49)(0) === 200) + + // Currently, we cannot apply Pushdown filter to Date type because it is not supported by Parquet + val query24 = testFilterRDD2 + .select(('mydate cast DateType) as 'md) + .where((('md cast DateType) as 'm) >= Literal(Date.valueOf("2014-11-23"), DateType) + && (('md cast DateType) as 'm) < Literal(Date.valueOf("2015-02-08"), DateType)) + assert( + query24.queryExecution.executedPlan(0)(1).isInstanceOf[Filter], + "Top operator should be Filter. Pushdown is not supported for Date type") + val result24 = query24.collect() + assert(result24.size === 77) + assert(result24(0)(0) === Date.valueOf("2014-11-23")) + assert(result24(76)(0) === Date.valueOf("2015-02-07")) + + // Currently, we cannot apply Pushdown filter to Timestamp type because it is not supported by Parquet + val query25 = testFilterRDD2 + .select(('mytimestamp cast TimestampType) as 'mt) + .where((('mt cast TimestampType) as 'm) >= Literal(Timestamp.valueOf("2014-11-23 00:04:00"), TimestampType) + && (('mt cast TimestampType) as 'm) < Literal(Timestamp.valueOf("2014-11-23 05:20:00"), TimestampType)) + assert( + query25.queryExecution.executedPlan(0)(1).isInstanceOf[Filter], + "Top operator should be Filter. Pushdown is not supported for Date type") + val result25 = query25.collect() + assert(result25.size === 190) + assert(result25(0)(0) === Timestamp.valueOf("2014-11-23 00:04:00")) + assert(result25(189)(0) === Timestamp.valueOf("2014-11-23 05:19:00")) + + // In this case, Pushdown filter is not applied because Parquet does not support DecimalType. + val query26 = testFilterRDD2 + .select(('mydecimal cast DecimalType.Unlimited) as 'md) + .where(('md as 'm) < Literal(Decimal(150), DecimalType.Unlimited) && + ('md as 'm) >= Literal(Decimal(100), DecimalType.Unlimited)) + assert( + query26.queryExecution.executedPlan(0)(1).isInstanceOf[Filter], + "The Second operator from top should be Filter") + val result26 = query26.collect() + assert(result26.size === 50) + assert(result26(0)(0) === 100) + assert(result26(49)(0) === 149) } test("SPARK-1913 regression: columns only referenced by pushed down filters should remain") {