Skip to content

Commit

Permalink
[HUDI-7052] Fix partition key validation for custom key generators. (#…
Browse files Browse the repository at this point in the history
…10014)


---------

Co-authored-by: rmahindra123 <[email protected]>
  • Loading branch information
rmahindra123 and rmahindra123 authored Nov 23, 2023
1 parent 8d6d043 commit 72ff9a7
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,24 +43,24 @@
* PartitionId refers to spark's partition Id.
* RowId refers to the row index within the spark partition.
*/
public class AutoRecordGenWrapperAvroKeyGenerator extends BaseKeyGenerator {
public class AutoRecordGenWrapperAvroKeyGenerator extends BaseKeyGenerator implements AutoRecordKeyGeneratorWrapper {

private final BaseKeyGenerator keyGenerator;
private final int partitionId;
private final String instantTime;
private Integer partitionId;
private String instantTime;
private int rowId;

public AutoRecordGenWrapperAvroKeyGenerator(TypedProperties config, BaseKeyGenerator keyGenerator) {
super(config);
this.keyGenerator = keyGenerator;
this.rowId = 0;
this.partitionId = config.getInteger(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG);
this.instantTime = config.getString(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG);
partitionId = null;
instantTime = null;
}

@Override
public String getRecordKey(GenericRecord record) {
return HoodieRecord.generateSequenceId(instantTime, partitionId, rowId++);
return generateSequenceId(rowId++);
}

@Override
Expand All @@ -80,4 +80,19 @@ public List<String> getPartitionPathFields() {
public boolean isConsistentLogicalTimestampEnabled() {
return keyGenerator.isConsistentLogicalTimestampEnabled();
}

@Override
public BaseKeyGenerator getPartitionKeyGenerator() {
return keyGenerator;
}

private String generateSequenceId(long recordIndex) {
if (partitionId == null) {
this.partitionId = config.getInteger(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG);
}
if (instantTime == null) {
this.instantTime = config.getString(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG);
}
return HoodieRecord.generateSequenceId(instantTime, partitionId, recordIndex);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.keygen;

/**
* Interface for {@link KeyGenerator} implementations that
* generate a unique record key internally.
*/
public interface AutoRecordKeyGeneratorWrapper {

/**
* @returns the underlying key generator used for the partition path.
*/
BaseKeyGenerator getPartitionKeyGenerator();
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,62 +47,76 @@
* PartitionId refers to spark's partition Id.
* RowId refers to the row index within the spark partition.
*/
public class AutoRecordGenWrapperKeyGenerator extends BuiltinKeyGenerator {
public class AutoRecordGenWrapperKeyGenerator extends BuiltinKeyGenerator implements AutoRecordKeyGeneratorWrapper {

private final BuiltinKeyGenerator builtinKeyGenerator;
private final int partitionId;
private final String instantTime;
private final BuiltinKeyGenerator keyGenerator;
private Integer partitionId;
private String instantTime;
private int rowId;

public AutoRecordGenWrapperKeyGenerator(TypedProperties config, BuiltinKeyGenerator builtinKeyGenerator) {
public AutoRecordGenWrapperKeyGenerator(TypedProperties config, BuiltinKeyGenerator keyGenerator) {
super(config);
this.builtinKeyGenerator = builtinKeyGenerator;
this.keyGenerator = keyGenerator;
this.rowId = 0;
this.partitionId = config.getInteger(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG);
this.instantTime = config.getString(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG);
partitionId = null;
instantTime = null;
}

@Override
public String getRecordKey(GenericRecord record) {
return HoodieRecord.generateSequenceId(instantTime, partitionId, rowId++);
return generateSequenceId(rowId++);
}

@Override
public String getPartitionPath(GenericRecord record) {
return builtinKeyGenerator.getPartitionPath(record);
return keyGenerator.getPartitionPath(record);
}

@Override
public String getRecordKey(Row row) {
return HoodieRecord.generateSequenceId(instantTime, partitionId, rowId++);
return generateSequenceId(rowId++);
}

@Override
public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
return UTF8String.fromString(HoodieRecord.generateSequenceId(instantTime, partitionId, rowId++));
return UTF8String.fromString(generateSequenceId(rowId++));
}

@Override
public String getPartitionPath(Row row) {
return builtinKeyGenerator.getPartitionPath(row);
return keyGenerator.getPartitionPath(row);
}

@Override
public UTF8String getPartitionPath(InternalRow internalRow, StructType schema) {
return builtinKeyGenerator.getPartitionPath(internalRow, schema);
return keyGenerator.getPartitionPath(internalRow, schema);
}

@Override
public List<String> getRecordKeyFieldNames() {
return builtinKeyGenerator.getRecordKeyFieldNames();
return keyGenerator.getRecordKeyFieldNames();
}

public List<String> getPartitionPathFields() {
return builtinKeyGenerator.getPartitionPathFields();
return keyGenerator.getPartitionPathFields();
}

public boolean isConsistentLogicalTimestampEnabled() {
return builtinKeyGenerator.isConsistentLogicalTimestampEnabled();
return keyGenerator.isConsistentLogicalTimestampEnabled();
}

@Override
public BuiltinKeyGenerator getPartitionKeyGenerator() {
return keyGenerator;
}

private String generateSequenceId(long recordIndex) {
if (partitionId == null) {
this.partitionId = config.getInteger(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG);
}
if (instantTime == null) {
this.instantTime = config.getString(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG);
}
return HoodieRecord.generateSequenceId(instantTime, partitionId, recordIndex);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,8 @@ import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.common.util.ValidationUtils.checkArgument
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator, GlobalAvroDeleteKeyGenerator, GlobalDeleteKeyGenerator, KeyGenerator, NonpartitionedAvroKeyGenerator, NonpartitionedKeyGenerator, SimpleKeyGenerator}
import org.apache.hudi.keygen.{AutoRecordKeyGeneratorWrapper, AutoRecordGenWrapperKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator, GlobalAvroDeleteKeyGenerator, GlobalDeleteKeyGenerator, KeyGenerator, NonpartitionedAvroKeyGenerator, NonpartitionedKeyGenerator}
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory.getKeyGeneratorClassName

import scala.collection.JavaConverters._

object SparkKeyGenUtils {

Expand All @@ -34,26 +31,34 @@ object SparkKeyGenUtils {
* @return partition columns
*/
def getPartitionColumns(props: TypedProperties): String = {
val keyGeneratorClass = getKeyGeneratorClassName(props)
getPartitionColumns(keyGeneratorClass, props)
val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props)
getPartitionColumns(keyGenerator, props)
}

/**
* @param keyGen key generator class name
* @return partition columns
*/
def getPartitionColumns(keyGenClass: String, typedProperties: TypedProperties): String = {
def getPartitionColumns(keyGenClass: KeyGenerator, typedProperties: TypedProperties): String = {
// For {@link AutoRecordGenWrapperKeyGenerator} or {@link AutoRecordGenWrapperAvroKeyGenerator},
// get the base key generator for the partition paths
var baseKeyGen = keyGenClass match {
case autoRecordKeyGenerator: AutoRecordKeyGeneratorWrapper =>
autoRecordKeyGenerator.getPartitionKeyGenerator
case _ => keyGenClass
}

// For CustomKeyGenerator and CustomAvroKeyGenerator, the partition path filed format
// is: "field_name: field_type", we extract the field_name from the partition path field.
if (keyGenClass.equals(classOf[CustomKeyGenerator].getCanonicalName) || keyGenClass.equals(classOf[CustomAvroKeyGenerator].getCanonicalName)) {
if (baseKeyGen.isInstanceOf[CustomKeyGenerator] || baseKeyGen.isInstanceOf[CustomAvroKeyGenerator]) {
typedProperties.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())
.split(",").map(pathField => {
pathField.split(CustomAvroKeyGenerator.SPLIT_REGEX)
.headOption.getOrElse(s"Illegal partition path field format: '$pathField' for ${keyGenClass}")}).mkString(",")
} else if (keyGenClass.equals(classOf[NonpartitionedKeyGenerator].getCanonicalName)
|| keyGenClass.equals(classOf[NonpartitionedAvroKeyGenerator].getCanonicalName)
|| keyGenClass.equals(classOf[GlobalDeleteKeyGenerator].getCanonicalName)
|| keyGenClass.equals(classOf[GlobalAvroDeleteKeyGenerator].getCanonicalName)) {
.headOption.getOrElse(s"Illegal partition path field format: '$pathField' for ${baseKeyGen}")}).mkString(",")
} else if (baseKeyGen.isInstanceOf[NonpartitionedKeyGenerator]
|| baseKeyGen.isInstanceOf[NonpartitionedAvroKeyGenerator]
|| baseKeyGen.isInstanceOf[GlobalDeleteKeyGenerator]
|| baseKeyGen.isInstanceOf[GlobalAvroDeleteKeyGenerator]) {
StringUtils.EMPTY_STRING
} else {
checkArgument(typedProperties.containsKey(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()), "Partition path needs to be set")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,14 +268,14 @@ class HoodieSparkSqlWriterInternal {
}
}

val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(hoodieConfig.getProps))
if (mode == SaveMode.Ignore && tableExists) {
log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.")
(false, common.util.Option.empty(), common.util.Option.empty(), common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig)
} else {
// Handle various save modes
handleSaveModes(sqlContext.sparkSession, mode, basePath, tableConfig, tblName, operation, fs)
val partitionColumns = SparkKeyGenUtils.getPartitionColumns(getKeyGeneratorClassName(new TypedProperties(hoodieConfig.getProps)),
toProperties(parameters))
val partitionColumns = SparkKeyGenUtils.getPartitionColumns(keyGenerator, toProperties(parameters))
val timelineTimeZone = HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE))
val tableMetaClient = if (tableExists) {
HoodieInstantTimeGenerator.setCommitTimeZone(timelineTimeZone)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,11 @@ object HoodieWriterUtils {
}

val datasourcePartitionFields = params.getOrElse(PARTITIONPATH_FIELD.key(), null)
val currentPartitionFields = if (datasourcePartitionFields == null) null else SparkKeyGenUtils.getPartitionColumns(TypedProperties.fromMap(params))
val tableConfigPartitionFields = tableConfig.getString(HoodieTableConfig.PARTITION_FIELDS)
if (null != datasourcePartitionFields && null != tableConfigPartitionFields
&& datasourcePartitionFields != tableConfigPartitionFields) {
diffConfigs.append(s"PartitionPath:\t$datasourcePartitionFields\t$tableConfigPartitionFields\n")
&& currentPartitionFields != tableConfigPartitionFields) {
diffConfigs.append(s"PartitionPath:\t$currentPartitionFields\t$tableConfigPartitionFields\n")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ class TestHoodieSparkSqlWriter {
val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)

// try write to Hudi
assertThrows[IllegalArgumentException] {
assertThrows[IOException] {
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, tableOpts - DataSourceWriteOptions.PARTITIONPATH_FIELD.key, df)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1001,8 +1001,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup
writer.save(basePath)
fail("should fail when invalid PartitionKeyType is provided!")
} catch {
case e: Exception =>
assertTrue(e.getCause.getMessage.contains("No enum constant org.apache.hudi.keygen.CustomAvroKeyGenerator.PartitionKeyType.DUMMY"))
case e: Exception => assertTrue(e.getCause.getMessage.contains("Unable to instantiate class org.apache.hudi.keygen.CustomKeyGenerator"))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,16 +379,16 @@ public void testKafkaConnectCheckpointProvider() throws IOException {

@Test
public void testPropsWithInvalidKeyGenerator() {
Exception e = assertThrows(IllegalArgumentException.class, () -> {
Exception e = assertThrows(IOException.class, () -> {
String tableBasePath = basePath + "/test_table_invalid_key_gen";
HoodieDeltaStreamer deltaStreamer =
new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT,
Collections.singletonList(TripsWithDistanceTransformer.class.getName()), PROPS_FILENAME_TEST_INVALID, false), jsc);
deltaStreamer.sync();
}, "Should error out when setting the key generator class property to an invalid value");
// expected
LOG.debug("Expected error during getting the key generator", e);
assertTrue(e.getMessage().contains("No KeyGeneratorType found for class name"));
LOG.warn("Expected error during getting the key generator", e);
assertTrue(e.getMessage().contains("Could not load key generator class invalid"));
}

private static Stream<Arguments> provideInferKeyGenArgs() {
Expand Down

0 comments on commit 72ff9a7

Please sign in to comment.