Skip to content

Commit

Permalink
Fixed conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
sarutak committed Nov 18, 2014
1 parent ef7c464 commit 457c25d
Show file tree
Hide file tree
Showing 2 changed files with 208 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.spark.sql.parquet

import java.io.File
import java.sql.{Date, Timestamp}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.sql.test.TestSQLContext

Expand Down Expand Up @@ -101,6 +101,17 @@ private[sql] object ParquetTestData {
}
"""

val testFilterSchema2 =
"""
message myrecord {
required int32 mybyte;
required int32 myshort;
required binary mydate (UTF8);
required binary mytimestamp (UTF8);
required int64 mydecimal;
}
"""

// field names for test assertion error messages
val subTestSchemaFieldNames = Seq(
"myboolean:Boolean",
Expand All @@ -109,6 +120,7 @@ private[sql] object ParquetTestData {

val testDir = Utils.createTempDir()
val testFilterDir = Utils.createTempDir()
val testFilterDir2 = Utils.createTempDir()

lazy val testData = new ParquetRelation(testDir.toURI.toString, None, TestSQLContext)

Expand Down Expand Up @@ -273,7 +285,31 @@ private[sql] object ParquetTestData {
record.add(10, i.toFloat + 0.5f)
record.add(11, i.toDouble + 0.5d)
}


writer.write(record)
}
writer.close()
}

def writeFilterFile2(records: Int = 200) = {
// for microbenchmark use: records = 300000000
testFilterDir2.delete
val path: Path = new Path(new Path(testFilterDir2.toURI), new Path("part-r-0.parquet"))
val schema: MessageType = MessageTypeParser.parseMessageType(testFilterSchema2)
val writeSupport = new TestGroupWriteSupport(schema)
val writer = new ParquetWriter[Group](path, writeSupport)

var nextDate = Date.valueOf("2014-11-23")
var nextTimestamp = Timestamp.valueOf("2014-11-23 00:04:00")
for(i <- 0 to records) {
val record = new SimpleGroup(schema)
record.add(0, i.toByte)
record.add(1, i.toShort)
record.add(2, nextDate.toString)
record.add(3, nextTimestamp.toString)
record.add(4, BigDecimal(i).toLong)
nextDate = new Date(nextDate.getTime + 24 * 60 * 60 * 1000)
nextTimestamp = new Timestamp(nextTimestamp.getTime + 100 * 1000)
writer.write(record)
}
writer.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,20 @@

package org.apache.spark.sql.parquet

import java.sql.{Date, Timestamp}

import _root_.parquet.filter2.predicate.{FilterPredicate, Operators}

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapreduce.Job
import org.scalatest.{BeforeAndAfterAll, FunSuiteLike}
import parquet.hadoop.ParquetFileWriter
import parquet.hadoop.util.ContextUtil
import org.apache.spark.sql._
import org.apache.spark.sql.execution.{Filter, Project}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.types.IntegerType
import org.apache.spark.sql.catalyst.types.decimal.Decimal
import org.apache.spark.sql.catalyst.util.getTempFilePath
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.test.TestSQLContext._
Expand Down Expand Up @@ -83,10 +89,12 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
TestData // Load test data tables.

var testRDD: SchemaRDD = null
var testFilterRDD2: SchemaRDD = null

override def beforeAll() {
ParquetTestData.writeFile()
ParquetTestData.writeFilterFile()
ParquetTestData.writeFilterFile2()
ParquetTestData.writeNestedFile1()
ParquetTestData.writeNestedFile2()
ParquetTestData.writeNestedFile3()
Expand All @@ -95,7 +103,8 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
testRDD.registerTempTable("testsource")
parquetFile(ParquetTestData.testFilterDir.toString)
.registerTempTable("testfiltersource")

testFilterRDD2 = parquetFile(ParquetTestData.testFilterDir2.toString)
testFilterRDD2.registerTempTable("testfiltersource2")
setConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED, "true")
}

Expand Down Expand Up @@ -661,6 +670,165 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
assert(result15(0).getString(2) == "0")
assert(result15(46).getString(2) == "100")
assert(result15(146).getString(2) == "200")

// In this case, Literal will be casted to IntegerType because
// the type of mybyte is Int32 (Parquet does not really support byte type).
// So it can be applied Parquet's filter for IntegerType.
val query16 = testFilterRDD2
.where('mybyte < Literal(80.asInstanceOf[Byte], ByteType) &&
'mybyte >= Literal(40.asInstanceOf[Byte], ByteType))
.select('mybyte)
assert(
query16.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
"Top operator should be ParquetTableScan after pushdown")
val result16 = query16.collect()
assert(result16.size === 40)
assert(result16(0)(0) === 40)
assert(result16(39)(0) === 79)

// In this case, Pushdown filter is not applied because Parquet does not support ByteType.
val query17 = testFilterRDD2
.select(('mybyte cast ByteType) as 'mb)
.where((('mb cast ByteType) as 'm) < Literal(80.asInstanceOf[Byte], ByteType) &&
(('mb cast ByteType) as 'm) >= Literal(40.asInstanceOf[Byte], ByteType))

assert(
query17.queryExecution.executedPlan(0)(0).isInstanceOf[Project],
"The operator should be Project")
assert(
query17.queryExecution.executedPlan(0)(1).isInstanceOf[Filter],
"The Second operator from top should be Filter")
val result17 = query16.collect()
assert(result17.size === 40)
assert(result17(0)(0) === 40)
assert(result17(39)(0) === 79)

// In this case, Literal will be casted to IntegerType because
// the type of mybyte is Int32 (Parquet does not really support byte type).
// So it can be applied Parquet's filter for IntegerType.
val query18 = testFilterRDD2
.where('mybyte > Literal(80.asInstanceOf[Byte], ByteType) &&
'mybyte <= Literal(120.asInstanceOf[Byte], ByteType))
.select('mybyte)
assert(
query18.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
"Top operator should be ParquetTableScan after pushdown")
val result18 = query18.collect()
assert(result18.size === 40)
assert(result18(0)(0) === 81)
assert(result18(39)(0) === 120)

// In this case, Pushdown filter is not applied because Parquet does not support ByteType.
val query19 = testFilterRDD2
.select(('mybyte cast ByteType) as 'mb)
.where((('mb cast ByteType) as 'm) > Literal(80.asInstanceOf[Byte], ByteType) &&
(('mb cast ByteType) as 'm) <= Literal(120.asInstanceOf[Byte], ByteType))

assert(
query19.queryExecution.executedPlan(0)(0).isInstanceOf[Project],
"Top operator should be Project")
assert(
query19.queryExecution.executedPlan(0)(1).isInstanceOf[Filter],
"The Second operator from top should be Filter")
val result19 = query19.collect()
assert(result19.size === 40)
assert(result19(0)(0) === 81)
assert(result19(39)(0) === 120)

// In this case, Literal will be casted to IntegerType because
// the type of myshort is Int32 (Parquet does not really support short type).
// So it can be applied Parquet's filter for IntegerType.
val query20 = testFilterRDD2
.where('myshort < Literal(150.asInstanceOf[Short], ShortType) &&
'myshort >= Literal(100.asInstanceOf[Short], ShortType))
.select('myshort)
assert(
query20.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
"Top operator should be ParquetTableScan after pushdown")
val result20 = query20.collect()
assert(result20.size === 50)
assert(result20(0)(0) === 100)
assert(result20(49)(0) === 149)

// In this case, Pushdown filter is not applied because Parquet does not support ShortType.
val query21 = testFilterRDD2
.select(('myshort cast ShortType) as 'ms)
.where(('ms as 'm) < Literal(150.asInstanceOf[Short], ShortType) &&
('ms as 'm) >= Literal(100.asInstanceOf[Short], ShortType))
assert(
query21.queryExecution.executedPlan(0)(1).isInstanceOf[Filter],
"The Second operator from top should be Filter")
val result21 = query21.collect()
assert(result21.size === 50)
assert(result21(0)(0) === 100)
assert(result21(49)(0) === 149)

// In this case, Literal will be casted to IntegerType because
// the type of myshort is Int32 (Parquet does not really support short type).
// So it can be applied Parquet's filter for IntegerType.
val query22 = testFilterRDD2
.where('myshort > Literal(150.asInstanceOf[Short], ShortType) &&
'myshort <= Literal(200.asInstanceOf[Short], ShortType))
.select('myshort)
assert(
query22.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
"Top operator should be ParquetTableScan after pushdown")
val result22 = query22.collect()
assert(result22.size === 50)
assert(result22(0)(0) === 151)
assert(result22(49)(0) === 200)

// In this case, Pushdown filter is not applied because Parquet does not support ShortType.
val query23 = testFilterRDD2
.select(('myshort cast ShortType) as 'ms)
.where((('ms cast ShortType) as 'm) > Literal(150.asInstanceOf[Short], ShortType) &&
(('ms cast ShortType) as 'm) <= Literal(200.asInstanceOf[Short], ShortType))
assert(
query23.queryExecution.executedPlan(0)(1).isInstanceOf[Filter],
"Top operator should be ParquetTableScan after pushdown")
val result23 = query23.collect()
assert(result23.size === 50)
assert(result23(0)(0) === 151)
assert(result23(49)(0) === 200)

// Currently, we cannot apply Pushdown filter to Date type because it is not supported by Parquet
val query24 = testFilterRDD2
.select(('mydate cast DateType) as 'md)
.where((('md cast DateType) as 'm) >= Literal(Date.valueOf("2014-11-23"), DateType)
&& (('md cast DateType) as 'm) < Literal(Date.valueOf("2015-02-08"), DateType))
assert(
query24.queryExecution.executedPlan(0)(1).isInstanceOf[Filter],
"Top operator should be Filter. Pushdown is not supported for Date type")
val result24 = query24.collect()
assert(result24.size === 77)
assert(result24(0)(0) === Date.valueOf("2014-11-23"))
assert(result24(76)(0) === Date.valueOf("2015-02-07"))

// Currently, we cannot apply Pushdown filter to Timestamp type because it is not supported by Parquet
val query25 = testFilterRDD2
.select(('mytimestamp cast TimestampType) as 'mt)
.where((('mt cast TimestampType) as 'm) >= Literal(Timestamp.valueOf("2014-11-23 00:04:00"), TimestampType)
&& (('mt cast TimestampType) as 'm) < Literal(Timestamp.valueOf("2014-11-23 05:20:00"), TimestampType))
assert(
query25.queryExecution.executedPlan(0)(1).isInstanceOf[Filter],
"Top operator should be Filter. Pushdown is not supported for Date type")
val result25 = query25.collect()
assert(result25.size === 190)
assert(result25(0)(0) === Timestamp.valueOf("2014-11-23 00:04:00"))
assert(result25(189)(0) === Timestamp.valueOf("2014-11-23 05:19:00"))

// In this case, Pushdown filter is not applied because Parquet does not support DecimalType.
val query26 = testFilterRDD2
.select(('mydecimal cast DecimalType.Unlimited) as 'md)
.where(('md as 'm) < Literal(Decimal(150), DecimalType.Unlimited) &&
('md as 'm) >= Literal(Decimal(100), DecimalType.Unlimited))
assert(
query26.queryExecution.executedPlan(0)(1).isInstanceOf[Filter],
"The Second operator from top should be Filter")
val result26 = query26.collect()
assert(result26.size === 50)
assert(result26(0)(0) === 100)
assert(result26(49)(0) === 149)
}

test("SPARK-1913 regression: columns only referenced by pushed down filters should remain") {
Expand Down

0 comments on commit 457c25d

Please sign in to comment.