Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
Browse files Browse the repository at this point in the history
Conflicts:
	sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
	sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
	sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
	sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
  • Loading branch information
yhuai committed Jul 30, 2014
2 parents c712fbf + 8446746 commit a6e08b4
Show file tree
Hide file tree
Showing 73 changed files with 2,268 additions and 417 deletions.
4 changes: 2 additions & 2 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.


========================================================================
For Py4J (python/lib/py4j0.7.egg and files in assembly/lib/net/sf/py4j):
For Py4J (python/lib/py4j-0.8.2.1-src.zip)
========================================================================

Copyright (c) 2009-2011, Barthelemy Dagenais All rights reserved.
Expand Down Expand Up @@ -532,7 +532,7 @@ The following components are provided under a BSD-style license. See project lin
(New BSD license) Protocol Buffer Java API (org.spark-project.protobuf:protobuf-java:2.4.1-shaded - http://code.google.com/p/protobuf)
(The BSD License) Fortran to Java ARPACK (net.sourceforge.f2j:arpack_combined_all:0.1 - http://f2j.sourceforge.net)
(The BSD License) xmlenc Library (xmlenc:xmlenc:0.52 - http://xmlenc.sourceforge.net)
(The New BSD License) Py4J (net.sf.py4j:py4j:0.8.1 - http://py4j.sourceforge.net/)
(The New BSD License) Py4J (net.sf.py4j:py4j:0.8.2.1 - http://py4j.sourceforge.net/)
(Two-clause BSD-style license) JUnit-Interface (com.novocode:junit-interface:0.10 - http://github.com/szeiger/junit-interface/)
(ISC/BSD License) jbcrypt (org.mindrot:jbcrypt:0.3m - http://www.mindrot.org/)

Expand Down
2 changes: 1 addition & 1 deletion bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ export PYSPARK_PYTHON

# Add the PySpark classes to the Python path:
export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH
export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.1-src.zip:$PYTHONPATH
export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH

# Load the PySpark shell.py script when ./pyspark is used interactively:
export OLD_PYTHONSTARTUP=$PYTHONSTARTUP
Expand Down
2 changes: 1 addition & 1 deletion bin/pyspark2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ rem Figure out which Python to use.
if [%PYSPARK_PYTHON%] == [] set PYSPARK_PYTHON=python

set PYTHONPATH=%FWDIR%python;%PYTHONPATH%
set PYTHONPATH=%FWDIR%python\lib\py4j-0.8.1-src.zip;%PYTHONPATH%
set PYTHONPATH=%FWDIR%python\lib\py4j-0.8.2.1-src.zip;%PYTHONPATH%

set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
set PYTHONSTARTUP=%FWDIR%python\pyspark\shell.py
Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@
<dependency>
<groupId>net.sf.py4j</groupId>
<artifactId>py4j</artifactId>
<version>0.8.1</version>
<version>0.8.2.1</version>
</dependency>
</dependencies>
<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ private[spark] object PythonUtils {
val pythonPath = new ArrayBuffer[String]
for (sparkHome <- sys.env.get("SPARK_HOME")) {
pythonPath += Seq(sparkHome, "python").mkString(File.separator)
pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.8.1-src.zip").mkString(File.separator)
pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.8.2.1-src.zip").mkString(File.separator)
}
pythonPath ++= SparkContext.jarOfObject(this)
pythonPath.mkString(File.pathSeparator)
Expand Down
10 changes: 10 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
<sbt.project.name>spark</sbt.project.name>
<scala.version>2.10.4</scala.version>
<scala.binary.version>2.10</scala.binary.version>
<scala.macros.version>2.0.1</scala.macros.version>
<mesos.version>0.18.1</mesos.version>
<mesos.classifier>shaded-protobuf</mesos.classifier>
<akka.group>org.spark-project.akka</akka.group>
Expand Down Expand Up @@ -825,6 +826,15 @@
<javacArg>-target</javacArg>
<javacArg>${java.version}</javacArg>
</javacArgs>
<!-- The following plugin is required to use quasiquotes in Scala 2.10 and is used
by Spark SQL for code generation. -->
<compilerPlugins>
<compilerPlugin>
<groupId>org.scalamacros</groupId>
<artifactId>paradise_${scala.version}</artifactId>
<version>${scala.macros.version}</version>
</compilerPlugin>
</compilerPlugins>
</configuration>
</plugin>
<plugin>
Expand Down
11 changes: 8 additions & 3 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ object SparkBuild extends PomBuild {
/* Enable unidoc only for the root spark project */
enable(Unidoc.settings)(spark)

/* Catalyst macro settings */
enable(Catalyst.settings)(catalyst)

/* Spark SQL Core console settings */
enable(SQL.settings)(sql)

Expand All @@ -189,10 +192,13 @@ object Flume {
lazy val settings = sbtavro.SbtAvro.avroSettings
}

object SQL {

object Catalyst {
lazy val settings = Seq(
addCompilerPlugin("org.scalamacros" % "paradise" % "2.0.1" cross CrossVersion.full))
}

object SQL {
lazy val settings = Seq(
initialCommands in console :=
"""
|import org.apache.spark.sql.catalyst.analysis._
Expand All @@ -207,7 +213,6 @@ object SQL {
|import org.apache.spark.sql.test.TestSQLContext._
|import org.apache.spark.sql.parquet.ParquetTestData""".stripMargin
)

}

object Hive {
Expand Down
Binary file removed python/lib/py4j-0.8.1-src.zip
Binary file not shown.
Binary file added python/lib/py4j-0.8.2.1-src.zip
Binary file not shown.
2 changes: 1 addition & 1 deletion sbin/spark-config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,4 @@ export SPARK_HOME=${SPARK_PREFIX}
export SPARK_CONF_DIR="$SPARK_HOME/conf"
# Add the PySpark classes to the PYTHONPATH:
export PYTHONPATH=$SPARK_HOME/python:$PYTHONPATH
export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.1-src.zip:$PYTHONPATH
export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH
2 changes: 1 addition & 1 deletion sbin/spark-executor
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
FWDIR="$(cd `dirname $0`/..; pwd)"

export PYTHONPATH=$FWDIR/python:$PYTHONPATH
export PYTHONPATH=$FWDIR/python/lib/py4j-0.8.1-src.zip:$PYTHONPATH
export PYTHONPATH=$FWDIR/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH

echo "Running spark-executor with framework dir = $FWDIR"
exec $FWDIR/bin/spark-class org.apache.spark.executor.MesosExecutorBackend
9 changes: 9 additions & 0 deletions sql/catalyst/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,19 @@
</properties>

<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
</dependency>
<dependency>
<groupId>org.scalamacros</groupId>
<artifactId>quasiquotes_${scala.binary.version}</artifactId>
<version>${scala.macros.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool
object UnresolvedHavingClauseAttributes extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case filter @ Filter(havingCondition, aggregate @ Aggregate(_, originalAggExprs, _))
if !filter.resolved && aggregate.resolved && containsAggregate(havingCondition) => {
if aggregate.resolved && containsAggregate(havingCondition) => {
val evaluatedCondition = Alias(havingCondition, "havingCondition")()
val aggExprsWithHaving = evaluatedCondition +: originalAggExprs

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.{errors, trees}
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.BaseRelation
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
import org.apache.spark.sql.catalyst.trees.TreeNode

/**
Expand All @@ -36,7 +36,7 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str
case class UnresolvedRelation(
databaseName: Option[String],
tableName: String,
alias: Option[String] = None) extends BaseRelation {
alias: Option[String] = None) extends LeafNode {
override def output = Nil
override lazy val resolved = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ package object dsl {
// Protobuf terminology
def required = a.withNullability(false)

def at(ordinal: Int) = BoundReference(ordinal, a)
def at(ordinal: Int) = BoundReference(ordinal, a.dataType, a.nullable)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,71 +18,36 @@
package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.catalyst.Logging
import org.apache.spark.sql.catalyst.trees
import org.apache.spark.sql.catalyst.errors.attachTree
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.trees

/**
* A bound reference points to a specific slot in the input tuple, allowing the actual value
* to be retrieved more efficiently. However, since operations like column pruning can change
* the layout of intermediate tuples, BindReferences should be run after all such transformations.
*/
case class BoundReference(ordinal: Int, baseReference: Attribute)
extends Attribute with trees.LeafNode[Expression] {
case class BoundReference(ordinal: Int, dataType: DataType, nullable: Boolean)
extends Expression with trees.LeafNode[Expression] {

type EvaluatedType = Any

override def nullable = baseReference.nullable
override def dataType = baseReference.dataType
override def exprId = baseReference.exprId
override def qualifiers = baseReference.qualifiers
override def name = baseReference.name

override def newInstance = BoundReference(ordinal, baseReference.newInstance)
override def withNullability(newNullability: Boolean) =
BoundReference(ordinal, baseReference.withNullability(newNullability))
override def withQualifiers(newQualifiers: Seq[String]) =
BoundReference(ordinal, baseReference.withQualifiers(newQualifiers))
override def references = Set.empty

override def toString = s"$baseReference:$ordinal"
override def toString = s"input[$ordinal]"

override def eval(input: Row): Any = input(ordinal)
}

/**
* Used to denote operators that do their own binding of attributes internally.
*/
trait NoBind { self: trees.TreeNode[_] => }

class BindReferences[TreeNode <: QueryPlan[TreeNode]] extends Rule[TreeNode] {
import BindReferences._

def apply(plan: TreeNode): TreeNode = {
plan.transform {
case n: NoBind => n.asInstanceOf[TreeNode]
case leafNode if leafNode.children.isEmpty => leafNode
case unaryNode if unaryNode.children.size == 1 => unaryNode.transformExpressions { case e =>
bindReference(e, unaryNode.children.head.output)
}
}
}
}

object BindReferences extends Logging {
def bindReference[A <: Expression](expression: A, input: Seq[Attribute]): A = {
expression.transform { case a: AttributeReference =>
attachTree(a, "Binding attribute") {
val ordinal = input.indexWhere(_.exprId == a.exprId)
if (ordinal == -1) {
// TODO: This fallback is required because some operators (such as ScriptTransform)
// produce new attributes that can't be bound. Likely the right thing to do is remove
// this rule and require all operators to explicitly bind to the input schema that
// they specify.
logger.debug(s"Couldn't find $a in ${input.mkString("[", ",", "]")}")
a
sys.error(s"Couldn't find $a in ${input.mkString("[", ",", "]")}")
} else {
BoundReference(ordinal, a)
BoundReference(ordinal, a.dataType, a.nullable)
}
}
}.asInstanceOf[A] // Kind of a hack, but safe. TODO: Tighten return type when possible.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@

package org.apache.spark.sql.catalyst.expressions


/**
* Converts a [[Row]] to another Row given a sequence of expression that define each column of the
* new row. If the schema of the input row is specified, then the given expression will be bound to
* that schema.
* A [[Projection]] that is calculated by calling the `eval` of each of the specified expressions.
* @param expressions a sequence of expressions that determine the value of each column of the
* output row.
*/
class Projection(expressions: Seq[Expression]) extends (Row => Row) {
class InterpretedProjection(expressions: Seq[Expression]) extends Projection {
def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) =
this(expressions.map(BindReferences.bindReference(_, inputSchema)))

Expand All @@ -40,25 +41,25 @@ class Projection(expressions: Seq[Expression]) extends (Row => Row) {
}

/**
* Converts a [[Row]] to another Row given a sequence of expression that define each column of th
* new row. If the schema of the input row is specified, then the given expression will be bound to
* that schema.
*
* In contrast to a normal projection, a MutableProjection reuses the same underlying row object
* each time an input row is added. This significantly reduces the cost of calculating the
* projection, but means that it is not safe to hold on to a reference to a [[Row]] after `next()`
* has been called on the [[Iterator]] that produced it. Instead, the user must call `Row.copy()`
* and hold on to the returned [[Row]] before calling `next()`.
* A [[MutableProjection]] that is calculated by calling `eval` on each of the specified
* expressions.
* @param expressions a sequence of expressions that determine the value of each column of the
* output row.
*/
case class MutableProjection(expressions: Seq[Expression]) extends (Row => Row) {
case class InterpretedMutableProjection(expressions: Seq[Expression]) extends MutableProjection {
def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) =
this(expressions.map(BindReferences.bindReference(_, inputSchema)))

private[this] val exprArray = expressions.toArray
private[this] val mutableRow = new GenericMutableRow(exprArray.size)
private[this] var mutableRow: MutableRow = new GenericMutableRow(exprArray.size)
def currentValue: Row = mutableRow

def apply(input: Row): Row = {
override def target(row: MutableRow): MutableProjection = {
mutableRow = row
this
}

override def apply(input: Row): Row = {
var i = 0
while (i < exprArray.length) {
mutableRow(i) = exprArray(i).eval(input)
Expand All @@ -76,6 +77,12 @@ class JoinedRow extends Row {
private[this] var row1: Row = _
private[this] var row2: Row = _

def this(left: Row, right: Row) = {
this()
row1 = left
row2 = right
}

/** Updates this JoinedRow to used point at two new base rows. Returns itself. */
def apply(r1: Row, r2: Row): Row = {
row1 = r1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,6 @@ trait MutableRow extends Row {
def setByte(ordinal: Int, value: Byte)
def setFloat(ordinal: Int, value: Float)
def setString(ordinal: Int, value: String)

/**
* Experimental
*
* Returns a mutable string builder for the specified column. A given row should return the
* result of any mutations made to the returned buffer next time getString is called for the same
* column.
*/
def getStringBuilder(ordinal: Int): StringBuilder
}

/**
Expand Down Expand Up @@ -190,15 +181,42 @@ class GenericRow(protected[catalyst] val values: Array[Any]) extends Row {
values(i).asInstanceOf[String]
}

// Custom hashCode function that matches the efficient code generated version.
override def hashCode(): Int = {
var result: Int = 37

var i = 0
while (i < values.length) {
val update: Int =
if (isNullAt(i)) {
0
} else {
apply(i) match {
case b: Boolean => if (b) 0 else 1
case b: Byte => b.toInt
case s: Short => s.toInt
case i: Int => i
case l: Long => (l ^ (l >>> 32)).toInt
case f: Float => java.lang.Float.floatToIntBits(f)
case d: Double =>
val b = java.lang.Double.doubleToLongBits(d)
(b ^ (b >>> 32)).toInt
case other => other.hashCode()
}
}
result = 37 * result + update
i += 1
}
result
}

def copy() = this
}

class GenericMutableRow(size: Int) extends GenericRow(size) with MutableRow {
/** No-arg constructor for serialization. */
def this() = this(0)

def getStringBuilder(ordinal: Int): StringBuilder = ???

override def setBoolean(ordinal: Int,value: Boolean): Unit = { values(ordinal) = value }
override def setByte(ordinal: Int,value: Byte): Unit = { values(ordinal) = value }
override def setDouble(ordinal: Int,value: Double): Unit = { values(ordinal) = value }
Expand Down
Loading

0 comments on commit a6e08b4

Please sign in to comment.