Skip to content

Commit

Permalink
Follow up of PR #1071 for Java API
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed Jun 14, 2014
1 parent ac96d96 commit 22aec97
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,13 @@ class JavaSQLContext(val sqlContext: SQLContext) {
/**
* Executes a query expressed in SQL, returning the result as a JavaSchemaRDD
*/
def sql(sqlQuery: String): JavaSchemaRDD = {
val result = new JavaSchemaRDD(sqlContext, sqlContext.parseSql(sqlQuery))
// We force query optimization to happen right away instead of letting it happen lazily like
// when using the query DSL. This is so DDL commands behave as expected. This is only
// generates the RDD lineage for DML queries, but do not perform any execution.
result.queryExecution.toRdd
result
}
def sql(sqlQuery: String): JavaSchemaRDD =
new JavaSchemaRDD(sqlContext, sqlContext.parseSql(sqlQuery))

/**
* :: Experimental ::
* Creates an empty parquet file with the schema of class `beanClass`, which can be registered as
* a table. This registered table can be used as the target of future insertInto` operations.
* a table. This registered table can be used as the target of future `insertInto` operations.
*
* {{{
* JavaSQLContext sqlCtx = new JavaSQLContext(...)
Expand All @@ -62,7 +56,7 @@ class JavaSQLContext(val sqlContext: SQLContext) {
* }}}
*
* @param beanClass A java bean class object that will be used to determine the schema of the
* parquet file. s
* parquet file.
* @param path The path where the directory containing parquet metadata should be created.
* Data inserted into this table will also be stored at this location.
* @param allowExisting When false, an exception will be thrown if this directory already exists.
Expand Down Expand Up @@ -100,14 +94,12 @@ class JavaSQLContext(val sqlContext: SQLContext) {
new JavaSchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(schema, rowRdd)))
}


/**
* Loads a parquet file, returning the result as a [[JavaSchemaRDD]].
*/
def parquetFile(path: String): JavaSchemaRDD =
new JavaSchemaRDD(sqlContext, ParquetRelation(path))


/**
* Registers the given RDD as a temporary table in the catalog. Temporary tables exist only
* during the lifetime of this instance of SQLContext.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,6 @@ class JavaHiveContext(sparkContext: JavaSparkContext) extends JavaSQLContext(spa
/**
* Executes a query expressed in HiveQL, returning the result as a JavaSchemaRDD.
*/
def hql(hqlQuery: String): JavaSchemaRDD = {
val result = new JavaSchemaRDD(sqlContext, HiveQl.parseSql(hqlQuery))
// We force query optimization to happen right away instead of letting it happen lazily like
// when using the query DSL. This is so DDL commands behave as expected. This is only
// generates the RDD lineage for DML queries, but do not perform any execution.
result.queryExecution.toRdd
result
}
def hql(hqlQuery: String): JavaSchemaRDD =
new JavaSchemaRDD(sqlContext, HiveQl.parseSql(hqlQuery))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive.api.java

import scala.util.Try

import org.scalatest.FunSuite

import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.api.java.JavaSchemaRDD
import org.apache.spark.sql.execution.ExplainCommand
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.test.TestSQLContext

// Implicits
import scala.collection.JavaConversions._

class JavaHiveQLSuite extends FunSuite {
val javaCtx = new JavaSparkContext(TestSQLContext.sparkContext)

// There is a little trickery here to avoid instantiating two HiveContexts in the same JVM
val javaHiveCtx = new JavaHiveContext(javaCtx) {
override val sqlContext = TestHive
}

test("SELECT * FROM src") {
assert(
javaHiveCtx.hql("SELECT * FROM src").collect().map(_.getInt(0)) ===
TestHive.sql("SELECT * FROM src").collect().map(_.getInt(0)).toSeq)
}

private val explainCommandClassName =
classOf[ExplainCommand].getSimpleName.stripSuffix("$")

def isExplanation(result: JavaSchemaRDD) = {
val explanation = result.collect().map(_.getString(0))
explanation.size == 1 && explanation.head.startsWith(explainCommandClassName)
}

test("Query Hive native command execution result") {
val tableName = "test_native_commands"

assertResult(0) {
javaHiveCtx.hql(s"DROP TABLE IF EXISTS $tableName").count()
}

assertResult(0) {
javaHiveCtx.hql(s"CREATE TABLE $tableName(key INT, value STRING)").count()
}

javaHiveCtx.hql("SHOW TABLES").registerAsTable("show_tables")

assert(
javaHiveCtx
.hql("SELECT result FROM show_tables")
.collect()
.map(_.getString(0))
.contains(tableName))

assertResult(Array(Array("key", "int", "None"), Array("value", "string", "None"))) {
javaHiveCtx.hql(s"DESCRIBE $tableName").registerAsTable("describe_table")

javaHiveCtx
.hql("SELECT result FROM describe_table")
.collect()
.map(_.getString(0).split("\t").map(_.trim))
.toArray
}

assert(isExplanation(javaHiveCtx.hql(
s"EXPLAIN SELECT key, COUNT(*) FROM $tableName GROUP BY key")))

TestHive.reset()
}

test("Exactly once semantics for DDL and command statements") {
val tableName = "test_exactly_once"
val q0 = javaHiveCtx.hql(s"CREATE TABLE $tableName(key INT, value STRING)")

// If the table was not created, the following assertion would fail
assert(Try(TestHive.table(tableName)).isSuccess)

// If the CREATE TABLE command got executed again, the following assertion would fail
assert(Try(q0.count()).isSuccess)
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -184,25 +184,29 @@ class HiveQuerySuite extends HiveComparisonTest {
test("Query Hive native command execution result") {
val tableName = "test_native_commands"

val q0 = hql(s"DROP TABLE IF EXISTS $tableName")
assert(q0.count() == 0)
assertResult(0) {
hql(s"DROP TABLE IF EXISTS $tableName").count()
}

val q1 = hql(s"CREATE TABLE $tableName(key INT, value STRING)")
assert(q1.count() == 0)
assertResult(0) {
hql(s"CREATE TABLE $tableName(key INT, value STRING)").count()
}

val q2 = hql("SHOW TABLES")
val tables = q2.select('result).collect().map { case Row(table: String) => table }
assert(tables.contains(tableName))
assert(
hql("SHOW TABLES")
.select('result)
.collect()
.map(_.getString(0))
.contains(tableName))

val q3 = hql(s"DESCRIBE $tableName")
assertResult(Array(Array("key", "int", "None"), Array("value", "string", "None"))) {
q3.select('result).collect().map { case Row(fieldDesc: String) =>
fieldDesc.split("\t").map(_.trim)
}
hql(s"DESCRIBE $tableName")
.select('result)
.collect()
.map(_.getString(0).split("\t").map(_.trim))
}

val q4 = hql(s"EXPLAIN SELECT key, COUNT(*) FROM $tableName GROUP BY key")
assert(isExplanation(q4))
assert(isExplanation(hql(s"EXPLAIN SELECT key, COUNT(*) FROM $tableName GROUP BY key")))

TestHive.reset()
}
Expand Down

0 comments on commit 22aec97

Please sign in to comment.