Skip to content

Commit

Permalink
avoid double convert
Browse files Browse the repository at this point in the history
  • Loading branch information
RyanHolstien committed Oct 3, 2022
1 parent 132c22a commit 4015769
Showing 1 changed file with 20 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public static PlatformEvent avroToPegasusPE(@Nonnull GenericRecord record) throw
public static GenericRecord pegasusToAvroMAE(@Nonnull MetadataAuditEvent event) throws IOException {
GenericRecord original =
DataTranslator.dataMapToGenericRecord(event.data(), event.schema(), ORIGINAL_MAE_AVRO_SCHEMA);
return renameSchemaNamespace(original, ORIGINAL_MAE_AVRO_SCHEMA, RENAMED_MAE_AVRO_SCHEMA);
return renameSchemaNamespace(original, RENAMED_MAE_AVRO_SCHEMA);
}

/**
Expand All @@ -185,7 +185,7 @@ public static GenericRecord pegasusToAvroMAE(@Nonnull MetadataAuditEvent event)
public static GenericRecord pegasusToAvroMCL(@Nonnull MetadataChangeLog event) throws IOException {
GenericRecord original =
DataTranslator.dataMapToGenericRecord(event.data(), event.schema(), ORIGINAL_MCL_AVRO_SCHEMA);
return renameSchemaNamespace(original, ORIGINAL_MCL_AVRO_SCHEMA, RENAMED_MCL_AVRO_SCHEMA);
return renameSchemaNamespace(original, RENAMED_MCL_AVRO_SCHEMA);
}

/**
Expand All @@ -199,7 +199,7 @@ public static GenericRecord pegasusToAvroMCL(@Nonnull MetadataChangeLog event) t
public static GenericRecord pegasusToAvroMCP(@Nonnull MetadataChangeProposal event) throws IOException {
GenericRecord original =
DataTranslator.dataMapToGenericRecord(event.data(), event.schema(), ORIGINAL_MCP_AVRO_SCHEMA);
return renameSchemaNamespace(original, ORIGINAL_MCP_AVRO_SCHEMA, RENAMED_MCP_AVRO_SCHEMA);
return renameSchemaNamespace(original, RENAMED_MCP_AVRO_SCHEMA);
}

/**
Expand All @@ -213,7 +213,7 @@ public static GenericRecord pegasusToAvroMCP(@Nonnull MetadataChangeProposal eve
public static GenericRecord pegasusToAvroMCE(@Nonnull MetadataChangeEvent event) throws IOException {
GenericRecord original =
DataTranslator.dataMapToGenericRecord(event.data(), event.schema(), ORIGINAL_MCE_AVRO_SCHEMA);
return renameSchemaNamespace(original, ORIGINAL_MCE_AVRO_SCHEMA, RENAMED_MCE_AVRO_SCHEMA);
return renameSchemaNamespace(original, RENAMED_MCE_AVRO_SCHEMA);
}

/**
Expand Down Expand Up @@ -246,7 +246,7 @@ public static GenericRecord pegasusToAvroFailedMCE(@Nonnull FailedMetadataChange
GenericRecord original =
DataTranslator.dataMapToGenericRecord(failedMetadataChangeEvent.data(), failedMetadataChangeEvent.schema(),
ORIGINAL_FAILED_MCE_AVRO_SCHEMA);
return renameSchemaNamespace(original, ORIGINAL_FAILED_MCE_AVRO_SCHEMA, RENAMED_FAILED_MCE_AVRO_SCHEMA);
return renameSchemaNamespace(original, RENAMED_FAILED_MCE_AVRO_SCHEMA);
}

/**
Expand All @@ -262,7 +262,7 @@ public static GenericRecord pegasusToAvroFailedMCP(
GenericRecord original =
DataTranslator.dataMapToGenericRecord(failedMetadataChangeProposal.data(), failedMetadataChangeProposal.schema(),
ORIGINAL_FMCL_AVRO_SCHEMA);
return renameSchemaNamespace(original, ORIGINAL_FMCL_AVRO_SCHEMA, RENAMED_FMCP_AVRO_SCHEMA);
return renameSchemaNamespace(original, RENAMED_FMCP_AVRO_SCHEMA);
}

/**
Expand All @@ -276,13 +276,16 @@ public static GenericRecord pegasusToAvroFailedMCP(
public static GenericRecord pegasusToAvroPE(@Nonnull PlatformEvent event) throws IOException {
GenericRecord original =
DataTranslator.dataMapToGenericRecord(event.data(), event.schema(), ORIGINAL_PE_AVRO_SCHEMA);
return renameSchemaNamespace(original, ORIGINAL_PE_AVRO_SCHEMA, RENAMED_PE_AVRO_SCHEMA);
return renameSchemaNamespace(original, RENAMED_PE_AVRO_SCHEMA);
}

/**
* Converts original MXE into a renamed namespace
* Does a double convert that should not be necessary since we're already converting prior to calling this method
* in most spots
*/
@Nonnull
@Deprecated
private static GenericRecord renameSchemaNamespace(@Nonnull GenericRecord original, @Nonnull Schema originalSchema,
@Nonnull Schema newSchema) throws IOException {

Expand All @@ -293,6 +296,16 @@ private static GenericRecord renameSchemaNamespace(@Nonnull GenericRecord origin
return changeSchema(record, newSchema, newSchema);
}

/**
* Converts original MXE into a renamed namespace
*/
@Nonnull
private static GenericRecord renameSchemaNamespace(@Nonnull GenericRecord original, @Nonnull Schema newSchema)
throws IOException {

return changeSchema(original, newSchema, newSchema);
}

/**
* Changes the schema of a {@link GenericRecord} to a compatible schema
*
Expand Down

0 comments on commit 4015769

Please sign in to comment.