From 84990aebe80fcda8813283189956ce565227b32e Mon Sep 17 00:00:00 2001 From: Rajesh Mahindra <76502047+rmahindra123@users.noreply.github.com> Date: Mon, 20 Nov 2023 11:17:45 -0800 Subject: [PATCH] Fix schema refresh for KafkaAvroSchemaDeserializer (#10118) Co-authored-by: rmahindra123 --- .../utilities/sources/AvroKafkaSource.java | 29 ++++++++++++++----- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java index e9353bb26660..2bf92280faf5 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java @@ -19,6 +19,7 @@ package org.apache.hudi.utilities.sources; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.Option; import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer; import org.apache.hudi.utilities.exception.HoodieReadFromSourceException; @@ -78,18 +79,25 @@ public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, Spa try { props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, Class.forName(deserializerClassName).getName()); - if (deserializerClassName.equals(KafkaAvroSchemaDeserializer.class.getName())) { - if (schemaProvider == null) { - throw new HoodieReadFromSourceException("SchemaProvider has to be set to use KafkaAvroSchemaDeserializer"); - } - props.put(KAFKA_VALUE_DESERIALIZER_SCHEMA.key(), schemaProvider.getSourceSchema().toString()); - } } catch (ClassNotFoundException e) { String error = "Could not load custom avro kafka deserializer: " + deserializerClassName; LOG.error(error); throw new HoodieReadFromSourceException(error, e); } - this.offsetGen = new KafkaOffsetGen(props); + + if (deserializerClassName.equals(KafkaAvroSchemaDeserializer.class.getName())) { + configureSchemaDeserializer(); + } + offsetGen = new KafkaOffsetGen(props); + } + + @Override + protected InputBatch> fetchNewData(Option lastCheckpointStr, long sourceLimit) { + if (deserializerClassName.equals(KafkaAvroSchemaDeserializer.class.getName())) { + configureSchemaDeserializer(); + offsetGen = new KafkaOffsetGen(props); + } + return super.fetchNewData(lastCheckpointStr, sourceLimit); } @Override @@ -121,4 +129,11 @@ protected JavaRDD maybeAppendKafkaOffsets(JavaRDD (GenericRecord) consumerRecord.value()); } } + + private void configureSchemaDeserializer() { + if (schemaProvider == null) { + throw new HoodieReadFromSourceException("SchemaProvider has to be set to use KafkaAvroSchemaDeserializer"); + } + props.put(KAFKA_VALUE_DESERIALIZER_SCHEMA.key(), schemaProvider.getSourceSchema().toString()); + } }