Skip to content

Commit

Permalink
[HUDI-8824] MIT error out on bad assignment clauses
Browse files Browse the repository at this point in the history
  • Loading branch information
Davis-Zhang-Onehouse committed Jan 9, 2025
1 parent 6c73075 commit 92bb3d2
Show file tree
Hide file tree
Showing 3 changed files with 276 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.hudi.util.JFunction.scalaFunction1Noop
import org.apache.avro.Schema
import org.apache.spark.sql._
import org.apache.spark.sql.HoodieCatalystExpressionUtils.{attributeEquals, MatchCast}
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, BoundReference, EqualTo, Expression, Literal, NamedExpression, PredicateHelper}
import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReference
Expand All @@ -44,7 +45,8 @@ import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
import org.apache.spark.sql.hudi.ProvidesHoodieConfig
import org.apache.spark.sql.hudi.ProvidesHoodieConfig.{combineOptions, getPartitionPathFieldWriteConfig}
import org.apache.spark.sql.hudi.analysis.HoodieAnalysis.failAnalysis
import org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.{encodeAsBase64String, stripCasting, toStructType, userGuideString, CoercedAttributeReference}
import org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.{encodeAsBase64String, stripCasting, toStructType,
userGuideString, CoercedAttributeReference, validateTargetTableAttrExistsInAssignments}
import org.apache.spark.sql.hudi.command.PartialAssignmentMode.PartialAssignmentMode
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._
Expand Down Expand Up @@ -272,11 +274,11 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
override def run(sparkSession: SparkSession): Seq[Row] = {
this.sparkSession = sparkSession
// TODO move to analysis phase
validate

val projectedJoinedDF: DataFrame = projectedJoinedDataset
// Create the write parameters
val props = buildMergeIntoConfig(hoodieCatalogTable)
validate(props)

val projectedJoinedDF: DataFrame = projectedJoinedDataset
// Do the upsert
executeUpsert(projectedJoinedDF, props)
// Refresh the table in the catalog
Expand Down Expand Up @@ -392,12 +394,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
* expressions to the ExpressionPayload#getInsertValue.
*/
private def executeUpsert(sourceDF: DataFrame, parameters: Map[String, String]): Unit = {
val operation = if (StringUtils.isNullOrEmpty(parameters.getOrElse(PRECOMBINE_FIELD.key, "")) && updatingActions.isEmpty) {
INSERT_OPERATION_OPT_VAL
} else {
UPSERT_OPERATION_OPT_VAL
}

val operation: String = getOperationType(parameters)
// Append the table schema to the parameters. In the case of merge into, the schema of projectedJoinedDF
// may be different from the target table, because the are transform logical in the update or
// insert actions.
Expand All @@ -410,12 +407,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
// Only enable writing partial updates to data blocks for upserts to MOR tables,
// when ENABLE_MERGE_INTO_PARTIAL_UPDATES is set to true,
// and not all fields are updated
val writePartialUpdates = if (targetTableType == MOR_TABLE_TYPE_OPT_VAL
&& operation == UPSERT_OPERATION_OPT_VAL
&& parameters.getOrElse(
ENABLE_MERGE_INTO_PARTIAL_UPDATES.key,
ENABLE_MERGE_INTO_PARTIAL_UPDATES.defaultValue.toString).toBoolean
&& updatingActions.nonEmpty) {
val writePartialUpdates = if (isPartialUpdateActionForMOR(parameters)) {
val updatedFieldSet = getUpdatedFields(updatingActions.map(a => a.assignments))
// Only enable partial updates if not all fields are updated
if (!areAllFieldsUpdated(updatedFieldSet)) {
Expand Down Expand Up @@ -489,6 +481,25 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
}
}

private def isPartialUpdateActionForMOR(parameters: Map[String, String]) = {
val isPartialUpdateAction = (targetTableType == MOR_TABLE_TYPE_OPT_VAL
&& UPSERT_OPERATION_OPT_VAL == getOperationType(parameters)
&& parameters.getOrElse(
ENABLE_MERGE_INTO_PARTIAL_UPDATES.key,
ENABLE_MERGE_INTO_PARTIAL_UPDATES.defaultValue.toString).toBoolean
&& updatingActions.nonEmpty)
isPartialUpdateAction
}

private def getOperationType(parameters: Map[String, String]) = {
val operation = if (StringUtils.isNullOrEmpty(parameters.getOrElse(PRECOMBINE_FIELD.key, "")) && updatingActions.isEmpty) {
INSERT_OPERATION_OPT_VAL
} else {
UPSERT_OPERATION_OPT_VAL
}
operation
}

private def getTableSchema: Schema = {
val (structName, nameSpace) = AvroConversionUtils
.getAvroRecordNameAndNamespace(hoodieCatalogTable.tableName)
Expand Down Expand Up @@ -785,8 +796,8 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
}


def validate(): Unit = {
checkUpdatingActions(updatingActions)
def validate(props: Map[String, String]): Unit = {
checkUpdatingActions(updatingActions, props)
checkInsertingActions(insertingActions)
checkDeletingActions(deletingActions)
}
Expand All @@ -802,27 +813,55 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
assert(insert.assignments.length <= targetTableSchema.length,
s"The number of insert assignments[${insert.assignments.length}] must be less than or equal to the " +
s"targetTable field size[${targetTableSchema.length}]"))

// Precombine field and primary key field must be present in the assignment clause of all insert actions.
// Check has no effect if we don't have such fields in target table or we don't have insert actions
insertActions.foreach(action =>
hoodieCatalogTable.preCombineKey.foreach(
field => {
validateTargetTableAttrExistsInAssignments(
sparkSession.sessionState.conf.resolver,
mergeInto.targetTable,
Seq(field),
"pre-combine field",
action.assignments)
validateTargetTableAttrExistsInAssignments(
sparkSession.sessionState.conf.resolver,
mergeInto.targetTable,
hoodieCatalogTable.tableConfig.getRecordKeyFields.orElse(Array.empty),
"primaryKey field",
action.assignments)
}))
}

private def checkUpdatingActions(updateActions: Seq[UpdateAction]): Unit = {
private def checkUpdatingActions(updateActions: Seq[UpdateAction], props: Map[String, String]): Unit = {
if (hoodieCatalogTable.preCombineKey.isEmpty && updateActions.nonEmpty) {
logWarning(s"Updates without precombine can have nondeterministic behavior")
logWarning(s"Updates without pre-combine can have nondeterministic behavior")
}
updateActions.foreach(update =>
assert(update.assignments.length <= targetTableSchema.length,
s"The number of update assignments[${update.assignments.length}] must be less than or equalequal to the " +
s"The number of update assignments[${update.assignments.length}] must be less than or equal to the " +
s"targetTable field size[${targetTableSchema.length}]"))

// For MOR table, the target table field cannot be the right-value in the update action.
if (targetTableType == MOR_TABLE_TYPE_OPT_VAL) {
// For MOR table, the target table field cannot be the right-value in the update action.
updateActions.foreach(update => {
val targetAttrs = update.assignments.flatMap(a => a.value.collect {
case attr: AttributeReference if mergeInto.targetTable.outputSet.contains(attr) => attr
})
assert(targetAttrs.isEmpty,
s"Target table's field(${targetAttrs.map(_.name).mkString(",")}) cannot be the right-value of the update clause for MOR table.")
})
// Only when the partial update is enabled that primary key assignment is not mandatory in update actions for MOR tables.
if (!isPartialUpdateActionForMOR(props)) {
// For MOR table, update assignment clause must have primary key field being set explicitly even if it does not
// change. The check has no effect if there is no updateActions or we don't have primaryKey
updateActions.foreach(action => validateTargetTableAttrExistsInAssignments(
sparkSession.sessionState.conf.resolver,
mergeInto.targetTable,
hoodieCatalogTable.tableConfig.getRecordKeyFields.orElse(Array.empty),
"primaryKey field",
action.assignments))
}
}
}
}
Expand Down Expand Up @@ -855,6 +894,44 @@ object MergeIntoHoodieTableCommand {

def encodeAsBase64String(any: Any): String =
Base64.getEncoder.encodeToString(Serializer.toBytes(any))

/**
* Generic method to validate target table attributes in the assignments clause of the merge into
* statement.
*
* @param resolver The resolver to use
* @param targetTable The target table of the merge
* @param fields The fields from the target table which should have an assignment clause
* @param fieldType String describing the type of field (for error messages)
* @param assignments The assignments clause of the merge into
*
* @throws AnalysisException if the target field from the target table is not found in the assignments.
*/
def validateTargetTableAttrExistsInAssignments(resolver: Resolver,
targetTable: LogicalPlan,
fields: Seq[String],
fieldType: String,
assignments: Seq[Assignment]): Unit = {
// To find corresponding [[fieldType]] attribute w/in the [[assignments]] we do
// - Check if target table itself has the attribute
// - Check if in any of the assignment actions, whose right-hand side attribute
// resolves to the source attribute. For example,
// WHEN MATCHED THEN UPDATE SET targetTable.attribute = <expr>
// the left-hand side of the assignment can be resolved to the target fields we are
// validating here.
fields.foreach { field =>
targetTable.output
.find(attr => resolver(attr.name, field))
.getOrElse(throw new AnalysisException(s"Failed to resolve $fieldType `$field` in target table"))

if (!assignments.exists {
case Assignment(attr: AttributeReference, _) if resolver(attr.name, field) => true
case _ => false
}) {
throw new AnalysisException(s"No matching assignment found for target table $fieldType `$field`")
}
}
}
}

object PartialAssignmentMode extends Enumeration {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo
* In Spark 3.0.x, UPDATE and DELETE can appear at most once in MATCHED clauses in a MERGE INTO statement.
* Refer to: `org.apache.spark.sql.catalyst.parser.AstBuilder#visitMergeIntoTable`
*
* The test also provides test coverage for "Test merge into allowed patterns of assignment clauses".
*/
test("Test MergeInto with more than once update actions") {

Expand Down
Loading

0 comments on commit 92bb3d2

Please sign in to comment.