Skip to content

Commit

Permalink
table redirect core
Browse files Browse the repository at this point in the history
  • Loading branch information
kamcheungting-db committed Feb 26, 2025
1 parent 6e9498c commit c88a46f
Show file tree
Hide file tree
Showing 12 changed files with 608 additions and 80 deletions.
14 changes: 13 additions & 1 deletion spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -2480,14 +2480,20 @@
"Unable to update table redirection state: Invalid state transition attempted.",
"The Delta table '<table>' cannot change from '<oldState>' to '<newState>'."
],
"sqlState" : "KD007"
"sqlState" : "22023"
},
"DELTA_TABLE_INVALID_REMOVE_TABLE_REDIRECT" : {
"message" : [
"Unable to remove table redirection for <table> due to its invalid state: <currentState>."
],
"sqlState" : "KD007"
},
"DELTA_TABLE_INVALID_SET_UNSET_REDIRECT" : {
"message" : [
"Unable to SET or UNSET redirect property on <table>: current property '<currentProperty>' mismatches with new property '<newProperty>'."
],
"sqlState" : "22023"
},
"DELTA_TABLE_LOCATION_MISMATCH" : {
"message" : [
"The location of the existing table <tableName> is <existingTableLocation>. It doesn't match the specified location <tableLocation>."
Expand All @@ -2512,6 +2518,12 @@
],
"sqlState" : "0AKDD"
},
"DELTA_TABLE_UNRECOGNIZED_REDIRECT_SPEC" : {
"message" : [
"The Delta log contains unrecognized table redirect spec '<spec>'."
],
"sqlState" : "42704"
},
"DELTA_TARGET_TABLE_FINAL_SCHEMA_EMPTY" : {
"message" : [
"Target table final schema is empty."
Expand Down
29 changes: 23 additions & 6 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.sql.delta.hooks.AutoCompactType
import org.apache.spark.sql.delta.hooks.PostCommitHook
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.redirect.NoRedirectRule
import org.apache.spark.sql.delta.redirect.RedirectSpec
import org.apache.spark.sql.delta.redirect.RedirectState
import org.apache.spark.sql.delta.schema.{DeltaInvariantViolationException, InvariantViolationException, SchemaUtils, UnsupportedDataTypeInfo}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
Expand Down Expand Up @@ -357,26 +358,42 @@ trait DeltaErrorsBase
)
}

def unrecognizedRedirectSpec(spec: RedirectSpec): Throwable = {
new DeltaIllegalStateException(
errorClass = "DELTA_TABLE_UNRECOGNIZED_REDIRECT_SPEC",
messageParameters = Array(spec.toString)
)
}

def invalidSetUnSetRedirectCommand(
table: String,
newProperty: String,
existingProperty: String): Throwable = {
new DeltaIllegalStateException(
errorClass = "DELTA_TABLE_INVALID_SET_UNSET_REDIRECT",
messageParameters = Array(table, existingProperty, newProperty)
)
}

def invalidRedirectStateTransition(
table: String,
oldState: RedirectState,
newState: RedirectState): Unit = {
newState: RedirectState): Throwable = {
new DeltaIllegalStateException(
errorClass = "DELTA_TABLE_INVALID_REDIRECT_STATE_TRANSITION",
messageParameters = Array(
table, table, oldState.name, newState.name)
messageParameters = Array(table, oldState.name, newState.name)
)
}

def invalidRemoveTableRedirect(table: String, currentState: RedirectState): Unit = {
def invalidRemoveTableRedirect(table: String, currentState: RedirectState): Throwable = {
new DeltaIllegalStateException(
errorClass = "DELTA_TABLE_INVALID_REMOVE_TABLE_REDIRECT",
messageParameters = Array(table, table, currentState.name)
)
}

def invalidCommitIntermediateRedirectState(state: RedirectState): Throwable = {
throw new DeltaIllegalStateException (
new DeltaIllegalStateException (
errorClass = "DELTA_COMMIT_INTERMEDIATE_REDIRECT_STATE",
messageParameters = Array(state.name)
)
Expand All @@ -385,7 +402,7 @@ trait DeltaErrorsBase
def noRedirectRulesViolated(
op: DeltaOperations.Operation,
noRedirectRules: Set[NoRedirectRule]): Throwable = {
throw new DeltaIllegalStateException (
new DeltaIllegalStateException (
errorClass = "DELTA_NO_REDIRECT_RULES_VIOLATED",
messageParameters =
Array(op.name, noRedirectRules.map("\"" + _ + "\"").mkString("[", ",\n", "]"))
Expand Down
80 changes: 68 additions & 12 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -891,6 +891,27 @@ object DeltaLog extends DeltaLogging {
(deltaLog, Some(table))
}

/**
* Helper method for transforming a given delta log path to the consistent formal path format.
*/
def formalizeDeltaPath(
spark: SparkSession,
options: Map[String, String],
rootPath: Path): Path = {
val fileSystemOptions: Map[String, String] =
if (spark.sessionState.conf.getConf(
DeltaSQLConf.LOAD_FILE_SYSTEM_CONFIGS_FROM_DATAFRAME_OPTIONS)) {
options.filterKeys { k =>
DeltaTableUtils.validDeltaTableHadoopPrefixes.exists(k.startsWith)
}.toMap
} else {
Map.empty
}
val hadoopConf = spark.sessionState.newHadoopConfWithOptions(fileSystemOptions)
val fs = rootPath.getFileSystem(hadoopConf)
fs.makeQualified(rootPath)
}

/**
* Helper function to be used with the forTableWithSnapshot calls. Thunk is a
* partially applied DeltaLog.forTable call, which we can then wrap around with a
Expand Down Expand Up @@ -929,14 +950,14 @@ object DeltaLog extends DeltaLogging {
// scalastyle:on deltahadoopconfiguration
val fs = rawPath.getFileSystem(hadoopConf)
val path = fs.makeQualified(rawPath)
def createDeltaLog(): DeltaLog = recordDeltaOperation(
def createDeltaLog(tablePath: Path = path): DeltaLog = recordDeltaOperation(
null,
"delta.log.create",
Map(TAG_TAHOE_PATH -> path.getParent.toString)) {
Map(TAG_TAHOE_PATH -> tablePath.getParent.toString)) {
AnalysisHelper.allowInvokingTransformsInAnalyzer {
new DeltaLog(
logPath = path,
dataPath = path.getParent,
logPath = tablePath,
dataPath = tablePath.getParent,
options = fileSystemOptions,
allOptions = options,
clock = clock,
Expand Down Expand Up @@ -966,15 +987,50 @@ object DeltaLog extends DeltaLogging {
}
}

val deltaLog = getDeltaLogFromCache
if (Option(deltaLog.sparkContext.get).map(_.isStopped).getOrElse(true)) {
// Invalid the cached `DeltaLog` and create a new one because the `SparkContext` of the cached
// `DeltaLog` has been stopped.
getOrCreateCache(spark.sessionState.conf).invalidate(cacheKey)
getDeltaLogFromCache
} else {
deltaLog
def initializeDeltaLog(): DeltaLog = {
val deltaLog = getDeltaLogFromCache
if (Option(deltaLog.sparkContext.get).forall(_.isStopped)) {
// Invalid the cached `DeltaLog` and create a new one because the `SparkContext` of the
// cached `DeltaLog` has been stopped.
getOrCreateCache(spark.sessionState.conf).invalidate(path -> fileSystemOptions)
getDeltaLogFromCache
} else {
deltaLog
}
}

val deltaLog = initializeDeltaLog()
// The deltaLog object may be cached while other session updates table redirect property.
// To avoid this potential race condition, we would add a validation inside deltaLog.update
// method to ensure deltaLog points to correct place after snapshot is updated.
val redirectConfigOpt = RedirectFeature.needDeltaLogRedirect(
spark,
deltaLog,
initialCatalogTable
)
redirectConfigOpt.map { redirectConfig =>
val (redirectLoc, catalogTableOpt) = RedirectFeature
.getRedirectLocationAndTable(spark, deltaLog, redirectConfig)
val formalizedPath = formalizeDeltaPath(spark, options, redirectLoc)
// with redirect prefix to prevent interference between redirection and normal access.
val redirectKey = new Path(RedirectFeature.DELTALOG_PREFIX, redirectLoc)
val deltaLogCacheKey = DeltaLogCacheKey(
redirectKey,
fileSystemOptions)
getOrCreateCache(spark.sessionState.conf).get(
deltaLogCacheKey,
() => {
new DeltaLog(
logPath = formalizedPath,
dataPath = formalizedPath.getParent,
options = fileSystemOptions,
allOptions = options,
clock = clock,
initialCatalogTable = catalogTableOpt
)
}
)
}.getOrElse(deltaLog)
}

/** Invalidate the cached DeltaLog object for the given `dataPath`. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1276,6 +1276,13 @@ trait OptimisticTransactionImpl extends DeltaTransaction
op: DeltaOperations.Operation,
redirectConfig: TableRedirectConfiguration
): Unit = {
// If this transaction commits to the redirect destination location, then there is no
// need to validate the subsequent no-redirect rules.
val configuration = deltaLog.newDeltaHadoopConf()
val dataPath = snapshot.deltaLog.dataPath.toUri.getPath
val catalog = spark.sessionState.catalog
val isRedirectDest = redirectConfig.spec.isRedirectDest(catalog, configuration, dataPath)
if (isRedirectDest) return
// Find all rules that match with the current application name.
// If appName is not present, its no-redirect-rule are included.
// If appName is present, includes its no-redirect-rule only when appName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,17 @@ class DeltaCatalog extends DelegatingCatalogExtension
case s: SetProperty if s.property() == "location" => classOf[SetLocation]
case c => c.getClass
}
// Determines whether this DDL SET or UNSET the table redirect property. If it is, the table
// redirect feature should be disabled to ensure the DDL can be applied onto the source or
// destination table properly.
val isUpdateTableRedirectDDL = grouped.map {
case (t, s: Seq[RemoveProperty]) if t == classOf[RemoveProperty] =>
s.map { prop => prop.property() }.exists(RedirectFeature.isRedirectProperty)
case (t, s: Seq[SetProperty]) if t == classOf[SetProperty] =>
RedirectFeature.hasRedirectConfig(s.map(prop => prop.property() -> prop.value()).toMap)
case (_, _) => false
}.toSeq.exists(a => a)
RedirectFeature.withUpdateTableRedirectDDL(isUpdateTableRedirectDDL) {
val table = loadTable(ident) match {
case deltaTable: DeltaTableV2 => deltaTable
case _ if changes.exists(_.isInstanceOf[ClusterBy]) =>
Expand Down Expand Up @@ -884,6 +895,7 @@ class DeltaCatalog extends DelegatingCatalogExtension

loadTable(ident)
}
}

// We want our catalog to handle Delta, therefore for other data sources that want to be
// created, we just have this wrapper StagedTable to only drop the table if the commit fails.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{ResolvedTable, UnresolvedTable}
import org.apache.spark.sql.catalyst.analysis.UnresolvedTableImplicits._
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.plans.logical.{AnalysisHelper, LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability, TableCatalog, V2TableWithV1Fallback}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
Expand All @@ -58,27 +58,41 @@ import org.apache.spark.util.{Clock, SystemClock}
* @param path The path to the table
* @param tableIdentifier The table identifier for this table
*/
case class DeltaTableV2(
spark: SparkSession,
path: Path,
catalogTable: Option[CatalogTable] = None,
tableIdentifier: Option[String] = None,
timeTravelOpt: Option[DeltaTimeTravelSpec] = None,
options: Map[String, String] = Map.empty)
class DeltaTableV2 private[delta](
val spark: SparkSession,
val path: Path,
val catalogTable: Option[CatalogTable],
val tableIdentifier: Option[String],
val timeTravelOpt: Option[DeltaTimeTravelSpec],
val options: Map[String, String])
extends Table
with SupportsWrite
with V2TableWithV1Fallback
with DeltaLogging {

private lazy val (rootPath, partitionFilters, timeTravelByPath) = {
case class PathInfo(
rootPath: Path,
private[delta] var partitionFilters: Seq[(String, String)],
private[delta] var timeTravelByPath: Option[DeltaTimeTravelSpec]
)

private lazy val pathInfo: PathInfo = {
if (catalogTable.isDefined) {
// Fast path for reducing path munging overhead
(new Path(catalogTable.get.location), Nil, None)
PathInfo(new Path(catalogTable.get.location), Seq.empty, None)
} else {
DeltaDataSource.parsePathIdentifier(spark, path.toString, options)
val (rootPath, filters, timeTravel) =
DeltaDataSource.parsePathIdentifier(spark, path.toString, options)
PathInfo(rootPath, filters, timeTravel)
}
}

private def rootPath = pathInfo.rootPath

private def partitionFilters = pathInfo.partitionFilters

private def timeTravelByPath = pathInfo.timeTravelByPath


def hasPartitionFilters: Boolean = partitionFilters.nonEmpty

Expand Down Expand Up @@ -363,9 +377,86 @@ case class DeltaTableV2(
None
}
}

def copy(
spark: SparkSession = this.spark,
path: Path = this.path,
catalogTable: Option[CatalogTable] = this.catalogTable,
tableIdentifier: Option[String] = this.tableIdentifier,
timeTravelOpt: Option[DeltaTimeTravelSpec] = this.timeTravelOpt,
options: Map[String, String] = this.options
): DeltaTableV2 = {
val deltaTableV2 =
new DeltaTableV2(spark, path, catalogTable, tableIdentifier, timeTravelOpt, options)
deltaTableV2.pathInfo.timeTravelByPath = timeTravelByPath
deltaTableV2.pathInfo.partitionFilters = partitionFilters
deltaTableV2
}

override def toString: String =
s"DeltaTableV2($spark,$path,$catalogTable,$tableIdentifier,$timeTravelOpt,$options)"
}

object DeltaTableV2 {
def unapply(deltaTable: DeltaTableV2): Option[(
SparkSession,
Path,
Option[CatalogTable],
Option[String],
Option[DeltaTimeTravelSpec],
Map[String, String])
] = {
Some((
deltaTable.spark,
deltaTable.path,
deltaTable.catalogTable,
deltaTable.tableIdentifier,
deltaTable.timeTravelOpt,
deltaTable.options)
)
}

def apply(
spark: SparkSession,
path: Path,
catalogTable: Option[CatalogTable] = None,
tableIdentifier: Option[String] = None,
options: Map[String, String] = Map.empty[String, String],
timeTravelOpt: Option[DeltaTimeTravelSpec] = None
): DeltaTableV2 = {
val deltaTable = new DeltaTableV2(
spark,
path,
catalogTable = catalogTable,
tableIdentifier = tableIdentifier,
timeTravelOpt = timeTravelOpt,
options = options
)
if (spark == null || spark.sessionState == null ||
!spark.sessionState.conf.getConf(ENABLE_TABLE_REDIRECT_FEATURE)) {
return deltaTable
}
// This following code ensure passing the path and catalogTable of the redirected table object.
// Note: the DeltaTableV2 can only be created using this method.
AnalysisHelper.allowInvokingTransformsInAnalyzer {
val deltaLog = deltaTable.deltaLog
val rootDeltaLogPath = DeltaLog.logPathFor(deltaTable.rootPath.toString)
val finalDeltaLogPath = DeltaLog.formalizeDeltaPath(spark, options, rootDeltaLogPath)
val catalogTableOpt = if (finalDeltaLogPath == deltaLog.logPath) {
// If there is no redirection, use existing catalogTable.
catalogTable
} else {
// If there is redirection, use the catalogTable of deltaLog.
deltaLog.unsafeVolatileCatalogTable
}
val tableIdentifier = catalogTableOpt.map(_.identifier.identifier)
val newPath = new Path(deltaLog.dataPath.toUri.getPath)
deltaTable.copy(
path = newPath, catalogTable = catalogTableOpt, tableIdentifier = tableIdentifier
)
}
}

/** Resolves a path into a DeltaTableV2, leveraging standard v2 table resolution. */
def apply(spark: SparkSession, tablePath: Path, options: Map[String, String], cmd: String)
: DeltaTableV2 = {
Expand Down
Loading

0 comments on commit c88a46f

Please sign in to comment.