Skip to content

Commit

Permalink
rdar://74251620 Add support for partition evolution (apache#909)
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi authored and dongjoon-hyun committed Mar 1, 2021
1 parent 2d45548 commit f78d2f6
Show file tree
Hide file tree
Showing 8 changed files with 204 additions and 1 deletion.
3 changes: 2 additions & 1 deletion docs/sql-ref-ansi-compliance.md
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ Below is a list of all the keywords in Spark SQL.
|EXTRACT|non-reserved|non-reserved|reserved|
|FALSE|reserved|non-reserved|reserved|
|FETCH|reserved|non-reserved|reserved|
|FIELD|non-reserved|non-reserved|non-reserved|
|FIELDS|non-reserved|non-reserved|non-reserved|
|FILTER|reserved|non-reserved|reserved|
|FILEFORMAT|non-reserved|non-reserved|non-reserved|
Expand Down Expand Up @@ -308,7 +309,7 @@ Below is a list of all the keywords in Spark SQL.
|LIST|non-reserved|non-reserved|non-reserved|
|LOAD|non-reserved|non-reserved|non-reserved|
|LOCAL|non-reserved|non-reserved|reserved|
|LOCALLY|non-reserved|non-reserved|reserved|
|LOCALLY|non-reserved|non-reserved|non-reserved|
|LOCATION|non-reserved|non-reserved|non-reserved|
|LOCK|non-reserved|non-reserved|non-reserved|
|LOCKS|non-reserved|non-reserved|non-reserved|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,9 @@ statement
(partitionSpec)? SET locationSpec #setTableLocation
| ALTER TABLE multipartIdentifier RECOVER PARTITIONS #recoverPartitions
| ALTER TABLE multipartIdentifier WRITE writeSpec #setWriteDistributionAndOrdering
| ALTER TABLE multipartIdentifier
ADD PARTITION FIELD transform (AS name=identifier)? #addPartitionField
| ALTER TABLE multipartIdentifier DROP PARTITION FIELD transform #dropPartitionField
| DROP TABLE (IF EXISTS)? multipartIdentifier PURGE? #dropTable
| DROP VIEW (IF EXISTS)? multipartIdentifier #dropView
| CREATE (OR REPLACE)? (GLOBAL? TEMPORARY)?
Expand Down Expand Up @@ -1106,6 +1109,7 @@ ansiNonReserved
| EXTENDED
| EXTERNAL
| EXTRACT
| FIELD
| FIELDS
| FILEFORMAT
| FIRST
Expand Down Expand Up @@ -1349,6 +1353,7 @@ nonReserved
| FALSE
| FETCH
| FILTER
| FIELD
| FIELDS
| FILEFORMAT
| FIRST
Expand Down Expand Up @@ -1603,6 +1608,7 @@ EXTERNAL: 'EXTERNAL';
EXTRACT: 'EXTRACT';
FALSE: 'FALSE';
FETCH: 'FETCH';
FIELD: 'FIELD';
FIELDS: 'FIELDS';
FILTER: 'FILTER';
FILEFORMAT: 'FILEFORMAT';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.connector.expressions.SortOrder;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.types.DataType;

/**
Expand Down Expand Up @@ -279,6 +280,72 @@ public int hashCode() {
}
}

static TableChange addPartitionField(Transform transform, String name) {
return new AddPartitionField(transform, name);
}

final class AddPartitionField implements TableChange {
private final Transform transform;
private final String name;

private AddPartitionField(Transform transform, String name) {
this.transform = transform;
this.name = name;
}

public Transform transform() {
return transform;
}

public String name() {
return name;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

AddPartitionField that = (AddPartitionField) o;
return Objects.equals(transform, that.transform) && Objects.equals(name, that.name);
}

@Override
public int hashCode() {
return Objects.hash(transform, name);
}
}

static TableChange dropPartitionField(Transform transform) {
return new DropPartitionField(transform);
}

final class DropPartitionField implements TableChange {
private final Transform transform;

private DropPartitionField(Transform transform) {
this.transform = transform;
}

public Transform transform() {
return transform;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

DropPartitionField that = (DropPartitionField) o;
return Objects.equals(transform, that.transform);
}

@Override
public int hashCode() {
return Objects.hash(transform);
}
}

/**
* A TableChange to set a table property.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,16 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
val change = TableChange.setWriteDistributionAndOrdering(distributionMode, order.toArray)
createAlterTable(nameParts, catalog, tbl, Seq(change))

case AlterTableAddPartitionFieldStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), transform, name) =>
val change = TableChange.addPartitionField(transform, name.orNull)
createAlterTable(nameParts, catalog, tbl, Seq(change))

case AlterTableDropPartitionFieldStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), transform) =>
val change = TableChange.dropPartitionField(transform)
createAlterTable(nameParts, catalog, tbl, Seq(change))

case AlterTableSetPropertiesStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), props) =>
val changes = props.map { case (key, value) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3573,6 +3573,27 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
LogicalExpressions.sort(visitTransform(ctx.transform), direction, nullOrdering)
}

/**
* Create an [[AlterTableAddPartitionFieldStatement]] command.
*/
override def visitAddPartitionField(
ctx: AddPartitionFieldContext): AlterTableAddPartitionFieldStatement = withOrigin(ctx) {
val tableName = visitMultipartIdentifier(ctx.multipartIdentifier)
val transform = visitTransform(ctx.transform)
val name = Option(ctx.name).map(_.getText)
AlterTableAddPartitionFieldStatement(tableName, transform, name)
}

/**
* Create an [[AlterTableDropPartitionFieldStatement]] command.
*/
override def visitDropPartitionField(
ctx: DropPartitionFieldContext): AlterTableDropPartitionFieldStatement = withOrigin(ctx) {
val tableName = visitMultipartIdentifier(ctx.multipartIdentifier)
val transform = visitTransform(ctx.transform)
AlterTableDropPartitionFieldStatement(tableName, transform)
}

/**
* Create a [[DescribeColumn]] or [[DescribeRelation]] commands.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,3 +453,18 @@ case class AlterTableSetWriteDistributionAndOrderingStatement(
tableName: Seq[String],
distributionMode: String,
ordering: Seq[SortOrder]) extends ParsedStatement

/**
* ALTER TABLE ... ADD PARTITION FIELD ... statement, as parsed from SQL.
*/
case class AlterTableAddPartitionFieldStatement(
tableName: Seq[String],
transform: Transform,
name: Option[String]) extends ParsedStatement

/**
* ALTER TABLE ... DROP PARTITION FIELD ... statement, as parsed from SQL.
*/
case class AlterTableDropPartitionFieldStatement(
tableName: Seq[String],
transform: Transform) extends ParsedStatement
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,26 @@ class ResolveSessionCatalog(
createAlterTable(nameParts, catalog, tbl, Seq(change))
}

case AlterTableAddPartitionFieldStatement(
nameParts @ SessionCatalogAndTable(catalog, tbl), transform, name) =>
loadTable(catalog, tbl.asIdentifier).collect {
case _: V1Table =>
throw new AnalysisException("Cannot add partition fields to v1 tables")
}.getOrElse {
val change = TableChange.addPartitionField(transform, name.orNull)
createAlterTable(nameParts, catalog, tbl, Seq(change))
}

case AlterTableDropPartitionFieldStatement(
nameParts @ SessionCatalogAndTable(catalog, tbl), transform) =>
loadTable(catalog, tbl.asIdentifier).collect {
case _: V1Table =>
throw new AnalysisException("Cannot drop partition fields from v1 tables")
}.getOrElse {
val change = TableChange.dropPartitionField(transform)
createAlterTable(nameParts, catalog, tbl, Seq(change))
}

case AlterTableSetPropertiesStatement(
nameParts @ SessionCatalogAndTable(catalog, tbl), props) =>
loadTable(catalog, tbl.asIdentifier).collect {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2219,6 +2219,69 @@ class PlanResolutionSuite extends AnalysisTest {
assert(e.message.contains("Cannot set write distribution and ordering in v1 tables"))
}

test("alter table: add partition field to v2 tables") {
Seq("v2Table", "testcat.tab").foreach { t =>
val sql = s"ALTER TABLE $t ADD PARTITION FIELD bucket(8, s)"

val transform = bucket(8, Array(FieldReference("s")))
val expectedChange = TableChange.addPartitionField(transform, null)

parseAndResolve(sql) match {
case AlterTable(_, _, _: DataSourceV2Relation, changes) =>
assert(changes.size == 1, "expected only one change")
assert(changes.head == expectedChange, "change must match")
case _ =>
fail("expected AlterTable")
}
}
}

test("alter table: add partition field with an alias to v2 tables") {
Seq("v2Table", "testcat.tab").foreach { t =>
val sql = s"ALTER TABLE $t ADD PARTITION FIELD bucket(8, s) AS new_part_col"

val transform = bucket(8, Array(FieldReference("s")))
val expectedChange = TableChange.addPartitionField(transform, "new_part_col")

parseAndResolve(sql) match {
case AlterTable(_, _, _: DataSourceV2Relation, changes) =>
assert(changes.size == 1, "expected only one change")
assert(changes.head == expectedChange, "change must match")
case _ =>
fail("expected AlterTable")
}
}
}

test("alter table: drop partition field from v2 tables") {
Seq("v2Table", "testcat.tab").foreach { t =>
val sql = s"ALTER TABLE $t DROP PARTITION FIELD bucket(8, s)"

val transform = bucket(8, Array(FieldReference("s")))
val expectedChange = TableChange.dropPartitionField(transform)

parseAndResolve(sql) match {
case AlterTable(_, _, _: DataSourceV2Relation, changes) =>
assert(changes.size == 1, "expected only one change")
assert(changes.head == expectedChange, "change must match")
case _ =>
fail("expected AlterTable")
}
}
}

test("alter table: cannot add/drop partition fields for v1 tables") {
val e1 = intercept[AnalysisException] {
parseAndResolve("ALTER TABLE v1Table ADD PARTITION FIELD bucket(8, s)")
}
assert(e1.message.contains("Cannot add partition fields to v1 tables"))

val e2 = intercept[AnalysisException] {
parseAndResolve("ALTER TABLE v1Table DROP PARTITION FIELD bucket(8, s)")
}
assert(e2.message.contains("Cannot drop partition fields from v1 tables"))
}

// TODO: add tests for more commands.
}

Expand Down

0 comments on commit f78d2f6

Please sign in to comment.