Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into expr_bin
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Jun 10, 2015
2 parents c0c3197 + 57c60c5 commit 0f78682
Show file tree
Hide file tree
Showing 14 changed files with 143 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ function renderDagVizForJob(svgContainer) {
// them separately later. Note that we cannot draw them now because we need to
// put these edges in a separate container that is on top of all stage graphs.
metadata.selectAll(".incoming-edge").each(function(v) {
var edge = d3.select(this).text().split(","); // e.g. 3,4 => [3, 4]
var edge = d3.select(this).text().trim().split(","); // e.g. 3,4 => [3, 4]
crossStageEdges.push(edge);
});
});
Expand Down
26 changes: 26 additions & 0 deletions docs/hadoop-provided.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
---
layout: global
displayTitle: Using Spark's "Hadoop Free" Build
title: Using Spark's "Hadoop Free" Build
---

Spark uses Hadoop client libraries for HDFS and YARN. Starting in version Spark 1.4, the project packages "Hadoop free" builds that lets you more easily connect a single Spark binary to any Hadoop version. To use these builds, you need to modify `SPARK_DIST_CLASSPATH` to include Hadoop's package jars. The most convenient place to do this is by adding an entry in `conf/spark-env.sh`.

This page describes how to connect Spark to Hadoop for different types of distributions.

# Apache Hadoop
For Apache distributions, you can use Hadoop's 'classpath' command. For instance:

{% highlight bash %}
### in conf/spark-env.sh ###

# If 'hadoop' binary is on your PATH
export SPARK_DIST_CLASSPATH=$(hadoop classpath)

# With explicit path to 'hadoop' binary
export SPARK_DIST_CLASSPATH=$(/path/to/hadoop/bin/hadoop classpath)

# Passing a Hadoop configuration directory
export SPARK_DIST_CLASSPATH=$(hadoop classpath --config /path/to/configs)

{% endhighlight %}
10 changes: 7 additions & 3 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@ It also supports a rich set of higher-level tools including [Spark SQL](sql-prog

# Downloading

Get Spark from the [downloads page](http://spark.apache.org/downloads.html) of the project website. This documentation is for Spark version {{site.SPARK_VERSION}}. The downloads page
contains Spark packages for many popular HDFS versions. If you'd like to build Spark from
scratch, visit [Building Spark](building-spark.html).
Get Spark from the [downloads page](http://spark.apache.org/downloads.html) of the project website. This documentation is for Spark version {{site.SPARK_VERSION}}. Spark uses Hadoop's client libraries for HDFS and YARN. Downloads are pre-packaged for a handful of popular Hadoop versions.
Users can also download a "Hadoop free" binary and run Spark with any Hadoop version
[by augmenting Spark's classpath](hadoop-provided.html).

If you'd like to build Spark from
source, visit [Building Spark](building-spark.html).


Spark runs on both Windows and UNIX-like systems (e.g. Linux, Mac OS). It's easy to run
locally on one machine --- all you need is to have `java` installed on your system `PATH`,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser {
protected val FULL = Keyword("FULL")
protected val GROUP = Keyword("GROUP")
protected val HAVING = Keyword("HAVING")
protected val IF = Keyword("IF")
protected val IN = Keyword("IN")
protected val INNER = Keyword("INNER")
protected val INSERT = Keyword("INSERT")
Expand Down Expand Up @@ -277,6 +276,7 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser {
lexical.normalizeKeyword(udfName) match {
case "sum" => SumDistinct(exprs.head)
case "count" => CountDistinct(exprs)
case _ => throw new AnalysisException(s"function $udfName does not support DISTINCT")
}
}
| APPROXIMATE ~> ident ~ ("(" ~ DISTINCT ~> expression <~ ")") ^^ { case udfName ~ exp =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ class Analyzer(
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case q: LogicalPlan =>
q transformExpressions {
case u @ UnresolvedFunction(name, children) if u.childrenResolved =>
case u @ UnresolvedFunction(name, children) =>
withPosition(u) {
registry.lookupFunction(name, children)
}
Expand Down Expand Up @@ -494,20 +494,21 @@ class Analyzer(
object UnresolvedHavingClauseAttributes extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case filter @ Filter(havingCondition, aggregate @ Aggregate(_, originalAggExprs, _))
if aggregate.resolved && containsAggregate(havingCondition) => {
if aggregate.resolved && containsAggregate(havingCondition) =>

val evaluatedCondition = Alias(havingCondition, "havingCondition")()
val aggExprsWithHaving = evaluatedCondition +: originalAggExprs

Project(aggregate.output,
Filter(evaluatedCondition.toAttribute,
aggregate.copy(aggregateExpressions = aggExprsWithHaving)))
}
}

protected def containsAggregate(condition: Expression): Boolean =
protected def containsAggregate(condition: Expression): Boolean = {
condition
.collect { case ae: AggregateExpression => ae }
.nonEmpty
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@

package org.apache.spark.sql.catalyst.analysis

import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.catalyst.EmptyConf
Expand Down Expand Up @@ -81,18 +85,18 @@ trait Catalog {
}

class SimpleCatalog(val conf: CatalystConf) extends Catalog {
val tables = new mutable.HashMap[String, LogicalPlan]()
val tables = new ConcurrentHashMap[String, LogicalPlan]

override def registerTable(
tableIdentifier: Seq[String],
plan: LogicalPlan): Unit = {
val tableIdent = processTableIdentifier(tableIdentifier)
tables += ((getDbTableName(tableIdent), plan))
tables.put(getDbTableName(tableIdent), plan)
}

override def unregisterTable(tableIdentifier: Seq[String]): Unit = {
val tableIdent = processTableIdentifier(tableIdentifier)
tables -= getDbTableName(tableIdent)
tables.remove(getDbTableName(tableIdent))
}

override def unregisterAllTables(): Unit = {
Expand All @@ -101,18 +105,18 @@ class SimpleCatalog(val conf: CatalystConf) extends Catalog {

override def tableExists(tableIdentifier: Seq[String]): Boolean = {
val tableIdent = processTableIdentifier(tableIdentifier)
tables.get(getDbTableName(tableIdent)) match {
case Some(_) => true
case None => false
}
tables.containsKey(getDbTableName(tableIdent))
}

override def lookupRelation(
tableIdentifier: Seq[String],
alias: Option[String] = None): LogicalPlan = {
val tableIdent = processTableIdentifier(tableIdentifier)
val tableFullName = getDbTableName(tableIdent)
val table = tables.getOrElse(tableFullName, sys.error(s"Table Not Found: $tableFullName"))
val table = tables.get(tableFullName)
if (table == null) {
sys.error(s"Table Not Found: $tableFullName")
}
val tableWithQualifiers = Subquery(tableIdent.last, table)

// If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are
Expand All @@ -121,9 +125,11 @@ class SimpleCatalog(val conf: CatalystConf) extends Catalog {
}

override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
tables.map {
case (name, _) => (name, true)
}.toSeq
val result = ArrayBuffer.empty[(String, Boolean)]
for (name <- tables.keySet()) {
result += ((name, true))
}
result
}

override def refreshTable(databaseName: String, tableName: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,16 @@ trait FunctionRegistry {
def lookupFunction(name: String, children: Seq[Expression]): Expression
}

trait OverrideFunctionRegistry extends FunctionRegistry {
class OverrideFunctionRegistry(underlying: FunctionRegistry) extends FunctionRegistry {

private val functionBuilders = StringKeyHashMap[FunctionBuilder](caseSensitive = false)

override def registerFunction(name: String, builder: FunctionBuilder): Unit = {
functionBuilders.put(name, builder)
}

abstract override def lookupFunction(name: String, children: Seq[Expression]): Expression = {
functionBuilders.get(name).map(_(children)).getOrElse(super.lookupFunction(name, children))
override def lookupFunction(name: String, children: Seq[Expression]): Expression = {
functionBuilders.get(name).map(_(children)).getOrElse(underlying.lookupFunction(name, children))
}
}

Expand Down Expand Up @@ -134,6 +134,12 @@ object FunctionRegistry {
expression[Sum]("sum")
)

val builtin: FunctionRegistry = {
val fr = new SimpleFunctionRegistry
expressions.foreach { case (name, builder) => fr.registerFunction(name, builder) }
fr
}

/** See usage above. */
private def expression[T <: Expression](name: String)
(implicit tag: ClassTag[T]): (String, FunctionBuilder) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ import org.apache.spark.sql.types._


/**
* For Catalyst to work correctly, concrete implementations of [[Expression]]s must be case classes
* whose constructor arguments are all Expressions types. In addition, if we want to support more
* than one constructor, define those constructors explicitly as apply methods in the companion
* object.
* If an expression wants to be exposed in the function registry (so users can call it with
* "name(arguments...)", the concrete implementation must be a case class whose constructor
* arguments are all Expressions types. In addition, if it needs to support more than one
* constructor, define those constructors explicitly as apply methods in the companion object.
*
* See [[Substring]] for an example.
*/
Expand Down
7 changes: 2 additions & 5 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,8 @@ class SQLContext(@transient val sparkContext: SparkContext)

// TODO how to handle the temp function per user session?
@transient
protected[sql] lazy val functionRegistry: FunctionRegistry = {
val fr = new SimpleFunctionRegistry
FunctionRegistry.expressions.foreach { case (name, func) => fr.registerFunction(name, func) }
fr
}
protected[sql] lazy val functionRegistry: FunctionRegistry =
new OverrideFunctionRegistry(FunctionRegistry.builtin)

@transient
protected[sql] lazy val analyzer: Analyzer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private[spark] case class PythonUDF(
def nullable: Boolean = true

override def eval(input: Row): Any = {
sys.error("PythonUDFs can not be directly evaluated.")
throw new UnsupportedOperationException("PythonUDFs can not be directly evaluated.")
}
}

Expand All @@ -71,43 +71,49 @@ private[spark] case class PythonUDF(
private[spark] object ExtractPythonUdfs extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// Skip EvaluatePython nodes.
case p: EvaluatePython => p
case plan: EvaluatePython => plan

case l: LogicalPlan =>
case plan: LogicalPlan =>
// Extract any PythonUDFs from the current operator.
val udfs = l.expressions.flatMap(_.collect { case udf: PythonUDF => udf})
val udfs = plan.expressions.flatMap(_.collect { case udf: PythonUDF => udf })
if (udfs.isEmpty) {
// If there aren't any, we are done.
l
plan
} else {
// Pick the UDF we are going to evaluate (TODO: Support evaluating multiple UDFs at a time)
// If there is more than one, we will add another evaluation operator in a subsequent pass.
val udf = udfs.head

var evaluation: EvaluatePython = null

// Rewrite the child that has the input required for the UDF
val newChildren = l.children.map { child =>
// Check to make sure that the UDF can be evaluated with only the input of this child.
// Other cases are disallowed as they are ambiguous or would require a cartisian product.
if (udf.references.subsetOf(child.outputSet)) {
evaluation = EvaluatePython(udf, child)
evaluation
} else if (udf.references.intersect(child.outputSet).nonEmpty) {
sys.error(s"Invalid PythonUDF $udf, requires attributes from more than one child.")
} else {
child
}
udfs.find(_.resolved) match {
case Some(udf) =>
var evaluation: EvaluatePython = null

// Rewrite the child that has the input required for the UDF
val newChildren = plan.children.map { child =>
// Check to make sure that the UDF can be evaluated with only the input of this child.
// Other cases are disallowed as they are ambiguous or would require a cartesian
// product.
if (udf.references.subsetOf(child.outputSet)) {
evaluation = EvaluatePython(udf, child)
evaluation
} else if (udf.references.intersect(child.outputSet).nonEmpty) {
sys.error(s"Invalid PythonUDF $udf, requires attributes from more than one child.")
} else {
child
}
}

assert(evaluation != null, "Unable to evaluate PythonUDF. Missing input attributes.")

// Trim away the new UDF value if it was only used for filtering or something.
logical.Project(
plan.output,
plan.transformExpressions {
case p: PythonUDF if p.fastEquals(udf) => evaluation.resultAttribute
}.withNewChildren(newChildren))

case None =>
// If there is no Python UDF that is resolved, skip this round.
plan
}

assert(evaluation != null, "Unable to evaluate PythonUDF. Missing input attributes.")

// Trim away the new UDF value if it was only used for filtering or something.
logical.Project(
l.output,
l.transformExpressions {
case p: PythonUDF if p.fastEquals(udf) => evaluation.resultAttribute
}.withNewChildren(newChildren))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -817,19 +817,19 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
"udf2",
"udf5",
"udf6",
"udf7",
// "udf7", turn this on after we figure out null vs nan vs infinity
"udf8",
"udf9",
"udf_10_trims",
"udf_E",
"udf_PI",
"udf_abs",
"udf_acos",
// "udf_acos", turn this on after we figure out null vs nan vs infinity
"udf_add",
"udf_array",
"udf_array_contains",
"udf_ascii",
"udf_asin",
// "udf_asin", turn this on after we figure out null vs nan vs infinity
"udf_atan",
"udf_avg",
"udf_bigint",
Expand Down Expand Up @@ -917,7 +917,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
"udf_repeat",
"udf_rlike",
"udf_round",
"udf_round_3",
// "udf_round_3", TODO: FIX THIS failed due to cast exception
"udf_rpad",
"udf_rtrim",
"udf_second",
Expand All @@ -931,7 +931,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
"udf_stddev_pop",
"udf_stddev_samp",
"udf_string",
"udf_struct",
// "udf_struct", TODO: FIX THIS and enable it.
"udf_substring",
"udf_subtract",
"udf_sum",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
// Note that HiveUDFs will be overridden by functions registered in this context.
@transient
override protected[sql] lazy val functionRegistry: FunctionRegistry =
new HiveFunctionRegistry with OverrideFunctionRegistry
new OverrideFunctionRegistry(new HiveFunctionRegistry(FunctionRegistry.builtin))

/* An analyzer that uses the Hive metastore. */
@transient
Expand Down
Loading

0 comments on commit 0f78682

Please sign in to comment.