Skip to content

Commit

Permalink
More tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
yhuai committed Feb 26, 2015
1 parent 50b6d0f commit ba543cd
Showing 1 changed file with 68 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,10 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
sql(s"ALTER TABLE partitioned_parquet_with_key ADD PARTITION (p=$p)")
}

val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
jsonRDD(rdd).registerTempTable("jt")
val rdd1 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
jsonRDD(rdd1).registerTempTable("jt")
val rdd2 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":[$i, null]}"""))
jsonRDD(rdd2).registerTempTable("jt_array")

setConf("spark.sql.hive.convertMetastoreParquet", "true")
}
Expand All @@ -104,6 +106,7 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
sql("DROP TABLE partitioned_parquet_with_key")
sql("DROP TABLE normal_parquet")
sql("DROP TABLE IF EXISTS jt")
sql("DROP TABLE IF EXISTS jt_array")
setConf("spark.sql.hive.convertMetastoreParquet", "false")
}

Expand Down Expand Up @@ -239,7 +242,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
sql("DROP TABLE IF EXISTS test_parquet_ctas")
}

test("MetastoreRelation in insert into will be converted") {
test("MetastoreRelation in InsertIntoTable will be converted") {
sql(
"""
|create table test_insert_parquet
Expand Down Expand Up @@ -270,6 +273,38 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {

sql("DROP TABLE IF EXISTS test_insert_parquet")
}

test("MetastoreRelation in InsertIntoHiveTable will be converted") {
sql(
"""
|create table test_insert_parquet
|(
| int_array array<int>
|)
|ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
|STORED AS
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
""".stripMargin)

val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array")
df.queryExecution.executedPlan match {
case ExecutedCommand(
InsertIntoDataSource(
LogicalRelation(r: ParquetRelation2), query, overwrite)) => // OK
case o => fail("test_insert_parquet should be converted to a " +
s"${classOf[ParquetRelation2].getCanonicalName} and " +
s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan." +
s"However, found a ${o.toString} ")
}

checkAnswer(
sql("SELECT int_array FROM test_insert_parquet"),
sql("SELECT a FROM jt_array").collect()
)

sql("DROP TABLE IF EXISTS test_insert_parquet")
}
}

class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase {
Expand All @@ -285,7 +320,7 @@ class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase {
setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
}

test("MetastoreRelation in insert into will not be converted") {
test("MetastoreRelation in InsertIntoTable will not be converted") {
sql(
"""
|create table test_insert_parquet
Expand All @@ -312,6 +347,35 @@ class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase {

sql("DROP TABLE IF EXISTS test_insert_parquet")
}

// TODO: enable it after the fix of SPARK-5950.
ignore("MetastoreRelation in InsertIntoHiveTable will not be converted") {
sql(
"""
|create table test_insert_parquet
|(
| int_array array<int>
|)
|ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
|STORED AS
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
""".stripMargin)

val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array")
df.queryExecution.executedPlan match {
case insert: InsertIntoHiveTable => // OK
case o => fail(s"The SparkPlan should be ${classOf[InsertIntoHiveTable].getCanonicalName}. " +
s"However, found ${o.toString}.")
}

checkAnswer(
sql("SELECT int_array FROM test_insert_parquet"),
sql("SELECT a FROM jt_array").collect()
)

sql("DROP TABLE IF EXISTS test_insert_parquet")
}
}

/**
Expand Down

0 comments on commit ba543cd

Please sign in to comment.