Skip to content

Commit

Permalink
feat(change-event): add change events for DataProcessInstanceRunEvent
Browse files Browse the repository at this point in the history
  • Loading branch information
aditya-radhakrishnan committed Nov 1, 2022
1 parent 14d1d73 commit 05a1c74
Show file tree
Hide file tree
Showing 9 changed files with 356 additions and 24 deletions.
5 changes: 5 additions & 0 deletions li-utils/src/main/java/com/linkedin/metadata/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ public class Constants {
public static final String DATA_PROCESS_INSTANCE_ENTITY_NAME = "dataProcessInstance";
public static final String DATA_PROCESS_INSTANCE_PROPERTIES_ASPECT_NAME = "dataProcessInstanceProperties";
public static final String DATA_PROCESS_INSTANCE_RUN_EVENT_ASPECT_NAME = "dataProcessInstanceRunEvent";
public static final String DATA_PROCESS_INSTANCE_RELATIONSHIPS_ASPECT_NAME = "dataProcessInstanceRelationships";

// Posts
public static final String POST_INFO_ASPECT_NAME = "postInfo";
Expand All @@ -274,6 +275,10 @@ public class Constants {
public static final String RUN_ID_KEY = "runId";
public static final String ASSERTEE_URN_KEY = "asserteeUrn";
public static final String ASSERTION_RESULT_KEY = "assertionResult";
public static final String ATTEMPT_KEY = "attempt";
public static final String PARENT_INSTANCE_URN_KEY = "parentInstanceUrn";
public static final String DATA_FLOW_URN_KEY = "dataFlowUrn";
public static final String DATA_JOB_URN_KEY = "dataJobUrn";

private Constants() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ public enum ChangeOperation {
* Entity is reinstated after being soft-deleted.
*/
REINSTATE,
/**
* Run has STARTED
*/
STARTED,
/**
* Run is completed
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package com.linkedin.metadata.timeline.eventgenerator;

import com.datahub.authentication.Authentication;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.dataprocess.DataProcessInstanceRelationships;
import com.linkedin.dataprocess.DataProcessInstanceRunEvent;
import com.linkedin.dataprocess.DataProcessRunStatus;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.EnvelopedAspectMap;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.timeline.data.ChangeCategory;
import com.linkedin.metadata.timeline.data.ChangeEvent;
import com.linkedin.metadata.timeline.data.ChangeOperation;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import static com.linkedin.metadata.Constants.*;


public class DataProcessInstanceRunEventChangeEventGenerator
extends EntityChangeEventGenerator<DataProcessInstanceRunEvent> {
private static final String COMPLETED_STATUS = "COMPLETED";
private static final String STARTED_STATUS = "STARTED";

public DataProcessInstanceRunEventChangeEventGenerator(@Nonnull final EntityClient entityClient, @Nonnull final
Authentication authentication) {
super(entityClient, authentication);
}

@Override
public List<ChangeEvent> getChangeEvents(
@Nonnull Urn urn,
@Nonnull String entity,
@Nonnull String aspect,
@Nonnull Aspect<DataProcessInstanceRunEvent> from,
@Nonnull Aspect<DataProcessInstanceRunEvent> to,
@Nonnull AuditStamp auditStamp) {
return computeDiffs(from.getValue(), to.getValue(), urn.toString(), auditStamp);
}

private List<ChangeEvent> computeDiffs(
final DataProcessInstanceRunEvent previousAspect,
final DataProcessInstanceRunEvent newAspect,
@Nonnull final String entityUrn,
@Nonnull final AuditStamp auditStamp) {
final DataProcessRunStatus previousStatus = getStatus(previousAspect);
final DataProcessRunStatus newStatus = getStatus(newAspect);

if (newStatus != null && !newStatus.equals(previousStatus)) {
String operationType = newStatus.equals(DataProcessRunStatus.COMPLETE) ? COMPLETED_STATUS : STARTED_STATUS;

return Collections.singletonList(ChangeEvent.builder()
.category(ChangeCategory.RUN)
.operation(ChangeOperation.valueOf(operationType))
.auditStamp(auditStamp)
.entityUrn(entityUrn)
.parameters(buildParameters(newAspect, entityUrn))
.build());
}

return Collections.emptyList();
}

@Nullable
private DataProcessRunStatus getStatus(DataProcessInstanceRunEvent dataProcessInstanceRunEvent) {
return dataProcessInstanceRunEvent != null ? dataProcessInstanceRunEvent.getStatus() : null;
}

@Nonnull
private Map<String, Object> buildParameters(@Nonnull final DataProcessInstanceRunEvent runEvent,
@Nonnull final String entityUrnString) {
final Map<String, Object> parameters = new HashMap<>();
if (runEvent.hasAttempt()) {
parameters.put(ATTEMPT_KEY, runEvent.getAttempt());
}
if (runEvent.hasResult()) {
parameters.put(RUN_RESULT_KEY, runEvent.getResult().getType().toString());
}

DataProcessInstanceRelationships relationships = getRelationships(entityUrnString);

if (relationships.hasParentInstance()) {
parameters.put(PARENT_INSTANCE_URN_KEY, relationships.getParentInstance().toString());
}

if (relationships.hasParentTemplate()) {
Urn parentTemplateUrn = relationships.getParentTemplate();
if (parentTemplateUrn.getEntityType().equals(DATA_FLOW_ENTITY_NAME)) {
parameters.put(DATA_FLOW_URN_KEY, parentTemplateUrn.toString());
} else if (parentTemplateUrn.getEntityType().equals(DATA_JOB_ENTITY_NAME)) {
parameters.put(DATA_JOB_URN_KEY, parentTemplateUrn.toString());
}
}
return parameters;
}

@Nullable
private DataProcessInstanceRelationships getRelationships(@Nonnull final String entityUrnString) {
Urn entityUrn;
EntityResponse entityResponse;
try {
entityUrn = Urn.createFromString(entityUrnString);
entityResponse = _entityClient.getV2(DATA_PROCESS_INSTANCE_ENTITY_NAME, entityUrn,
Collections.singleton(DATA_PROCESS_INSTANCE_RELATIONSHIPS_ASPECT_NAME), _authentication);
} catch (Exception e) {
return null;
}

if (entityResponse == null) {
return null;
}

final EnvelopedAspectMap aspectMap = entityResponse.getAspects();
// If invite token aspect is not present, create a new one. Otherwise, return existing one.
if (!aspectMap.containsKey(DATA_PROCESS_INSTANCE_RELATIONSHIPS_ASPECT_NAME)) {
return null;
}

return new DataProcessInstanceRelationships(
aspectMap.get(DATA_PROCESS_INSTANCE_RELATIONSHIPS_ASPECT_NAME).getValue().data());
}
}
Original file line number Diff line number Diff line change
@@ -1,21 +1,36 @@
package com.linkedin.metadata.timeline.eventgenerator;

import com.datahub.authentication.Authentication;
import com.github.fge.jsonpatch.JsonPatch;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.entity.EntityAspect;
import com.linkedin.metadata.timeline.data.ChangeCategory;
import com.linkedin.metadata.timeline.data.ChangeEvent;
import com.linkedin.metadata.timeline.data.ChangeTransaction;
import java.util.List;
import javax.annotation.Nonnull;


/**
* An abstract class to generate {@link ChangeEvent}s for a given entity aspect.
*/
public abstract class EntityChangeEventGenerator<T extends RecordTemplate> {
// TODO: Add a check for supported aspects
protected EntityClient _entityClient;
protected Authentication _authentication;

public EntityChangeEventGenerator() {
}

public EntityChangeEventGenerator(@Nonnull final EntityClient entityClient,
@Nonnull final Authentication authentication) {
_entityClient = entityClient;
_authentication = authentication;
}

@Deprecated
public ChangeTransaction getSemanticDiff(EntityAspect previousValue, EntityAspect currentValue,
ChangeCategory element, JsonPatch rawDiff, boolean rawDiffsRequested) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class EntityChangeEventGeneratorHook implements MetadataChangeLogHook {
Constants.DATASET_PROPERTIES_ASPECT_NAME,
Constants.EDITABLE_DATASET_PROPERTIES_ASPECT_NAME,
Constants.ASSERTION_RUN_EVENT_ASPECT_NAME,
Constants.DATA_PROCESS_INSTANCE_RUN_EVENT_ASPECT_NAME,

// Entity Lifecycle Event
Constants.DATASET_KEY_ASPECT_NAME,
Expand Down
Loading

0 comments on commit 05a1c74

Please sign in to comment.