Skip to content

Commit

Permalink
Addressed all comments by @pwendell
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed Jul 24, 2014
1 parent 7755062 commit 061880f
Show file tree
Hide file tree
Showing 16 changed files with 73 additions and 128 deletions.
5 changes: 5 additions & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,11 @@
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>hive-thriftserver</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
Expand Down
54 changes: 3 additions & 51 deletions bin/spark-sql
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,8 @@
# limitations under the License.
#

SCALA_VERSION=2.10

cygwin=false
case "`uname`" in
CYGWIN*) cygwin=true;;
esac

# Enter posix mode for bash
set -o posix
#
# Shell script for starting the Spark SQL CLI

# Figure out where Spark is installed
FWDIR="$(cd `dirname $0`/..; pwd)"
Expand All @@ -36,46 +29,5 @@ if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
exit 0
fi

ASSEMBLY_DIR="$FWDIR/assembly/target/scala-$SCALA_VERSION"

if [ -n "$JAVA_HOME" ]; then
JAR_CMD="$JAVA_HOME/bin/jar"
else
JAR_CMD="jar"
fi

# Use spark-assembly jar from either RELEASE or assembly directory
if [ -f "$FWDIR/RELEASE" ]; then
assembly_folder="$FWDIR"/lib
else
assembly_folder="$ASSEMBLY_DIR"
fi

num_jars=$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*\.jar" | wc -l)
if [ "$num_jars" -eq "0" ]; then
echo "Failed to find Spark assembly in $assembly_folder"
echo "You need to build Spark before running this program."
exit 1
fi
if [ "$num_jars" -gt "1" ]; then
jars_list=$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*.jar")
echo "Found multiple Spark assembly jars in $assembly_folder:"
echo "$jars_list"
echo "Please remove all but one jar."
exit 1
fi

ASSEMBLY_JAR=$(ls "$assembly_folder"/spark-assembly*hadoop*.jar 2>/dev/null)

# Verify that versions of java used to build the jars and run Spark are compatible
jar_error_check=$("$JAR_CMD" -tf "$ASSEMBLY_JAR" nonexistent/class/path 2>&1)
if [[ "$jar_error_check" =~ "invalid CEN header" ]]; then
echo "Loading Spark jar with '$JAR_CMD' failed. " 1>&2
echo "This is likely because Spark was compiled with Java 7 and run " 1>&2
echo "with Java 6. (see SPARK-1703). Please use Java 7 to run Spark " 1>&2
echo "or build Spark with Java 6." 1>&2
exit 1
fi

CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
exec "$FWDIR"/bin/spark-submit --class $CLASS $@ $ASSEMBLY_JAR
exec "$FWDIR"/bin/spark-submit --class $CLASS $@ spark-internal
14 changes: 12 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ object SparkSubmit {
private val CLUSTER = 2
private val ALL_DEPLOY_MODES = CLIENT | CLUSTER

// A special jar name that indicates the class being run is inside of Spark itself, and therefore
// no user jar is needed.
private val SPARK_INTERNAL = "spark-internal"

// Special primary resource names that represent shells rather than application jars.
private val SPARK_SHELL = "spark-shell"
private val PYSPARK_SHELL = "pyspark-shell"
Expand Down Expand Up @@ -257,7 +261,9 @@ object SparkSubmit {
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (clusterManager == YARN && deployMode == CLUSTER) {
childMainClass = "org.apache.spark.deploy.yarn.Client"
childArgs += ("--jar", args.primaryResource)
if (args.primaryResource != SPARK_INTERNAL) {
childArgs += ("--jar", args.primaryResource)
}
childArgs += ("--class", args.mainClass)
if (args.childArgs != null) {
args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
Expand Down Expand Up @@ -332,7 +338,7 @@ object SparkSubmit {
* Return whether the given primary resource represents a user jar.
*/
private def isUserJar(primaryResource: String): Boolean = {
!isShell(primaryResource) && !isPython(primaryResource)
!isShell(primaryResource) && !isPython(primaryResource) && !isInternal(primaryResource)
}

/**
Expand All @@ -349,6 +355,10 @@ object SparkSubmit {
primaryResource.endsWith(".py") || primaryResource == PYSPARK_SHELL
}

private[spark] def isInternal(primaryResource: String): Boolean = {
primaryResource == SPARK_INTERNAL
}

/**
* Merge a sequence of comma-separated file lists, some of which may be null to indicate
* no files, into a single comma-separated string.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
SparkSubmit.printErrorAndExit(errMessage)
case v =>
primaryResource =
if (!SparkSubmit.isShell(v)) {
if (!SparkSubmit.isShell(v) && !SparkSubmit.isInternal(v)) {
Utils.resolveURI(v).toString
} else {
v
Expand Down
8 changes: 5 additions & 3 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ You may also use the beeline script comes with Hive.

#### Reducer number

In Shark, default reducer number is 1, and can be tuned by property `mapred.reduce.tasks`. In Spark SQL, reducer number is default to 200, and can be customized by the `spark.sql.shuffle.partitions` property:
In Shark, default reducer number is 1 and is controlled by the property `mapred.reduce.tasks`. Spark SQL deprecates this property by a new property `spark.sql.shuffle.partitions`, whose default value is 200. Users may customize this property via `SET`:

```
SET spark.sql.shuffle.partitions=10;
Expand All @@ -615,6 +615,8 @@ GROUP BY page ORDER BY c DESC LIMIT 10;

You may also put this property in `hive-site.xml` to override the default value.

For now, the `mapred.reduce.tasks` property is still recognized, and is converted to `spark.sql.shuffle.partitions` automatically.

#### Caching

The `shark.cache` table property no longer exists, and tables whose name end with `_cached` are no longer automcatically cached. Instead, we provide `CACHE TABLE` and `UNCACHE TABLE` statements to let user control table caching explicitly:
Expand Down Expand Up @@ -697,7 +699,7 @@ Spark SQL supports the vast majority of Hive features, such as:

#### Unsupported Hive Functionality

Below is a list of Hive features that we don't support yet. Most of these features are rarely used in Hive deployments.
Below is a list of Hive features that we don't support yet. Most of these features are rarely used in Hive deployments.

**Major Hive Features**

Expand All @@ -723,7 +725,7 @@ A handful of Hive optimizations are not yet included in Spark. Some of these (su

* Block level bitmap indexes and virtual columns (used to build indexes)
* Automatically convert a join to map join: For joining a large table with multiple small tables, Hive automatically converts the join into a map join. We are adding this auto conversion in the next release.
* Automatically determine the number of reducers for joins and groupbys: Currently in Spark SQL, you need to control the degree of parallelism post-shuffle using "set mapred.reduce.tasks=[num_tasks];". We are going to add auto-setting of parallelism in the next release.
* Automatically determine the number of reducers for joins and groupbys: Currently in Spark SQL, you need to control the degree of parallelism post-shuffle using "SET spark.sql.shuffle.partitions=[num_tasks];". We are going to add auto-setting of parallelism in the next release.
* Meta-data only query: For queries that can be answered by using only meta data, Spark SQL still launches tasks to compute the result.
* Skew data flag: Spark SQL does not follow the skew data flags in Hive.
* `STREAMTABLE` hint in join: Spark SQL does not follow the `STREAMTABLE` hint.
Expand Down
56 changes: 4 additions & 52 deletions sbin/start-thriftserver.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,65 +20,17 @@
#
# Shell script for starting the Spark SQL Thrift server

SCALA_VERSION=2.10

cygwin=false
case "`uname`" in
CYGWIN*) cygwin=true;;
esac

# Enter posix mode for bash
set -o posix

# Figure out where Spark is installed
FWDIR="$(cd `dirname $0`/..; pwd)"

if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
echo "Usage: ./sbin/start-thriftserver [options]"
$FWDIR/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
exit 0
fi

# Figure out where Spark is installed
FWDIR="$(cd `dirname $0`/..; pwd)"

ASSEMBLY_DIR="$FWDIR/assembly/target/scala-$SCALA_VERSION"

if [ -n "$JAVA_HOME" ]; then
JAR_CMD="$JAVA_HOME/bin/jar"
else
JAR_CMD="jar"
fi

# Use spark-assembly jar from either RELEASE or assembly directory
if [ -f "$FWDIR/RELEASE" ]; then
assembly_folder="$FWDIR"/lib
else
assembly_folder="$ASSEMBLY_DIR"
fi

num_jars=$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*\.jar" | wc -l)
if [ "$num_jars" -eq "0" ]; then
echo "Failed to find Spark assembly in $assembly_folder"
echo "You need to build Spark before running this program."
exit 1
fi
if [ "$num_jars" -gt "1" ]; then
jars_list=$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*.jar")
echo "Found multiple Spark assembly jars in $assembly_folder:"
echo "$jars_list"
echo "Please remove all but one jar."
exit 1
fi

ASSEMBLY_JAR=$(ls "$assembly_folder"/spark-assembly*hadoop*.jar 2>/dev/null)

# Verify that versions of java used to build the jars and run Spark are compatible
jar_error_check=$("$JAR_CMD" -tf "$ASSEMBLY_JAR" nonexistent/class/path 2>&1)
if [[ "$jar_error_check" =~ "invalid CEN header" ]]; then
echo "Loading Spark jar with '$JAR_CMD' failed. " 1>&2
echo "This is likely because Spark was compiled with Java 7 and run " 1>&2
echo "with Java 6. (see SPARK-1703). Please use Java 7 to run Spark " 1>&2
echo "or build Spark with Java 6." 1>&2
exit 1
fi

CLASS="org.apache.spark.sql.hive.thriftserver.HiveThriftServer2"
exec "$FWDIR"/bin/spark-submit --class $CLASS $@ $ASSEMBLY_JAR
exec "$FWDIR"/bin/spark-submit --class $CLASS $@ spark-internal
20 changes: 15 additions & 5 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,24 @@ import scala.collection.JavaConverters._
* SQLConf is thread-safe (internally synchronized so safe to be used in multiple threads).
*/
trait SQLConf {
import SQLConf._

/** ************************ Spark SQL Params/Hints ******************* */
// TODO: refactor so that these hints accessors don't pollute the name space of SQLContext?

/** Number of partitions to use for shuffle operators. */
private[spark] def numShufflePartitions: Int = get("spark.sql.shuffle.partitions", "200").toInt
private[spark] def numShufflePartitions: Int = get(SHUFFLE_PARTITIONS, "200").toInt

/**
* Upper bound on the sizes (in bytes) of the tables qualified for the auto conversion to
* a broadcast value during the physical executions of join operations. Setting this to 0
* effectively disables auto conversion.
* Hive setting: hive.auto.convert.join.noconditionaltask.size.
*/
private[spark] def autoConvertJoinSize: Int =
get("spark.sql.auto.convert.join.size", "10000").toInt
private[spark] def autoConvertJoinSize: Int = get(AUTO_CONVERT_JOIN_SIZE, "10000").toInt

/** A comma-separated list of table names marked to be broadcasted during joins. */
private[spark] def joinBroadcastTables: String = get("spark.sql.join.broadcastTables", "")
private[spark] def joinBroadcastTables: String = get(JOIN_BROADCAST_TABLES, "")

/** ********************** SQLConf functionality methods ************ */

Expand All @@ -61,7 +61,7 @@ trait SQLConf {

def set(key: String, value: String): Unit = {
require(key != null, "key cannot be null")
require(value != null, s"value cannot be null for ${key}")
require(value != null, s"value cannot be null for $key")
settings.put(key, value)
}

Expand Down Expand Up @@ -90,3 +90,13 @@ trait SQLConf {
}

}

object SQLConf {
val AUTO_CONVERT_JOIN_SIZE = "spark.sql.auto.convert.join.size"
val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
val JOIN_BROADCAST_TABLES = "spark.sql.join.broadcastTables"

object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@

package org.apache.spark.sql.execution

import org.apache.spark.Logging
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRow}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.{Row, SQLConf, SQLContext}

trait Command {
/**
Expand All @@ -44,13 +45,20 @@ trait Command {
case class SetCommand(
key: Option[String], value: Option[String], output: Seq[Attribute])(
@transient context: SQLContext)
extends LeafNode with Command {
extends LeafNode with Command with Logging {

override protected[sql] lazy val sideEffectResult: Seq[String] = (key, value) match {
// Set value for key k.
case (Some(k), Some(v)) =>
context.set(k, v)
Array(s"$k=$v")
if (k == SQLConf.Deprecated.MAPRED_REDUCE_TASKS) {
logWarning(s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS} instead.")
context.set(SQLConf.SHUFFLE_PARTITIONS, v)
Array(s"${SQLConf.SHUFFLE_PARTITIONS}=$v")
} else {
context.set(k, v)
Array(s"$k=$v")
}

// Query the value bound to key k.
case (Some(k), _) =>
Expand Down
7 changes: 7 additions & 0 deletions sql/hive-thriftserver/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
* The main entry point for the Spark SQL port of HiveServer2. Starts up a `SparkSQLContext` and a
* `HiveThriftServer2` thrift server.
*/
object HiveThriftServer2 extends Logging {
private[hive] object HiveThriftServer2 extends Logging {
var LOG = LogFactory.getLog(classOf[HiveServer2])

def main(args: Array[String]) {
Expand Down Expand Up @@ -73,7 +73,7 @@ object HiveThriftServer2 extends Logging {
}
}

class HiveThriftServer2(hiveContext: HiveContext)
private[hive] class HiveThriftServer2(hiveContext: HiveContext)
extends HiveServer2
with ReflectedCompositeService {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.hive.thriftserver

object ReflectionUtils {
private[hive] object ReflectionUtils {
def setSuperField(obj : Object, fieldName: String, fieldValue: Object) {
setAncestorField(obj, 1, fieldName, fieldValue)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.thrift.transport.TSocket

import org.apache.spark.sql.Logging

object SparkSQLCLIDriver {
private[hive] object SparkSQLCLIDriver {
private var prompt = "spark-sql"
private var continuedPrompt = "".padTo(prompt.length, ' ')
private var transport:TSocket = _
Expand Down Expand Up @@ -240,8 +240,7 @@ object SparkSQLCLIDriver {
}
}


class SparkSQLCLIDriver extends CliDriver with Logging {
private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {
private val sessionState = SessionState.get().asInstanceOf[CliSessionState]

private val LOG = LogFactory.getLog("CliDriver")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.hive.service.{AbstractService, Service, ServiceException}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._

class SparkSQLCLIService(hiveContext: HiveContext)
private[hive] class SparkSQLCLIService(hiveContext: HiveContext)
extends CLIService
with ReflectedCompositeService {

Expand Down
Loading

0 comments on commit 061880f

Please sign in to comment.