diff --git a/spark/src/main/resources/error/delta-error-classes.json b/spark/src/main/resources/error/delta-error-classes.json
index 3749045beb3..1f7f6faf4ad 100644
--- a/spark/src/main/resources/error/delta-error-classes.json
+++ b/spark/src/main/resources/error/delta-error-classes.json
@@ -2480,7 +2480,7 @@
"Unable to update table redirection state: Invalid state transition attempted.",
"The Delta table '
' cannot change from '' to ''."
],
- "sqlState" : "KD007"
+ "sqlState" : "22023"
},
"DELTA_TABLE_INVALID_REMOVE_TABLE_REDIRECT" : {
"message" : [
@@ -2488,6 +2488,12 @@
],
"sqlState" : "KD007"
},
+ "DELTA_TABLE_INVALID_SET_UNSET_REDIRECT" : {
+ "message" : [
+ "Unable to SET or UNSET redirect property on : current property '' mismatches with new property ''."
+ ],
+ "sqlState" : "22023"
+ },
"DELTA_TABLE_LOCATION_MISMATCH" : {
"message" : [
"The location of the existing table is . It doesn't match the specified location ."
@@ -2512,6 +2518,12 @@
],
"sqlState" : "0AKDD"
},
+ "DELTA_TABLE_UNRECOGNIZED_REDIRECT_SPEC" : {
+ "message" : [
+ "The Delta log contains unrecognized table redirect spec ''."
+ ],
+ "sqlState" : "42704"
+ },
"DELTA_TARGET_TABLE_FINAL_SCHEMA_EMPTY" : {
"message" : [
"Target table final schema is empty."
diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
index 57bdec6c070..1c59932b755 100644
--- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.delta.hooks.AutoCompactType
import org.apache.spark.sql.delta.hooks.PostCommitHook
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.redirect.NoRedirectRule
+import org.apache.spark.sql.delta.redirect.RedirectSpec
import org.apache.spark.sql.delta.redirect.RedirectState
import org.apache.spark.sql.delta.schema.{DeltaInvariantViolationException, InvariantViolationException, SchemaUtils, UnsupportedDataTypeInfo}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
@@ -357,18 +358,34 @@ trait DeltaErrorsBase
)
}
+ def unrecognizedRedirectSpec(spec: RedirectSpec): Throwable = {
+ new DeltaIllegalStateException(
+ errorClass = "DELTA_TABLE_UNRECOGNIZED_REDIRECT_SPEC",
+ messageParameters = Array(spec.toString)
+ )
+ }
+
+ def invalidSetUnSetRedirectCommand(
+ table: String,
+ newProperty: String,
+ existingProperty: String): Throwable = {
+ new DeltaIllegalStateException(
+ errorClass = "DELTA_TABLE_INVALID_SET_UNSET_REDIRECT",
+ messageParameters = Array(table, existingProperty, newProperty)
+ )
+ }
+
def invalidRedirectStateTransition(
table: String,
oldState: RedirectState,
- newState: RedirectState): Unit = {
+ newState: RedirectState): Throwable = {
new DeltaIllegalStateException(
errorClass = "DELTA_TABLE_INVALID_REDIRECT_STATE_TRANSITION",
- messageParameters = Array(
- table, table, oldState.name, newState.name)
+ messageParameters = Array(table, oldState.name, newState.name)
)
}
- def invalidRemoveTableRedirect(table: String, currentState: RedirectState): Unit = {
+ def invalidRemoveTableRedirect(table: String, currentState: RedirectState): Throwable = {
new DeltaIllegalStateException(
errorClass = "DELTA_TABLE_INVALID_REMOVE_TABLE_REDIRECT",
messageParameters = Array(table, table, currentState.name)
@@ -376,7 +393,7 @@ trait DeltaErrorsBase
}
def invalidCommitIntermediateRedirectState(state: RedirectState): Throwable = {
- throw new DeltaIllegalStateException (
+ new DeltaIllegalStateException (
errorClass = "DELTA_COMMIT_INTERMEDIATE_REDIRECT_STATE",
messageParameters = Array(state.name)
)
@@ -385,7 +402,7 @@ trait DeltaErrorsBase
def noRedirectRulesViolated(
op: DeltaOperations.Operation,
noRedirectRules: Set[NoRedirectRule]): Throwable = {
- throw new DeltaIllegalStateException (
+ new DeltaIllegalStateException (
errorClass = "DELTA_NO_REDIRECT_RULES_VIOLATED",
messageParameters =
Array(op.name, noRedirectRules.map("\"" + _ + "\"").mkString("[", ",\n", "]"))
diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala
index 1260c348f87..dbeafc20ffc 100644
--- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.delta.commands.WriteIntoDelta
import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsUtils
import org.apache.spark.sql.delta.files.{TahoeBatchFileIndex, TahoeLogFileIndex}
import org.apache.spark.sql.delta.metering.DeltaLogging
+import org.apache.spark.sql.delta.redirect.RedirectFeature
import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils}
import org.apache.spark.sql.delta.sources._
import org.apache.spark.sql.delta.storage.LogStoreProvider
@@ -80,7 +81,7 @@ class DeltaLog private(
val options: Map[String, String],
val allOptions: Map[String, String],
val clock: Clock,
- initialCatalogTable: Option[CatalogTable]
+ val initialCatalogTable: Option[CatalogTable]
) extends Checkpoints
with MetadataCleanup
with LogStoreProvider
@@ -160,6 +161,7 @@ class DeltaLog private(
/** The unique identifier for this table. */
def tableId: String = unsafeVolatileMetadata.id // safe because table id never changes
+ def getInitialCatalogTable: Option[CatalogTable] = initialCatalogTable
/**
* Combines the tableId with the path of the table to ensure uniqueness. Normally `tableId`
* should be globally unique, but nothing stops users from copying a Delta table directly to
@@ -891,6 +893,29 @@ object DeltaLog extends DeltaLogging {
(deltaLog, Some(table))
}
+ /**
+ * Helper method for transforming a given delta log path to the consistent formal path format.
+ */
+ def formalizeDeltaPath(
+ spark: SparkSession,
+ options: Map[String, String],
+ rootPath: Path): Path = {
+ val fileSystemOptions: Map[String, String] =
+ if (spark.sessionState.conf.getConf(
+ DeltaSQLConf.LOAD_FILE_SYSTEM_CONFIGS_FROM_DATAFRAME_OPTIONS)) {
+ options.filterKeys { k =>
+ DeltaTableUtils.validDeltaTableHadoopPrefixes.exists(k.startsWith)
+ }.toMap
+ } else {
+ Map.empty
+ }
+ // scalastyle:off deltahadoopconfiguration
+ val hadoopConf = spark.sessionState.newHadoopConfWithOptions(fileSystemOptions)
+ // scalastyle:on deltahadoopconfiguration
+ val fs = rootPath.getFileSystem(hadoopConf)
+ fs.makeQualified(rootPath)
+ }
+
/**
* Helper function to be used with the forTableWithSnapshot calls. Thunk is a
* partially applied DeltaLog.forTable call, which we can then wrap around with a
@@ -929,14 +954,14 @@ object DeltaLog extends DeltaLogging {
// scalastyle:on deltahadoopconfiguration
val fs = rawPath.getFileSystem(hadoopConf)
val path = fs.makeQualified(rawPath)
- def createDeltaLog(): DeltaLog = recordDeltaOperation(
+ def createDeltaLog(tablePath: Path = path): DeltaLog = recordDeltaOperation(
null,
"delta.log.create",
- Map(TAG_TAHOE_PATH -> path.getParent.toString)) {
+ Map(TAG_TAHOE_PATH -> tablePath.getParent.toString)) {
AnalysisHelper.allowInvokingTransformsInAnalyzer {
new DeltaLog(
- logPath = path,
- dataPath = path.getParent,
+ logPath = tablePath,
+ dataPath = tablePath.getParent,
options = fileSystemOptions,
allOptions = options,
clock = clock,
@@ -966,15 +991,51 @@ object DeltaLog extends DeltaLogging {
}
}
- val deltaLog = getDeltaLogFromCache
- if (Option(deltaLog.sparkContext.get).map(_.isStopped).getOrElse(true)) {
- // Invalid the cached `DeltaLog` and create a new one because the `SparkContext` of the cached
- // `DeltaLog` has been stopped.
- getOrCreateCache(spark.sessionState.conf).invalidate(cacheKey)
- getDeltaLogFromCache
- } else {
- deltaLog
+ def initializeDeltaLog(): DeltaLog = {
+ val deltaLog = getDeltaLogFromCache
+ if (Option(deltaLog.sparkContext.get).forall(_.isStopped)) {
+ // Invalid the cached `DeltaLog` and create a new one because the `SparkContext` of the
+ // cached `DeltaLog` has been stopped.
+ getOrCreateCache(spark.sessionState.conf).invalidate(path -> fileSystemOptions)
+ getDeltaLogFromCache
+ } else {
+ deltaLog
+ }
}
+
+ val deltaLog = initializeDeltaLog()
+ // The deltaLog object may be cached while other session updates table redirect property.
+ // To avoid this potential race condition, we would add a validation inside deltaLog.update
+ // method to ensure deltaLog points to correct place after snapshot is updated.
+ val redirectConfigOpt = RedirectFeature.needDeltaLogRedirect(
+ spark,
+ deltaLog,
+ initialCatalogTable
+ )
+ redirectConfigOpt.map { redirectConfig =>
+ val (redirectLoc, catalogTableOpt) = RedirectFeature
+ .getRedirectLocationAndTable(spark, deltaLog, redirectConfig)
+ val formalizedPath = formalizeDeltaPath(spark, options, redirectLoc)
+ // with redirect prefix to prevent interference between redirection and normal access.
+ val redirectKey = new Path(RedirectFeature.DELTALOG_PREFIX, redirectLoc)
+ val deltaLogCacheKey = DeltaLogCacheKey(
+ redirectKey,
+ fileSystemOptions)
+ getOrCreateCache(spark.sessionState.conf).get(
+ deltaLogCacheKey,
+ () => {
+ var redirectedDeltaLog = new DeltaLog(
+ logPath = formalizedPath,
+ dataPath = formalizedPath.getParent,
+ options = fileSystemOptions,
+ allOptions = options,
+ clock = clock,
+ initialCatalogTable = catalogTableOpt
+ )
+ redirectedDeltaLog
+ }
+ )
+ }.getOrElse(deltaLog)
}
/** Invalidate the cached DeltaLog object for the given `dataPath`. */
diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala
index 48062bf9e2a..4643b0a856d 100644
--- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala
@@ -1276,6 +1276,13 @@ trait OptimisticTransactionImpl extends DeltaTransaction
op: DeltaOperations.Operation,
redirectConfig: TableRedirectConfiguration
): Unit = {
+ // If this transaction commits to the redirect destination location, then there is no
+ // need to validate the subsequent no-redirect rules.
+ val configuration = deltaLog.newDeltaHadoopConf()
+ val dataPath = snapshot.deltaLog.dataPath.toUri.getPath
+ val catalog = spark.sessionState.catalog
+ val isRedirectDest = redirectConfig.spec.isRedirectDest(catalog, configuration, dataPath)
+ if (isRedirectDest) return
// Find all rules that match with the current application name.
// If appName is not present, its no-redirect-rule are included.
// If appName is present, includes its no-redirect-rule only when appName
diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalog.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalog.scala
index 6d827928b69..0106135dbc8 100644
--- a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalog.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalog.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.delta.commands._
import org.apache.spark.sql.delta.constraints.{AddConstraint, DropConstraint}
import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.metering.DeltaLogging
+import org.apache.spark.sql.delta.redirect.RedirectFeature
import org.apache.spark.sql.delta.schema.SchemaUtils
import org.apache.spark.sql.delta.sources.{DeltaDataSource, DeltaSourceUtils, DeltaSQLConf}
import org.apache.spark.sql.delta.stats.StatisticsCollection
@@ -651,6 +652,17 @@ class DeltaCatalog extends DelegatingCatalogExtension
case s: SetProperty if s.property() == "location" => classOf[SetLocation]
case c => c.getClass
}
+ // Determines whether this DDL SET or UNSET the table redirect property. If it is, the table
+ // redirect feature should be disabled to ensure the DDL can be applied onto the source or
+ // destination table properly.
+ val isUpdateTableRedirectDDL = grouped.map {
+ case (t, s: Seq[RemoveProperty]) if t == classOf[RemoveProperty] =>
+ s.map { prop => prop.property() }.exists(RedirectFeature.isRedirectProperty)
+ case (t, s: Seq[SetProperty]) if t == classOf[SetProperty] =>
+ RedirectFeature.hasRedirectConfig(s.map(prop => prop.property() -> prop.value()).toMap)
+ case (_, _) => false
+ }.toSeq.exists(a => a)
+ RedirectFeature.withUpdateTableRedirectDDL(isUpdateTableRedirectDDL) {
val table = loadTable(ident) match {
case deltaTable: DeltaTableV2 => deltaTable
case _ if changes.exists(_.isInstanceOf[ClusterBy]) =>
@@ -884,6 +896,7 @@ class DeltaCatalog extends DelegatingCatalogExtension
loadTable(ident)
}
+ }
// We want our catalog to handle Delta, therefore for other data sources that want to be
// created, we just have this wrapper StagedTable to only drop the table if the commit fails.
diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala
index a4bb58700e2..cafa3ec2d99 100644
--- a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.delta.commands.cdc.CDCReader
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.{DeltaDataSource, DeltaSourceUtils}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.spark.sql.delta.sources.DeltaSQLConf.ENABLE_TABLE_REDIRECT_FEATURE
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession}
@@ -37,7 +38,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{ResolvedTable, UnresolvedTable}
import org.apache.spark.sql.catalyst.analysis.UnresolvedTableImplicits._
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.catalyst.plans.logical.{AnalysisHelper, LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability, TableCatalog, V2TableWithV1Fallback}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
@@ -58,27 +59,41 @@ import org.apache.spark.util.{Clock, SystemClock}
* @param path The path to the table
* @param tableIdentifier The table identifier for this table
*/
-case class DeltaTableV2(
- spark: SparkSession,
- path: Path,
- catalogTable: Option[CatalogTable] = None,
- tableIdentifier: Option[String] = None,
- timeTravelOpt: Option[DeltaTimeTravelSpec] = None,
- options: Map[String, String] = Map.empty)
+class DeltaTableV2 private[delta](
+ val spark: SparkSession,
+ val path: Path,
+ val catalogTable: Option[CatalogTable],
+ val tableIdentifier: Option[String],
+ val timeTravelOpt: Option[DeltaTimeTravelSpec],
+ val options: Map[String, String])
extends Table
with SupportsWrite
with V2TableWithV1Fallback
with DeltaLogging {
- private lazy val (rootPath, partitionFilters, timeTravelByPath) = {
+ case class PathInfo(
+ rootPath: Path,
+ private[delta] var partitionFilters: Seq[(String, String)],
+ private[delta] var timeTravelByPath: Option[DeltaTimeTravelSpec]
+ )
+
+ private lazy val pathInfo: PathInfo = {
if (catalogTable.isDefined) {
// Fast path for reducing path munging overhead
- (new Path(catalogTable.get.location), Nil, None)
+ PathInfo(new Path(catalogTable.get.location), Seq.empty, None)
} else {
- DeltaDataSource.parsePathIdentifier(spark, path.toString, options)
+ val (rootPath, filters, timeTravel) =
+ DeltaDataSource.parsePathIdentifier(spark, path.toString, options)
+ PathInfo(rootPath, filters, timeTravel)
}
}
+ private def rootPath = pathInfo.rootPath
+
+ private def partitionFilters = pathInfo.partitionFilters
+
+ private def timeTravelByPath = pathInfo.timeTravelByPath
+
def hasPartitionFilters: Boolean = partitionFilters.nonEmpty
@@ -363,9 +378,86 @@ case class DeltaTableV2(
None
}
}
+
+ def copy(
+ spark: SparkSession = this.spark,
+ path: Path = this.path,
+ catalogTable: Option[CatalogTable] = this.catalogTable,
+ tableIdentifier: Option[String] = this.tableIdentifier,
+ timeTravelOpt: Option[DeltaTimeTravelSpec] = this.timeTravelOpt,
+ options: Map[String, String] = this.options
+ ): DeltaTableV2 = {
+ val deltaTableV2 =
+ new DeltaTableV2(spark, path, catalogTable, tableIdentifier, timeTravelOpt, options)
+ deltaTableV2.pathInfo.timeTravelByPath = timeTravelByPath
+ deltaTableV2.pathInfo.partitionFilters = partitionFilters
+ deltaTableV2
+ }
+
+ override def toString: String =
+ s"DeltaTableV2($spark,$path,$catalogTable,$tableIdentifier,$timeTravelOpt,$options)"
}
object DeltaTableV2 {
+ def unapply(deltaTable: DeltaTableV2): Option[(
+ SparkSession,
+ Path,
+ Option[CatalogTable],
+ Option[String],
+ Option[DeltaTimeTravelSpec],
+ Map[String, String])
+ ] = {
+ Some((
+ deltaTable.spark,
+ deltaTable.path,
+ deltaTable.catalogTable,
+ deltaTable.tableIdentifier,
+ deltaTable.timeTravelOpt,
+ deltaTable.options)
+ )
+ }
+
+ def apply(
+ spark: SparkSession,
+ path: Path,
+ catalogTable: Option[CatalogTable] = None,
+ tableIdentifier: Option[String] = None,
+ options: Map[String, String] = Map.empty[String, String],
+ timeTravelOpt: Option[DeltaTimeTravelSpec] = None
+ ): DeltaTableV2 = {
+ val deltaTable = new DeltaTableV2(
+ spark,
+ path,
+ catalogTable = catalogTable,
+ tableIdentifier = tableIdentifier,
+ timeTravelOpt = timeTravelOpt,
+ options = options
+ )
+ if (spark == null || spark.sessionState == null ||
+ !spark.sessionState.conf.getConf(ENABLE_TABLE_REDIRECT_FEATURE)) {
+ return deltaTable
+ }
+ // This following code ensure passing the path and catalogTable of the redirected table object.
+ // Note: the DeltaTableV2 can only be created using this method.
+ AnalysisHelper.allowInvokingTransformsInAnalyzer {
+ val deltaLog = deltaTable.deltaLog
+ val rootDeltaLogPath = DeltaLog.logPathFor(deltaTable.rootPath.toString)
+ val finalDeltaLogPath = DeltaLog.formalizeDeltaPath(spark, options, rootDeltaLogPath)
+ val catalogTableOpt = if (finalDeltaLogPath == deltaLog.logPath) {
+ // If there is no redirection, use existing catalogTable.
+ catalogTable
+ } else {
+ // If there is redirection, use the catalogTable of deltaLog.
+ deltaLog.getInitialCatalogTable
+ }
+ val tableIdentifier = catalogTableOpt.map(_.identifier.identifier)
+ val newPath = new Path(deltaLog.dataPath.toUri.getPath)
+ deltaTable.copy(
+ path = newPath, catalogTable = catalogTableOpt, tableIdentifier = tableIdentifier
+ )
+ }
+ }
+
/** Resolves a path into a DeltaTableV2, leveraging standard v2 table resolution. */
def apply(spark: SparkSession, tablePath: Path, options: Map[String, String], cmd: String)
: DeltaTableV2 = {
diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala
index f684714c39f..f51d13a4a13 100644
--- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.delta.commands.columnmapping.RemoveColumnMappingComm
import org.apache.spark.sql.delta.constraints.{CharVarcharConstraint, Constraints}
import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsUtils
import org.apache.spark.sql.delta.logging.DeltaLogKeys
+import org.apache.spark.sql.delta.redirect.RedirectFeature
import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils}
import org.apache.spark.sql.delta.schema.SchemaUtils.transformSchema
import org.apache.spark.sql.delta.sources.DeltaSQLConf
@@ -166,6 +167,8 @@ case class AlterTableSetPropertiesDeltaCommand(
CoordinatedCommitsUtils.validateConfigurationsForAlterTableSetPropertiesDeltaCommand(
metadata.configuration, filteredConfs)
+ // If table redirect feature is updated, validates its property.
+ RedirectFeature.validateTableRedirect(txn.snapshot, table.catalogTable, configuration)
val newMetadata = metadata.copy(
description = configuration.getOrElse(TableCatalog.PROP_COMMENT, metadata.description),
configuration = metadata.configuration ++ filteredConfs)
diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/redirect/TableRedirect.scala b/spark/src/main/scala/org/apache/spark/sql/delta/redirect/TableRedirect.scala
index c9f64c656d9..3f889955396 100644
--- a/spark/src/main/scala/org/apache/spark/sql/delta/redirect/TableRedirect.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/delta/redirect/TableRedirect.scala
@@ -16,8 +16,12 @@
package org.apache.spark.sql.delta.redirect
+import java.util.UUID
+
import scala.reflect.ClassTag
+import scala.util.DynamicVariable
+// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.delta.{
DeltaConfig,
DeltaConfigs,
@@ -28,14 +32,22 @@ import org.apache.spark.sql.delta.{
RedirectWriterOnlyFeature,
Snapshot
}
+import org.apache.spark.sql.delta.DeltaLog.logPathFor
import org.apache.spark.sql.delta.actions.Metadata
+import org.apache.spark.sql.delta.catalog.DeltaTableV2
+import org.apache.spark.sql.delta.sources.DeltaSQLConf.ENABLE_TABLE_REDIRECT_FEATURE
import org.apache.spark.sql.delta.util.JsonUtils
import com.fasterxml.jackson.annotation.JsonIgnore
import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
/**
* The table redirection feature includes specific states that manage the behavior of Delta clients
@@ -106,7 +118,12 @@ case object DropRedirectInProgress extends RedirectState {
* This is the abstract class of the redirect specification, which stores the information
* of accessing the redirect destination table.
*/
-abstract class RedirectSpec()
+abstract class RedirectSpec() {
+ /** Determine whether `dataPath` is the redirect destination location. */
+ def isRedirectDest(catalog: SessionCatalog, config: Configuration, dataPath: String): Boolean
+ /** Determine whether `dataPath` is the redirect source location. */
+ def isRedirectSource(dataPath: String): Boolean
+}
/**
* The default redirect spec that is used for OSS delta.
@@ -115,12 +132,24 @@ abstract class RedirectSpec()
* {
* ......
* "spec": {
- * "tablePath": "s3:///tables/"
+ * "redirectSrc": "s3:///tables/"
+ * "redirectDest": "s3:///tables/"
* }
* }
- * @param tablePath this is the path where stores the redirect destination table's location.
+ *
+ * @param sourcePath this is the path where stores the redirect source table's location.
+ * @param destPath: this is the path where stores the redirect destination table's location.
*/
-class PathBasedRedirectSpec(val tablePath: String) extends RedirectSpec
+class PathBasedRedirectSpec(
+ val sourcePath: String,
+ val destPath: String
+) extends RedirectSpec {
+ def isRedirectDest(catalog: SessionCatalog, config: Configuration, dataPath: String): Boolean = {
+ destPath == dataPath
+ }
+
+ def isRedirectSource(dataPath: String): Boolean = sourcePath == dataPath
+}
object PathBasedRedirectSpec {
/**
@@ -221,6 +250,35 @@ case class TableRedirectConfiguration(
val isInProgressState: Boolean = {
redirectState == EnableRedirectInProgress || redirectState == DropRedirectInProgress
}
+
+ /** Determines whether the current application fulfills the no-redirect rules. */
+ private def isNoRedirectApp(spark: SparkSession): Boolean = {
+ noRedirectRules.exists { rule =>
+ // If rule.appName is empty, then it applied to "spark.app.name"
+ // Todo(LC-6953): The operation name should also be taken into consideration.
+ rule.appName.forall(_.equalsIgnoreCase(spark.conf.get("spark.app.name")))
+ }
+ }
+
+ /** Determines whether the current session needs to access the redirect dest location. */
+ def needRedirect(spark: SparkSession, logPath: Path): Boolean = {
+ !isNoRedirectApp(spark) &&
+ redirectState == RedirectReady &&
+ spec.isRedirectSource(logPath.toUri.getPath)
+ }
+
+ /**
+ * Get the redirect destination location from `deltaLog` object.
+ */
+ def getRedirectLocation(deltaLog: DeltaLog, spark: SparkSession): Path = {
+ spec match {
+ case spec: PathBasedRedirectSpec =>
+ val location = new Path(spec.destPath)
+ val fs = location.getFileSystem(deltaLog.newDeltaHadoopConf())
+ fs.makeQualified(logPathFor(location))
+ case other => throw DeltaErrors.unrecognizedRedirectSpec(other)
+ }
+ }
}
/**
@@ -238,9 +296,7 @@ class TableRedirect(val config: DeltaConfig[Option[String]]) {
*/
def getRedirectConfiguration(deltaLogMetadata: Metadata): Option[TableRedirectConfiguration] = {
config.fromMetaData(deltaLogMetadata).map { propertyValue =>
- val mapper = new ObjectMapper()
- mapper.registerModule(DefaultScalaModule)
- mapper.readValue(propertyValue, classOf[TableRedirectConfiguration])
+ RedirectFeature.parseRedirectConfiguration(propertyValue)
}
}
@@ -289,23 +345,12 @@ class TableRedirect(val config: DeltaConfig[Option[String]]) {
}
// There should be an existing table redirect configuration.
if (currentConfigOpt.isEmpty) {
- DeltaErrors.invalidRedirectStateTransition(tableIdent, NoRedirect, state)
+ throw DeltaErrors.invalidRedirectStateTransition(tableIdent, NoRedirect, state)
}
val currentConfig = currentConfigOpt.get
val redirectState = currentConfig.redirectState
- state match {
- case RedirectReady =>
- if (redirectState != EnableRedirectInProgress && redirectState != RedirectReady) {
- DeltaErrors.invalidRedirectStateTransition(tableIdent, redirectState, state)
- }
- case DropRedirectInProgress =>
- if (redirectState != RedirectReady) {
- DeltaErrors.invalidRedirectStateTransition(tableIdent, redirectState, state)
- }
- case _ =>
- DeltaErrors.invalidRedirectStateTransition(tableIdent, redirectState, state)
- }
+ RedirectFeature.validateStateTransition(tableIdent, redirectState, state)
val properties = generateRedirectMetadata(currentConfig.`type`, state, spec, noRedirectRules)
val newConfigs = txn.metadata.configuration ++ properties
val newMetadata = txn.metadata.copy(configuration = newConfigs)
@@ -333,7 +378,7 @@ class TableRedirect(val config: DeltaConfig[Option[String]]) {
val txn = deltaLog.startTransaction(catalogTableOpt)
val snapshot = txn.snapshot
getRedirectConfiguration(snapshot.metadata).foreach { currentConfig =>
- DeltaErrors.invalidRedirectStateTransition(
+ throw DeltaErrors.invalidRedirectStateTransition(
catalogTableOpt.map(_.identifier.quotedString).getOrElse {
s"delta.`${deltaLog.dataPath.toString}`"
},
@@ -406,6 +451,52 @@ object RedirectFeature {
RedirectWriterOnly.isFeatureSupported(snapshot)
}
+ private def getRedirectConfigurationFromDeltaLog(
+ spark: SparkSession,
+ deltaLog: DeltaLog,
+ initialCatalogTable: Option[CatalogTable]
+ ): Option[TableRedirectConfiguration] = {
+ val snapshot = deltaLog.update(
+ catalogTableOpt = initialCatalogTable
+ )
+ getRedirectConfiguration(snapshot.getProperties.toMap)
+ }
+
+ /**
+ * This is the main method that redirect `deltaLog` to the destination location.
+ */
+ def getRedirectLocationAndTable(
+ spark: SparkSession,
+ deltaLog: DeltaLog,
+ redirectConfig: TableRedirectConfiguration
+ ): (Path, Option[CatalogTable]) = {
+ // Try to get the catalogTable object for the redirect destination table.
+ val catalogTableOpt = redirectConfig.spec match {
+ case pathRedirect: PathBasedRedirectSpec =>
+ withUpdateTableRedirectDDL(updateTableRedirectDDL = true) {
+ import spark.sessionState.analyzer.CatalogAndIdentifier
+ val CatalogAndIdentifier(catalog, ident) = Seq("delta", pathRedirect.destPath)
+ catalog.asTableCatalog.loadTable(ident).asInstanceOf[DeltaTableV2].catalogTable
+ }
+ }
+ // Get the redirect destination location.
+ val redirectLocation = redirectConfig.getRedirectLocation(deltaLog, spark)
+ (redirectLocation, catalogTableOpt)
+ }
+
+ def parseRedirectConfiguration(configString: String): TableRedirectConfiguration = {
+ val mapper = new ObjectMapper()
+ mapper.registerModule(DefaultScalaModule)
+ mapper.readValue(configString, classOf[TableRedirectConfiguration])
+ }
+
+ def getRedirectConfiguration(
+ properties: Map[String, String]): Option[TableRedirectConfiguration] = {
+ properties.get(DeltaConfigs.REDIRECT_READER_WRITER.key)
+ .orElse(properties.get(DeltaConfigs.REDIRECT_WRITER_ONLY.key))
+ .map(parseRedirectConfiguration)
+ }
+
/**
* Determine whether the operation `op` updates the existing redirect-reader-writer or
* redirect-writer-only table property of a table with `snapshot`.
@@ -431,4 +522,136 @@ object RedirectFeature {
RedirectReaderWriter.getRedirectConfiguration(snapshot.metadata)
}
}
+
+ /** Determines whether `configs` contains redirect configuration. */
+ def hasRedirectConfig(configs: Map[String, String]): Boolean =
+ getRedirectConfiguration(configs).isDefined
+
+ /** Determines whether the property `name` is redirect property. */
+ def isRedirectProperty(name: String): Boolean = {
+ name == DeltaConfigs.REDIRECT_READER_WRITER.key || name == DeltaConfigs.REDIRECT_WRITER_ONLY.key
+ }
+
+ // Helper method to validate state transitions
+ def validateStateTransition(
+ identifier: String,
+ currentState: RedirectState,
+ newState: RedirectState
+ ): Unit = {
+ (currentState, newState) match {
+ case (state, RedirectReady) =>
+ if (state == DropRedirectInProgress) {
+ throw DeltaErrors.invalidRedirectStateTransition(identifier, state, newState)
+ }
+ case (state, DropRedirectInProgress) =>
+ if (state != RedirectReady) {
+ throw DeltaErrors.invalidRedirectStateTransition(identifier, state, newState)
+ }
+ case (state, _) =>
+ throw DeltaErrors.invalidRedirectStateTransition(identifier, state, newState)
+ }
+ }
+
+ /** Determine whether the current `deltaLog` needs to skip redirect feature. */
+ def needDeltaLogRedirect(
+ spark: SparkSession,
+ deltaLog: DeltaLog,
+ initialCatalogTable: Option[CatalogTable]
+ ): Option[TableRedirectConfiguration] = {
+ // It can skip redirect, if the table fulfills any of the following conditions:
+ // - redirect feature is not enable,
+ // - current command is an DDL that updates table redirect property, or
+ // - deltaLog doesn't have valid table.
+ val canSkipTableRedirect = !spark.conf.get(ENABLE_TABLE_REDIRECT_FEATURE) ||
+ isUpdateTableRedirectDDL.value ||
+ !deltaLog.tableExists
+ if (canSkipTableRedirect) return None
+
+ val redirectConfigOpt = getRedirectConfigurationFromDeltaLog(
+ spark,
+ deltaLog,
+ initialCatalogTable
+ )
+ val needRedirectToDest = redirectConfigOpt.exists { redirectConfig =>
+ // If the current deltaLog already points to destination, early returns since
+ // no need to redirect deltaLog.
+ redirectConfig.needRedirect(spark, deltaLog.dataPath)
+ }
+ if (needRedirectToDest) redirectConfigOpt else None
+ }
+
+ def validateTableRedirect(
+ snapshot: Snapshot,
+ catalogTable: Option[CatalogTable],
+ configs: Map[String, String]
+ ): Unit = {
+ val identifier = catalogTable
+ .map(_.identifier.quotedString)
+ .getOrElse(s"delta.`${snapshot.deltaLog.logPath.toString}`")
+ if (configs.contains(DeltaConfigs.REDIRECT_READER_WRITER.key)) {
+ if (RedirectWriterOnly.isFeatureSet(snapshot.metadata)) {
+ throw DeltaErrors.invalidSetUnSetRedirectCommand(
+ identifier,
+ DeltaConfigs.REDIRECT_READER_WRITER.key,
+ DeltaConfigs.REDIRECT_WRITER_ONLY.key
+ )
+ }
+ } else if (configs.contains(DeltaConfigs.REDIRECT_WRITER_ONLY.key)) {
+ if (RedirectReaderWriter.isFeatureSet(snapshot.metadata)) {
+ throw DeltaErrors.invalidSetUnSetRedirectCommand(
+ identifier,
+ DeltaConfigs.REDIRECT_WRITER_ONLY.key,
+ DeltaConfigs.REDIRECT_READER_WRITER.key
+ )
+ }
+ } else {
+ return
+ }
+ val currentRedirectConfigOpt = getRedirectConfiguration(snapshot)
+ val newRedirectConfigOpt = getRedirectConfiguration(configs)
+ newRedirectConfigOpt.foreach { newRedirectConfig =>
+ val newState = newRedirectConfig.redirectState
+ // Validate state transitions based on current and new states
+ currentRedirectConfigOpt match {
+ case Some(currentRedirectConfig) =>
+ validateStateTransition(identifier, currentRedirectConfig.redirectState, newState)
+ case None if newState == DropRedirectInProgress =>
+ throw DeltaErrors.invalidRedirectStateTransition(
+ identifier, newState, DropRedirectInProgress
+ )
+ case _ => // No action required for valid transitions
+ }
+ }
+ }
+
+ val DELTALOG_PREFIX = "redirect-delta-log://"
+ /**
+ * The thread local variable for indicating whether the current session is an
+ * DDL that updates redirect table property.
+ */
+ @SuppressWarnings(
+ Array(
+ "BadMethodCall-DynamicVariable",
+ """
+ Reason: The redirect feature implementation requires a thread-local variable to control
+ enable/disable states during SET and UNSET operations. This approach is necessary because:
+ - Parameter Passing Limitation: The call stack cannot propagate this state via method
+ parameters, as the feature is triggered through an external open-source API interface
+ that does not expose this configurability.
+ - Concurrency Constraints: A global variable (without thread-local isolation) would allow
+ unintended cross-thread interference, risking undefined behavior in concurrent
+ transactions. We can not use lock because the lock would introduce big critical session
+ and create performance issue.
+ By using thread-local storage, the feature ensures transaction-specific state isolation
+ while maintaining compatibility with the third-party API's design."""
+ )
+ )
+ private val isUpdateTableRedirectDDL = new DynamicVariable[Boolean](false)
+
+ /**
+ * Execute `thunk` while `isUpdateTableRedirectDDL` is set to `updateTableRedirectDDL`.
+ */
+ def withUpdateTableRedirectDDL[T](updateTableRedirectDDL: Boolean)(thunk: => T): T = {
+ isUpdateTableRedirectDDL.withValue(updateTableRedirectDDL) { thunk }
+ }
}
diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala
index d7cf7aec852..06a09d8dbba 100644
--- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala
@@ -2327,6 +2327,13 @@ trait DeltaSQLConfBase {
.booleanConf
.createWithDefault(false)
+ val ENABLE_TABLE_REDIRECT_FEATURE =
+ buildConf("enableTableRedirectFeature")
+ .doc("True if enabling the table redirect feature.")
+ .internal()
+ .booleanConf
+ .createWithDefault(false)
+
val DELTA_OPTIMIZE_WRITE_MAX_SHUFFLE_PARTITIONS =
buildConf("optimizeWrite.maxShufflePartitions")
.internal()
diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/CloneTableSuiteBase.scala b/spark/src/test/scala/org/apache/spark/sql/delta/CloneTableSuiteBase.scala
index f82f132b4c3..f263d0852ab 100644
--- a/spark/src/test/scala/org/apache/spark/sql/delta/CloneTableSuiteBase.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/delta/CloneTableSuiteBase.scala
@@ -168,7 +168,7 @@ trait CloneTableSuiteBase extends QueryTest
} else {
None
}
- val deltaTable = DeltaTableV2(spark, sourceLog.dataPath, None, None, timeTravelSpec)
+ val deltaTable = DeltaTableV2(spark, sourceLog.dataPath, timeTravelOpt = timeTravelSpec)
val sourceData = Dataset.ofRows(
spark,
LogicalRelation(sourceLog.createRelation(
diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/TableRedirectSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/TableRedirectSuite.scala
index 6ededce0533..3c6b8de6cb7 100644
--- a/spark/src/test/scala/org/apache/spark/sql/delta/TableRedirectSuite.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/delta/TableRedirectSuite.scala
@@ -18,6 +18,7 @@ package org.apache.spark.sql.delta
import java.io.File
+// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsBaseSuite
import org.apache.spark.sql.delta.redirect.{
DropRedirectInProgress,
@@ -26,12 +27,16 @@ import org.apache.spark.sql.delta.redirect.{
PathBasedRedirectSpec,
RedirectReaderWriter,
RedirectReady,
+ RedirectSpec,
RedirectState,
RedirectWriterOnly,
- TableRedirect
+ TableRedirect,
+ TableRedirectConfiguration
}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.{DeltaSQLCommandTest, DeltaSQLTestUtils}
+import org.apache.spark.sql.delta.util.JsonUtils
+import org.apache.commons.text.StringEscapeUtils
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{QueryTest, SaveMode, SparkSession}
@@ -49,6 +54,7 @@ class TableRedirectSuite extends QueryTest
private def validateState(
deltaLog: DeltaLog,
redirectState: RedirectState,
+ sourceTablePath: File,
destTablePath: File,
feature: TableRedirect
): Unit = {
@@ -65,10 +71,13 @@ class TableRedirectSuite extends QueryTest
}
assert(redirectConfig.redirectState == redirectState)
assert(redirectConfig.`type` == PathBasedRedirectSpec.REDIRECT_TYPE)
- val expectedSpecValue = s"""{"tablePath":"${destTablePath.getCanonicalPath}"}"""
+ val srcPath = sourceTablePath.getCanonicalPath
+ val dstPath = destTablePath.getCanonicalPath
+ val expectedSpecValue = s"""{"sourcePath":"$srcPath","destPath":"$dstPath"}"""
assert(redirectConfig.specValue == expectedSpecValue)
val redirectSpec = redirectConfig.spec.asInstanceOf[PathBasedRedirectSpec]
- assert(redirectSpec.tablePath == destTablePath.getCanonicalPath)
+ assert(redirectSpec.sourcePath == srcPath)
+ assert(redirectSpec.destPath == dstPath)
}
private def validateRemovedState(deltaLog: DeltaLog, feature: TableRedirect): Unit = {
@@ -84,15 +93,19 @@ class TableRedirectSuite extends QueryTest
}
}
- def redirectTest(label: String)(f: (DeltaLog, File, File, CatalogTable) => Unit): Unit = {
+ def redirectTest(
+ label: String, enableRedirect: Boolean
+ )(f: (DeltaLog, File, File, CatalogTable) => Unit): Unit = {
test(s"basic table redirect: $label") {
withTempDir { sourceTablePath =>
withTempDir { destTablePath =>
- withTable("t1") {
- sql(s"CREATE external TABLE t1(c0 long)USING delta LOCATION '$sourceTablePath';")
- val catalogTable = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
- val deltaLog = DeltaLog.forTable(spark, new Path(sourceTablePath.getCanonicalPath))
- f(deltaLog, sourceTablePath, destTablePath, catalogTable)
+ withSQLConf(DeltaSQLConf.ENABLE_TABLE_REDIRECT_FEATURE.key -> enableRedirect.toString) {
+ withTable("t1", "t2") {
+ sql(s"CREATE external TABLE t1(c0 long) USING delta LOCATION '$sourceTablePath';")
+ val catalogTable = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
+ val deltaLog = DeltaLog.forTable(spark, new Path(sourceTablePath.getCanonicalPath))
+ f(deltaLog, sourceTablePath, destTablePath, catalogTable)
+ }
}
}
}
@@ -103,30 +116,37 @@ class TableRedirectSuite extends QueryTest
val featureName = feature.config.key
Seq(true, false).foreach { hasCatalogTable =>
redirectTest(s"basic redirect: $featureName - " +
- s"hasCatalogTable: $hasCatalogTable") { case (deltaLog, _, dest, catalogTable) =>
+ s"hasCatalogTable: $hasCatalogTable", enableRedirect = false) {
+ case (deltaLog, source, dest, catalogTable) =>
val snapshot = deltaLog.update()
assert(!feature.isFeatureSet(snapshot.metadata))
- val redirectSpec = new PathBasedRedirectSpec(dest.getCanonicalPath)
+ val redirectSpec = new PathBasedRedirectSpec(
+ source.getCanonicalPath,
+ dest.getCanonicalPath
+ )
val catalogTableOpt = if (hasCatalogTable) Some(catalogTable) else None
val redirectType = PathBasedRedirectSpec.REDIRECT_TYPE
// Step-1: Initiate table redirection and set to EnableRedirectInProgress state.
feature.add(deltaLog, catalogTableOpt, redirectType, redirectSpec)
- validateState(deltaLog, EnableRedirectInProgress, dest, feature)
+ validateState(deltaLog, EnableRedirectInProgress, source, dest, feature)
// Step-2: Complete table redirection and set to RedirectReady state.
feature.update(deltaLog, catalogTableOpt, RedirectReady, redirectSpec)
- validateState(deltaLog, RedirectReady, dest, feature)
+ validateState(deltaLog, RedirectReady, source, dest, feature)
// Step-3: Start dropping table redirection and set to DropRedirectInProgress state.
feature.update(deltaLog, catalogTableOpt, DropRedirectInProgress, redirectSpec)
- validateState(deltaLog, DropRedirectInProgress, dest, feature)
+ validateState(deltaLog, DropRedirectInProgress, source, dest, feature)
// Step-4: Finish dropping table redirection and remove the property completely.
feature.remove(deltaLog, catalogTableOpt)
validateRemovedState(deltaLog, feature)
// Step-5: Initiate table redirection and set to EnableRedirectInProgress state one
// more time.
withTempDir { destTablePath2 =>
- val redirectSpec = new PathBasedRedirectSpec(destTablePath2.getCanonicalPath)
+ val redirectSpec = new PathBasedRedirectSpec(
+ source.getCanonicalPath,
+ destTablePath2.getCanonicalPath
+ )
feature.add(deltaLog, catalogTableOpt, redirectType, redirectSpec)
- validateState(deltaLog, EnableRedirectInProgress, destTablePath2, feature)
+ validateState(deltaLog, EnableRedirectInProgress, source, destTablePath2, feature)
// Step-6: Finish dropping table redirection and remove the property completely.
feature.remove(deltaLog, catalogTableOpt)
validateRemovedState(deltaLog, feature)
@@ -134,16 +154,19 @@ class TableRedirectSuite extends QueryTest
}
redirectTest(s"Redirect $featureName: empty no redirect rules - " +
- s"hasCatalogTable: $hasCatalogTable") {
+ s"hasCatalogTable: $hasCatalogTable", enableRedirect = false) {
case (deltaLog, source, dest, catalogTable) =>
val snapshot = deltaLog.update()
assert(!feature.isFeatureSet(snapshot.metadata))
- val redirectSpec = new PathBasedRedirectSpec(dest.getCanonicalPath)
+ val redirectSpec = new PathBasedRedirectSpec(
+ source.getCanonicalPath,
+ dest.getCanonicalPath
+ )
val catalogTableOpt = if (hasCatalogTable) Some(catalogTable) else None
val redirectType = PathBasedRedirectSpec.REDIRECT_TYPE
// 0. Initialize table redirection by setting table to EnableRedirectInProgress state.
feature.add(deltaLog, catalogTableOpt, redirectType, redirectSpec)
- validateState(deltaLog, EnableRedirectInProgress, dest, feature)
+ validateState(deltaLog, EnableRedirectInProgress, source, dest, feature)
// 1. INSERT should hit DELTA_COMMIT_INTERMEDIATE_REDIRECT_STATE because table is in
// EnableRedirectInProgress, which doesn't allow any DML and DDL.
@@ -164,7 +187,7 @@ class TableRedirectSuite extends QueryTest
// 4. INSERT should hit DELTA_NO_REDIRECT_RULES_VIOLATED since the
// no-redirect-rules is empty.
- validateState(deltaLog, RedirectReady, dest, feature)
+ validateState(deltaLog, RedirectReady, source, dest, feature)
val exception3 = intercept[DeltaIllegalStateException] {
sql(s"insert into delta.`$source` values(1),(2),(3),(4),(5),(6)")
}
@@ -182,7 +205,7 @@ class TableRedirectSuite extends QueryTest
// 7. INSERT should hit DELTA_COMMIT_INTERMEDIATE_REDIRECT_STATE because table is in
// DropRedirectInProgress, which doesn't allow any DML and DDL.
- validateState(deltaLog, DropRedirectInProgress, dest, feature)
+ validateState(deltaLog, DropRedirectInProgress, source, dest, feature)
val exception5 = intercept[DeltaIllegalStateException] {
sql(s"insert into delta.`$source` values(1),(2),(3),(4),(5),(6)")
}
@@ -197,16 +220,19 @@ class TableRedirectSuite extends QueryTest
}
redirectTest(s"Redirect $featureName: no redirect rules - " +
- s"hasCatalogTable: $hasCatalogTable") {
+ s"hasCatalogTable: $hasCatalogTable", enableRedirect = false) {
case (deltaLog, source, dest, catalogTable) =>
val snapshot = deltaLog.update()
assert(!feature.isFeatureSet(snapshot.metadata))
- val redirectSpec = new PathBasedRedirectSpec(dest.getCanonicalPath)
+ val redirectSpec = new PathBasedRedirectSpec(
+ source.getCanonicalPath,
+ dest.getCanonicalPath
+ )
val catalogTableOpt = if (hasCatalogTable) Some(catalogTable) else None
val redirectType = PathBasedRedirectSpec.REDIRECT_TYPE
sql(s"insert into delta.`$source` values(1),(2),(3),(4),(5),(6)")
feature.add(deltaLog, catalogTableOpt, redirectType, redirectSpec)
- validateState(deltaLog, EnableRedirectInProgress, dest, feature)
+ validateState(deltaLog, EnableRedirectInProgress, source, dest, feature)
// 1. Move table redirect to RedirectReady state with no redirect rules that
// allows WRITE, DELETE, UPDATE.
var noRedirectRules = Set(
@@ -220,7 +246,7 @@ class TableRedirectSuite extends QueryTest
)
)
feature.update(deltaLog, catalogTableOpt, RedirectReady, redirectSpec, noRedirectRules)
- validateState(deltaLog, RedirectReady, dest, feature)
+ validateState(deltaLog, RedirectReady, source, dest, feature)
sql(s"insert into delta.`$source` values(1),(2),(3),(4),(5),(6)")
sql(s"update delta.`$source` set c0 = 100")
sql(s"delete from delta.`$source` where c0 = 1")
@@ -233,7 +259,7 @@ class TableRedirectSuite extends QueryTest
)
)
feature.update(deltaLog, catalogTableOpt, RedirectReady, redirectSpec, noRedirectRules)
- validateState(deltaLog, RedirectReady, dest, feature)
+ validateState(deltaLog, RedirectReady, source, dest, feature)
// 2.1. WRITE should be aborted because no-redirect-rules only allow UPDATE.
val exception1 = intercept[DeltaIllegalStateException] {
sql(s"insert into delta.`$source` values(1),(2),(3),(4),(5),(6)")
@@ -264,7 +290,7 @@ class TableRedirectSuite extends QueryTest
)
)
feature.update(deltaLog, catalogTableOpt, RedirectReady, redirectSpec, noRedirectRules)
- validateState(deltaLog, RedirectReady, dest, feature)
+ validateState(deltaLog, RedirectReady, source, dest, feature)
// 3.1. The WRITE of appName "dummy" would be aborted because no-redirect-rules
// only allow WRITE on application "etl".
@@ -298,5 +324,76 @@ class TableRedirectSuite extends QueryTest
}
}
}
+
+ def alterRedirect(
+ table: String,
+ redirectType: String,
+ redirectState: RedirectState,
+ spec: RedirectSpec,
+ noRedirectRules: Set[NoRedirectRule]
+ ): Unit = {
+ val enableConfig = TableRedirectConfiguration(
+ redirectType,
+ redirectState.name,
+ JsonUtils.toJson(spec),
+ noRedirectRules
+ )
+ val enableConfigJson = StringEscapeUtils.escapeJson(JsonUtils.toJson(enableConfig))
+ sql(s"alter table $table set TBLPROPERTIES('$featureName' = '$enableConfigJson')")
+ }
+
+ redirectTest(s"Redirect $featureName: modify table property", enableRedirect = true) {
+ case (deltaLog, source, dest, catalogTable) =>
+ val redirectSpec = new PathBasedRedirectSpec(
+ source.getCanonicalPath,
+ dest.getCanonicalPath
+ )
+ val redirectType = PathBasedRedirectSpec.REDIRECT_TYPE
+ val destPath = dest.toString
+ val srcPath = source.toString
+ sql(s"CREATE external TABLE t2(c0 long) USING delta LOCATION '$dest';")
+ sql(s"insert into t2 values(1),(2),(3),(4),(5)")
+ val destTable = s"delta.`$destPath`"
+ val srcTable = s"delta.`$srcPath`"
+ // Initialize the redirection by moving table into EnableRedirectInProgress state.
+ alterRedirect(srcTable, redirectType, EnableRedirectInProgress, redirectSpec, Set.empty)
+ alterRedirect(destTable, redirectType, EnableRedirectInProgress, redirectSpec, Set.empty)
+ // Delta log is cloned, then moves both redirect destination table and redirect source
+ // table to RedirectReady state.
+ alterRedirect(srcTable, redirectType, RedirectReady, redirectSpec, Set.empty)
+ alterRedirect(destTable, redirectType, RedirectReady, redirectSpec, Set.empty)
+ sql(s"insert into $srcTable values(1), (2), (3)")
+ sql(s"insert into $destTable values(1), (2), (3)")
+ sql(s"insert into t1 values(1), (2), (3)")
+ sql(s"insert into t2 values(1), (2), (3)")
+
+ var result = sql("select * from t1").collect()
+ assert(result.length == 17)
+ result = sql("select * from t2").collect()
+ assert(result.length == 17)
+ result = sql(s"select * from $srcTable ").collect()
+ assert(result.length == 17)
+ result = sql(s"select * from $destTable ").collect()
+ assert(result.length == 17)
+ val root = new Path(catalogTable.location)
+ val fs = root.getFileSystem(deltaLog.newDeltaHadoopConf)
+ var files = fs.listStatus(new Path(srcPath + "/_delta_log"))
+ .filter(_.getPath.toString.endsWith(".json"))
+ assert(files.length == 3)
+ files = fs.listStatus(new Path(destPath + "/_delta_log"))
+ .filter(_.getPath.toString.endsWith(".json"))
+ assert(files.length == 8)
+ // Drop redirection by moving both redirect destination table and redirect source table to
+ // DropRedirectInProgress.
+ alterRedirect(destTable, redirectType, DropRedirectInProgress, redirectSpec, Set.empty)
+ alterRedirect(srcTable, redirectType, DropRedirectInProgress, redirectSpec, Set.empty)
+ // Remove table redirect feature from redirect source table and verify table content.
+ sql(s"alter table $srcTable unset TBLPROPERTIES('$featureName')")
+ result = sql("select * from t1").collect()
+ assert(result.length == 0)
+ sql("insert into t1 values(1), (2), (3), (4)")
+ result = sql("select * from t1").collect()
+ assert(result.length == 4)
+ }
}
}
diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/test/DeltaTestImplicits.scala b/spark/src/test/scala/org/apache/spark/sql/delta/test/DeltaTestImplicits.scala
index f5001f69182..0c988d5996b 100644
--- a/spark/src/test/scala/org/apache/spark/sql/delta/test/DeltaTestImplicits.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/delta/test/DeltaTestImplicits.scala
@@ -171,7 +171,7 @@ object DeltaTestImplicits {
def apply(spark: SparkSession, tableDir: File, clock: Clock): DeltaTableV2 = {
val tablePath = new Path(tableDir.getAbsolutePath)
- new DeltaTableV2(spark, tablePath) {
+ new DeltaTableV2(spark, tablePath, catalogTable = None, None, None, Map.empty) {
override lazy val deltaLog: DeltaLog = DeltaLog.forTable(spark, tablePath, clock)
}
}