Skip to content

Commit

Permalink
Introduce row ID write table feature and table property
Browse files Browse the repository at this point in the history
This change adds knobs to enable Row IDs when creating a new Delta table, as defined in the Row ID specification (#1610):
 - Write table feature `rowIds`: Require writers to support row IDs. Used to enable row IDs on newly created tables.
 - Table property: `rowIds.enabled`: Indicate whether all rows have an assigned row ID.
 - SQL conf `rowIds.allowForDevOnly`: restrict the use of Row IDs to testing for now.

Adding test suite RowIdSuite:
 - Test enabling Row IDs on a new table succeeds
 - Test enabling Row IDs on an existing table fails.

Closes #1702

GitOrigin-RevId: 961ff72f1ae7abf1f08d53052062ce20669d4aad
  • Loading branch information
johanl-db authored and allisonport-db committed Apr 24, 2023
1 parent c47445e commit 8272ee9
Show file tree
Hide file tree
Showing 7 changed files with 238 additions and 2 deletions.
11 changes: 11 additions & 0 deletions core/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,17 @@ trait DeltaConfigsBase extends DeltaLogging {
_ == Serializable,
"must be Serializable"
)

/**
* Indicates whether row IDs are enabled. When this flag is turned on, all rows are guaranteed to
* have row IDs assigned to them.
*/
val ROW_IDS_ENABLED = buildConfig[Boolean](
key = "enableRowIds",
defaultValue = false.toString,
fromString = _.toBoolean,
validationFunction = _ => true,
helpMessage = "needs to be a boolean.")
}

object DeltaConfigs extends DeltaConfigsBase
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,10 @@ trait OptimisticTransactionImpl extends TransactionalWrite
setNewProtocolWithFeaturesEnabledByMetadata(newMetadataTmp)
}


newMetadataTmp = RowId.verifyAndUpdateMetadata(
spark, protocol, snapshot.metadata, newMetadataTmp, isCreatingNewTable)

DeletionVectorUtils.assertDeletionVectorsNotEnabled(spark, newMetadataTmp, protocol)
logInfo(s"Updated metadata from ${newMetadata.getOrElse("-")} to $newMetadataTmp")
newMetadata = Some(newMetadataTmp)
Expand Down
90 changes: 90 additions & 0 deletions core/src/main/scala/org/apache/spark/sql/delta/RowId.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta

import org.apache.spark.sql.delta.actions.{Metadata, Protocol}
import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils.propertyKey
import org.apache.spark.sql.delta.sources.DeltaSQLConf

import org.apache.spark.sql.SparkSession

/**
* Collection of helpers to handle Row IDs.
*/
object RowId {

/**
* Returns whether Row IDs can be written to Delta tables and read from Delta tables. This acts as
* a feature flag during development: every Row ID code path should be hidden behind this flag and
* behave as if Row IDs didn't exist when this returns false to avoid leaking an incomplete
* implementation.
*/
def rowIdsAllowed(spark: SparkSession): Boolean = {
spark.conf.get(DeltaSQLConf.ROW_IDS_ALLOWED)
}

/**
* Returns whether the protocol version supports the Row ID table feature. Whenever Row IDs are
* supported, fresh Row IDs must be assigned to all newly committed files, even when Row IDs are
* disabled in the current table version.
*/
def rowIdsSupported(protocol: Protocol): Boolean = {
protocol.isFeatureSupported(RowIdFeature)
}

/**
* Returns whether Row IDs are enabled on this table version. Checks that Row IDs are supported,
* which is a pre-requisite for enabling Row IDs, throws an error if not.
*/
def rowIdsEnabled(protocol: Protocol, metadata: Metadata): Boolean = {
val isEnabled = DeltaConfigs.ROW_IDS_ENABLED.fromMetaData(metadata)
if (isEnabled && !rowIdsSupported(protocol)) {
throw new IllegalStateException(s"Table property '${DeltaConfigs.ROW_IDS_ENABLED.key}' is" +
s"set on the table but this table version doesn't support table feature " +
s"'${propertyKey(RowIdFeature)}'.")
}
isEnabled
}

/**
* Marks row IDs as readable if the row ID writer feature is enabled on a new table and
* verifies that row IDs are only set as readable when a new table is created.
*/
private[delta] def verifyAndUpdateMetadata(
spark: SparkSession,
protocol: Protocol,
oldMetadata: Metadata,
newMetadata: Metadata,
isCreatingNewTable: Boolean): Metadata = {
if (!rowIdsAllowed(spark)) return newMetadata
val latestMetadata = if (isCreatingNewTable && rowIdsSupported(protocol)) {
val newConfig = newMetadata.configuration + (DeltaConfigs.ROW_IDS_ENABLED.key -> "true")
newMetadata.copy(configuration = newConfig)
} else {
newMetadata
}

val rowIdsEnabledBefore = rowIdsEnabled(protocol, oldMetadata)
val rowIdsEnabledAfter = rowIdsEnabled(protocol, latestMetadata)

if (rowIdsEnabledAfter && !rowIdsEnabledBefore && !isCreatingNewTable) {
throw new UnsupportedOperationException(
"Cannot enable Row IDs on an existing table.")
}
latestMetadata
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,9 @@ object TableFeature {
TestWriterMetadataNoAutoUpdateFeature,
TestReaderWriterFeature,
TestReaderWriterMetadataAutoUpdateFeature,
TestReaderWriterMetadataNoAutoUpdateFeature)
TestReaderWriterMetadataNoAutoUpdateFeature,
// Row IDs are still under development and only available in testing.
RowIdFeature)
}
val featureMap = features.map(f => f.name.toLowerCase(Locale.ROOT) -> f).toMap
require(features.size == featureMap.size, "Lowercase feature names must not duplicate.")
Expand Down Expand Up @@ -336,6 +338,15 @@ object DeletionVectorsTableFeature
}
}

object RowIdFeature extends WriterFeature(name = "rowIds")
with FeatureAutomaticallyEnabledByMetadata {
override def automaticallyUpdateProtocolOfExistingTables: Boolean = true

override def metadataRequiresFeatureToBeEnabled(
metadata: Metadata,
spark: SparkSession): Boolean = DeltaConfigs.ROW_IDS_ENABLED.fromMetaData(metadata)
}


/**
* Features below are for testing only, and are being registered to the system only in the testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@

package org.apache.spark.sql.delta.sources

// scalastyle:off import.ordering.noEmptyLine
import java.util.Locale
import java.util.concurrent.TimeUnit

import org.apache.spark.internal.config.ConfigBuilder
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

/**
* [[SQLConf]] entries for Delta features.
Expand Down Expand Up @@ -1123,6 +1123,19 @@ trait DeltaSQLConfBase {
.booleanConf
.createWithDefault(false)

val ROW_IDS_ALLOWED =
buildConf("rowIds.allowForDevOnly")
.internal()
.doc(
"""Controls whether Row Ids can be written to Delta tables and read from Delta tables.
|This flag should always be false for now, except in tests. Row Ids are an in-development
|feature and this flag ensures that we never try to read row ids using a partial
|implementation.
""".stripMargin)
.booleanConf
.checkValue(v => !v || Utils.isTesting, "Row Ids are only allowed in testing.")
.createWithDefault(false)

val DELTA_OPTIMIZE_MAX_DELETED_ROWS_RATIO =
buildConf("optimize.maxDeletedRowsRatio")
.internal()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.rowid

import org.apache.spark.sql.delta.{DeltaLog, RowId}
import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils.TABLE_FEATURES_MIN_WRITER_VERSION

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.test.SharedSparkSession

class RowIdSuite extends QueryTest
with SharedSparkSession
with RowIdTestUtils {
test("Creating a new table with row ID table feature sets row IDs as readable") {
withRowIdsEnabled(enabled = false) {
withTable("tbl") {
spark.range(10).write.format("delta")
.option(rowIdFeatureName, "supported").saveAsTable("tbl")

val log = DeltaLog.forTable(spark, TableIdentifier("tbl"))
assert(RowId.rowIdsEnabled(log.update().protocol, log.update().metadata))
}
}
}

test("Enabling row IDs on existing table does not set row IDs as readable") {
withRowIdsEnabled(enabled = false) {
withTable("tbl") {
spark.range(10).write.format("delta")
.saveAsTable("tbl")

sql(
s"""
|ALTER TABLE tbl
|SET TBLPROPERTIES (
|'$rowIdFeatureName' = 'supported',
|'delta.minWriterVersion' = $TABLE_FEATURES_MIN_WRITER_VERSION)""".stripMargin)

val log = DeltaLog.forTable(spark, TableIdentifier("tbl"))
assert(RowId.rowIdsSupported(log.update().protocol))
assert(!RowId.rowIdsEnabled(log.update().protocol, log.update().metadata))
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.rowid

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.delta.RowIdFeature
import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils.{defaultPropertyKey, propertyKey}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest

import org.apache.spark.SparkConf
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.test.SharedSparkSession

trait RowIdTestUtils extends QueryTest
with SharedSparkSession
with DeltaSQLCommandTest {

val rowIdFeatureName: String = propertyKey(RowIdFeature)
val defaultRowIdFeatureProperty: String = defaultPropertyKey(RowIdFeature)

override protected def sparkConf: SparkConf =
super.sparkConf.set(DeltaSQLConf.ROW_IDS_ALLOWED.key, "true")

def withRowIdsEnabled(enabled: Boolean)(f: => Unit): Unit = {
// Even when we don't want Row Ids on created tables, we want to enable code paths that
// interact with them, which is controlled by this config.
assert(spark.conf.get(DeltaSQLConf.ROW_IDS_ALLOWED.key) == "true")
val configPairs = if (enabled) Seq(defaultRowIdFeatureProperty -> "enabled") else Seq.empty
withSQLConf(configPairs: _*)(f)
}
}

0 comments on commit 8272ee9

Please sign in to comment.