Skip to content

Commit

Permalink
[SPARK-17104][SQL] LogicalRelation.newInstance should follow the sema…
Browse files Browse the repository at this point in the history
…ntics of MultiInstanceRelation

## What changes were proposed in this pull request?

Currently `LogicalRelation.newInstance()` simply creates another `LogicalRelation` object with the same parameters. However, the `newInstance()` method inherited from `MultiInstanceRelation` should return a copy of object with unique expression ids. Current `LogicalRelation.newInstance()` can cause failure when doing self-join.

## How was this patch tested?

Jenkins tests.

Author: Liang-Chi Hsieh <[email protected]>

Closes #14682 from viirya/fix-localrelation.

(cherry picked from commit 31a0155)
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
viirya authored and cloud-fan committed Aug 20, 2016
1 parent f7458c7 commit 4c4c275
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,18 @@ case class LogicalRelation(
/** Used to lookup original attribute capitalization */
val attributeMap: AttributeMap[AttributeReference] = AttributeMap(output.map(o => (o, o)))

def newInstance(): this.type =
/**
* Returns a new instance of this LogicalRelation. According to the semantics of
* MultiInstanceRelation, this method returns a copy of this object with
* unique expression ids. We respect the `expectedOutputAttributes` and create
* new instances of attributes in it.
*/
override def newInstance(): this.type = {
LogicalRelation(
relation,
expectedOutputAttributes,
expectedOutputAttributes.map(_.map(_.newInstance())),
metastoreTableIdentifier).asInstanceOf[this.type]
}

override def refresh(): Unit = relation match {
case fs: HadoopFsRelation => fs.refresh()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,13 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
}
}
}

test("self-join") {
val table = spark.table("normal_parquet")
val selfJoin = table.as("t1").join(table.as("t2"))
checkAnswer(selfJoin,
sql("SELECT * FROM normal_parquet x JOIN normal_parquet y"))
}
}

/**
Expand Down

0 comments on commit 4c4c275

Please sign in to comment.