Skip to content

Commit

Permalink
Add nullabilization to Generate of SparkPlan.
Browse files Browse the repository at this point in the history
  • Loading branch information
ueshin committed Jul 1, 2014
1 parent a0fc9bc commit 799ce56
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ case class Generate(
val output = alias
.map(a => generator.output.map(_.withQualifiers(a :: Nil)))
.getOrElse(generator.output)
if (outer) {
if (join && outer) {
output.map {
case attr if !attr.nullable =>
AttributeReference(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.execution

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.catalyst.expressions.{Generator, JoinedRow, Literal, Projection}
import org.apache.spark.sql.catalyst.expressions._

/**
* :: DeveloperApi ::
Expand All @@ -39,8 +39,21 @@ case class Generate(
child: SparkPlan)
extends UnaryNode {

protected def generatorOutput: Seq[Attribute] = {
if (join && outer) {
generator.output.map {
case attr if !attr.nullable =>
AttributeReference(
attr.name, attr.dataType, nullable = true)(attr.exprId, attr.qualifiers)
case attr => attr
}
} else {
generator.output
}
}

override def output =
if (join) child.output ++ generator.output else generator.output
if (join) child.output ++ generatorOutput else generatorOutput

override def execute() = {
if (join) {
Expand Down

0 comments on commit 799ce56

Please sign in to comment.