Skip to content

Commit

Permalink
fix(api/timeline): fix corner cases missed, add tests (#11288)
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored Sep 5, 2024
1 parent 199c203 commit 0788347
Show file tree
Hide file tree
Showing 2 changed files with 263 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,7 @@
import com.linkedin.schema.SchemaMetadata;
import jakarta.json.JsonPatch;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.*;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -208,160 +203,168 @@ private static List<ChangeEvent> getFieldPropertyChangeEvents(
return propChangeEvents;
}

// TODO: This could use some cleanup, lots of repeated logic and tenuous conditionals
private static Map<String, SchemaField> getSchemaFieldMap(SchemaFieldArray fieldArray) {
Map<String, SchemaField> fieldMap = new HashMap<>();
fieldArray.forEach(schemaField -> fieldMap.put(schemaField.getFieldPath(), schemaField));
return fieldMap;
}

private static void processFieldPathDataTypeChange(
String baseFieldPath,
Urn datasetUrn,
ChangeCategory changeCategory,
AuditStamp auditStamp,
Map<String, SchemaField> baseFieldMap,
Map<String, SchemaField> targetFieldMap,
Set<String> processedBaseFields,
Set<String> processedTargetFields,
List<ChangeEvent> changeEvents) {
SchemaField curBaseField = baseFieldMap.get(baseFieldPath);
if (!targetFieldMap.containsKey(baseFieldPath)) {
return;
}
processedBaseFields.add(baseFieldPath);
processedTargetFields.add(baseFieldPath);
SchemaField curTargetField = targetFieldMap.get(baseFieldPath);
if (!curBaseField.getNativeDataType().equals(curTargetField.getNativeDataType())) {
// Non-backward compatible change + Major version bump
if (ChangeCategory.TECHNICAL_SCHEMA.equals(changeCategory)) {
changeEvents.add(
DatasetSchemaFieldChangeEvent.schemaFieldChangeEventBuilder()
.category(ChangeCategory.TECHNICAL_SCHEMA)
.modifier(getSchemaFieldUrn(datasetUrn, curBaseField).toString())
.entityUrn(datasetUrn.toString())
.operation(ChangeOperation.MODIFY)
.semVerChange(SemanticChangeType.MAJOR)
.description(
String.format(
"%s native datatype of the field '%s' changed from '%s' to '%s'.",
BACKWARDS_INCOMPATIBLE_DESC,
getFieldPathV1(curTargetField),
curBaseField.getNativeDataType(),
curTargetField.getNativeDataType()))
.fieldPath(curBaseField.getFieldPath())
.fieldUrn(getSchemaFieldUrn(datasetUrn, curBaseField))
.nullable(curBaseField.isNullable())
.auditStamp(auditStamp)
.build());
}
List<ChangeEvent> propChangeEvents =
getFieldPropertyChangeEvents(
curBaseField, curTargetField, datasetUrn, changeCategory, auditStamp);
changeEvents.addAll(propChangeEvents);
}
List<ChangeEvent> propChangeEvents =
getFieldPropertyChangeEvents(
curBaseField, curTargetField, datasetUrn, changeCategory, auditStamp);
changeEvents.addAll(propChangeEvents);
}

private static void processFieldPathRename(
String baseFieldPath,
Urn datasetUrn,
ChangeCategory changeCategory,
AuditStamp auditStamp,
Map<String, SchemaField> baseFieldMap,
Map<String, SchemaField> targetFieldMap,
Set<String> processedBaseFields,
Set<String> processedTargetFields,
List<ChangeEvent> changeEvents,
Set<SchemaField> renamedFields) {

List<SchemaField> nonProcessedTargetSchemaFields = new ArrayList<>();
targetFieldMap.forEach(
(s, schemaField) -> {
if (!processedTargetFields.contains(s)) {
nonProcessedTargetSchemaFields.add(schemaField);
}
});

SchemaField curBaseField = baseFieldMap.get(baseFieldPath);
SchemaField renamedField =
findRenamedField(curBaseField, nonProcessedTargetSchemaFields, renamedFields);
processedBaseFields.add(baseFieldPath);
if (renamedField == null) {
processRemoval(changeCategory, changeEvents, datasetUrn, curBaseField, auditStamp);
} else {
if (!ChangeCategory.TECHNICAL_SCHEMA.equals(changeCategory)) {
return;
}
processedTargetFields.add(renamedField.getFieldPath());
changeEvents.add(generateRenameEvent(datasetUrn, curBaseField, renamedField, auditStamp));
List<ChangeEvent> propChangeEvents =
getFieldPropertyChangeEvents(
curBaseField, renamedField, datasetUrn, changeCategory, auditStamp);
changeEvents.addAll(propChangeEvents);
}
}

private static Set<String> getNonProcessedFields(
Map<String, SchemaField> fieldMap, Set<String> processedFields) {
Set<String> nonProcessedFields = new HashSet<>(fieldMap.keySet());
nonProcessedFields.removeAll(processedFields);
return nonProcessedFields;
}

private static List<ChangeEvent> computeDiffs(
SchemaMetadata baseSchema,
SchemaMetadata targetSchema,
Urn datasetUrn,
ChangeCategory changeCategory,
AuditStamp auditStamp) {
// Sort the fields by their field path.
if (baseSchema != null) {
sortFieldsByPath(baseSchema);
}
if (targetSchema != null) {
sortFieldsByPath(targetSchema);
}

// Performs ordinal based diff, primarily based on fixed field ordinals and their types.
SchemaFieldArray baseFields =
(baseSchema != null ? baseSchema.getFields() : new SchemaFieldArray());
SchemaFieldArray targetFields =
targetSchema != null ? targetSchema.getFields() : new SchemaFieldArray();
int baseFieldIdx = 0;
int targetFieldIdx = 0;

Map<String, SchemaField> baseFieldMap = getSchemaFieldMap(baseFields);
Map<String, SchemaField> targetFieldMap = getSchemaFieldMap(targetFields);

Set<String> processedBaseFields = new HashSet<>();
Set<String> processedTargetFields = new HashSet<>();

List<ChangeEvent> changeEvents = new ArrayList<>();
Set<SchemaField> renamedFields = new HashSet<>();
while (baseFieldIdx < baseFields.size() && targetFieldIdx < targetFields.size()) {
SchemaField curBaseField = baseFields.get(baseFieldIdx);
SchemaField curTargetField = targetFields.get(targetFieldIdx);
// TODO: Re-evaluate ordinal processing?
int comparison = curBaseField.getFieldPath().compareTo(curTargetField.getFieldPath());
if (renamedFields.contains(curBaseField)) {
baseFieldIdx++;
} else if (renamedFields.contains(curTargetField)) {
targetFieldIdx++;
} else if (comparison == 0) {
// This is the same field. Check for change events from property changes.
if (!curBaseField.getNativeDataType().equals(curTargetField.getNativeDataType())) {
// Non-backward compatible change + Major version bump
if (ChangeCategory.TECHNICAL_SCHEMA.equals(changeCategory)) {
changeEvents.add(
DatasetSchemaFieldChangeEvent.schemaFieldChangeEventBuilder()
.category(ChangeCategory.TECHNICAL_SCHEMA)
.modifier(getSchemaFieldUrn(datasetUrn, curBaseField).toString())
.entityUrn(datasetUrn.toString())
.operation(ChangeOperation.MODIFY)
.semVerChange(SemanticChangeType.MAJOR)
.description(
String.format(
"%s native datatype of the field '%s' changed from '%s' to '%s'.",
BACKWARDS_INCOMPATIBLE_DESC,
getFieldPathV1(curTargetField),
curBaseField.getNativeDataType(),
curTargetField.getNativeDataType()))
.fieldPath(curBaseField.getFieldPath())
.fieldUrn(getSchemaFieldUrn(datasetUrn, curBaseField))
.nullable(curBaseField.isNullable())
.auditStamp(auditStamp)
.build());
}
List<ChangeEvent> propChangeEvents =
getFieldPropertyChangeEvents(
curBaseField, curTargetField, datasetUrn, changeCategory, auditStamp);
changeEvents.addAll(propChangeEvents);
++baseFieldIdx;
++targetFieldIdx;
}
List<ChangeEvent> propChangeEvents =
getFieldPropertyChangeEvents(
curBaseField, curTargetField, datasetUrn, changeCategory, auditStamp);
changeEvents.addAll(propChangeEvents);
++baseFieldIdx;
++targetFieldIdx;
} else if (comparison < 0) {
// Base Field was removed or was renamed. Non-backward compatible change + Major version
// bump
// Check for rename, if rename coincides with other modifications we assume drop/add.
// Assumes that two different fields on the same schema would not have the same description,
// terms,
// or tags and share the same type
SchemaField renamedField =
findRenamedField(
curBaseField,
targetFields.subList(targetFieldIdx, targetFields.size()),
renamedFields);
if (renamedField == null) {
processRemoval(changeCategory, changeEvents, datasetUrn, curBaseField, auditStamp);
++baseFieldIdx;
} else {
changeEvents.add(generateRenameEvent(datasetUrn, curBaseField, renamedField, auditStamp));
List<ChangeEvent> propChangeEvents =
getFieldPropertyChangeEvents(
curBaseField, curTargetField, datasetUrn, changeCategory, auditStamp);
changeEvents.addAll(propChangeEvents);
++baseFieldIdx;
renamedFields.add(renamedField);
}
} else {
// The targetField got added or a renaming occurred. Forward & backwards compatible change +
// minor version bump.
SchemaField renamedField =
findRenamedField(
curTargetField, baseFields.subList(baseFieldIdx, baseFields.size()), renamedFields);
if (renamedField == null) {
processAdd(changeCategory, changeEvents, datasetUrn, curTargetField, auditStamp);
++targetFieldIdx;
} else {
changeEvents.add(
generateRenameEvent(datasetUrn, renamedField, curTargetField, auditStamp));
List<ChangeEvent> propChangeEvents =
getFieldPropertyChangeEvents(
curBaseField, curTargetField, datasetUrn, changeCategory, auditStamp);
changeEvents.addAll(propChangeEvents);
++targetFieldIdx;
renamedFields.add(renamedField);
}
}
}
while (baseFieldIdx < baseFields.size()) {
// Handle removed fields. Non-backward compatible change + major version bump
SchemaField baseField = baseFields.get(baseFieldIdx);
if (!renamedFields.contains(baseField)) {
processRemoval(changeCategory, changeEvents, datasetUrn, baseField, auditStamp);
}
++baseFieldIdx;

for (String baseFieldPath : baseFieldMap.keySet()) {
processFieldPathDataTypeChange(
baseFieldPath,
datasetUrn,
changeCategory,
auditStamp,
baseFieldMap,
targetFieldMap,
processedBaseFields,
processedTargetFields,
changeEvents);
}
while (targetFieldIdx < targetFields.size()) {
// Newly added fields. Forwards & backwards compatible change + minor version bump.
SchemaField targetField = targetFields.get(targetFieldIdx);
if (!renamedFields.contains(targetField)) {
processAdd(changeCategory, changeEvents, datasetUrn, targetField, auditStamp);
}
targetFieldIdx++;
Set<String> nonProcessedBaseFields = getNonProcessedFields(baseFieldMap, processedBaseFields);
for (String baseFieldPath : nonProcessedBaseFields) {
processFieldPathRename(
baseFieldPath,
datasetUrn,
changeCategory,
auditStamp,
baseFieldMap,
targetFieldMap,
processedBaseFields,
processedTargetFields,
changeEvents,
renamedFields);
}

// Handle primary key constraint change events.
List<ChangeEvent> primaryKeyChangeEvents =
getPrimaryKeyChangeEvents(baseSchema, targetSchema, datasetUrn, auditStamp);
changeEvents.addAll(primaryKeyChangeEvents);
Set<String> nonProcessedTargetFields =
getNonProcessedFields(targetFieldMap, processedTargetFields);

// Handle foreign key constraint change events.
List<ChangeEvent> foreignKeyChangeEvents = getForeignKeyChangeEvents();
changeEvents.addAll(foreignKeyChangeEvents);
nonProcessedTargetFields.forEach(
fieldPath -> {
SchemaField curTargetField = targetFieldMap.get(fieldPath);
processAdd(changeCategory, changeEvents, datasetUrn, curTargetField, auditStamp);
});

return changeEvents;
}

private static void sortFieldsByPath(SchemaMetadata schemaMetadata) {
if (schemaMetadata == null) {
throw new IllegalArgumentException("SchemaMetadata should not be null");
}
List<SchemaField> schemaFields = new ArrayList<>(schemaMetadata.getFields());
schemaFields.sort(Comparator.comparing(SchemaField::getFieldPath));
schemaMetadata.setFields(new SchemaFieldArray(schemaFields));
}

private static SchemaField findRenamedField(
SchemaField curField, List<SchemaField> targetFields, Set<SchemaField> renamedFields) {
return targetFields.stream()
Expand Down Expand Up @@ -391,7 +394,14 @@ private static boolean parentFieldsMatch(SchemaField curField, SchemaField schem
}

private static boolean descriptionsMatch(SchemaField curField, SchemaField schemaField) {
return StringUtils.isNotBlank(curField.getDescription())
if (StringUtils.isBlank(curField.getDescription())
&& StringUtils.isBlank(schemaField.getDescription())) {
return true;
}
return !(StringUtils.isBlank(curField.getDescription())
&& StringUtils.isNotBlank(schemaField.getDescription()))
&& !(StringUtils.isNotBlank(curField.getDescription())
&& StringUtils.isBlank(schemaField.getDescription()))
&& curField.getDescription().equals(schemaField.getDescription());
}

Expand Down
Loading

0 comments on commit 0788347

Please sign in to comment.