Skip to content

Commit

Permalink
feat(change-event): add change events for DataProcessInstanceRunEvent
Browse files Browse the repository at this point in the history
  • Loading branch information
aditya-radhakrishnan committed Oct 31, 2022
1 parent a7de2f5 commit f662410
Show file tree
Hide file tree
Showing 8 changed files with 288 additions and 25 deletions.
5 changes: 5 additions & 0 deletions li-utils/src/main/java/com/linkedin/metadata/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ public class Constants {
public static final String DATA_PROCESS_INSTANCE_ENTITY_NAME = "dataProcessInstance";
public static final String DATA_PROCESS_INSTANCE_PROPERTIES_ASPECT_NAME = "dataProcessInstanceProperties";
public static final String DATA_PROCESS_INSTANCE_RUN_EVENT_ASPECT_NAME = "dataProcessInstanceRunEvent";
public static final String DATA_PROCESS_INSTANCE_RELATIONSHIPS_ASPECT_NAME = "dataProcessInstanceRelationships";

// Posts
public static final String POST_INFO_ASPECT_NAME = "postInfo";
Expand All @@ -273,6 +274,10 @@ public class Constants {
public static final String RUN_RESULT_KEY = "runResult";
public static final String RUN_ID_KEY = "runId";
public static final String ASSERTEE_URN_KEY = "asserteeUrn";
public static final String ATTEMPT_KEY = "attempt";
public static final String PARENT_INSTANCE_URN_KEY = "parentInstanceUrn";
public static final String DATA_FLOW_URN_KEY = "dataFlowUrn";
public static final String DATA_JOB_URN_KEY = "dataJobUrn";

private Constants() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ public enum ChangeOperation {
* Entity is reinstated after being soft-deleted.
*/
REINSTATE,
/**
* Run has STARTED
*/
STARTED,
/**
* Run is completed
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package com.linkedin.metadata.timeline.eventgenerator;

import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.dataprocess.DataProcessInstanceRelationships;
import com.linkedin.dataprocess.DataProcessInstanceRunEvent;
import com.linkedin.dataprocess.DataProcessRunStatus;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.timeline.data.ChangeCategory;
import com.linkedin.metadata.timeline.data.ChangeEvent;
import com.linkedin.metadata.timeline.data.ChangeOperation;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import static com.linkedin.metadata.Constants.*;


public class DataProcessInstanceRunEventChangeEventGenerator
extends EntityChangeEventGenerator<DataProcessInstanceRunEvent> {
public DataProcessInstanceRunEventChangeEventGenerator(@Nonnull final EntityService entityService) {
super(entityService);
}

@Override
public List<ChangeEvent> getChangeEvents(@Nonnull Urn urn, @Nonnull String entity, @Nonnull String aspect,
@Nonnull Aspect<DataProcessInstanceRunEvent> from, @Nonnull Aspect<DataProcessInstanceRunEvent> to,
@Nonnull AuditStamp auditStamp) {
return computeDiffs(from.getValue(), to.getValue(), urn.toString(), auditStamp);
}

private List<ChangeEvent> computeDiffs(DataProcessInstanceRunEvent baseAspect,
DataProcessInstanceRunEvent targetAspect, String entityUrn, AuditStamp auditStamp) {
final DataProcessRunStatus baseStatus = getStatus(baseAspect);
final DataProcessRunStatus targetStatus = getStatus(targetAspect);

if (targetStatus != null && !targetStatus.equals(baseStatus)) {
String operationType = targetStatus.toString().equals("COMPLETE") ? "COMPLETED" : "STARTED";

return Collections.singletonList(ChangeEvent.builder()
.category(ChangeCategory.RUN)
.operation(ChangeOperation.valueOf(operationType))
.auditStamp(auditStamp)
.entityUrn(entityUrn)
.parameters(buildParamsMap(targetAspect, entityUrn))
.build());
}

return Collections.emptyList();
}

@Nullable
private DataProcessRunStatus getStatus(DataProcessInstanceRunEvent dataProcessInstanceRunEvent) {
return dataProcessInstanceRunEvent != null ? dataProcessInstanceRunEvent.getStatus() : null;
}

@Nonnull
private Map<String, Object> buildParamsMap(DataProcessInstanceRunEvent targetAspect, String entityUrnString) {
final Map<String, Object> paramsMap = new HashMap<>();
if (targetAspect.hasAttempt()) {
paramsMap.put(ATTEMPT_KEY, targetAspect.getAttempt());
}
if (targetAspect.hasResult()) {
paramsMap.put(RUN_RESULT_KEY, targetAspect.getResult().getType().toString());
}

Urn entityUrn;
try {
entityUrn = Urn.createFromString(entityUrnString);
} catch (URISyntaxException e) {
return paramsMap;
}

final DataProcessInstanceRelationships dataProcessInstanceRelationships =
(DataProcessInstanceRelationships) _entityService.getLatestAspect(entityUrn,
DATA_PROCESS_INSTANCE_RELATIONSHIPS_ASPECT_NAME);
if (dataProcessInstanceRelationships == null) {
return paramsMap;
}

if (dataProcessInstanceRelationships.hasParentInstance()) {
paramsMap.put(PARENT_INSTANCE_URN_KEY, dataProcessInstanceRelationships.getParentInstance().toString());
}

if (dataProcessInstanceRelationships.hasParentTemplate()) {
Urn parentTemplateUrn = dataProcessInstanceRelationships.getParentTemplate();
if (parentTemplateUrn.getEntityType().equals(DATA_FLOW_ENTITY_NAME)) {
paramsMap.put(DATA_FLOW_URN_KEY, parentTemplateUrn.toString());
} else if (parentTemplateUrn.getEntityType().equals(DATA_JOB_ENTITY_NAME)) {
paramsMap.put(DATA_JOB_URN_KEY, parentTemplateUrn.toString());
}
}
return paramsMap;
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package com.linkedin.metadata.timeline.eventgenerator;

import com.datahub.authentication.Authentication;
import com.github.fge.jsonpatch.JsonPatch;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.entity.EntityAspect;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.timeline.data.ChangeCategory;
import com.linkedin.metadata.timeline.data.ChangeEvent;
import com.linkedin.metadata.timeline.data.ChangeTransaction;
Expand All @@ -13,6 +16,14 @@


public abstract class EntityChangeEventGenerator<T extends RecordTemplate> {
protected EntityService _entityService;

public EntityChangeEventGenerator() {}

public EntityChangeEventGenerator(@Nonnull final EntityService entityService) {
_entityService = entityService;
}

@Deprecated
public ChangeTransaction getSemanticDiff(EntityAspect previousValue, EntityAspect currentValue,
ChangeCategory element, JsonPatch rawDiff, boolean rawDiffsRequested) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ public class EntityChangeEventGeneratorHook implements MetadataChangeLogHook {
Constants.EDITABLE_SCHEMA_METADATA_ASPECT_NAME, Constants.SCHEMA_METADATA_ASPECT_NAME,
Constants.DEPRECATION_ASPECT_NAME, Constants.DATASET_PROPERTIES_ASPECT_NAME,
Constants.EDITABLE_DATASET_PROPERTIES_ASPECT_NAME, Constants.ASSERTION_RUN_EVENT_ASPECT_NAME,
Constants.DATA_PROCESS_INSTANCE_RUN_EVENT_ASPECT_NAME,

// Entity Lifecycle Event
Constants.DATASET_KEY_ASPECT_NAME, Constants.DASHBOARD_KEY_ASPECT_NAME, Constants.CHART_KEY_ASPECT_NAME,
Constants.CONTAINER_KEY_ASPECT_NAME, Constants.DATA_FLOW_KEY_ASPECT_NAME, Constants.DATA_JOB_KEY_ASPECT_NAME,
Constants.GLOSSARY_TERM_KEY_ASPECT_NAME, Constants.DOMAIN_KEY_ASPECT_NAME, Constants.TAG_KEY_ASPECT_NAME,
Constants.STATUS_ASPECT_NAME
);
Constants.STATUS_ASPECT_NAME);
/**
* The list of change types that are supported for generating semantic change events.
*/
Expand Down Expand Up @@ -159,7 +159,7 @@ private void emitPlatformEvent(@Nonnull final PlatformEvent event, @Nonnull fina
_systemAuthentication);
}

private PlatformEvent buildPlatformEvent(final ChangeEvent rawChangeEvent) {
PlatformEvent buildPlatformEvent(final ChangeEvent rawChangeEvent) {
// 1. Convert raw Change Event to a serialized change event.
RecordTemplate changeEvent = convertRawEventToChangeEvent(rawChangeEvent);
// 2. Build platform event
Expand Down
Loading

0 comments on commit f662410

Please sign in to comment.