Skip to content

Commit

Permalink
Fix partition key validation and support for custom payloads
Browse files Browse the repository at this point in the history
  • Loading branch information
rmahindra123 authored and rmahindra123 committed Nov 8, 2023
1 parent e731755 commit 898d03c
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,26 +34,26 @@ object SparkKeyGenUtils {
* @return partition columns
*/
def getPartitionColumns(props: TypedProperties): String = {
val keyGeneratorClass = getKeyGeneratorClassName(props)
getPartitionColumns(keyGeneratorClass, props)
val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props)
getPartitionColumns(keyGenerator, props)
}

/**
* @param keyGen key generator class name
* @return partition columns
*/
def getPartitionColumns(keyGenClass: String, typedProperties: TypedProperties): String = {
def getPartitionColumns(keyGenClass: KeyGenerator, typedProperties: TypedProperties): String = {
// For CustomKeyGenerator and CustomAvroKeyGenerator, the partition path filed format
// is: "field_name: field_type", we extract the field_name from the partition path field.
if (keyGenClass.equals(classOf[CustomKeyGenerator].getCanonicalName) || keyGenClass.equals(classOf[CustomAvroKeyGenerator].getCanonicalName)) {
if (keyGenClass.isInstanceOf[CustomKeyGenerator] || keyGenClass.isInstanceOf[CustomAvroKeyGenerator]) {
typedProperties.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())
.split(",").map(pathField => {
pathField.split(CustomAvroKeyGenerator.SPLIT_REGEX)
.headOption.getOrElse(s"Illegal partition path field format: '$pathField' for ${keyGenClass}")}).mkString(",")
} else if (keyGenClass.equals(classOf[NonpartitionedKeyGenerator].getCanonicalName)
|| keyGenClass.equals(classOf[NonpartitionedAvroKeyGenerator].getCanonicalName)
|| keyGenClass.equals(classOf[GlobalDeleteKeyGenerator].getCanonicalName)
|| keyGenClass.equals(classOf[GlobalAvroDeleteKeyGenerator].getCanonicalName)) {
} else if (keyGenClass.isInstanceOf[NonpartitionedKeyGenerator]
|| keyGenClass.isInstanceOf[NonpartitionedAvroKeyGenerator]
|| keyGenClass.isInstanceOf[GlobalDeleteKeyGenerator]
|| keyGenClass.isInstanceOf[GlobalAvroDeleteKeyGenerator]) {
StringUtils.EMPTY_STRING
} else {
checkArgument(typedProperties.containsKey(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()), "Partition path needs to be set")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,14 +263,14 @@ class HoodieSparkSqlWriterInternal {
}
}

val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(hoodieConfig.getProps))
if (mode == SaveMode.Ignore && tableExists) {
log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.")
(false, common.util.Option.empty(), common.util.Option.empty(), common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig)
} else {
// Handle various save modes
handleSaveModes(sqlContext.sparkSession, mode, basePath, tableConfig, tblName, operation, fs)
val partitionColumns = SparkKeyGenUtils.getPartitionColumns(getKeyGeneratorClassName(new TypedProperties(hoodieConfig.getProps)),
toProperties(parameters))
val partitionColumns = SparkKeyGenUtils.getPartitionColumns(keyGenerator, toProperties(parameters))
val timelineTimeZone = HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE))
val tableMetaClient = if (tableExists) {
HoodieInstantTimeGenerator.setCommitTimeZone(timelineTimeZone)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,11 @@ object HoodieWriterUtils {
}

val datasourcePartitionFields = params.getOrElse(PARTITIONPATH_FIELD.key(), null)
val currentPartitionFields = if (datasourcePartitionFields == null) null else SparkKeyGenUtils.getPartitionColumns(TypedProperties.fromMap(params))
val tableConfigPartitionFields = tableConfig.getString(HoodieTableConfig.PARTITION_FIELDS)
if (null != datasourcePartitionFields && null != tableConfigPartitionFields
&& datasourcePartitionFields != tableConfigPartitionFields) {
diffConfigs.append(s"PartitionPath:\t$datasourcePartitionFields\t$tableConfigPartitionFields\n")
&& currentPartitionFields != tableConfigPartitionFields) {
diffConfigs.append(s"PartitionPath:\t$currentPartitionFields\t$tableConfigPartitionFields\n")
}
}

Expand Down

0 comments on commit 898d03c

Please sign in to comment.