Skip to content

Commit

Permalink
Assign materialized Row ID and Row commit version column names
Browse files Browse the repository at this point in the history
## Description
This change is part of implementing row tracking as specified in #1610 and 7272b04.
It covers assigning a column name for the materialized Row ID and Row commit version columns by setting them in the table metadata when creating or cloning a table.

- Add test suite `rowtracking.MaterializedColumnSuite` to cover assigning materialized column names in various table creation and clone scenarios.

Closes #1896

GitOrigin-RevId: 3b963ed7e08524f24d60744160490e41a0aab3e8
  • Loading branch information
johanl-db authored and allisonport-db committed Jul 20, 2023
1 parent dbb2210 commit 355804c
Show file tree
Hide file tree
Showing 11 changed files with 487 additions and 23 deletions.
12 changes: 12 additions & 0 deletions spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@
],
"sqlState" : "0B000"
},
"DELTA_ADDING_COLUMN_WITH_INTERNAL_NAME_FAILED" : {
"message" : [
"Failed to add column <colName> because the name is reserved."
],
"sqlState" : "42000"
},
"DELTA_ADDING_DELETION_VECTORS_DISALLOWED" : {
"message" : [
"The current operation attempted to add a deletion vector to a table that does not permit the creation of new deletion vectors. Please file a bug report."
Expand Down Expand Up @@ -1112,6 +1118,12 @@
],
"sqlState" : "42K03"
},
"DELTA_MATERIALIZED_ROW_TRACKING_COLUMN_NAME_MISSING" : {
"message" : [
"Materialized <rowTrackingColumn> column name missing for <tableName>."
],
"sqlState" : "22000"
},
"DELTA_MAX_ARRAY_SIZE_EXCEEDED" : {
"message" : [
"Please use a limit less than Int.MaxValue - 8."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,14 @@ class DeltaAnalysis(session: SparkSession)
if (src.provider.exists(DeltaSourceUtils.isDeltaDataSourceName)) {
val deltaLogSrc = DeltaTableV2(session, new Path(src.location))

// maxColumnId field cannot be set externally. If column-mapping is
// used on the source delta table, then maxColumnId would be set for the sourceTable
// and needs to be removed from the targetTable's configuration
// maxColumnId will be set in the targetTable's configuration internally after
// Column mapping and row tracking fields cannot be set externally. If the features are
// used on the source delta table, then the corresponding fields would be set for the
// sourceTable and needs to be removed from the targetTable's configuration. The fields
// will then be set in the targetTable's configuration internally after.
val config =
deltaLogSrc.snapshot.metadata.configuration.-("delta.columnMapping.maxColumnId")
.-(MaterializedRowId.MATERIALIZED_COLUMN_NAME_PROP)
.-(MaterializedRowCommitVersion.MATERIALIZED_COLUMN_NAME_PROP)

new CatalogTable(
identifier = targetTableIdentifier,
Expand Down
21 changes: 21 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2846,6 +2846,27 @@ trait DeltaErrorsBase
new DeltaIllegalStateException(errorClass = "DELTA_ROW_ID_ASSIGNMENT_WITHOUT_STATS")
}

def addingColumnWithInternalNameFailed(colName: String): Throwable = {
new DeltaRuntimeException(
errorClass = "DELTA_ADDING_COLUMN_WITH_INTERNAL_NAME_FAILED",
messageParameters = Array(colName)
)
}

def materializedRowIdMetadataMissing(tableName: String): Throwable = {
new DeltaIllegalStateException(
errorClass = "DELTA_MATERIALIZED_ROW_TRACKING_COLUMN_NAME_MISSING",
messageParameters = Array("Row ID", tableName)
)
}

def materializedRowCommitVersionMetadataMissing(tableName: String): Throwable = {
new DeltaIllegalStateException(
errorClass = "DELTA_MATERIALIZED_ROW_TRACKING_COLUMN_NAME_MISSING",
messageParameters = Array("Row Commit Version", tableName)
)
}

def domainMetadataDuplicate(domainName: String): Throwable = {
new DeltaIllegalArgumentException(
errorClass = "DELTA_DUPLICATE_DOMAIN_METADATA_INTERNAL_ERROR",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta

import java.util.UUID

import org.apache.spark.sql.delta.actions.{Metadata, Protocol}

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.catalyst.expressions.Attribute

/**
* Represents a materialized row tracking column. Concrete implementations are [[MaterializedRowId]]
* and [[MaterializedRowCommitVersion]].
*/
abstract class MaterializedRowTrackingColumn {
/**
* Table metadata configuration property name storing the name of this materialized row tracking
* column.
*/
val MATERIALIZED_COLUMN_NAME_PROP: String

/** Prefix to use for the name of this materialized row tracking column */
val MATERIALIZED_COLUMN_NAME_PREFIX: String

/**
* Returns the exception to throw when the materialized column name is not set in the table
* metadata. The table name is passed as argument.
*/
def missingMetadataException: String => Throwable

/**
* Generate a random name for a materialized row tracking column. The generated name contains a
* unique UUID, we assume it shall not conflict with existing column.
*/
private def generateMaterializedColumnName: String =
MATERIALIZED_COLUMN_NAME_PREFIX + UUID.randomUUID().toString

/**
* Update this materialized row tracking column name in the metadata.
* - If row tracking is not allowed or not supported, this operation is a noop.
* - If row tracking is supported on the table and no name is assigned to the old metadata, we
* assign a name. If a name was already assigned, we copy over this name.
* Throws in case the assignment of a new name fails due to a conflict.
*/
private[delta] def updateMaterializedColumnName(
protocol: Protocol,
oldMetadata: Metadata,
newMetadata: Metadata): Metadata = {
if (!RowTracking.isSupported(protocol)) {
// During a CLONE we might not enable row tracking, but still receive the materialized column
// name from the source. In this case, we need to remove the column name to not have the same
// column name in two different tables.
return newMetadata.copy(
configuration = newMetadata.configuration - MATERIALIZED_COLUMN_NAME_PROP)
}

// Take the materialized column name from the old metadata, as this is the materialized column
// name of the current table. We overwrite the materialized column name of the new metadata as
// it could contain a materialized column name from another table, e.g. the source table during
// a CLONE.
val materializedColumnName = oldMetadata.configuration
.getOrElse(MATERIALIZED_COLUMN_NAME_PROP, generateMaterializedColumnName)
newMetadata.copy(configuration = newMetadata.configuration +
(MATERIALIZED_COLUMN_NAME_PROP -> materializedColumnName))
}

/**
* Throws an exception if row tracking is allowed and the materialized column name conflicts with
* another column name.
*/
private[delta] def throwIfMaterializedColumnNameConflictsWithSchema(metadata: Metadata): Unit = {
val logicalColumnNames = metadata.schema.fields.map(_.name)
val physicalColumnNames = metadata.schema.fields
.map(field => DeltaColumnMapping.getPhysicalName(field))

metadata.configuration.get(MATERIALIZED_COLUMN_NAME_PROP).foreach { columnName =>
if (logicalColumnNames.contains(columnName) || physicalColumnNames.contains(columnName)) {
throw DeltaErrors.addingColumnWithInternalNameFailed(columnName)
}
}
}

/** Extract the materialized column name from the [[Metadata]] of a [[DeltaLog]]. */
def getMaterializedColumnName(protocol: Protocol, metadata: Metadata): Option[String] = {
if (RowTracking.isEnabled(protocol, metadata)) {
metadata.configuration.get(MATERIALIZED_COLUMN_NAME_PROP)
} else {
None
}
}

/** Convenience method that throws if the materialized column name cannot be extracted. */
def getMaterializedColumnNameOrThrow(
protocol: Protocol, metadata: Metadata, tableId: String): String = {
getMaterializedColumnName(protocol, metadata).getOrElse {
throw missingMetadataException(tableId)
}
}

/**
* If Row tracking is enabled, return an Expression referencing this Row tracking column Attribute
* in 'dataFrame' if one is available. Otherwise returns None.
*/
private[delta] def getAttribute(
snapshot: Snapshot, dataFrame: DataFrame): Option[Attribute] = {
if (!RowTracking.isEnabled(snapshot.protocol, snapshot.metadata)) {
return None
}

val materializedColumnName = getMaterializedColumnNameOrThrow(
snapshot.protocol, snapshot.metadata, snapshot.deltaLog.tableId)

val analyzedPlan = dataFrame.queryExecution.analyzed
analyzedPlan.outputSet.view.find(attr => materializedColumnName == attr.name)
}
}

object MaterializedRowId extends MaterializedRowTrackingColumn {
/**
* Table metadata configuration property name storing the name of the column in which the
* Row IDs are materialized.
*/
val MATERIALIZED_COLUMN_NAME_PROP = "delta.rowTracking.materializedRowIdColumnName"

/** Prefix to use for the name of the materialized Row ID column */
val MATERIALIZED_COLUMN_NAME_PREFIX = "_row-id-col-"

def missingMetadataException: String => Throwable = DeltaErrors.materializedRowIdMetadataMissing
}

object MaterializedRowCommitVersion extends MaterializedRowTrackingColumn {
/**
* Table metadata configuration property name storing the name of the column in which the
* Row commit versions are materialized.
*/
val MATERIALIZED_COLUMN_NAME_PROP = "delta.rowTracking.materializedRowCommitVersionColumnName"

/** Prefix to use for the name of the materialized Row commit version column */
val MATERIALIZED_COLUMN_NAME_PREFIX = "_row-commit-version-col-"

def missingMetadataException: String => Throwable =
DeltaErrors.materializedRowCommitVersionMetadataMissing
}
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,10 @@ trait OptimisticTransactionImpl extends TransactionalWrite
setNewProtocolWithFeaturesEnabledByMetadata(newMetadataTmp)
}

newMetadataTmp = MaterializedRowId.updateMaterializedColumnName(
protocol, oldMetadata = snapshot.metadata, newMetadataTmp)
newMetadataTmp = MaterializedRowCommitVersion.updateMaterializedColumnName(
protocol, oldMetadata = snapshot.metadata, newMetadataTmp)

RowId.verifyMetadata(
snapshot.protocol, protocol, snapshot.metadata, newMetadataTmp, isCreatingNewTable)
Expand Down Expand Up @@ -652,6 +656,8 @@ trait OptimisticTransactionImpl extends TransactionalWrite
if (spark.conf.get(DeltaSQLConf.DELTA_TABLE_PROPERTY_CONSTRAINTS_CHECK_ENABLED)) {
Protocol.assertTablePropertyConstraintsSatisfied(spark, metadata, snapshot)
}
MaterializedRowId.throwIfMaterializedColumnNameConflictsWithSchema(metadata)
MaterializedRowCommitVersion.throwIfMaterializedColumnNameConflictsWithSchema(metadata)
}

private def setNewProtocolWithFeaturesEnabledByMetadata(metadata: Metadata): Unit = {
Expand Down
16 changes: 0 additions & 16 deletions spark/src/main/scala/org/apache/spark/sql/delta/RowId.scala
Original file line number Diff line number Diff line change
Expand Up @@ -127,20 +127,4 @@ object RowId {
*/
private[delta] def extractHighWatermark(snapshot: Snapshot): Option[Long] =
RowTrackingMetadataDomain.fromSnapshot(snapshot).map(_.rowIdHighWaterMark)

/**
* Checks whether CONVERT TO DELTA collects statistics if row tracking is supported. If it does
* not collect statistics, we cannot assign fresh row IDs, hence we throw an error to either rerun
* the command without enabling the row tracking table feature, or to enable the necessary
* flags to collect statistics.
*/
private[delta] def checkStatsCollectedIfRowTrackingSupported(
protocol: Protocol,
convertToDeltaShouldCollectStats: Boolean,
statsCollectionEnabled: Boolean): Unit = {
if (!isSupported(protocol)) return
if (!convertToDeltaShouldCollectStats || !statsCollectionEnabled) {
throw DeltaErrors.convertToDeltaRowTrackingEnabledWithoutStatsCollection
}
}
}
44 changes: 44 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/RowTracking.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,48 @@ object RowTracking {
}
isEnabled
}

/**
* Checks whether CONVERT TO DELTA collects statistics if row tracking is supported. If it does
* not collect statistics, we cannot assign fresh row IDs, hence we throw an error to either rerun
* the command without enabling the row tracking table feature, or to enable the necessary
* flags to collect statistics.
*/
private[delta] def checkStatsCollectedIfRowTrackingSupported(
protocol: Protocol,
convertToDeltaShouldCollectStats: Boolean,
statsCollectionEnabled: Boolean): Unit = {
if (!isSupported(protocol)) return
if (!convertToDeltaShouldCollectStats || !statsCollectionEnabled) {
throw DeltaErrors.convertToDeltaRowTrackingEnabledWithoutStatsCollection
}
}

/**
* Returns the sourceMetadata with the row tracking property coming from the targetMetadata.
*/
private[delta] def takeRowTrackingPropertyFromTarget(
targetMetadata: Metadata,
sourceMetadata: Metadata): Metadata = {
var newConfig = sourceMetadata.configuration - DeltaConfigs.ROW_TRACKING_ENABLED.key
targetMetadata.configuration.get(DeltaConfigs.ROW_TRACKING_ENABLED.key).foreach { v =>
newConfig += DeltaConfigs.ROW_TRACKING_ENABLED.key -> v
}
sourceMetadata.copy(configuration = newConfig)
}

/**
* Removes the row tracking property from the metadata.
*/
private[delta] def removeRowTrackingProperty(metadata: Metadata): Metadata = {
metadata.copy(configuration = metadata.configuration - DeltaConfigs.ROW_TRACKING_ENABLED.key)
}

/**
* Removes the row tracking table feature from the protocol.
*/
private[delta] def removeRowTrackingTableFeature(protocol: Protocol): Protocol = {
val writerFeaturesWithoutRowTracking = protocol.writerFeatures.map(_ - RowTrackingFeature.name)
protocol.copy(writerFeatures = writerFeaturesWithoutRowTracking)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -276,10 +276,25 @@ abstract class CloneTableBase(
private def prepareSourceMetadata(
targetSnapshot: SnapshotDescriptor,
opName: String): Metadata = {
var clonedMetadata =
sourceTable.metadata.copy(
id = UUID.randomUUID().toString,
name = targetSnapshot.metadata.name,
description = targetSnapshot.metadata.description)
if (opName == CloneTableCommand.OP_NAME) {
// If it's a new table, we remove the row tracking table property to create a 1:1 CLONE of
// the source, just without row tracking. If it's an existing table, we take whatever
// setting is currently on the target, as the setting should be independent between
// target and source.
if (targetSnapshot.version == -1) {
clonedMetadata = RowTracking.removeRowTrackingProperty(clonedMetadata)
} else {
clonedMetadata = RowTracking.takeRowTrackingPropertyFromTarget(
targetMetadata = targetSnapshot.metadata,
sourceMetadata = clonedMetadata)
}
}
clonedMetadata
}

/**
Expand Down Expand Up @@ -344,12 +359,18 @@ abstract class CloneTableBase(
conf.getConf(DeltaSQLConf.RESTORE_TABLE_PROTOCOL_DOWNGRADE_ALLOWED) ||
// It's not a real downgrade if the table doesn't exist before the CLONE.
txn.snapshot.version == -1
val sourceProtocolWithoutRowTracking = RowTracking.removeRowTrackingTableFeature(sourceProtocol)

if (protocolDowngradeAllowed) {
minReaderVersion = minReaderVersion.max(sourceProtocol.minReaderVersion)
minWriterVersion = minWriterVersion.max(sourceProtocol.minWriterVersion)
val minProtocol = Protocol(minReaderVersion, minWriterVersion).withFeatures(enabledFeatures)
sourceProtocol.merge(minProtocol)
// Row tracking settings should be independent between target and source.
if (opName == CloneTableCommand.OP_NAME) {
sourceProtocolWithoutRowTracking.merge(minProtocol)
} else {
sourceProtocol.merge(minProtocol)
}
} else {
// Take the maximum of all protocol versions being merged to ensure that table features
// from table property overrides are correctly added to the table feature list or are only
Expand All @@ -359,7 +380,12 @@ abstract class CloneTableBase(
minWriterVersion = Seq(
targetProtocol.minWriterVersion, sourceProtocol.minWriterVersion, minWriterVersion).max
val minProtocol = Protocol(minReaderVersion, minWriterVersion).withFeatures(enabledFeatures)
targetProtocol.merge(sourceProtocol, minProtocol)
// Row tracking settings should be independent between target and source.
if (opName == CloneTableCommand.OP_NAME) {
targetProtocol.merge(sourceProtocolWithoutRowTracking, minProtocol)
} else {
targetProtocol.merge(sourceProtocol, minProtocol)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ abstract class ConvertToDeltaCommandBase(
// TODO: we have not decided on how to implement CONVERT TO DELTA under column mapping modes
// for some convert targets so we block this feature for them here
checkColumnMapping(txn.metadata, targetTable)
RowId.checkStatsCollectedIfRowTrackingSupported(
RowTracking.checkStatsCollectedIfRowTrackingSupported(
txn.protocol,
collectStats,
statsEnabled)
Expand Down
Loading

0 comments on commit 355804c

Please sign in to comment.