Skip to content

Commit

Permalink
refactor(change-events): refactor AspectDiffers to EntityChangeEventG…
Browse files Browse the repository at this point in the history
…enerators (#6320)

* refactor(change-events): refactor AspectDiffers to EntityChangeEventGenerators

* feat(change-event): add change events for AssertionRunEvent

* feat(change-event): add change events for DataProcessInstanceRunEvent
  • Loading branch information
aditya-radhakrishnan authored Nov 1, 2022
1 parent 4b31204 commit 873b100
Show file tree
Hide file tree
Showing 33 changed files with 934 additions and 678 deletions.
2 changes: 1 addition & 1 deletion docs/dev-guides/timeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ Note the `target` and `elementId` fields in the examples above to familiarize yo
Each `ChangeTransaction` is assigned a computed semantic version based on the `ChangeEvents` that occurred within it,
starting at `0.0.0` and updating based on whether the most significant change in the transaction is a `MAJOR`, `MINOR`, or
`PATCH` change. The logic for what changes constitute a Major, Minor or Patch change are encoded in the category specific `Differ` implementation.
For example, the [SchemaMetadataDiffer](../../metadata-io/src/main/java/com/linkedin/metadata/timeline/differ/SchemaMetadataDiffer.java) has baked-in logic for determining what level of semantic change an event is based on backwards and forwards incompatibility. Read on to learn about the different categories of changes, and how semantic changes are interpreted in each.
For example, the [SchemaMetadataDiffer](../../metadata-io/src/main/java/com/linkedin/metadata/timeline/eventgenerator/SchemaMetadataChangeEventGenerator.java) has baked-in logic for determining what level of semantic change an event is based on backwards and forwards incompatibility. Read on to learn about the different categories of changes, and how semantic changes are interpreted in each.

# Categories
ChangeTransactions contain a `category` that represents a kind of change that happened. The `Timeline API` allows the caller to specify which categories of changes they are interested in. Categories allow us to abstract away the low-level technical change that happened in the metadata (e.g. the `schemaMetadata` aspect changed) to a high-level semantic change that happened in the metadata (e.g. the `Technical Schema` of the dataset changed). Read on to learn about the different categories that are supported today.
Expand Down
13 changes: 13 additions & 0 deletions li-utils/src/main/java/com/linkedin/metadata/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ public class Constants {
public static final String DATA_PROCESS_INSTANCE_ENTITY_NAME = "dataProcessInstance";
public static final String DATA_PROCESS_INSTANCE_PROPERTIES_ASPECT_NAME = "dataProcessInstanceProperties";
public static final String DATA_PROCESS_INSTANCE_RUN_EVENT_ASPECT_NAME = "dataProcessInstanceRunEvent";
public static final String DATA_PROCESS_INSTANCE_RELATIONSHIPS_ASPECT_NAME = "dataProcessInstanceRelationships";

// Posts
public static final String POST_INFO_ASPECT_NAME = "postInfo";
Expand All @@ -267,6 +268,18 @@ public class Constants {
public static final String CLIENT_ID_URN = "urn:li:telemetry:clientId";
public static final String CLIENT_ID_ASPECT = "telemetryClientId";

// Metadata Change Event Parameter Names

// Runs
public static final String RUN_RESULT_KEY = "runResult";
public static final String RUN_ID_KEY = "runId";
public static final String ASSERTEE_URN_KEY = "asserteeUrn";
public static final String ASSERTION_RESULT_KEY = "assertionResult";
public static final String ATTEMPT_KEY = "attempt";
public static final String PARENT_INSTANCE_URN_KEY = "parentInstanceUrn";
public static final String DATA_FLOW_URN_KEY = "dataFlowUrn";
public static final String DATA_JOB_URN_KEY = "dataJobUrn";

private Constants() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,16 @@
import com.linkedin.metadata.timeline.data.ChangeEvent;
import com.linkedin.metadata.timeline.data.ChangeTransaction;
import com.linkedin.metadata.timeline.data.SemanticChangeType;
import com.linkedin.metadata.timeline.differ.AspectDiffer;
import com.linkedin.metadata.timeline.differ.AspectDifferFactory;
import com.linkedin.metadata.timeline.differ.DatasetPropertiesDiffer;
import com.linkedin.metadata.timeline.differ.EditableDatasetPropertiesDiffer;
import com.linkedin.metadata.timeline.differ.EditableSchemaMetadataDiffer;
import com.linkedin.metadata.timeline.differ.GlobalTagsDiffer;
import com.linkedin.metadata.timeline.differ.GlossaryTermsDiffer;
import com.linkedin.metadata.timeline.differ.InstitutionalMemoryDiffer;
import com.linkedin.metadata.timeline.differ.OwnershipDiffer;
import com.linkedin.metadata.timeline.differ.SchemaMetadataDiffer;
import org.apache.commons.collections.CollectionUtils;

import javax.annotation.Nonnull;
import com.linkedin.metadata.timeline.eventgenerator.DatasetPropertiesChangeEventGenerator;
import com.linkedin.metadata.timeline.eventgenerator.EditableDatasetPropertiesChangeEventGenerator;
import com.linkedin.metadata.timeline.eventgenerator.EditableSchemaMetadataChangeEventGenerator;
import com.linkedin.metadata.timeline.eventgenerator.EntityChangeEventGenerator;
import com.linkedin.metadata.timeline.eventgenerator.EntityChangeEventGeneratorFactory;
import com.linkedin.metadata.timeline.eventgenerator.GlobalTagsChangeEventGenerator;
import com.linkedin.metadata.timeline.eventgenerator.GlossaryTermsChangeEventGenerator;
import com.linkedin.metadata.timeline.eventgenerator.InstitutionalMemoryChangeEventGenerator;
import com.linkedin.metadata.timeline.eventgenerator.OwnershipChangeEventGenerator;
import com.linkedin.metadata.timeline.eventgenerator.SchemaMetadataChangeEventGenerator;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -42,17 +39,11 @@
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.commons.collections.CollectionUtils;

import static com.linkedin.common.urn.VersionedUrnUtils.constructVersionStamp;
import static com.linkedin.metadata.Constants.DATASET_ENTITY_NAME;
import static com.linkedin.metadata.Constants.DATASET_PROPERTIES_ASPECT_NAME;
import static com.linkedin.metadata.Constants.EDITABLE_DATASET_PROPERTIES_ASPECT_NAME;
import static com.linkedin.metadata.Constants.EDITABLE_SCHEMA_METADATA_ASPECT_NAME;
import static com.linkedin.metadata.Constants.GLOBAL_TAGS_ASPECT_NAME;
import static com.linkedin.metadata.Constants.GLOSSARY_TERMS_ASPECT_NAME;
import static com.linkedin.metadata.Constants.INSTITUTIONAL_MEMORY_ASPECT_NAME;
import static com.linkedin.metadata.Constants.OWNERSHIP_ASPECT_NAME;
import static com.linkedin.metadata.Constants.SCHEMA_METADATA_ASPECT_NAME;
import static com.linkedin.common.urn.VersionedUrnUtils.*;
import static com.linkedin.metadata.Constants.*;

public class TimelineServiceImpl implements TimelineService {

Expand All @@ -62,7 +53,7 @@ public class TimelineServiceImpl implements TimelineService {
private static final String BUILD_VALUE_COMPUTED = "computed";

private final AspectDao _aspectDao;
private final AspectDifferFactory _diffFactory;
private final EntityChangeEventGeneratorFactory _entityChangeEventGeneratorFactory;
private final EntityRegistry _entityRegistry;
private final HashMap<String, HashMap<ChangeCategory, Set<String>>> entityTypeElementAspectRegistry = new HashMap<>();

Expand All @@ -74,54 +65,60 @@ public TimelineServiceImpl(@Nonnull AspectDao aspectDao, @Nonnull EntityRegistry
// TODO: Load up from yaml file
// Dataset registry
HashMap<ChangeCategory, Set<String>> datasetElementAspectRegistry = new HashMap<>();
_diffFactory = new AspectDifferFactory();
_entityChangeEventGeneratorFactory = new EntityChangeEventGeneratorFactory();
String entityType = DATASET_ENTITY_NAME;
for (ChangeCategory elementName : ChangeCategory.values()) {
Set<String> aspects = new HashSet<>();
switch (elementName) {
case TAG: {
aspects.add(SCHEMA_METADATA_ASPECT_NAME);
_diffFactory.addDiffer(entityType, elementName, SCHEMA_METADATA_ASPECT_NAME, new SchemaMetadataDiffer());
_entityChangeEventGeneratorFactory.addGenerator(entityType, elementName, SCHEMA_METADATA_ASPECT_NAME,
new SchemaMetadataChangeEventGenerator());
aspects.add(EDITABLE_SCHEMA_METADATA_ASPECT_NAME);
_diffFactory.addDiffer(entityType, elementName, EDITABLE_SCHEMA_METADATA_ASPECT_NAME,
new EditableSchemaMetadataDiffer());
_entityChangeEventGeneratorFactory.addGenerator(entityType, elementName, EDITABLE_SCHEMA_METADATA_ASPECT_NAME,
new EditableSchemaMetadataChangeEventGenerator());
aspects.add(GLOBAL_TAGS_ASPECT_NAME);
_diffFactory.addDiffer(entityType, elementName, GLOBAL_TAGS_ASPECT_NAME, new GlobalTagsDiffer());
_entityChangeEventGeneratorFactory.addGenerator(entityType, elementName, GLOBAL_TAGS_ASPECT_NAME,
new GlobalTagsChangeEventGenerator());
}
break;
case OWNER: {
aspects.add(OWNERSHIP_ASPECT_NAME);
_diffFactory.addDiffer(entityType, elementName, OWNERSHIP_ASPECT_NAME, new OwnershipDiffer());
_entityChangeEventGeneratorFactory.addGenerator(entityType, elementName, OWNERSHIP_ASPECT_NAME,
new OwnershipChangeEventGenerator());
}
break;
case DOCUMENTATION: {
aspects.add(INSTITUTIONAL_MEMORY_ASPECT_NAME);
_diffFactory.addDiffer(entityType, elementName, INSTITUTIONAL_MEMORY_ASPECT_NAME,
new InstitutionalMemoryDiffer());
_entityChangeEventGeneratorFactory.addGenerator(entityType, elementName, INSTITUTIONAL_MEMORY_ASPECT_NAME,
new InstitutionalMemoryChangeEventGenerator());
aspects.add(EDITABLE_DATASET_PROPERTIES_ASPECT_NAME);
_diffFactory.addDiffer(entityType, elementName, EDITABLE_DATASET_PROPERTIES_ASPECT_NAME,
new EditableDatasetPropertiesDiffer());
_entityChangeEventGeneratorFactory.addGenerator(entityType, elementName, EDITABLE_DATASET_PROPERTIES_ASPECT_NAME,
new EditableDatasetPropertiesChangeEventGenerator());
aspects.add(DATASET_PROPERTIES_ASPECT_NAME);
_diffFactory.addDiffer(entityType, elementName, DATASET_PROPERTIES_ASPECT_NAME,
new DatasetPropertiesDiffer());
_entityChangeEventGeneratorFactory.addGenerator(entityType, elementName, DATASET_PROPERTIES_ASPECT_NAME,
new DatasetPropertiesChangeEventGenerator());
aspects.add(EDITABLE_SCHEMA_METADATA_ASPECT_NAME);
_diffFactory.addDiffer(entityType, elementName, EDITABLE_SCHEMA_METADATA_ASPECT_NAME,
new EditableSchemaMetadataDiffer());
_entityChangeEventGeneratorFactory.addGenerator(entityType, elementName, EDITABLE_SCHEMA_METADATA_ASPECT_NAME,
new EditableSchemaMetadataChangeEventGenerator());
aspects.add(SCHEMA_METADATA_ASPECT_NAME);
_diffFactory.addDiffer(entityType, elementName, SCHEMA_METADATA_ASPECT_NAME, new SchemaMetadataDiffer());
_entityChangeEventGeneratorFactory.addGenerator(entityType, elementName, SCHEMA_METADATA_ASPECT_NAME,
new SchemaMetadataChangeEventGenerator());
}
break;
case GLOSSARY_TERM: {
aspects.add(GLOSSARY_TERMS_ASPECT_NAME);
_diffFactory.addDiffer(entityType, elementName, GLOSSARY_TERMS_ASPECT_NAME, new GlossaryTermsDiffer());
_entityChangeEventGeneratorFactory.addGenerator(entityType, elementName, GLOSSARY_TERMS_ASPECT_NAME,
new GlossaryTermsChangeEventGenerator());
aspects.add(EDITABLE_SCHEMA_METADATA_ASPECT_NAME);
_diffFactory.addDiffer(entityType, elementName, EDITABLE_SCHEMA_METADATA_ASPECT_NAME,
new EditableSchemaMetadataDiffer());
_entityChangeEventGeneratorFactory.addGenerator(entityType, elementName, EDITABLE_SCHEMA_METADATA_ASPECT_NAME,
new EditableSchemaMetadataChangeEventGenerator());
}
break;
case TECHNICAL_SCHEMA: {
aspects.add(SCHEMA_METADATA_ASPECT_NAME);
_diffFactory.addDiffer(entityType, elementName, SCHEMA_METADATA_ASPECT_NAME, new SchemaMetadataDiffer());
_entityChangeEventGeneratorFactory.addGenerator(entityType, elementName, SCHEMA_METADATA_ASPECT_NAME,
new SchemaMetadataChangeEventGenerator());
}
break;
default:
Expand Down Expand Up @@ -337,11 +334,13 @@ private List<ChangeTransaction> computeDiff(@Nonnull EntityAspect previousValue,
List<ChangeTransaction> semanticChangeTransactions = new ArrayList<>();
JsonPatch rawDiff = getRawDiff(previousValue, currentValue);
for (ChangeCategory element : elementNames) {
AspectDiffer differ = _diffFactory.getDiffer(entityType, element, aspectName);
if (differ != null) {
EntityChangeEventGenerator entityChangeEventGenerator =
_entityChangeEventGeneratorFactory.getGenerator(entityType, element, aspectName);
if (entityChangeEventGenerator != null) {
try {
ChangeTransaction changeTransaction = differ.getSemanticDiff(previousValue, currentValue, element,
rawDiff, rawDiffsRequested);
ChangeTransaction changeTransaction =
entityChangeEventGenerator.getSemanticDiff(previousValue, currentValue, element, rawDiff,
rawDiffsRequested);
if (CollectionUtils.isNotEmpty(changeTransaction.getChangeEvents())) {
semanticChangeTransactions.add(changeTransaction);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ public enum ChangeCategory {
// Update the deprecation for an entity
DEPRECATION,
// Entity Lifecycle events (create, soft delete, hard delete)
LIFECYCLE;
LIFECYCLE,
// Run event
RUN;

public static final Map<List<String>, ChangeCategory> COMPOUND_CATEGORIES;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,12 @@ public enum ChangeOperation {
* Entity is reinstated after being soft-deleted.
*/
REINSTATE,
/**
* Run has STARTED
*/
STARTED,
/**
* Run is completed
*/
COMPLETED
}

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.metadata.timeline.differ;
package com.linkedin.metadata.timeline.eventgenerator;

import com.linkedin.data.template.RecordTemplate;
import com.linkedin.mxe.SystemMetadata;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package com.linkedin.metadata.timeline.eventgenerator;

import com.google.common.collect.ImmutableSortedMap;
import com.linkedin.assertion.AssertionResult;
import com.linkedin.assertion.AssertionRunEvent;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.metadata.timeline.data.ChangeCategory;
import com.linkedin.metadata.timeline.data.ChangeEvent;
import com.linkedin.metadata.timeline.data.ChangeOperation;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nonnull;

import static com.linkedin.metadata.Constants.*;


public class AssertionRunEventChangeEventGenerator extends EntityChangeEventGenerator<AssertionRunEvent> {
@Override
public List<ChangeEvent> getChangeEvents(
@Nonnull Urn urn,
@Nonnull String entity,
@Nonnull String aspect,
@Nonnull Aspect<AssertionRunEvent> from,
@Nonnull Aspect<AssertionRunEvent> to,
@Nonnull AuditStamp auditStamp) {
return computeDiffs(from.getValue(), to.getValue(), urn.toString(), auditStamp);
}

private List<ChangeEvent> computeDiffs(
@Nonnull final AssertionRunEvent previousAspect,
@Nonnull final AssertionRunEvent newAspect,
@Nonnull final String entityUrn,
@Nonnull final AuditStamp auditStamp) {

boolean isPreviousCompleted = isCompleted(previousAspect);
boolean isNewCompleted = isCompleted(newAspect);

if (isNewCompleted && !isPreviousCompleted) {
return Collections.singletonList(ChangeEvent.builder()
.category(ChangeCategory.RUN)
.operation(ChangeOperation.COMPLETED)
.auditStamp(auditStamp)
.entityUrn(entityUrn)
.parameters(buildParameters(newAspect))
.build());
}

return Collections.emptyList();
}

private boolean isCompleted(final AssertionRunEvent assertionRunEvent) {
return assertionRunEvent != null && assertionRunEvent.getStatus()
.toString()
.equals(ASSERTION_RUN_EVENT_STATUS_COMPLETE);
}

@Nonnull
private Map<String, Object> buildParameters(@Nonnull final AssertionRunEvent assertionRunEvent) {
final Map<String, Object> parameters = new HashMap<>();
parameters.put(RUN_RESULT_KEY, assertionRunEvent.getStatus().toString());
parameters.put(RUN_ID_KEY, assertionRunEvent.getRunId());
parameters.put(ASSERTEE_URN_KEY, assertionRunEvent.getAsserteeUrn().toString());

if (assertionRunEvent.hasResult()) {
final AssertionResult assertionResult = assertionRunEvent.getResult();
parameters.put(ASSERTION_RESULT_KEY, assertionResult.getType().toString());
}

return ImmutableSortedMap.copyOf(parameters);
}
}
Loading

0 comments on commit 873b100

Please sign in to comment.