From afd757a241f41d7f8c458ef8f1f9ce8ed12986e5 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Fri, 25 Jul 2014 15:36:57 -0700 Subject: [PATCH] Revert "[SPARK-2410][SQL] Merging Hive Thrift/JDBC server" This reverts commit 06dc0d2c6b69c5d59b4d194ced2ac85bfe2e05e2. #1399 is making Jenkins fail. We should investigate and put this back after its passing tests. Author: Michael Armbrust Closes #1594 from marmbrus/revertJDBC and squashes the following commits: 59748da [Michael Armbrust] Revert "[SPARK-2410][SQL] Merging Hive Thrift/JDBC server" --- .gitignore | 1 - assembly/pom.xml | 10 - bagel/pom.xml | 2 +- bin/beeline | 45 --- bin/compute-classpath.sh | 1 - bin/spark-shell | 4 +- bin/spark-shell.cmd | 2 +- bin/spark-sql | 36 -- core/pom.xml | 2 +- .../org/apache/spark/deploy/SparkSubmit.scala | 14 +- .../spark/deploy/SparkSubmitArguments.scala | 5 +- dev/create-release/create-release.sh | 10 +- dev/run-tests | 2 +- dev/scalastyle | 2 +- docs/sql-programming-guide.md | 200 +--------- examples/pom.xml | 2 +- external/flume/pom.xml | 2 +- external/kafka/pom.xml | 2 +- external/mqtt/pom.xml | 2 +- external/twitter/pom.xml | 2 +- external/zeromq/pom.xml | 2 +- graphx/pom.xml | 2 +- mllib/pom.xml | 2 +- pom.xml | 7 +- project/SparkBuild.scala | 14 +- sbin/start-thriftserver.sh | 36 -- sql/catalyst/pom.xml | 2 +- .../sql/catalyst/plans/logical/commands.scala | 3 +- sql/core/pom.xml | 2 +- .../scala/org/apache/spark/sql/SQLConf.scala | 20 +- .../apache/spark/sql/execution/commands.scala | 42 +-- .../org/apache/spark/sql/SQLConfSuite.scala | 13 +- .../org/apache/spark/sql/SQLQuerySuite.scala | 10 +- sql/hive-thriftserver/pom.xml | 82 ----- .../hive/thriftserver/HiveThriftServer2.scala | 97 ----- .../hive/thriftserver/ReflectionUtils.scala | 58 --- .../hive/thriftserver/SparkSQLCLIDriver.scala | 344 ------------------ .../thriftserver/SparkSQLCLIService.scala | 74 ---- .../hive/thriftserver/SparkSQLDriver.scala | 93 ----- .../sql/hive/thriftserver/SparkSQLEnv.scala | 58 --- .../thriftserver/SparkSQLSessionManager.scala | 49 --- .../server/SparkSQLOperationManager.scala | 151 -------- .../test/resources/data/files/small_kv.txt | 5 - .../sql/hive/thriftserver/CliSuite.scala | 59 --- .../thriftserver/HiveThriftServer2Suite.scala | 125 ------- .../sql/hive/thriftserver/TestUtils.scala | 108 ------ sql/hive/pom.xml | 2 +- .../apache/spark/sql/hive/HiveContext.scala | 2 +- .../sql/hive/execution/HiveQuerySuite.scala | 50 +-- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- yarn/alpha/pom.xml | 2 +- yarn/pom.xml | 2 +- yarn/stable/pom.xml | 2 +- 54 files changed, 96 insertions(+), 1772 deletions(-) delete mode 100755 bin/beeline delete mode 100755 bin/spark-sql delete mode 100755 sbin/start-thriftserver.sh delete mode 100644 sql/hive-thriftserver/pom.xml delete mode 100644 sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala delete mode 100644 sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ReflectionUtils.scala delete mode 100755 sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala delete mode 100644 sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala delete mode 100644 sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala delete mode 100644 sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala delete mode 100644 sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala delete mode 100644 sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala delete mode 100644 sql/hive-thriftserver/src/test/resources/data/files/small_kv.txt delete mode 100644 sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala delete mode 100644 sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala delete mode 100644 sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/TestUtils.scala diff --git a/.gitignore b/.gitignore index 5b56a67c883e6..061c8946d23c1 100644 --- a/.gitignore +++ b/.gitignore @@ -57,4 +57,3 @@ metastore_db/ metastore/ warehouse/ TempStatsStore/ -sql/hive-thriftserver/test_warehouses diff --git a/assembly/pom.xml b/assembly/pom.xml index 703f15925bc44..567a8dd2a0d94 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -165,16 +165,6 @@ - - hive-thriftserver - - - org.apache.spark - spark-hive-thriftserver_${scala.binary.version} - ${project.version} - - - spark-ganglia-lgpl diff --git a/bagel/pom.xml b/bagel/pom.xml index bd51b112e26fa..90c4b095bb611 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -28,7 +28,7 @@ org.apache.spark spark-bagel_2.10 - bagel + bagel jar Spark Project Bagel diff --git a/bin/beeline b/bin/beeline deleted file mode 100755 index 09fe366c609fa..0000000000000 --- a/bin/beeline +++ /dev/null @@ -1,45 +0,0 @@ -#!/usr/bin/env bash - -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# Figure out where Spark is installed -FWDIR="$(cd `dirname $0`/..; pwd)" - -# Find the java binary -if [ -n "${JAVA_HOME}" ]; then - RUNNER="${JAVA_HOME}/bin/java" -else - if [ `command -v java` ]; then - RUNNER="java" - else - echo "JAVA_HOME is not set" >&2 - exit 1 - fi -fi - -# Compute classpath using external script -classpath_output=$($FWDIR/bin/compute-classpath.sh) -if [[ "$?" != "0" ]]; then - echo "$classpath_output" - exit 1 -else - CLASSPATH=$classpath_output -fi - -CLASS="org.apache.hive.beeline.BeeLine" -exec "$RUNNER" -cp "$CLASSPATH" $CLASS "$@" diff --git a/bin/compute-classpath.sh b/bin/compute-classpath.sh index 16b794a1592e8..e81e8c060cb98 100755 --- a/bin/compute-classpath.sh +++ b/bin/compute-classpath.sh @@ -52,7 +52,6 @@ if [ -n "$SPARK_PREPEND_CLASSES" ]; then CLASSPATH="$CLASSPATH:$FWDIR/sql/catalyst/target/scala-$SCALA_VERSION/classes" CLASSPATH="$CLASSPATH:$FWDIR/sql/core/target/scala-$SCALA_VERSION/classes" CLASSPATH="$CLASSPATH:$FWDIR/sql/hive/target/scala-$SCALA_VERSION/classes" - CLASSPATH="$CLASSPATH:$FWDIR/sql/hive-thriftserver/target/scala-$SCALA_VERSION/classes" CLASSPATH="$CLASSPATH:$FWDIR/yarn/stable/target/scala-$SCALA_VERSION/classes" fi diff --git a/bin/spark-shell b/bin/spark-shell index 756c8179d12b6..850e9507ec38f 100755 --- a/bin/spark-shell +++ b/bin/spark-shell @@ -46,11 +46,11 @@ function main(){ # (see https://github.com/sbt/sbt/issues/562). stty -icanon min 1 -echo > /dev/null 2>&1 export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix" - $FWDIR/bin/spark-submit --class org.apache.spark.repl.Main spark-shell "$@" + $FWDIR/bin/spark-submit spark-shell "$@" --class org.apache.spark.repl.Main stty icanon echo > /dev/null 2>&1 else export SPARK_SUBMIT_OPTS - $FWDIR/bin/spark-submit --class org.apache.spark.repl.Main spark-shell "$@" + $FWDIR/bin/spark-submit spark-shell "$@" --class org.apache.spark.repl.Main fi } diff --git a/bin/spark-shell.cmd b/bin/spark-shell.cmd index b56d69801171c..4b9708a8c03f3 100755 --- a/bin/spark-shell.cmd +++ b/bin/spark-shell.cmd @@ -19,4 +19,4 @@ rem set SPARK_HOME=%~dp0.. -cmd /V /E /C %SPARK_HOME%\bin\spark-submit.cmd spark-shell --class org.apache.spark.repl.Main %* +cmd /V /E /C %SPARK_HOME%\bin\spark-submit.cmd spark-shell %* --class org.apache.spark.repl.Main diff --git a/bin/spark-sql b/bin/spark-sql deleted file mode 100755 index bba7f897b19bc..0000000000000 --- a/bin/spark-sql +++ /dev/null @@ -1,36 +0,0 @@ -#!/usr/bin/env bash - -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# -# Shell script for starting the Spark SQL CLI - -# Enter posix mode for bash -set -o posix - -# Figure out where Spark is installed -FWDIR="$(cd `dirname $0`/..; pwd)" - -if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then - echo "Usage: ./sbin/spark-sql [options]" - $FWDIR/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2 - exit 0 -fi - -CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver" -exec "$FWDIR"/bin/spark-submit --class $CLASS spark-internal $@ diff --git a/core/pom.xml b/core/pom.xml index a24743495b0e1..1054cec4d77bb 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -28,7 +28,7 @@ org.apache.spark spark-core_2.10 - core + core jar Spark Project Core diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index c9cec33ebaa66..3b5642b6caa36 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -46,10 +46,6 @@ object SparkSubmit { private val CLUSTER = 2 private val ALL_DEPLOY_MODES = CLIENT | CLUSTER - // A special jar name that indicates the class being run is inside of Spark itself, and therefore - // no user jar is needed. - private val SPARK_INTERNAL = "spark-internal" - // Special primary resource names that represent shells rather than application jars. private val SPARK_SHELL = "spark-shell" private val PYSPARK_SHELL = "pyspark-shell" @@ -261,9 +257,7 @@ object SparkSubmit { // In yarn-cluster mode, use yarn.Client as a wrapper around the user class if (clusterManager == YARN && deployMode == CLUSTER) { childMainClass = "org.apache.spark.deploy.yarn.Client" - if (args.primaryResource != SPARK_INTERNAL) { - childArgs += ("--jar", args.primaryResource) - } + childArgs += ("--jar", args.primaryResource) childArgs += ("--class", args.mainClass) if (args.childArgs != null) { args.childArgs.foreach { arg => childArgs += ("--arg", arg) } @@ -338,7 +332,7 @@ object SparkSubmit { * Return whether the given primary resource represents a user jar. */ private def isUserJar(primaryResource: String): Boolean = { - !isShell(primaryResource) && !isPython(primaryResource) && !isInternal(primaryResource) + !isShell(primaryResource) && !isPython(primaryResource) } /** @@ -355,10 +349,6 @@ object SparkSubmit { primaryResource.endsWith(".py") || primaryResource == PYSPARK_SHELL } - private[spark] def isInternal(primaryResource: String): Boolean = { - primaryResource == SPARK_INTERNAL - } - /** * Merge a sequence of comma-separated file lists, some of which may be null to indicate * no files, into a single comma-separated string. diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 01d0ae541a66b..3ab67a43a3b55 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -204,9 +204,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { /** Fill in values by parsing user options. */ private def parseOpts(opts: Seq[String]): Unit = { - var inSparkOpts = true - // Delineates parsing of Spark options from parsing of user options. + var inSparkOpts = true parse(opts) def parse(opts: Seq[String]): Unit = opts match { @@ -319,7 +318,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { SparkSubmit.printErrorAndExit(errMessage) case v => primaryResource = - if (!SparkSubmit.isShell(v) && !SparkSubmit.isInternal(v)) { + if (!SparkSubmit.isShell(v)) { Utils.resolveURI(v).toString } else { v diff --git a/dev/create-release/create-release.sh b/dev/create-release/create-release.sh index 33de24d1ae6d7..38830103d1e8d 100755 --- a/dev/create-release/create-release.sh +++ b/dev/create-release/create-release.sh @@ -53,7 +53,7 @@ if [[ ! "$@" =~ --package-only ]]; then -Dusername=$GIT_USERNAME -Dpassword=$GIT_PASSWORD \ -Dmaven.javadoc.skip=true \ -Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 \ - -Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl\ + -Pyarn -Phive -Phadoop-2.2 -Pspark-ganglia-lgpl\ -Dtag=$GIT_TAG -DautoVersionSubmodules=true \ --batch-mode release:prepare @@ -61,7 +61,7 @@ if [[ ! "$@" =~ --package-only ]]; then -Darguments="-DskipTests=true -Dmaven.javadoc.skip=true -Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 -Dgpg.passphrase=${GPG_PASSPHRASE}" \ -Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 \ -Dmaven.javadoc.skip=true \ - -Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl\ + -Pyarn -Phive -Phadoop-2.2 -Pspark-ganglia-lgpl\ release:perform cd .. @@ -111,10 +111,10 @@ make_binary_release() { spark-$RELEASE_VERSION-bin-$NAME.tgz.sha } -make_binary_release "hadoop1" "-Phive -Phive-thriftserver -Dhadoop.version=1.0.4" -make_binary_release "cdh4" "-Phive -Phive-thriftserver -Dhadoop.version=2.0.0-mr1-cdh4.2.0" +make_binary_release "hadoop1" "-Phive -Dhadoop.version=1.0.4" +make_binary_release "cdh4" "-Phive -Dhadoop.version=2.0.0-mr1-cdh4.2.0" make_binary_release "hadoop2" \ - "-Phive -Phive-thriftserver -Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 -Pyarn.version=2.2.0" + "-Phive -Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 -Pyarn.version=2.2.0" # Copy data echo "Copying release tarballs" diff --git a/dev/run-tests b/dev/run-tests index 98ec969dc1b37..51e4def0f835a 100755 --- a/dev/run-tests +++ b/dev/run-tests @@ -65,7 +65,7 @@ echo "=========================================================================" # (either resolution or compilation) prompts the user for input either q, r, # etc to quit or retry. This echo is there to make it not block. if [ -n "$_RUN_SQL_TESTS" ]; then - echo -e "q\n" | SBT_MAVEN_PROFILES="$SBT_MAVEN_PROFILES -Phive -Phive-thriftserver" sbt/sbt clean package \ + echo -e "q\n" | SBT_MAVEN_PROFILES="$SBT_MAVEN_PROFILES -Phive" sbt/sbt clean package \ assembly/assembly test | grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including" else echo -e "q\n" | sbt/sbt clean package assembly/assembly test | \ diff --git a/dev/scalastyle b/dev/scalastyle index d9f2b91a3a091..a02d06912f238 100755 --- a/dev/scalastyle +++ b/dev/scalastyle @@ -17,7 +17,7 @@ # limitations under the License. # -echo -e "q\n" | sbt/sbt -Phive -Phive-thriftserver scalastyle > scalastyle.txt +echo -e "q\n" | sbt/sbt -Phive scalastyle > scalastyle.txt # Check style with YARN alpha built too echo -e "q\n" | sbt/sbt -Pyarn -Phadoop-0.23 -Dhadoop.version=0.23.9 yarn-alpha/scalastyle \ >> scalastyle.txt diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 36d642f2923b2..38728534a46e0 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -136,7 +136,7 @@ val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.createSchemaRDD // Define the schema using a case class. -// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit, +// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit, // you can use custom classes that implement the Product interface. case class Person(name: String, age: Int) @@ -548,6 +548,7 @@ results = hiveContext.hql("FROM src SELECT key, value").collect() + # Writing Language-Integrated Relational Queries **Language-Integrated queries are currently only supported in Scala.** @@ -572,199 +573,4 @@ prefixed with a tick (`'`). Implicit conversions turn these symbols into expres evaluated by the SQL execution engine. A full list of the functions supported can be found in the [ScalaDoc](api/scala/index.html#org.apache.spark.sql.SchemaRDD). - - -## Running the Thrift JDBC server - -The Thrift JDBC server implemented here corresponds to the [`HiveServer2`] -(https://cwiki.apache.org/confluence/display/Hive/Setting+Up+HiveServer2) in Hive 0.12. You can test -the JDBC server with the beeline script comes with either Spark or Hive 0.12. In order to use Hive -you must first run '`sbt/sbt -Phive-thriftserver assembly/assembly`' (or use `-Phive-thriftserver` -for maven). - -To start the JDBC server, run the following in the Spark directory: - - ./sbin/start-thriftserver.sh - -The default port the server listens on is 10000. You may run -`./sbin/start-thriftserver.sh --help` for a complete list of all available -options. Now you can use beeline to test the Thrift JDBC server: - - ./bin/beeline - -Connect to the JDBC server in beeline with: - - beeline> !connect jdbc:hive2://localhost:10000 - -Beeline will ask you for a username and password. In non-secure mode, simply enter the username on -your machine and a blank password. For secure mode, please follow the instructions given in the -[beeline documentation](https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients) - -Configuration of Hive is done by placing your `hive-site.xml` file in `conf/`. - -You may also use the beeline script comes with Hive. - -### Migration Guide for Shark Users - -#### Reducer number - -In Shark, default reducer number is 1 and is controlled by the property `mapred.reduce.tasks`. Spark -SQL deprecates this property by a new property `spark.sql.shuffle.partitions`, whose default value -is 200. Users may customize this property via `SET`: - -``` -SET spark.sql.shuffle.partitions=10; -SELECT page, count(*) c FROM logs_last_month_cached -GROUP BY page ORDER BY c DESC LIMIT 10; -``` - -You may also put this property in `hive-site.xml` to override the default value. - -For now, the `mapred.reduce.tasks` property is still recognized, and is converted to -`spark.sql.shuffle.partitions` automatically. - -#### Caching - -The `shark.cache` table property no longer exists, and tables whose name end with `_cached` are no -longer automcatically cached. Instead, we provide `CACHE TABLE` and `UNCACHE TABLE` statements to -let user control table caching explicitly: - -``` -CACHE TABLE logs_last_month; -UNCACHE TABLE logs_last_month; -``` - -**NOTE** `CACHE TABLE tbl` is lazy, it only marks table `tbl` as "need to by cached if necessary", -but doesn't actually cache it until a query that touches `tbl` is executed. To force the table to be -cached, you may simply count the table immediately after executing `CACHE TABLE`: - -``` -CACHE TABLE logs_last_month; -SELECT COUNT(1) FROM logs_last_month; -``` - -Several caching related features are not supported yet: - -* User defined partition level cache eviction policy -* RDD reloading -* In-memory cache write through policy - -### Compatibility with Apache Hive - -#### Deploying in Exising Hive Warehouses - -Spark SQL Thrift JDBC server is designed to be "out of the box" compatible with existing Hive -installations. You do not need to modify your existing Hive Metastore or change the data placement -or partitioning of your tables. - -#### Supported Hive Features - -Spark SQL supports the vast majority of Hive features, such as: - -* Hive query statements, including: - * `SELECT` - * `GROUP BY - * `ORDER BY` - * `CLUSTER BY` - * `SORT BY` -* All Hive operators, including: - * Relational operators (`=`, `⇔`, `==`, `<>`, `<`, `>`, `>=`, `<=`, etc) - * Arthimatic operators (`+`, `-`, `*`, `/`, `%`, etc) - * Logical operators (`AND`, `&&`, `OR`, `||`, etc) - * Complex type constructors - * Mathemtatical functions (`sign`, `ln`, `cos`, etc) - * String functions (`instr`, `length`, `printf`, etc) -* User defined functions (UDF) -* User defined aggregation functions (UDAF) -* User defined serialization formats (SerDe's) -* Joins - * `JOIN` - * `{LEFT|RIGHT|FULL} OUTER JOIN` - * `LEFT SEMI JOIN` - * `CROSS JOIN` -* Unions -* Sub queries - * `SELECT col FROM ( SELECT a + b AS col from t1) t2` -* Sampling -* Explain -* Partitioned tables -* All Hive DDL Functions, including: - * `CREATE TABLE` - * `CREATE TABLE AS SELECT` - * `ALTER TABLE` -* Most Hive Data types, including: - * `TINYINT` - * `SMALLINT` - * `INT` - * `BIGINT` - * `BOOLEAN` - * `FLOAT` - * `DOUBLE` - * `STRING` - * `BINARY` - * `TIMESTAMP` - * `ARRAY<>` - * `MAP<>` - * `STRUCT<>` - -#### Unsupported Hive Functionality - -Below is a list of Hive features that we don't support yet. Most of these features are rarely used -in Hive deployments. - -**Major Hive Features** - -* Tables with buckets: bucket is the hash partitioning within a Hive table partition. Spark SQL - doesn't support buckets yet. - -**Esoteric Hive Features** - -* Tables with partitions using different input formats: In Spark SQL, all table partitions need to - have the same input format. -* Non-equi outer join: For the uncommon use case of using outer joins with non-equi join conditions - (e.g. condition "`key < 10`"), Spark SQL will output wrong result for the `NULL` tuple. -* `UNIONTYPE` -* Unique join -* Single query multi insert -* Column statistics collecting: Spark SQL does not piggyback scans to collect column statistics at - the moment. - -**Hive Input/Output Formats** - -* File format for CLI: For results showing back to the CLI, Spark SQL only supports TextOutputFormat. -* Hadoop archive - -**Hive Optimizations** - -A handful of Hive optimizations are not yet included in Spark. Some of these (such as indexes) are -not necessary due to Spark SQL's in-memory computational model. Others are slotted for future -releases of Spark SQL. - -* Block level bitmap indexes and virtual columns (used to build indexes) -* Automatically convert a join to map join: For joining a large table with multiple small tables, - Hive automatically converts the join into a map join. We are adding this auto conversion in the - next release. -* Automatically determine the number of reducers for joins and groupbys: Currently in Spark SQL, you - need to control the degree of parallelism post-shuffle using "SET - spark.sql.shuffle.partitions=[num_tasks];". We are going to add auto-setting of parallelism in the - next release. -* Meta-data only query: For queries that can be answered by using only meta data, Spark SQL still - launches tasks to compute the result. -* Skew data flag: Spark SQL does not follow the skew data flags in Hive. -* `STREAMTABLE` hint in join: Spark SQL does not follow the `STREAMTABLE` hint. -* Merge multiple small files for query results: if the result output contains multiple small files, - Hive can optionally merge the small files into fewer large files to avoid overflowing the HDFS - metadata. Spark SQL does not support that. - -## Running the Spark SQL CLI - -The Spark SQL CLI is a convenient tool to run the Hive metastore service in local mode and execute -queries input from command line. Note: the Spark SQL CLI cannot talk to the Thrift JDBC server. - -To start the Spark SQL CLI, run the following in the Spark directory: - - ./bin/spark-sql - -Configuration of Hive is done by placing your `hive-site.xml` file in `conf/`. -You may run `./bin/spark-sql --help` for a complete list of all available -options. + \ No newline at end of file diff --git a/examples/pom.xml b/examples/pom.xml index c4ed0f5a6a02b..bd1c387c2eb91 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -28,7 +28,7 @@ org.apache.spark spark-examples_2.10 - examples + examples jar Spark Project Examples diff --git a/external/flume/pom.xml b/external/flume/pom.xml index 874b8a7959bb6..61a6aff543aed 100644 --- a/external/flume/pom.xml +++ b/external/flume/pom.xml @@ -28,7 +28,7 @@ org.apache.spark spark-streaming-flume_2.10 - streaming-flume + streaming-flume jar Spark Project External Flume diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml index 25a5c0a4d7d77..4762c50685a93 100644 --- a/external/kafka/pom.xml +++ b/external/kafka/pom.xml @@ -28,7 +28,7 @@ org.apache.spark spark-streaming-kafka_2.10 - streaming-kafka + streaming-kafka jar Spark Project External Kafka diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml index f31ed655f6779..32c530e600ce0 100644 --- a/external/mqtt/pom.xml +++ b/external/mqtt/pom.xml @@ -28,7 +28,7 @@ org.apache.spark spark-streaming-mqtt_2.10 - streaming-mqtt + streaming-mqtt jar Spark Project External MQTT diff --git a/external/twitter/pom.xml b/external/twitter/pom.xml index 56bb24c2a072e..637adb0f00da0 100644 --- a/external/twitter/pom.xml +++ b/external/twitter/pom.xml @@ -28,7 +28,7 @@ org.apache.spark spark-streaming-twitter_2.10 - streaming-twitter + streaming-twitter jar Spark Project External Twitter diff --git a/external/zeromq/pom.xml b/external/zeromq/pom.xml index 54b0242c54e78..e4d758a04a4cd 100644 --- a/external/zeromq/pom.xml +++ b/external/zeromq/pom.xml @@ -28,7 +28,7 @@ org.apache.spark spark-streaming-zeromq_2.10 - streaming-zeromq + streaming-zeromq jar Spark Project External ZeroMQ diff --git a/graphx/pom.xml b/graphx/pom.xml index 6dd52fc618b1e..7e3bcf29dcfbc 100644 --- a/graphx/pom.xml +++ b/graphx/pom.xml @@ -28,7 +28,7 @@ org.apache.spark spark-graphx_2.10 - graphx + graphx jar Spark Project GraphX diff --git a/mllib/pom.xml b/mllib/pom.xml index f27cf520dc9fa..92b07e2357db1 100644 --- a/mllib/pom.xml +++ b/mllib/pom.xml @@ -28,7 +28,7 @@ org.apache.spark spark-mllib_2.10 - mllib + mllib jar Spark Project ML Library diff --git a/pom.xml b/pom.xml index 3e9d388180d8e..4e2d64a833640 100644 --- a/pom.xml +++ b/pom.xml @@ -95,7 +95,6 @@ sql/catalyst sql/core sql/hive - sql/hive-thriftserver repl assembly external/twitter @@ -253,9 +252,9 @@ 3.3.2 - commons-codec - commons-codec - 1.5 + commons-codec + commons-codec + 1.5 com.google.code.findbugs diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 1629bc2cba8ba..62576f84dd031 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -30,11 +30,11 @@ object BuildCommons { private val buildLocation = file(".").getAbsoluteFile.getParentFile - val allProjects@Seq(bagel, catalyst, core, graphx, hive, hiveThriftServer, mllib, repl, spark, sql, - streaming, streamingFlume, streamingKafka, streamingMqtt, streamingTwitter, streamingZeromq) = - Seq("bagel", "catalyst", "core", "graphx", "hive", "hive-thriftserver", "mllib", "repl", - "spark", "sql", "streaming", "streaming-flume", "streaming-kafka", "streaming-mqtt", - "streaming-twitter", "streaming-zeromq").map(ProjectRef(buildLocation, _)) + val allProjects@Seq(bagel, catalyst, core, graphx, hive, mllib, repl, spark, sql, streaming, + streamingFlume, streamingKafka, streamingMqtt, streamingTwitter, streamingZeromq) = + Seq("bagel", "catalyst", "core", "graphx", "hive", "mllib", "repl", "spark", "sql", + "streaming", "streaming-flume", "streaming-kafka", "streaming-mqtt", "streaming-twitter", + "streaming-zeromq").map(ProjectRef(buildLocation, _)) val optionallyEnabledProjects@Seq(yarn, yarnStable, yarnAlpha, java8Tests, sparkGangliaLgpl) = Seq("yarn", "yarn-stable", "yarn-alpha", "java8-tests", "ganglia-lgpl") @@ -100,7 +100,7 @@ object SparkBuild extends PomBuild { Properties.envOrNone("SBT_MAVEN_PROPERTIES") match { case Some(v) => v.split("(\\s+|,)").filterNot(_.isEmpty).map(_.split("=")).foreach(x => System.setProperty(x(0), x(1))) - case _ => + case _ => } override val userPropertiesMap = System.getProperties.toMap @@ -158,7 +158,7 @@ object SparkBuild extends PomBuild { /* Enable Mima for all projects except spark, hive, catalyst, sql and repl */ // TODO: Add Sql to mima checks - allProjects.filterNot(x => Seq(spark, sql, hive, hiveThriftServer, catalyst, repl).contains(x)). + allProjects.filterNot(y => Seq(spark, sql, hive, catalyst, repl).exists(x => x == y)). foreach (x => enable(MimaBuild.mimaSettings(sparkHome, x))(x)) /* Enable Assembly for all assembly projects */ diff --git a/sbin/start-thriftserver.sh b/sbin/start-thriftserver.sh deleted file mode 100755 index 8398e6f19b511..0000000000000 --- a/sbin/start-thriftserver.sh +++ /dev/null @@ -1,36 +0,0 @@ -#!/usr/bin/env bash - -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# -# Shell script for starting the Spark SQL Thrift server - -# Enter posix mode for bash -set -o posix - -# Figure out where Spark is installed -FWDIR="$(cd `dirname $0`/..; pwd)" - -if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then - echo "Usage: ./sbin/start-thriftserver [options]" - $FWDIR/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2 - exit 0 -fi - -CLASS="org.apache.spark.sql.hive.thriftserver.HiveThriftServer2" -exec "$FWDIR"/bin/spark-submit --class $CLASS spark-internal $@ diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml index 531bfddbf237b..6decde3fcd62d 100644 --- a/sql/catalyst/pom.xml +++ b/sql/catalyst/pom.xml @@ -32,7 +32,7 @@ Spark Project Catalyst http://spark.apache.org/ - catalyst + catalyst diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala index a357c6ffb8977..1d5f033f0d274 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala @@ -43,7 +43,8 @@ case class NativeCommand(cmd: String) extends Command { */ case class SetCommand(key: Option[String], value: Option[String]) extends Command { override def output = Seq( - BoundReference(1, AttributeReference("", StringType, nullable = false)())) + BoundReference(0, AttributeReference("key", StringType, nullable = false)()), + BoundReference(1, AttributeReference("value", StringType, nullable = false)())) } /** diff --git a/sql/core/pom.xml b/sql/core/pom.xml index 3a038a2db6173..c309c43804d97 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -32,7 +32,7 @@ Spark Project SQL http://spark.apache.org/ - sql + sql diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 41920c00b5a2c..2b787e14f3f15 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -30,13 +30,12 @@ import scala.collection.JavaConverters._ * SQLConf is thread-safe (internally synchronized so safe to be used in multiple threads). */ trait SQLConf { - import SQLConf._ /** ************************ Spark SQL Params/Hints ******************* */ // TODO: refactor so that these hints accessors don't pollute the name space of SQLContext? /** Number of partitions to use for shuffle operators. */ - private[spark] def numShufflePartitions: Int = get(SHUFFLE_PARTITIONS, "200").toInt + private[spark] def numShufflePartitions: Int = get("spark.sql.shuffle.partitions", "200").toInt /** * Upper bound on the sizes (in bytes) of the tables qualified for the auto conversion to @@ -44,10 +43,11 @@ trait SQLConf { * effectively disables auto conversion. * Hive setting: hive.auto.convert.join.noconditionaltask.size. */ - private[spark] def autoConvertJoinSize: Int = get(AUTO_CONVERT_JOIN_SIZE, "10000").toInt + private[spark] def autoConvertJoinSize: Int = + get("spark.sql.auto.convert.join.size", "10000").toInt /** A comma-separated list of table names marked to be broadcasted during joins. */ - private[spark] def joinBroadcastTables: String = get(JOIN_BROADCAST_TABLES, "") + private[spark] def joinBroadcastTables: String = get("spark.sql.join.broadcastTables", "") /** ********************** SQLConf functionality methods ************ */ @@ -61,7 +61,7 @@ trait SQLConf { def set(key: String, value: String): Unit = { require(key != null, "key cannot be null") - require(value != null, s"value cannot be null for $key") + require(value != null, s"value cannot be null for ${key}") settings.put(key, value) } @@ -90,13 +90,3 @@ trait SQLConf { } } - -object SQLConf { - val AUTO_CONVERT_JOIN_SIZE = "spark.sql.auto.convert.join.size" - val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions" - val JOIN_BROADCAST_TABLES = "spark.sql.join.broadcastTables" - - object Deprecated { - val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index 9293239131d52..98d2f89c8ae71 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -17,13 +17,12 @@ package org.apache.spark.sql.execution -import org.apache.spark.Logging import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRow} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.{Row, SQLConf, SQLContext} +import org.apache.spark.sql.{Row, SQLContext} trait Command { /** @@ -45,53 +44,28 @@ trait Command { case class SetCommand( key: Option[String], value: Option[String], output: Seq[Attribute])( @transient context: SQLContext) - extends LeafNode with Command with Logging { + extends LeafNode with Command { - override protected[sql] lazy val sideEffectResult: Seq[String] = (key, value) match { + override protected[sql] lazy val sideEffectResult: Seq[(String, String)] = (key, value) match { // Set value for key k. case (Some(k), Some(v)) => - if (k == SQLConf.Deprecated.MAPRED_REDUCE_TASKS) { - logWarning(s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " + - s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS} instead.") - context.set(SQLConf.SHUFFLE_PARTITIONS, v) - Array(s"${SQLConf.SHUFFLE_PARTITIONS}=$v") - } else { - context.set(k, v) - Array(s"$k=$v") - } + context.set(k, v) + Array(k -> v) // Query the value bound to key k. case (Some(k), _) => - // TODO (lian) This is just a workaround to make the Simba ODBC driver work. - // Should remove this once we get the ODBC driver updated. - if (k == "-v") { - val hiveJars = Seq( - "hive-exec-0.12.0.jar", - "hive-service-0.12.0.jar", - "hive-common-0.12.0.jar", - "hive-hwi-0.12.0.jar", - "hive-0.12.0.jar").mkString(":") - - Array( - "system:java.class.path=" + hiveJars, - "system:sun.java.command=shark.SharkServer2") - } - else { - Array(s"$k=${context.getOption(k).getOrElse("")}") - } + Array(k -> context.getOption(k).getOrElse("")) // Query all key-value pairs that are set in the SQLConf of the context. case (None, None) => - context.getAll.map { case (k, v) => - s"$k=$v" - } + context.getAll case _ => throw new IllegalArgumentException() } def execute(): RDD[Row] = { - val rows = sideEffectResult.map { line => new GenericRow(Array[Any](line)) } + val rows = sideEffectResult.map { case (k, v) => new GenericRow(Array[Any](k, v)) } context.sparkContext.parallelize(rows, 1) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala index 1a58d73d9e7f4..08293f7f0ca30 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala @@ -54,10 +54,10 @@ class SQLConfSuite extends QueryTest { assert(get(testKey, testVal + "_") == testVal) assert(TestSQLContext.get(testKey, testVal + "_") == testVal) - sql("set some.property=20") - assert(get("some.property", "0") == "20") - sql("set some.property = 40") - assert(get("some.property", "0") == "40") + sql("set mapred.reduce.tasks=20") + assert(get("mapred.reduce.tasks", "0") == "20") + sql("set mapred.reduce.tasks = 40") + assert(get("mapred.reduce.tasks", "0") == "40") val key = "spark.sql.key" val vs = "val0,val_1,val2.3,my_table" @@ -70,9 +70,4 @@ class SQLConfSuite extends QueryTest { clear() } - test("deprecated property") { - clear() - sql(s"set ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS}=10") - assert(get(SQLConf.SHUFFLE_PARTITIONS) == "10") - } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index de9e8aa4f62ed..6736189c96d4b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -424,25 +424,25 @@ class SQLQuerySuite extends QueryTest { sql(s"SET $testKey=$testVal") checkAnswer( sql("SET"), - Seq(Seq(s"$testKey=$testVal")) + Seq(Seq(testKey, testVal)) ) sql(s"SET ${testKey + testKey}=${testVal + testVal}") checkAnswer( sql("set"), Seq( - Seq(s"$testKey=$testVal"), - Seq(s"${testKey + testKey}=${testVal + testVal}")) + Seq(testKey, testVal), + Seq(testKey + testKey, testVal + testVal)) ) // "set key" checkAnswer( sql(s"SET $testKey"), - Seq(Seq(s"$testKey=$testVal")) + Seq(Seq(testKey, testVal)) ) checkAnswer( sql(s"SET $nonexistentKey"), - Seq(Seq(s"$nonexistentKey=")) + Seq(Seq(nonexistentKey, "")) ) clear() } diff --git a/sql/hive-thriftserver/pom.xml b/sql/hive-thriftserver/pom.xml deleted file mode 100644 index 7fac90fdc596d..0000000000000 --- a/sql/hive-thriftserver/pom.xml +++ /dev/null @@ -1,82 +0,0 @@ - - - - - 4.0.0 - - org.apache.spark - spark-parent - 1.1.0-SNAPSHOT - ../../pom.xml - - - org.apache.spark - spark-hive-thriftserver_2.10 - jar - Spark Project Hive - http://spark.apache.org/ - - hive-thriftserver - - - - - org.apache.spark - spark-hive_${scala.binary.version} - ${project.version} - - - org.spark-project.hive - hive-cli - ${hive.version} - - - org.spark-project.hive - hive-jdbc - ${hive.version} - - - org.spark-project.hive - hive-beeline - ${hive.version} - - - org.scalatest - scalatest_${scala.binary.version} - test - - - - target/scala-${scala.binary.version}/classes - target/scala-${scala.binary.version}/test-classes - - - org.scalatest - scalatest-maven-plugin - - - org.apache.maven.plugins - maven-deploy-plugin - - true - - - - - diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala deleted file mode 100644 index ddbc2a79fb512..0000000000000 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.thriftserver - -import scala.collection.JavaConversions._ - -import org.apache.commons.logging.LogFactory -import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.ql.session.SessionState -import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService -import org.apache.hive.service.server.{HiveServer2, ServerOptionsProcessor} - -import org.apache.spark.sql.Logging -import org.apache.spark.sql.hive.HiveContext -import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ - -/** - * The main entry point for the Spark SQL port of HiveServer2. Starts up a `SparkSQLContext` and a - * `HiveThriftServer2` thrift server. - */ -private[hive] object HiveThriftServer2 extends Logging { - var LOG = LogFactory.getLog(classOf[HiveServer2]) - - def main(args: Array[String]) { - val optionsProcessor = new ServerOptionsProcessor("HiveThriftServer2") - - if (!optionsProcessor.process(args)) { - logger.warn("Error starting HiveThriftServer2 with given arguments") - System.exit(-1) - } - - val ss = new SessionState(new HiveConf(classOf[SessionState])) - - // Set all properties specified via command line. - val hiveConf: HiveConf = ss.getConf - hiveConf.getAllProperties.toSeq.sortBy(_._1).foreach { case (k, v) => - logger.debug(s"HiveConf var: $k=$v") - } - - SessionState.start(ss) - - logger.info("Starting SparkContext") - SparkSQLEnv.init() - SessionState.start(ss) - - Runtime.getRuntime.addShutdownHook( - new Thread() { - override def run() { - SparkSQLEnv.sparkContext.stop() - } - } - ) - - try { - val server = new HiveThriftServer2(SparkSQLEnv.hiveContext) - server.init(hiveConf) - server.start() - logger.info("HiveThriftServer2 started") - } catch { - case e: Exception => - logger.error("Error starting HiveThriftServer2", e) - System.exit(-1) - } - } -} - -private[hive] class HiveThriftServer2(hiveContext: HiveContext) - extends HiveServer2 - with ReflectedCompositeService { - - override def init(hiveConf: HiveConf) { - val sparkSqlCliService = new SparkSQLCLIService(hiveContext) - setSuperField(this, "cliService", sparkSqlCliService) - addService(sparkSqlCliService) - - val thriftCliService = new ThriftBinaryCLIService(sparkSqlCliService) - setSuperField(this, "thriftCLIService", thriftCliService) - addService(thriftCliService) - - initCompositeService(hiveConf) - } -} diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ReflectionUtils.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ReflectionUtils.scala deleted file mode 100644 index 599294dfbb7d7..0000000000000 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ReflectionUtils.scala +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.thriftserver - -private[hive] object ReflectionUtils { - def setSuperField(obj : Object, fieldName: String, fieldValue: Object) { - setAncestorField(obj, 1, fieldName, fieldValue) - } - - def setAncestorField(obj: AnyRef, level: Int, fieldName: String, fieldValue: AnyRef) { - val ancestor = Iterator.iterate[Class[_]](obj.getClass)(_.getSuperclass).drop(level).next() - val field = ancestor.getDeclaredField(fieldName) - field.setAccessible(true) - field.set(obj, fieldValue) - } - - def getSuperField[T](obj: AnyRef, fieldName: String): T = { - getAncestorField[T](obj, 1, fieldName) - } - - def getAncestorField[T](clazz: Object, level: Int, fieldName: String): T = { - val ancestor = Iterator.iterate[Class[_]](clazz.getClass)(_.getSuperclass).drop(level).next() - val field = ancestor.getDeclaredField(fieldName) - field.setAccessible(true) - field.get(clazz).asInstanceOf[T] - } - - def invokeStatic(clazz: Class[_], methodName: String, args: (Class[_], AnyRef)*): AnyRef = { - invoke(clazz, null, methodName, args: _*) - } - - def invoke( - clazz: Class[_], - obj: AnyRef, - methodName: String, - args: (Class[_], AnyRef)*): AnyRef = { - - val (types, values) = args.unzip - val method = clazz.getDeclaredMethod(methodName, types: _*) - method.setAccessible(true) - method.invoke(obj, values.toSeq: _*) - } -} diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala deleted file mode 100755 index 27268ecb923e9..0000000000000 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ /dev/null @@ -1,344 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.thriftserver - -import scala.collection.JavaConversions._ - -import java.io._ -import java.util.{ArrayList => JArrayList} - -import jline.{ConsoleReader, History} -import org.apache.commons.lang.StringUtils -import org.apache.commons.logging.LogFactory -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.hive.cli.{CliDriver, CliSessionState, OptionsProcessor} -import org.apache.hadoop.hive.common.LogUtils.LogInitializationException -import org.apache.hadoop.hive.common.{HiveInterruptCallback, HiveInterruptUtils, LogUtils} -import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.ql.Driver -import org.apache.hadoop.hive.ql.exec.Utilities -import org.apache.hadoop.hive.ql.processors.{CommandProcessor, CommandProcessorFactory} -import org.apache.hadoop.hive.ql.session.SessionState -import org.apache.hadoop.hive.shims.ShimLoader -import org.apache.thrift.transport.TSocket - -import org.apache.spark.sql.Logging - -private[hive] object SparkSQLCLIDriver { - private var prompt = "spark-sql" - private var continuedPrompt = "".padTo(prompt.length, ' ') - private var transport:TSocket = _ - - installSignalHandler() - - /** - * Install an interrupt callback to cancel all Spark jobs. In Hive's CliDriver#processLine(), - * a signal handler will invoke this registered callback if a Ctrl+C signal is detected while - * a command is being processed by the current thread. - */ - def installSignalHandler() { - HiveInterruptUtils.add(new HiveInterruptCallback { - override def interrupt() { - // Handle remote execution mode - if (SparkSQLEnv.sparkContext != null) { - SparkSQLEnv.sparkContext.cancelAllJobs() - } else { - if (transport != null) { - // Force closing of TCP connection upon session termination - transport.getSocket.close() - } - } - } - }) - } - - def main(args: Array[String]) { - val oproc = new OptionsProcessor() - if (!oproc.process_stage1(args)) { - System.exit(1) - } - - // NOTE: It is critical to do this here so that log4j is reinitialized - // before any of the other core hive classes are loaded - var logInitFailed = false - var logInitDetailMessage: String = null - try { - logInitDetailMessage = LogUtils.initHiveLog4j() - } catch { - case e: LogInitializationException => - logInitFailed = true - logInitDetailMessage = e.getMessage - } - - val sessionState = new CliSessionState(new HiveConf(classOf[SessionState])) - - sessionState.in = System.in - try { - sessionState.out = new PrintStream(System.out, true, "UTF-8") - sessionState.info = new PrintStream(System.err, true, "UTF-8") - sessionState.err = new PrintStream(System.err, true, "UTF-8") - } catch { - case e: UnsupportedEncodingException => System.exit(3) - } - - if (!oproc.process_stage2(sessionState)) { - System.exit(2) - } - - if (!sessionState.getIsSilent) { - if (logInitFailed) System.err.println(logInitDetailMessage) - else SessionState.getConsole.printInfo(logInitDetailMessage) - } - - // Set all properties specified via command line. - val conf: HiveConf = sessionState.getConf - sessionState.cmdProperties.entrySet().foreach { item: java.util.Map.Entry[Object, Object] => - conf.set(item.getKey.asInstanceOf[String], item.getValue.asInstanceOf[String]) - sessionState.getOverriddenConfigurations.put( - item.getKey.asInstanceOf[String], item.getValue.asInstanceOf[String]) - } - - SessionState.start(sessionState) - - // Clean up after we exit - Runtime.getRuntime.addShutdownHook( - new Thread() { - override def run() { - SparkSQLEnv.stop() - } - } - ) - - // "-h" option has been passed, so connect to Hive thrift server. - if (sessionState.getHost != null) { - sessionState.connect() - if (sessionState.isRemoteMode) { - prompt = s"[${sessionState.getHost}:${sessionState.getPort}]" + prompt - continuedPrompt = "".padTo(prompt.length, ' ') - } - } - - if (!sessionState.isRemoteMode && !ShimLoader.getHadoopShims.usesJobShell()) { - // Hadoop-20 and above - we need to augment classpath using hiveconf - // components. - // See also: code in ExecDriver.java - var loader = conf.getClassLoader - val auxJars = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS) - if (StringUtils.isNotBlank(auxJars)) { - loader = Utilities.addToClassPath(loader, StringUtils.split(auxJars, ",")) - } - conf.setClassLoader(loader) - Thread.currentThread().setContextClassLoader(loader) - } - - val cli = new SparkSQLCLIDriver - cli.setHiveVariables(oproc.getHiveVariables) - - // TODO work around for set the log output to console, because the HiveContext - // will set the output into an invalid buffer. - sessionState.in = System.in - try { - sessionState.out = new PrintStream(System.out, true, "UTF-8") - sessionState.info = new PrintStream(System.err, true, "UTF-8") - sessionState.err = new PrintStream(System.err, true, "UTF-8") - } catch { - case e: UnsupportedEncodingException => System.exit(3) - } - - // Execute -i init files (always in silent mode) - cli.processInitFiles(sessionState) - - if (sessionState.execString != null) { - System.exit(cli.processLine(sessionState.execString)) - } - - try { - if (sessionState.fileName != null) { - System.exit(cli.processFile(sessionState.fileName)) - } - } catch { - case e: FileNotFoundException => - System.err.println(s"Could not open input file for reading. (${e.getMessage})") - System.exit(3) - } - - val reader = new ConsoleReader() - reader.setBellEnabled(false) - // reader.setDebug(new PrintWriter(new FileWriter("writer.debug", true))) - CliDriver.getCommandCompletor.foreach((e) => reader.addCompletor(e)) - - val historyDirectory = System.getProperty("user.home") - - try { - if (new File(historyDirectory).exists()) { - val historyFile = historyDirectory + File.separator + ".hivehistory" - reader.setHistory(new History(new File(historyFile))) - } else { - System.err.println("WARNING: Directory for Hive history file: " + historyDirectory + - " does not exist. History will not be available during this session.") - } - } catch { - case e: Exception => - System.err.println("WARNING: Encountered an error while trying to initialize Hive's " + - "history file. History will not be available during this session.") - System.err.println(e.getMessage) - } - - val clientTransportTSocketField = classOf[CliSessionState].getDeclaredField("transport") - clientTransportTSocketField.setAccessible(true) - - transport = clientTransportTSocketField.get(sessionState).asInstanceOf[TSocket] - - var ret = 0 - var prefix = "" - val currentDB = ReflectionUtils.invokeStatic(classOf[CliDriver], "getFormattedDb", - classOf[HiveConf] -> conf, classOf[CliSessionState] -> sessionState) - - def promptWithCurrentDB = s"$prompt$currentDB" - def continuedPromptWithDBSpaces = continuedPrompt + ReflectionUtils.invokeStatic( - classOf[CliDriver], "spacesForString", classOf[String] -> currentDB) - - var currentPrompt = promptWithCurrentDB - var line = reader.readLine(currentPrompt + "> ") - - while (line != null) { - if (prefix.nonEmpty) { - prefix += '\n' - } - - if (line.trim().endsWith(";") && !line.trim().endsWith("\\;")) { - line = prefix + line - ret = cli.processLine(line, true) - prefix = "" - currentPrompt = promptWithCurrentDB - } else { - prefix = prefix + line - currentPrompt = continuedPromptWithDBSpaces - } - - line = reader.readLine(currentPrompt + "> ") - } - - sessionState.close() - - System.exit(ret) - } -} - -private[hive] class SparkSQLCLIDriver extends CliDriver with Logging { - private val sessionState = SessionState.get().asInstanceOf[CliSessionState] - - private val LOG = LogFactory.getLog("CliDriver") - - private val console = new SessionState.LogHelper(LOG) - - private val conf: Configuration = - if (sessionState != null) sessionState.getConf else new Configuration() - - // Force initializing SparkSQLEnv. This is put here but not object SparkSQLCliDriver - // because the Hive unit tests do not go through the main() code path. - if (!sessionState.isRemoteMode) { - SparkSQLEnv.init() - } - - override def processCmd(cmd: String): Int = { - val cmd_trimmed: String = cmd.trim() - val tokens: Array[String] = cmd_trimmed.split("\\s+") - val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim() - if (cmd_trimmed.toLowerCase.equals("quit") || - cmd_trimmed.toLowerCase.equals("exit") || - tokens(0).equalsIgnoreCase("source") || - cmd_trimmed.startsWith("!") || - tokens(0).toLowerCase.equals("list") || - sessionState.isRemoteMode) { - val start = System.currentTimeMillis() - super.processCmd(cmd) - val end = System.currentTimeMillis() - val timeTaken: Double = (end - start) / 1000.0 - console.printInfo(s"Time taken: $timeTaken seconds") - 0 - } else { - var ret = 0 - val hconf = conf.asInstanceOf[HiveConf] - val proc: CommandProcessor = CommandProcessorFactory.get(tokens(0), hconf) - - if (proc != null) { - if (proc.isInstanceOf[Driver]) { - val driver = new SparkSQLDriver - - driver.init() - val out = sessionState.out - val start:Long = System.currentTimeMillis() - if (sessionState.getIsVerbose) { - out.println(cmd) - } - - ret = driver.run(cmd).getResponseCode - if (ret != 0) { - driver.close() - return ret - } - - val res = new JArrayList[String]() - - if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_CLI_PRINT_HEADER)) { - // Print the column names. - Option(driver.getSchema.getFieldSchemas).map { fields => - out.println(fields.map(_.getName).mkString("\t")) - } - } - - try { - while (!out.checkError() && driver.getResults(res)) { - res.foreach(out.println) - res.clear() - } - } catch { - case e:IOException => - console.printError( - s"""Failed with exception ${e.getClass.getName}: ${e.getMessage} - |${org.apache.hadoop.util.StringUtils.stringifyException(e)} - """.stripMargin) - ret = 1 - } - - val cret = driver.close() - if (ret == 0) { - ret = cret - } - - val end = System.currentTimeMillis() - if (end > start) { - val timeTaken:Double = (end - start) / 1000.0 - console.printInfo(s"Time taken: $timeTaken seconds", null) - } - - // Destroy the driver to release all the locks. - driver.destroy() - } else { - if (sessionState.getIsVerbose) { - sessionState.out.println(tokens(0) + " " + cmd_1) - } - ret = proc.run(cmd_1).getResponseCode - } - } - ret - } - } -} - diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala deleted file mode 100644 index 42cbf363b274f..0000000000000 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.thriftserver - -import scala.collection.JavaConversions._ - -import java.io.IOException -import java.util.{List => JList} -import javax.security.auth.login.LoginException - -import org.apache.commons.logging.Log -import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.shims.ShimLoader -import org.apache.hive.service.Service.STATE -import org.apache.hive.service.auth.HiveAuthFactory -import org.apache.hive.service.cli.CLIService -import org.apache.hive.service.{AbstractService, Service, ServiceException} - -import org.apache.spark.sql.hive.HiveContext -import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ - -private[hive] class SparkSQLCLIService(hiveContext: HiveContext) - extends CLIService - with ReflectedCompositeService { - - override def init(hiveConf: HiveConf) { - setSuperField(this, "hiveConf", hiveConf) - - val sparkSqlSessionManager = new SparkSQLSessionManager(hiveContext) - setSuperField(this, "sessionManager", sparkSqlSessionManager) - addService(sparkSqlSessionManager) - - try { - HiveAuthFactory.loginFromKeytab(hiveConf) - val serverUserName = ShimLoader.getHadoopShims - .getShortUserName(ShimLoader.getHadoopShims.getUGIForConf(hiveConf)) - setSuperField(this, "serverUserName", serverUserName) - } catch { - case e @ (_: IOException | _: LoginException) => - throw new ServiceException("Unable to login to kerberos with given principal/keytab", e) - } - - initCompositeService(hiveConf) - } -} - -private[thriftserver] trait ReflectedCompositeService { this: AbstractService => - def initCompositeService(hiveConf: HiveConf) { - // Emulating `CompositeService.init(hiveConf)` - val serviceList = getAncestorField[JList[Service]](this, 2, "serviceList") - serviceList.foreach(_.init(hiveConf)) - - // Emulating `AbstractService.init(hiveConf)` - invoke(classOf[AbstractService], this, "ensureCurrentState", classOf[STATE] -> STATE.NOTINITED) - setAncestorField(this, 3, "hiveConf", hiveConf) - invoke(classOf[AbstractService], this, "changeState", classOf[STATE] -> STATE.INITED) - getAncestorField[Log](this, 3, "LOG").info(s"Service: $getName is inited.") - } -} diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala deleted file mode 100644 index 5202aa9903e03..0000000000000 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.thriftserver - -import scala.collection.JavaConversions._ - -import java.util.{ArrayList => JArrayList} - -import org.apache.commons.lang.exception.ExceptionUtils -import org.apache.hadoop.hive.metastore.api.{FieldSchema, Schema} -import org.apache.hadoop.hive.ql.Driver -import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse - -import org.apache.spark.sql.Logging -import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes} - -private[hive] class SparkSQLDriver(val context: HiveContext = SparkSQLEnv.hiveContext) - extends Driver with Logging { - - private var tableSchema: Schema = _ - private var hiveResponse: Seq[String] = _ - - override def init(): Unit = { - } - - private def getResultSetSchema(query: context.QueryExecution): Schema = { - val analyzed = query.analyzed - logger.debug(s"Result Schema: ${analyzed.output}") - if (analyzed.output.size == 0) { - new Schema(new FieldSchema("Response code", "string", "") :: Nil, null) - } else { - val fieldSchemas = analyzed.output.map { attr => - new FieldSchema(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), "") - } - - new Schema(fieldSchemas, null) - } - } - - override def run(command: String): CommandProcessorResponse = { - val execution = context.executePlan(context.hql(command).logicalPlan) - - // TODO unify the error code - try { - hiveResponse = execution.stringResult() - tableSchema = getResultSetSchema(execution) - new CommandProcessorResponse(0) - } catch { - case cause: Throwable => - logger.error(s"Failed in [$command]", cause) - new CommandProcessorResponse(-3, ExceptionUtils.getFullStackTrace(cause), null) - } - } - - override def close(): Int = { - hiveResponse = null - tableSchema = null - 0 - } - - override def getSchema: Schema = tableSchema - - override def getResults(res: JArrayList[String]): Boolean = { - if (hiveResponse == null) { - false - } else { - res.addAll(hiveResponse) - hiveResponse = null - true - } - } - - override def destroy() { - super.destroy() - hiveResponse = null - tableSchema = null - } -} diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala deleted file mode 100644 index 451c3bd7b9352..0000000000000 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.thriftserver - -import org.apache.hadoop.hive.ql.session.SessionState - -import org.apache.spark.scheduler.{SplitInfo, StatsReportListener} -import org.apache.spark.sql.Logging -import org.apache.spark.sql.hive.HiveContext -import org.apache.spark.{SparkConf, SparkContext} - -/** A singleton object for the master program. The slaves should not access this. */ -private[hive] object SparkSQLEnv extends Logging { - logger.debug("Initializing SparkSQLEnv") - - var hiveContext: HiveContext = _ - var sparkContext: SparkContext = _ - - def init() { - if (hiveContext == null) { - sparkContext = new SparkContext(new SparkConf() - .setAppName(s"SparkSQL::${java.net.InetAddress.getLocalHost.getHostName}")) - - sparkContext.addSparkListener(new StatsReportListener()) - - hiveContext = new HiveContext(sparkContext) { - @transient override lazy val sessionState = SessionState.get() - @transient override lazy val hiveconf = sessionState.getConf - } - } - } - - /** Cleans up and shuts down the Spark SQL environments. */ - def stop() { - logger.debug("Shutting down Spark SQL Environment") - // Stop the SparkContext - if (SparkSQLEnv.sparkContext != null) { - sparkContext.stop() - sparkContext = null - hiveContext = null - } - } -} diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala deleted file mode 100644 index 6b3275b4eaf04..0000000000000 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.thriftserver - -import java.util.concurrent.Executors - -import org.apache.commons.logging.Log -import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.conf.HiveConf.ConfVars -import org.apache.hive.service.cli.session.SessionManager - -import org.apache.spark.sql.hive.HiveContext -import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ -import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager - -private[hive] class SparkSQLSessionManager(hiveContext: HiveContext) - extends SessionManager - with ReflectedCompositeService { - - override def init(hiveConf: HiveConf) { - setSuperField(this, "hiveConf", hiveConf) - - val backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS) - setSuperField(this, "backgroundOperationPool", Executors.newFixedThreadPool(backgroundPoolSize)) - getAncestorField[Log](this, 3, "LOG").info( - s"HiveServer2: Async execution pool size $backgroundPoolSize") - - val sparkSqlOperationManager = new SparkSQLOperationManager(hiveContext) - setSuperField(this, "operationManager", sparkSqlOperationManager) - addService(sparkSqlOperationManager) - - initCompositeService(hiveConf) - } -} diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala deleted file mode 100644 index a4e1f3e762e89..0000000000000 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.thriftserver.server - -import scala.collection.JavaConversions._ -import scala.collection.mutable.ArrayBuffer -import scala.math.{random, round} - -import java.sql.Timestamp -import java.util.{Map => JMap} - -import org.apache.hadoop.hive.common.`type`.HiveDecimal -import org.apache.hadoop.hive.metastore.api.FieldSchema -import org.apache.hive.service.cli._ -import org.apache.hive.service.cli.operation.{ExecuteStatementOperation, Operation, OperationManager} -import org.apache.hive.service.cli.session.HiveSession - -import org.apache.spark.sql.catalyst.types._ -import org.apache.spark.sql.hive.thriftserver.ReflectionUtils -import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes} -import org.apache.spark.sql.{Logging, SchemaRDD, Row => SparkRow} - -/** - * Executes queries using Spark SQL, and maintains a list of handles to active queries. - */ -class SparkSQLOperationManager(hiveContext: HiveContext) extends OperationManager with Logging { - val handleToOperation = ReflectionUtils - .getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation") - - override def newExecuteStatementOperation( - parentSession: HiveSession, - statement: String, - confOverlay: JMap[String, String], - async: Boolean): ExecuteStatementOperation = synchronized { - - val operation = new ExecuteStatementOperation(parentSession, statement, confOverlay) { - private var result: SchemaRDD = _ - private var iter: Iterator[SparkRow] = _ - private var dataTypes: Array[DataType] = _ - - def close(): Unit = { - // RDDs will be cleaned automatically upon garbage collection. - logger.debug("CLOSING") - } - - def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = { - if (!iter.hasNext) { - new RowSet() - } else { - val maxRows = maxRowsL.toInt // Do you really want a row batch larger than Int Max? No. - var curRow = 0 - var rowSet = new ArrayBuffer[Row](maxRows) - - while (curRow < maxRows && iter.hasNext) { - val sparkRow = iter.next() - val row = new Row() - var curCol = 0 - - while (curCol < sparkRow.length) { - dataTypes(curCol) match { - case StringType => - row.addString(sparkRow(curCol).asInstanceOf[String]) - case IntegerType => - row.addColumnValue(ColumnValue.intValue(sparkRow.getInt(curCol))) - case BooleanType => - row.addColumnValue(ColumnValue.booleanValue(sparkRow.getBoolean(curCol))) - case DoubleType => - row.addColumnValue(ColumnValue.doubleValue(sparkRow.getDouble(curCol))) - case FloatType => - row.addColumnValue(ColumnValue.floatValue(sparkRow.getFloat(curCol))) - case DecimalType => - val hiveDecimal = sparkRow.get(curCol).asInstanceOf[BigDecimal].bigDecimal - row.addColumnValue(ColumnValue.stringValue(new HiveDecimal(hiveDecimal))) - case LongType => - row.addColumnValue(ColumnValue.longValue(sparkRow.getLong(curCol))) - case ByteType => - row.addColumnValue(ColumnValue.byteValue(sparkRow.getByte(curCol))) - case ShortType => - row.addColumnValue(ColumnValue.intValue(sparkRow.getShort(curCol))) - case TimestampType => - row.addColumnValue( - ColumnValue.timestampValue(sparkRow.get(curCol).asInstanceOf[Timestamp])) - case BinaryType | _: ArrayType | _: StructType | _: MapType => - val hiveString = result - .queryExecution - .asInstanceOf[HiveContext#QueryExecution] - .toHiveString((sparkRow.get(curCol), dataTypes(curCol))) - row.addColumnValue(ColumnValue.stringValue(hiveString)) - } - curCol += 1 - } - rowSet += row - curRow += 1 - } - new RowSet(rowSet, 0) - } - } - - def getResultSetSchema: TableSchema = { - logger.warn(s"Result Schema: ${result.queryExecution.analyzed.output}") - if (result.queryExecution.analyzed.output.size == 0) { - new TableSchema(new FieldSchema("Result", "string", "") :: Nil) - } else { - val schema = result.queryExecution.analyzed.output.map { attr => - new FieldSchema(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), "") - } - new TableSchema(schema) - } - } - - def run(): Unit = { - logger.info(s"Running query '$statement'") - setState(OperationState.RUNNING) - try { - result = hiveContext.hql(statement) - logger.debug(result.queryExecution.toString()) - val groupId = round(random * 1000000).toString - hiveContext.sparkContext.setJobGroup(groupId, statement) - iter = result.queryExecution.toRdd.toLocalIterator - dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray - setHasResultSet(true) - } catch { - // Actually do need to catch Throwable as some failures don't inherit from Exception and - // HiveServer will silently swallow them. - case e: Throwable => - logger.error("Error executing query:",e) - throw new HiveSQLException(e.toString) - } - setState(OperationState.FINISHED) - } - } - - handleToOperation.put(operation.getHandle, operation) - operation - } -} diff --git a/sql/hive-thriftserver/src/test/resources/data/files/small_kv.txt b/sql/hive-thriftserver/src/test/resources/data/files/small_kv.txt deleted file mode 100644 index 850f8014b6f05..0000000000000 --- a/sql/hive-thriftserver/src/test/resources/data/files/small_kv.txt +++ /dev/null @@ -1,5 +0,0 @@ -238val_238 -86val_86 -311val_311 -27val_27 -165val_165 diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala deleted file mode 100644 index b90670a796b81..0000000000000 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.thriftserver - -import java.io.{BufferedReader, InputStreamReader, PrintWriter} - -import org.scalatest.{BeforeAndAfterAll, FunSuite} - -import org.apache.spark.sql.hive.test.TestHive - -class CliSuite extends FunSuite with BeforeAndAfterAll with TestUtils { - val WAREHOUSE_PATH = TestUtils.getWarehousePath("cli") - val METASTORE_PATH = TestUtils.getMetastorePath("cli") - - override def beforeAll() { - val pb = new ProcessBuilder( - "../../bin/spark-sql", - "--master", - "local", - "--hiveconf", - s"javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=$METASTORE_PATH;create=true", - "--hiveconf", - "hive.metastore.warehouse.dir=" + WAREHOUSE_PATH) - - process = pb.start() - outputWriter = new PrintWriter(process.getOutputStream, true) - inputReader = new BufferedReader(new InputStreamReader(process.getInputStream)) - errorReader = new BufferedReader(new InputStreamReader(process.getErrorStream)) - waitForOutput(inputReader, "spark-sql>") - } - - override def afterAll() { - process.destroy() - process.waitFor() - } - - test("simple commands") { - val dataFilePath = getDataFile("data/files/small_kv.txt") - executeQuery("create table hive_test1(key int, val string);") - executeQuery("load data local inpath '" + dataFilePath+ "' overwrite into table hive_test1;") - executeQuery("cache table hive_test1", "Time taken") - } -} diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala deleted file mode 100644 index 59f4952b78bc6..0000000000000 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.thriftserver - -import scala.collection.JavaConversions._ -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent._ - -import java.io.{BufferedReader, InputStreamReader} -import java.sql.{Connection, DriverManager, Statement} - -import org.scalatest.{BeforeAndAfterAll, FunSuite} - -import org.apache.spark.sql.Logging -import org.apache.spark.sql.catalyst.util.getTempFilePath - -/** - * Test for the HiveThriftServer2 using JDBC. - */ -class HiveThriftServer2Suite extends FunSuite with BeforeAndAfterAll with TestUtils with Logging { - - val WAREHOUSE_PATH = getTempFilePath("warehouse") - val METASTORE_PATH = getTempFilePath("metastore") - - val DRIVER_NAME = "org.apache.hive.jdbc.HiveDriver" - val TABLE = "test" - // use a different port, than the hive standard 10000, - // for tests to avoid issues with the port being taken on some machines - val PORT = "10000" - - // If verbose is true, the test program will print all outputs coming from the Hive Thrift server. - val VERBOSE = Option(System.getenv("SPARK_SQL_TEST_VERBOSE")).getOrElse("false").toBoolean - - Class.forName(DRIVER_NAME) - - override def beforeAll() { launchServer() } - - override def afterAll() { stopServer() } - - private def launchServer(args: Seq[String] = Seq.empty) { - // Forking a new process to start the Hive Thrift server. The reason to do this is it is - // hard to clean up Hive resources entirely, so we just start a new process and kill - // that process for cleanup. - val defaultArgs = Seq( - "../../sbin/start-thriftserver.sh", - "--master local", - "--hiveconf", - "hive.root.logger=INFO,console", - "--hiveconf", - s"javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=$METASTORE_PATH;create=true", - "--hiveconf", - s"hive.metastore.warehouse.dir=$WAREHOUSE_PATH") - val pb = new ProcessBuilder(defaultArgs ++ args) - process = pb.start() - inputReader = new BufferedReader(new InputStreamReader(process.getInputStream)) - errorReader = new BufferedReader(new InputStreamReader(process.getErrorStream)) - waitForOutput(inputReader, "ThriftBinaryCLIService listening on") - - // Spawn a thread to read the output from the forked process. - // Note that this is necessary since in some configurations, log4j could be blocked - // if its output to stderr are not read, and eventually blocking the entire test suite. - future { - while (true) { - val stdout = readFrom(inputReader) - val stderr = readFrom(errorReader) - if (VERBOSE && stdout.length > 0) { - println(stdout) - } - if (VERBOSE && stderr.length > 0) { - println(stderr) - } - Thread.sleep(50) - } - } - } - - private def stopServer() { - process.destroy() - process.waitFor() - } - - test("test query execution against a Hive Thrift server") { - Thread.sleep(5 * 1000) - val dataFilePath = getDataFile("data/files/small_kv.txt") - val stmt = createStatement() - stmt.execute("DROP TABLE IF EXISTS test") - stmt.execute("DROP TABLE IF EXISTS test_cached") - stmt.execute("CREATE TABLE test(key int, val string)") - stmt.execute(s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test") - stmt.execute("CREATE TABLE test_cached as select * from test limit 4") - stmt.execute("CACHE TABLE test_cached") - - var rs = stmt.executeQuery("select count(*) from test") - rs.next() - assert(rs.getInt(1) === 5) - - rs = stmt.executeQuery("select count(*) from test_cached") - rs.next() - assert(rs.getInt(1) === 4) - - stmt.close() - } - - def getConnection: Connection = { - val connectURI = s"jdbc:hive2://localhost:$PORT/" - DriverManager.getConnection(connectURI, System.getProperty("user.name"), "") - } - - def createStatement(): Statement = getConnection.createStatement() -} diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/TestUtils.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/TestUtils.scala deleted file mode 100644 index bb2242618fbef..0000000000000 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/TestUtils.scala +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.thriftserver - -import java.io.{BufferedReader, PrintWriter} -import java.text.SimpleDateFormat -import java.util.Date - -import org.apache.hadoop.hive.common.LogUtils -import org.apache.hadoop.hive.common.LogUtils.LogInitializationException - -object TestUtils { - val timestamp = new SimpleDateFormat("yyyyMMdd-HHmmss") - - def getWarehousePath(prefix: String): String = { - System.getProperty("user.dir") + "/test_warehouses/" + prefix + "-warehouse-" + - timestamp.format(new Date) - } - - def getMetastorePath(prefix: String): String = { - System.getProperty("user.dir") + "/test_warehouses/" + prefix + "-metastore-" + - timestamp.format(new Date) - } - - // Dummy function for initialize the log4j properties. - def init() { } - - // initialize log4j - try { - LogUtils.initHiveLog4j() - } catch { - case e: LogInitializationException => // Ignore the error. - } -} - -trait TestUtils { - var process : Process = null - var outputWriter : PrintWriter = null - var inputReader : BufferedReader = null - var errorReader : BufferedReader = null - - def executeQuery( - cmd: String, outputMessage: String = "OK", timeout: Long = 15000): String = { - println("Executing: " + cmd + ", expecting output: " + outputMessage) - outputWriter.write(cmd + "\n") - outputWriter.flush() - waitForQuery(timeout, outputMessage) - } - - protected def waitForQuery(timeout: Long, message: String): String = { - if (waitForOutput(errorReader, message, timeout)) { - Thread.sleep(500) - readOutput() - } else { - assert(false, "Didn't find \"" + message + "\" in the output:\n" + readOutput()) - null - } - } - - // Wait for the specified str to appear in the output. - protected def waitForOutput( - reader: BufferedReader, str: String, timeout: Long = 10000): Boolean = { - val startTime = System.currentTimeMillis - var out = "" - while (!out.contains(str) && System.currentTimeMillis < (startTime + timeout)) { - out += readFrom(reader) - } - out.contains(str) - } - - // Read stdout output and filter out garbage collection messages. - protected def readOutput(): String = { - val output = readFrom(inputReader) - // Remove GC Messages - val filteredOutput = output.lines.filterNot(x => x.contains("[GC") || x.contains("[Full GC")) - .mkString("\n") - filteredOutput - } - - protected def readFrom(reader: BufferedReader): String = { - var out = "" - var c = 0 - while (reader.ready) { - c = reader.read() - out += c.asInstanceOf[Char] - } - out - } - - protected def getDataFile(name: String) = { - Thread.currentThread().getContextClassLoader.getResource(name) - } -} diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml index 93d00f7c37c9b..1699ffe06ce15 100644 --- a/sql/hive/pom.xml +++ b/sql/hive/pom.xml @@ -32,7 +32,7 @@ Spark Project Hive http://spark.apache.org/ - hive + hive diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 84d43eaeea51d..201c85f3d501e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -255,7 +255,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { Seq(StringType, IntegerType, LongType, DoubleType, FloatType, BooleanType, ByteType, ShortType, DecimalType, TimestampType, BinaryType) - protected[sql] def toHiveString(a: (Any, DataType)): String = a match { + protected def toHiveString(a: (Any, DataType)): String = a match { case (struct: Row, StructType(fields)) => struct.zip(fields).map { case (v, t) => s""""${t.name}":${toHiveStructString(v, t.dataType)}""" diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 8489f2a34e63c..6f36a4f8cb905 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -416,10 +416,10 @@ class HiveQuerySuite extends HiveComparisonTest { hql(s"set $testKey=$testVal") assert(get(testKey, testVal + "_") == testVal) - hql("set some.property=20") - assert(get("some.property", "0") == "20") - hql("set some.property = 40") - assert(get("some.property", "0") == "40") + hql("set mapred.reduce.tasks=20") + assert(get("mapred.reduce.tasks", "0") == "20") + hql("set mapred.reduce.tasks = 40") + assert(get("mapred.reduce.tasks", "0") == "40") hql(s"set $testKey=$testVal") assert(get(testKey, "0") == testVal) @@ -433,61 +433,63 @@ class HiveQuerySuite extends HiveComparisonTest { val testKey = "spark.sql.key.usedfortestonly" val testVal = "test.val.0" val nonexistentKey = "nonexistent" + def collectResults(rdd: SchemaRDD): Set[(String, String)] = + rdd.collect().map { case Row(key: String, value: String) => key -> value }.toSet clear() // "set" itself returns all config variables currently specified in SQLConf. assert(hql("SET").collect().size == 0) - assertResult(Array(s"$testKey=$testVal")) { - hql(s"SET $testKey=$testVal").collect().map(_.getString(0)) + assertResult(Set(testKey -> testVal)) { + collectResults(hql(s"SET $testKey=$testVal")) } assert(hiveconf.get(testKey, "") == testVal) - assertResult(Array(s"$testKey=$testVal")) { - hql(s"SET $testKey=$testVal").collect().map(_.getString(0)) + assertResult(Set(testKey -> testVal)) { + collectResults(hql("SET")) } hql(s"SET ${testKey + testKey}=${testVal + testVal}") assert(hiveconf.get(testKey + testKey, "") == testVal + testVal) - assertResult(Array(s"$testKey=$testVal", s"${testKey + testKey}=${testVal + testVal}")) { - hql(s"SET").collect().map(_.getString(0)) + assertResult(Set(testKey -> testVal, (testKey + testKey) -> (testVal + testVal))) { + collectResults(hql("SET")) } // "set key" - assertResult(Array(s"$testKey=$testVal")) { - hql(s"SET $testKey").collect().map(_.getString(0)) + assertResult(Set(testKey -> testVal)) { + collectResults(hql(s"SET $testKey")) } - assertResult(Array(s"$nonexistentKey=")) { - hql(s"SET $nonexistentKey").collect().map(_.getString(0)) + assertResult(Set(nonexistentKey -> "")) { + collectResults(hql(s"SET $nonexistentKey")) } // Assert that sql() should have the same effects as hql() by repeating the above using sql(). clear() assert(sql("SET").collect().size == 0) - assertResult(Array(s"$testKey=$testVal")) { - sql(s"SET $testKey=$testVal").collect().map(_.getString(0)) + assertResult(Set(testKey -> testVal)) { + collectResults(sql(s"SET $testKey=$testVal")) } assert(hiveconf.get(testKey, "") == testVal) - assertResult(Array(s"$testKey=$testVal")) { - sql("SET").collect().map(_.getString(0)) + assertResult(Set(testKey -> testVal)) { + collectResults(sql("SET")) } sql(s"SET ${testKey + testKey}=${testVal + testVal}") assert(hiveconf.get(testKey + testKey, "") == testVal + testVal) - assertResult(Array(s"$testKey=$testVal", s"${testKey + testKey}=${testVal + testVal}")) { - sql("SET").collect().map(_.getString(0)) + assertResult(Set(testKey -> testVal, (testKey + testKey) -> (testVal + testVal))) { + collectResults(sql("SET")) } - assertResult(Array(s"$testKey=$testVal")) { - sql(s"SET $testKey").collect().map(_.getString(0)) + assertResult(Set(testKey -> testVal)) { + collectResults(sql(s"SET $testKey")) } - assertResult(Array(s"$nonexistentKey=")) { - sql(s"SET $nonexistentKey").collect().map(_.getString(0)) + assertResult(Set(nonexistentKey -> "")) { + collectResults(sql(s"SET $nonexistentKey")) } clear() diff --git a/streaming/pom.xml b/streaming/pom.xml index b99f306b8f2cc..f60697ce745b7 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -28,7 +28,7 @@ org.apache.spark spark-streaming_2.10 - streaming + streaming jar Spark Project Streaming diff --git a/tools/pom.xml b/tools/pom.xml index 97abb6b2b63e0..c0ee8faa7a615 100644 --- a/tools/pom.xml +++ b/tools/pom.xml @@ -27,7 +27,7 @@ org.apache.spark spark-tools_2.10 - tools + tools jar Spark Project Tools diff --git a/yarn/alpha/pom.xml b/yarn/alpha/pom.xml index 51744ece0412d..5b13a1f002d6e 100644 --- a/yarn/alpha/pom.xml +++ b/yarn/alpha/pom.xml @@ -24,7 +24,7 @@ ../pom.xml - yarn-alpha + yarn-alpha org.apache.spark diff --git a/yarn/pom.xml b/yarn/pom.xml index 3faaf053634d6..efb473aa1b261 100644 --- a/yarn/pom.xml +++ b/yarn/pom.xml @@ -29,7 +29,7 @@ pom Spark Project YARN Parent POM - yarn + yarn diff --git a/yarn/stable/pom.xml b/yarn/stable/pom.xml index b6c8456d06684..ceaf9f9d71001 100644 --- a/yarn/stable/pom.xml +++ b/yarn/stable/pom.xml @@ -24,7 +24,7 @@ ../pom.xml - yarn-stable + yarn-stable org.apache.spark