Skip to content

Commit

Permalink
fix(change-events): guard against NPE's (datahub-project#7264)
Browse files Browse the repository at this point in the history
  • Loading branch information
aditya-radhakrishnan authored and Eric Yomi committed Feb 8, 2023
1 parent d8a0d68 commit a74dacd
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ private Map<String, Object> buildParameters(@Nonnull final DataProcessInstanceRu
parameters.put(RUN_RESULT_KEY, runEvent.getResult().getType().toString());
}

DataProcessInstanceRelationships relationships = getRelationships(entityUrnString);
final DataProcessInstanceRelationships relationships = getRelationships(entityUrnString);
if (relationships == null) {
return parameters;
}

if (relationships.hasParentInstance()) {
parameters.put(PARENT_INSTANCE_URN_KEY, relationships.getParentInstance().toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,17 @@
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;

import static com.linkedin.metadata.timeline.eventgenerator.ChangeEventGeneratorUtils.*;


@Slf4j
public class SchemaMetadataChangeEventGenerator extends EntityChangeEventGenerator<SchemaMetadata> {
private static final String SCHEMA_METADATA_ASPECT_NAME = "schemaMetadata";
private static final String BACKWARDS_INCOMPATIBLE_DESC = "A backwards incompatible change due to";
private static final String FORWARDS_COMPATIBLE_DESC = "A forwards compatible change due to ";
private static final String BACK_AND_FORWARD_COMPATIBLE_DESC = "A forwards & backwards compatible change due to ";
private static final String FIELD_DESCRIPTION_ADDED_FORMAT =
"The description '%s' for the field '%s' has been added.";
Expand All @@ -42,11 +44,11 @@ public class SchemaMetadataChangeEventGenerator extends EntityChangeEventGenerat
private static final String FIELD_DESCRIPTION_MODIFIED_FORMAT =
"The description for the field '%s' has been changed from '%s' to '%s'.";

private static ChangeEvent getDescriptionChange(SchemaField baseField, SchemaField targetField,
private static ChangeEvent getDescriptionChange(@Nullable SchemaField baseField, @Nullable SchemaField targetField,
String datasetFieldUrn, AuditStamp auditStamp) {
String baseDesciption = (baseField != null) ? baseField.getDescription() : null;
String baseDescription = (baseField != null) ? baseField.getDescription() : null;
String targetDescription = (targetField != null) ? targetField.getDescription() : null;
if (baseDesciption == null && targetDescription != null) {
if (baseDescription == null && targetDescription != null) {
// Description got added.
return ChangeEvent.builder()
.operation(ChangeOperation.ADD)
Expand All @@ -57,25 +59,25 @@ private static ChangeEvent getDescriptionChange(SchemaField baseField, SchemaFie
.auditStamp(auditStamp)
.build();
}
if (baseDesciption != null && targetDescription == null) {
if (baseDescription != null && targetDescription == null) {
// Description removed.
return ChangeEvent.builder()
.operation(ChangeOperation.REMOVE)
.semVerChange(SemanticChangeType.MINOR)
.category(ChangeCategory.DOCUMENTATION)
.entityUrn(datasetFieldUrn)
.description(String.format(FIELD_DESCRIPTION_REMOVED_FORMAT, baseDesciption, baseField.getFieldPath()))
.description(String.format(FIELD_DESCRIPTION_REMOVED_FORMAT, baseDescription, baseField.getFieldPath()))
.auditStamp(auditStamp)
.build();
}
if (baseDesciption != null && !baseDesciption.equals(targetDescription)) {
if (baseDescription != null && !baseDescription.equals(targetDescription)) {
// Description Change
return ChangeEvent.builder()
.operation(ChangeOperation.MODIFY)
.semVerChange(SemanticChangeType.PATCH)
.category(ChangeCategory.DOCUMENTATION)
.entityUrn(datasetFieldUrn)
.description(String.format(FIELD_DESCRIPTION_MODIFIED_FORMAT, baseField.getFieldPath(), baseDesciption,
.description(String.format(FIELD_DESCRIPTION_MODIFIED_FORMAT, baseField.getFieldPath(), baseDescription,
targetDescription))
.auditStamp(auditStamp)
.build();
Expand All @@ -84,7 +86,7 @@ private static ChangeEvent getDescriptionChange(SchemaField baseField, SchemaFie
}

private static List<ChangeEvent> getGlobalTagChangeEvents(SchemaField baseField, SchemaField targetField,
String parentUrn,
String parentUrnStr,
String datasetFieldUrn,
AuditStamp auditStamp) {

Expand All @@ -96,17 +98,25 @@ private static List<ChangeEvent> getGlobalTagChangeEvents(SchemaField baseField,
if (baseField != null || targetField != null) {
String fieldPath = targetField != null ? targetField.getFieldPath() : baseField.getFieldPath();
// 2. Convert EntityTagChangeEvent into a SchemaFieldTagChangeEvent.
final Urn parentUrn;
try {
parentUrn = UrnUtils.getUrn(parentUrnStr);
} catch (Exception e) {
log.error("Failed to parse parentUrnStr: {}", parentUrnStr, e);
return Collections.emptyList();
}

return convertEntityTagChangeEvents(
fieldPath,
UrnUtils.getUrn(parentUrn),
parentUrn,
entityTagChangeEvents);
}

return Collections.emptyList();
}

private static List<ChangeEvent> getGlossaryTermsChangeEvents(SchemaField baseField, SchemaField targetField,
String parentUrn,
String parentUrnStr,
String datasetFieldUrn,
AuditStamp auditStamp) {

Expand All @@ -118,9 +128,17 @@ private static List<ChangeEvent> getGlossaryTermsChangeEvents(SchemaField baseFi
if (targetField != null || baseField != null) {
String fieldPath = targetField != null ? targetField.getFieldPath() : baseField.getFieldPath();
// 2. Convert EntityGlossaryTermChangeEvent into a SchemaFieldGlossaryTermChangeEvent.
final Urn parentUrn;
try {
parentUrn = UrnUtils.getUrn(parentUrnStr);
} catch (Exception e) {
log.error("Failed to parse parentUrnStr: {}", parentUrnStr, e);
return Collections.emptyList();
}

return convertEntityGlossaryTermChangeEvents(
fieldPath,
UrnUtils.getUrn(parentUrn),
parentUrn,
entityGlossaryTermsChangeEvents);
}

Expand Down Expand Up @@ -233,7 +251,7 @@ BACKWARDS_INCOMPATIBLE_DESC, getFieldPathV1(curTargetField), curBaseField.getNat
renamedFields.add(renamedField);
}
} else {
// The targetField got added or a rename occurred. Forward & backwards compatible change + minor version bump.
// The targetField got added or a renaming occurred. Forward & backwards compatible change + minor version bump.
SchemaField renamedField = findRenamedField(curTargetField,
baseFields.subList(baseFieldIdx, baseFields.size()), renamedFields);
if (renamedField == null) {
Expand Down Expand Up @@ -271,7 +289,7 @@ BACKWARDS_INCOMPATIBLE_DESC, getFieldPathV1(curTargetField), curBaseField.getNat
changeEvents.addAll(primaryKeyChangeEvents);

// Handle foreign key constraint change events.
List<ChangeEvent> foreignKeyChangeEvents = getForeignKeyChangeEvents(baseSchema, targetSchema);
List<ChangeEvent> foreignKeyChangeEvents = getForeignKeyChangeEvents();
changeEvents.addAll(foreignKeyChangeEvents);

return changeEvents;
Expand Down Expand Up @@ -371,7 +389,6 @@ private static ChangeEvent generateRenameEvent(Urn datasetUrn, SchemaField curBa
.build();
}

@SuppressWarnings("ConstantConditions")
private static SchemaMetadata getSchemaMetadataFromAspect(EntityAspect entityAspect) {
if (entityAspect != null && entityAspect.getMetadata() != null) {
return RecordUtils.toRecordTemplate(SchemaMetadata.class, entityAspect.getMetadata());
Expand All @@ -380,7 +397,7 @@ private static SchemaMetadata getSchemaMetadataFromAspect(EntityAspect entityAsp
}

@SuppressWarnings("UnnecessaryLocalVariable")
private static List<ChangeEvent> getForeignKeyChangeEvents(SchemaMetadata baseSchema, SchemaMetadata targetSchema) {
private static List<ChangeEvent> getForeignKeyChangeEvents() {
List<ChangeEvent> foreignKeyChangeEvents = new ArrayList<>();
// TODO: Implement the diffing logic.
return foreignKeyChangeEvents;
Expand Down Expand Up @@ -435,10 +452,11 @@ public ChangeTransaction getSemanticDiff(EntityAspect previousValue, EntityAspec
SchemaMetadata baseSchema = getSchemaMetadataFromAspect(previousValue);
SchemaMetadata targetSchema = getSchemaMetadataFromAspect(currentValue);
assert (targetSchema != null);
List<ChangeEvent> changeEvents = new ArrayList<>();
List<ChangeEvent> changeEvents;
try {
changeEvents.addAll(
computeDiffs(baseSchema, targetSchema, DatasetUrn.createFromString(currentValue.getUrn()), changeCategory, null));
changeEvents = new ArrayList<>(
computeDiffs(baseSchema, targetSchema, DatasetUrn.createFromString(currentValue.getUrn()), changeCategory,
null));
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Malformed DatasetUrn " + currentValue.getUrn());
}
Expand Down

0 comments on commit a74dacd

Please sign in to comment.