Skip to content

Commit

Permalink
address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Davis-Zhang-Onehouse committed Jan 9, 2025
1 parent 92bb3d2 commit 4293a94
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,

/**
* Columns that relation has to read from the storage to properly execute on its semantic: for ex,
* for Merge-on-Read tables key fields as well and pre-combine field comprise mandatory set of columns,
* for Merge-on-Read tables key fields as well and precombine field comprise mandatory set of columns,
* meaning that regardless of whether this columns are being requested by the query they will be fetched
* regardless so that relation is able to combine records properly (if necessary)
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@ import org.apache.hudi.exception.{HoodieException, HoodieNotSupportedException}
import org.apache.hudi.hive.HiveSyncConfigHolder
import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.util.JFunction.scalaFunction1Noop

import org.apache.avro.Schema
import org.apache.spark.sql._
import org.apache.spark.sql.HoodieCatalystExpressionUtils.{attributeEquals, MatchCast}
import org.apache.spark.sql.HoodieCatalystExpressionUtils.{MatchCast, attributeEquals}
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, BoundReference, EqualTo, Expression, Literal, NamedExpression, PredicateHelper}
Expand All @@ -45,15 +44,13 @@ import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
import org.apache.spark.sql.hudi.ProvidesHoodieConfig
import org.apache.spark.sql.hudi.ProvidesHoodieConfig.{combineOptions, getPartitionPathFieldWriteConfig}
import org.apache.spark.sql.hudi.analysis.HoodieAnalysis.failAnalysis
import org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.{encodeAsBase64String, stripCasting, toStructType,
userGuideString, CoercedAttributeReference, validateTargetTableAttrExistsInAssignments}
import org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.{CoercedAttributeReference, encodeAsBase64String, stripCasting, toStructType, userGuideString, validateTargetTableAttrExistsInAssignments}
import org.apache.spark.sql.hudi.command.PartialAssignmentMode.PartialAssignmentMode
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._
import org.apache.spark.sql.types.{BooleanType, StructField, StructType}

import java.util.Base64

import scala.collection.JavaConverters._

/**
Expand All @@ -68,7 +65,7 @@ import scala.collection.JavaConverters._
*
* <ol>
* <li>Incoming batch ([[sourceTable]]) is reshaped such that it bears correspondingly:
* a) (required) "primary-key" column as well as b) (optional) "pre-combine" column; this is
* a) (required) "primary-key" column as well as b) (optional) "precombine" column; this is
* required since MIT statements does not restrict [[sourceTable]]s schema to be aligned w/ the
* [[targetTable]]s one, while Hudi's upserting flow expects such columns to be present</li>
*
Expand Down Expand Up @@ -129,7 +126,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
*
* To be able to leverage Hudi's engine to merge an incoming dataset against the existing table
* we will have to make sure that both [[source]] and [[target]] tables have the *same*
* "primary-key" and "pre-combine" columns. Since actual MIT condition might be leveraging an arbitrary
* "primary-key" and "precombine" columns. Since actual MIT condition might be leveraging an arbitrary
* expression involving [[source]] column(s), we will have to add "phony" column matching the
* primary-key one of the target table.
*/
Expand All @@ -139,19 +136,19 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
if (primaryKeyFields.isPresent) {
//pkless tables can have more complex conditions
if (!conditions.forall(p => p.isInstanceOf[EqualTo])) {
throw new AnalysisException(s"Currently only equality predicates are supported in MERGE INTO statement on primary key table" +
throw new AnalysisException(s"Currently only equality predicates are supported in MERGE INTO statement on record key table" +
s"(provided ${mergeInto.mergeCondition.sql}")
}
}
val resolver = sparkSession.sessionState.analyzer.resolver
val partitionPathFields = hoodieCatalogTable.tableConfig.getPartitionFields
//ensure all primary key fields are part of the merge condition
//ensure all record key fields are part of the merge condition
//allow partition path to be part of the merge condition but not required
val targetAttr2ConditionExpressions = doCasting(conditions, primaryKeyFields.isPresent)
val expressionSet = scala.collection.mutable.Set[(Attribute, Expression)](targetAttr2ConditionExpressions:_*)
var partitionAndKeyFields: Seq[(String,String)] = Seq.empty
if (primaryKeyFields.isPresent) {
partitionAndKeyFields = partitionAndKeyFields ++ primaryKeyFields.get().map(pk => ("primaryKey", pk)).toSeq
partitionAndKeyFields = partitionAndKeyFields ++ primaryKeyFields.get().map(pk => ("recordKey", pk)).toSeq
}
if (partitionPathFields.isPresent) {
partitionAndKeyFields = partitionAndKeyFields ++ partitionPathFields.get().map(pp => ("partitionPath", pp)).toSeq
Expand All @@ -169,7 +166,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
// ON t.id = s.id + 1
// WHEN MATCHED THEN UPDATE *
//
// Which (in the current design) could result in a primary key of the record being modified,
// Which (in the current design) could result in a record key of the record being modified,
// which is not allowed.
if (!resolvesToSourceAttribute(expr)) {
throw new AnalysisException("Only simple conditions of the form `t.id = s.id` are allowed on the " +
Expand All @@ -178,9 +175,9 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
expressionSet.remove((attr, expr))
(attr, expr)
}
if (resolving.isEmpty && rk._1.equals("primaryKey")
if (resolving.isEmpty && rk._1.equals("recordKey")
&& sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key(), "false") == "true") {
throw new AnalysisException(s"Hudi tables with primary key are required to match on all primary key colums. Column: '${rk._2}' not found")
throw new AnalysisException(s"Hudi tables with record key are required to match on all record key columns. Column: '${rk._2}' not found")
}
resolving
}).filter(_.nonEmpty).map(_.get)
Expand Down Expand Up @@ -248,10 +245,10 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
.find { attr => resolver(attr.name, preCombineField) }
.get

// To find corresponding "pre-combine" attribute w/in the [[sourceTable]] we do
// To find corresponding "precombine" attribute w/in the [[sourceTable]] we do
// - Check if we can resolve the attribute w/in the source table as is; if unsuccessful, then
// - Check if in any of the update actions, right-hand side of the assignment actually resolves
// to it, in which case we will determine left-hand side expression as the value of "pre-combine"
// to it, in which case we will determine left-hand side expression as the value of "precombine"
// attribute w/in the [[sourceTable]]
val sourceExpr = {
mergeInto.sourceTable.output.find(attr => resolver(attr.name, preCombineField)) match {
Expand All @@ -261,7 +258,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
case Assignment(attr: AttributeReference, expr)
if resolver(attr.name, preCombineField) && resolvesToSourceAttribute(expr) => expr
} getOrElse {
throw new AnalysisException(s"Failed to resolve pre-combine field `${preCombineField}` w/in the source-table output")
throw new AnalysisException(s"Failed to resolve precombine field `${preCombineField}` w/in the source-table output")
}

}
Expand Down Expand Up @@ -297,7 +294,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
*
* <ol>
* <li>Contains "primary-key" column (as defined by target table's config)</li>
* <li>Contains "pre-combine" column (as defined by target table's config, if any)</li>
* <li>Contains "precombine" column (as defined by target table's config, if any)</li>
* </ol>
*
* In cases when [[sourceTable]] doesn't contain aforementioned columns, following heuristic
Expand All @@ -310,12 +307,12 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
* leveraging matching side of such conditional expression (containing [[sourceTable]] attribute)
* interpreting it as a primary-key column in the [[sourceTable]]</li>
*
* <li>Expression for the "pre-combine" column (optional) is extracted from the matching update
* <li>Expression for the "precombine" column (optional) is extracted from the matching update
* clause ({@code WHEN MATCHED ... THEN UPDATE ...}) as right-hand side of the expression referencing
* pre-combine attribute of the target column</li>
* precombine attribute of the target column</li>
* <ul>
*
* For example, w/ the following statement (primary-key column is [[id]], while pre-combine column is [[ts]])
* For example, w/ the following statement (primary-key column is [[id]], while precombine column is [[ts]])
* <pre>
* MERGE INTO target
* USING (SELECT 1 AS sid, 'A1' AS sname, 1000 AS sts) source
Expand Down Expand Up @@ -356,7 +353,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie

// This is to handle the situation where condition is something like "s0.s_id = t0.id" so In the source table
// we add an additional column that is an alias of "s0.s_id" named "id"
// NOTE: Primary key attribute (required) as well as Pre-combine one (optional) defined
// NOTE: Record key attribute (required) as well as precombine one (optional) defined
// in the [[targetTable]] schema has to be present in the incoming [[sourceTable]] dataset.
// In cases when [[sourceTable]] doesn't bear such attributes (which, for ex, could happen
// in case of it having different schema), we will be adding additional columns (while setting
Expand Down Expand Up @@ -482,13 +479,12 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
}

private def isPartialUpdateActionForMOR(parameters: Map[String, String]) = {
val isPartialUpdateAction = (targetTableType == MOR_TABLE_TYPE_OPT_VAL
(targetTableType == MOR_TABLE_TYPE_OPT_VAL
&& UPSERT_OPERATION_OPT_VAL == getOperationType(parameters)
&& parameters.getOrElse(
ENABLE_MERGE_INTO_PARTIAL_UPDATES.key,
ENABLE_MERGE_INTO_PARTIAL_UPDATES.defaultValue.toString).toBoolean
&& updatingActions.nonEmpty)
isPartialUpdateAction
}

private def getOperationType(parameters: Map[String, String]) = {
Expand Down Expand Up @@ -695,11 +691,11 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie

/**
* Output of the expected (left) join of the a) [[sourceTable]] dataset (potentially amended w/ primary-key,
* pre-combine columns) with b) existing [[targetTable]]
* precombine columns) with b) existing [[targetTable]]
*/
private def joinedExpectedOutput: Seq[Attribute] = {
// NOTE: We're relying on [[sourceDataset]] here instead of [[mergeInto.sourceTable]],
// as it could be amended to add missing primary-key and/or pre-combine columns.
// as it could be amended to add missing primary-key and/or precombine columns.
// Please check [[sourceDataset]] scala-doc for more details
(projectedJoinedDataset.queryExecution.analyzed.output ++ mergeInto.targetTable.output).filterNot(a => isMetaField(a.name))
}
Expand Down Expand Up @@ -795,10 +791,9 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
defaultOpts = Map.empty, overridingOpts = overridingOpts)
}


def validate(props: Map[String, String]): Unit = {
checkUpdatingActions(updatingActions, props)
checkInsertingActions(insertingActions)
checkInsertingActions(insertingActions, props)
checkDeletingActions(deletingActions)
}

Expand All @@ -808,34 +803,40 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
}
}

private def checkInsertingActions(insertActions: Seq[InsertAction]): Unit = {
private def checkInsertingActions(insertActions: Seq[InsertAction], props: Map[String, String]): Unit = {
insertActions.foreach(insert =>
assert(insert.assignments.length <= targetTableSchema.length,
s"The number of insert assignments[${insert.assignments.length}] must be less than or equal to the " +
s"targetTable field size[${targetTableSchema.length}]"))
// Precombine field and primary key field must be present in the assignment clause of all insert actions.
// Precombine field and record key field must be present in the assignment clause of all insert actions for event time ordering mode.
// Check has no effect if we don't have such fields in target table or we don't have insert actions
if (props.getOrElse(RECORD_MERGE_MODE.key(), RECORD_MERGE_MODE.defaultValue()).equals(RecordMergeMode.EVENT_TIME_ORDERING)) {
insertActions.foreach(action =>
hoodieCatalogTable.preCombineKey.foreach(
field => {
validateTargetTableAttrExistsInAssignments(
sparkSession.sessionState.conf.resolver,
mergeInto.targetTable,
Seq(field),
"precombine field",
action.assignments)
}))
}
insertActions.foreach(action =>
hoodieCatalogTable.preCombineKey.foreach(
field => {
validateTargetTableAttrExistsInAssignments(
sparkSession.sessionState.conf.resolver,
mergeInto.targetTable,
Seq(field),
"pre-combine field",
action.assignments)
validateTargetTableAttrExistsInAssignments(
sparkSession.sessionState.conf.resolver,
mergeInto.targetTable,
hoodieCatalogTable.tableConfig.getRecordKeyFields.orElse(Array.empty),
"primaryKey field",
"record key field",
action.assignments)
}))
}

private def checkUpdatingActions(updateActions: Seq[UpdateAction], props: Map[String, String]): Unit = {
if (hoodieCatalogTable.preCombineKey.isEmpty && updateActions.nonEmpty) {
logWarning(s"Updates without pre-combine can have nondeterministic behavior")
logWarning(s"Updates without precombine can have nondeterministic behavior")
}
updateActions.foreach(update =>
assert(update.assignments.length <= targetTableSchema.length,
Expand All @@ -851,15 +852,15 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
assert(targetAttrs.isEmpty,
s"Target table's field(${targetAttrs.map(_.name).mkString(",")}) cannot be the right-value of the update clause for MOR table.")
})
// Only when the partial update is enabled that primary key assignment is not mandatory in update actions for MOR tables.
// Only when the partial update is enabled that record key assignment is not mandatory in update actions for MOR tables.
if (!isPartialUpdateActionForMOR(props)) {
// For MOR table, update assignment clause must have primary key field being set explicitly even if it does not
// change. The check has no effect if there is no updateActions or we don't have primaryKey
// For MOR table, update assignment clause must have record key field being set explicitly even if it does not
// change. The check has no effect if there is no updateActions or we don't have record key
updateActions.foreach(action => validateTargetTableAttrExistsInAssignments(
sparkSession.sessionState.conf.resolver,
mergeInto.targetTable,
hoodieCatalogTable.tableConfig.getRecordKeyFields.orElse(Array.empty),
"primaryKey field",
"record key field",
action.assignments))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo
//
// 2) set source column name to be different with target column
//
val errorMessage = "Failed to resolve pre-combine field `v` w/in the source-table output"
val errorMessage = "Failed to resolve precombine field `v` w/in the source-table output"

checkException(
s"""
Expand Down Expand Up @@ -815,9 +815,9 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo
)

//
// 2.a) set source column name to be different with target column (should fail unable to match pre-combine field)
// 2.a) set source column name to be different with target column (should fail unable to match precombine field)
//
val failedToResolveErrorMessage = "Failed to resolve pre-combine field `v` w/in the source-table output"
val failedToResolveErrorMessage = "Failed to resolve precombine field `v` w/in the source-table output"

checkException(
s"""merge into $tableName1 t0
Expand Down
Loading

0 comments on commit 4293a94

Please sign in to comment.