Skip to content

Commit

Permalink
WIP: Code generation with scala reflection.
Browse files Browse the repository at this point in the history
  • Loading branch information
marmbrus committed Jul 9, 2014
1 parent b520b64 commit a2b5408
Show file tree
Hide file tree
Showing 35 changed files with 1,493 additions and 64 deletions.
3 changes: 3 additions & 0 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,9 @@ object SparkBuild extends Build {
// assumptions about the the expression ids being contiguous. Running tests in parallel breaks
// this non-deterministically. TODO: FIX THIS.
parallelExecution in Test := false,
addCompilerPlugin("org.scalamacros" % "paradise" % "2.0.0-M8" cross CrossVersion.full),
libraryDependencies <+= scalaVersion(v => "org.scala-lang" % "scala-compiler" % v ),
libraryDependencies += "org.scalamacros" %% "quasiquotes" % "2.0.0-M8",
libraryDependencies ++= Seq(
"com.typesafe" %% "scalalogging-slf4j" % "1.0.1"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool
*/
object ImplicitGenerate extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case Project(Seq(Alias(g: Generator, _)), child) =>
Generate(g, join = false, outer = false, None, child)
case Project(Seq(Alias(g: Generator, alias)), child) =>
Generate(g, join = false, outer = false, Some(alias), child)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ trait HiveTypeCoercion {
case Cast(e, BooleanType) if e.dataType != BooleanType => Not(EqualTo(e, Literal(0)))
// Turn true into 1, and false into 0 if casting boolean into other types.
case Cast(e, dataType) if e.dataType == BooleanType =>
Cast(If(e, Literal(1), Literal(0)), dataType)
If(e, Cast(Literal(1), dataType), Cast(Literal(0), dataType))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class BindReferences[TreeNode <: QueryPlan[TreeNode]] extends Rule[TreeNode] {
plan.transform {
case n: NoBind => n.asInstanceOf[TreeNode]
case leafNode if leafNode.children.isEmpty => leafNode
case nb: NoBind => nb.asInstanceOf[TreeNode]
case unaryNode if unaryNode.children.size == 1 => unaryNode.transformExpressions { case e =>
bindReference(e, unaryNode.children.head.output)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.Logging
import org.apache.spark.sql.catalyst._

import org.apache.spark.sql.catalyst.types._

object CodeGeneration

Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ package org.apache.spark.sql.catalyst.expressions
* new row. If the schema of the input row is specified, then the given expression will be bound to
* that schema.
*/
class Projection(expressions: Seq[Expression]) extends (Row => Row) {
class InterpretedProjection(expressions: Seq[Expression]) extends (Row => Row) {
def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) =
this(expressions.map(BindReferences.bindReference(_, inputSchema)))

Expand All @@ -40,7 +40,7 @@ class Projection(expressions: Seq[Expression]) extends (Row => Row) {
}

/**
* Converts a [[Row]] to another Row given a sequence of expression that define each column of th
* Converts a [[Row]] to another Row given a sequence of expression that define each column of the
* new row. If the schema of the input row is specified, then the given expression will be bound to
* that schema.
*
Expand All @@ -50,14 +50,19 @@ class Projection(expressions: Seq[Expression]) extends (Row => Row) {
* has been called on the [[Iterator]] that produced it. Instead, the user must call `Row.copy()`
* and hold on to the returned [[Row]] before calling `next()`.
*/
case class MutableProjection(expressions: Seq[Expression]) extends (Row => Row) {
case class InterpretedMutableProjection(expressions: Seq[Expression]) extends MutableProjection {
def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) =
this(expressions.map(BindReferences.bindReference(_, inputSchema)))

private[this] val exprArray = expressions.toArray
private[this] val mutableRow = new GenericMutableRow(exprArray.size)
private[this] var mutableRow: MutableRow = new GenericMutableRow(exprArray.size)
def currentValue: Row = mutableRow

def target(row: MutableRow): MutableProjection = {
mutableRow = row
this
}

def apply(input: Row): Row = {
var i = 0
while (i < exprArray.length) {
Expand All @@ -76,6 +81,12 @@ class JoinedRow extends Row {
private[this] var row1: Row = _
private[this] var row2: Row = _

def this(left: Row, right: Row) = {
this()
row1 = left
row2 = right
}

/** Updates this JoinedRow to used point at two new base rows. Returns itself. */
def apply(r1: Row, r2: Row): Row = {
row1 = r1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,34 @@ class GenericRow(protected[catalyst] val values: Array[Any]) extends Row {
values(i).asInstanceOf[String]
}

override def hashCode(): Int = {
var result: Int = 37

var i = 0
while (i < values.length) {
val update: Int =
if (isNullAt(i)) {
0
} else {
apply(i) match {
case b: Boolean => if (b) 0 else 1
case b: Byte => b.toInt
case s: Short => s.toInt
case i: Int => i
case l: Long => (l ^ (l >>> 32)).toInt
case f: Float => java.lang.Float.floatToIntBits(f)
case d: Double =>
val b = java.lang.Double.doubleToLongBits(d)
(b ^ (b >>> 32)).toInt
case other => other.hashCode()
}
}
result = 37 * result + update
i += 1
}
result
}

def copy() = this
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi

override def eval(input: Row): Any = {
children.size match {
case 0 => function.asInstanceOf[() => Any]()
case 1 => function.asInstanceOf[(Any) => Any](children(0).eval(input))
case 2 =>
function.asInstanceOf[(Any, Any) => Any](
Expand Down
Loading

0 comments on commit a2b5408

Please sign in to comment.