Skip to content

Commit

Permalink
feat(graph) Add createdOn, createdActor, updatedOn, updatedActor to g…
Browse files Browse the repository at this point in the history
…raph edges (datahub-project#6615)
  • Loading branch information
chriscollins3456 authored and cccs-Dustin committed Feb 1, 2023
1 parent 3d0cd0c commit 9fb2145
Show file tree
Hide file tree
Showing 10 changed files with 374 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,19 @@ public class RelationshipAnnotation {
private static final String ENTITY_TYPES_FIELD = "entityTypes";
private static final String IS_UPSTREAM_FIELD = "isUpstream";
private static final String IS_LINEAGE_FIELD = "isLineage";
private static final String CREATED_ON = "createdOn";
private static final String CREATED_ACTOR = "createdActor";
private static final String UPDATED_ON = "updatedOn";
private static final String UPDATED_ACTOR = "updatedActor";

String name;
List<String> validDestinationTypes;
boolean isUpstream;
boolean isLineage;
String createdOn;
String createdActor;
String updatedOn;
String updatedActor;

@Nonnull
public static RelationshipAnnotation fromPegasusAnnotationObject(
Expand Down Expand Up @@ -70,7 +78,19 @@ public static RelationshipAnnotation fromPegasusAnnotationObject(

final Optional<Boolean> isUpstream = AnnotationUtils.getField(map, IS_UPSTREAM_FIELD, Boolean.class);
final Optional<Boolean> isLineage = AnnotationUtils.getField(map, IS_LINEAGE_FIELD, Boolean.class);
final Optional<String> createdOn = AnnotationUtils.getField(map, CREATED_ON, String.class);
final Optional<String> createdActor = AnnotationUtils.getField(map, CREATED_ACTOR, String.class);
final Optional<String> updatedOn = AnnotationUtils.getField(map, UPDATED_ON, String.class);
final Optional<String> updatedActor = AnnotationUtils.getField(map, UPDATED_ACTOR, String.class);

return new RelationshipAnnotation(name.get(), entityTypes, isUpstream.orElse(true), isLineage.orElse(false));
}
return new RelationshipAnnotation(
name.get(),
entityTypes,
isUpstream.orElse(true),
isLineage.orElse(false),
createdOn.orElse(null),
createdActor.orElse(null),
updatedOn.orElse(null),
updatedActor.orElse(null)
); }
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private RelationshipFieldSpec buildSpec(String relationshipType, List<String> de
boolean isUpstream, boolean isLineage) {
RelationshipFieldSpec spec = mock(RelationshipFieldSpec.class);
when(spec.getRelationshipAnnotation()).thenReturn(
new RelationshipAnnotation(relationshipType, destinationEntityTypes, isUpstream, isLineage));
new RelationshipAnnotation(relationshipType, destinationEntityTypes, isUpstream, isLineage, null, null, null, null));
return spec;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,8 @@ public class Edge {
private Urn source;
private Urn destination;
private String relationshipType;
private Long createdOn;
private Urn createdActor;
private Long updatedOn;
private Urn updatedActor;
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,18 @@ private String toDocument(@Nonnull final Edge edge) {
searchDocument.set("source", sourceObject);
searchDocument.set("destination", destinationObject);
searchDocument.put("relationshipType", edge.getRelationshipType());
if (edge.getCreatedOn() != null) {
searchDocument.put("createdOn", edge.getCreatedOn());
}
if (edge.getCreatedActor() != null) {
searchDocument.put("createdActor", edge.getCreatedActor().toString());
}
if (edge.getUpdatedOn() != null) {
searchDocument.put("updatedOn", edge.getUpdatedOn());
}
if (edge.getUpdatedActor() != null) {
searchDocument.put("updatedActor", edge.getUpdatedActor().toString());
}

return searchDocument.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,17 +202,17 @@ protected GraphService getPopulatedGraphService() throws Exception {
GraphService service = getGraphService();

List<Edge> edges = Arrays.asList(
new Edge(datasetTwoUrn, datasetOneUrn, downstreamOf),
new Edge(datasetThreeUrn, datasetTwoUrn, downstreamOf),
new Edge(datasetFourUrn, datasetTwoUrn, downstreamOf),
new Edge(datasetTwoUrn, datasetOneUrn, downstreamOf, null, null, null, null),
new Edge(datasetThreeUrn, datasetTwoUrn, downstreamOf, null, null, null, null),
new Edge(datasetFourUrn, datasetTwoUrn, downstreamOf, null, null, null, null),

new Edge(datasetOneUrn, userOneUrn, hasOwner),
new Edge(datasetTwoUrn, userOneUrn, hasOwner),
new Edge(datasetThreeUrn, userTwoUrn, hasOwner),
new Edge(datasetFourUrn, userTwoUrn, hasOwner),
new Edge(datasetOneUrn, userOneUrn, hasOwner, null, null, null, null),
new Edge(datasetTwoUrn, userOneUrn, hasOwner, null, null, null, null),
new Edge(datasetThreeUrn, userTwoUrn, hasOwner, null, null, null, null),
new Edge(datasetFourUrn, userTwoUrn, hasOwner, null, null, null, null),

new Edge(userOneUrn, userTwoUrn, knowsUser),
new Edge(userTwoUrn, userOneUrn, knowsUser)
new Edge(userOneUrn, userTwoUrn, knowsUser, null, null, null, null),
new Edge(userTwoUrn, userOneUrn, knowsUser, null, null, null, null)
);

edges.forEach(service::addEdge);
Expand All @@ -225,25 +225,25 @@ protected GraphService getLineagePopulatedGraphService() throws Exception {
GraphService service = getGraphService();

List<Edge> edges = Arrays.asList(
new Edge(datasetTwoUrn, datasetOneUrn, downstreamOf),
new Edge(datasetThreeUrn, datasetTwoUrn, downstreamOf),
new Edge(datasetFourUrn, datasetTwoUrn, downstreamOf),

new Edge(datasetOneUrn, userOneUrn, hasOwner),
new Edge(datasetTwoUrn, userOneUrn, hasOwner),
new Edge(datasetThreeUrn, userTwoUrn, hasOwner),
new Edge(datasetFourUrn, userTwoUrn, hasOwner),

new Edge(userOneUrn, userTwoUrn, knowsUser),
new Edge(userTwoUrn, userOneUrn, knowsUser),

new Edge(dataJobOneUrn, datasetOneUrn, consumes),
new Edge(dataJobOneUrn, datasetTwoUrn, consumes),
new Edge(dataJobOneUrn, datasetThreeUrn, produces),
new Edge(dataJobOneUrn, datasetFourUrn, produces),
new Edge(dataJobTwoUrn, datasetOneUrn, consumes),
new Edge(dataJobTwoUrn, datasetTwoUrn, consumes),
new Edge(dataJobTwoUrn, dataJobOneUrn, downstreamOf)
new Edge(datasetTwoUrn, datasetOneUrn, downstreamOf, null, null, null, null),
new Edge(datasetThreeUrn, datasetTwoUrn, downstreamOf, null, null, null, null),
new Edge(datasetFourUrn, datasetTwoUrn, downstreamOf, null, null, null, null),

new Edge(datasetOneUrn, userOneUrn, hasOwner, null, null, null, null),
new Edge(datasetTwoUrn, userOneUrn, hasOwner, null, null, null, null),
new Edge(datasetThreeUrn, userTwoUrn, hasOwner, null, null, null, null),
new Edge(datasetFourUrn, userTwoUrn, hasOwner, null, null, null, null),

new Edge(userOneUrn, userTwoUrn, knowsUser, null, null, null, null),
new Edge(userTwoUrn, userOneUrn, knowsUser, null, null, null, null),

new Edge(dataJobOneUrn, datasetOneUrn, consumes, null, null, null, null),
new Edge(dataJobOneUrn, datasetTwoUrn, consumes, null, null, null, null),
new Edge(dataJobOneUrn, datasetThreeUrn, produces, null, null, null, null),
new Edge(dataJobOneUrn, datasetFourUrn, produces, null, null, null, null),
new Edge(dataJobTwoUrn, datasetOneUrn, consumes, null, null, null, null),
new Edge(dataJobTwoUrn, datasetTwoUrn, consumes, null, null, null, null),
new Edge(dataJobTwoUrn, dataJobOneUrn, downstreamOf, null, null, null, null)
);

edges.forEach(service::addEdge);
Expand Down Expand Up @@ -295,24 +295,24 @@ public Object[][] getAddEdgeTests() {
Arrays.asList()
},
new Object[]{
Arrays.asList(new Edge(datasetOneUrn, datasetTwoUrn, downstreamOf)),
Arrays.asList(new Edge(datasetOneUrn, datasetTwoUrn, downstreamOf, null, null, null, null)),
Arrays.asList(downstreamOfDatasetTwoRelatedEntity),
Arrays.asList(downstreamOfDatasetOneRelatedEntity)
},
new Object[]{
Arrays.asList(
new Edge(datasetOneUrn, datasetTwoUrn, downstreamOf),
new Edge(datasetTwoUrn, datasetThreeUrn, downstreamOf)
new Edge(datasetOneUrn, datasetTwoUrn, downstreamOf, null, null, null, null),
new Edge(datasetTwoUrn, datasetThreeUrn, downstreamOf, null, null, null, null)
),
Arrays.asList(downstreamOfDatasetTwoRelatedEntity, downstreamOfDatasetThreeRelatedEntity),
Arrays.asList(downstreamOfDatasetOneRelatedEntity, downstreamOfDatasetTwoRelatedEntity)
},
new Object[]{
Arrays.asList(
new Edge(datasetOneUrn, datasetTwoUrn, downstreamOf),
new Edge(datasetOneUrn, userOneUrn, hasOwner),
new Edge(datasetTwoUrn, userTwoUrn, hasOwner),
new Edge(userOneUrn, userTwoUrn, knowsUser)
new Edge(datasetOneUrn, datasetTwoUrn, downstreamOf, null, null, null, null),
new Edge(datasetOneUrn, userOneUrn, hasOwner, null, null, null, null),
new Edge(datasetTwoUrn, userTwoUrn, hasOwner, null, null, null, null),
new Edge(userOneUrn, userTwoUrn, knowsUser, null, null, null, null)
),
Arrays.asList(
downstreamOfDatasetTwoRelatedEntity,
Expand All @@ -328,9 +328,9 @@ public Object[][] getAddEdgeTests() {
},
new Object[]{
Arrays.asList(
new Edge(userOneUrn, userOneUrn, knowsUser),
new Edge(userOneUrn, userOneUrn, knowsUser),
new Edge(userOneUrn, userOneUrn, knowsUser)
new Edge(userOneUrn, userOneUrn, knowsUser, null, null, null, null),
new Edge(userOneUrn, userOneUrn, knowsUser, null, null, null, null),
new Edge(userOneUrn, userOneUrn, knowsUser, null, null, null, null)
),
Arrays.asList(knowsUserOneRelatedEntity),
Arrays.asList(knowsUserOneRelatedEntity)
Expand Down Expand Up @@ -922,12 +922,12 @@ public void testFindRelatedEntitiesNullSourceType() throws Exception {
doTestFindRelatedEntitiesEntityType(anyType, ImmutableList.of("null"), downstreamOf, outgoingRelationships, service);
doTestFindRelatedEntitiesEntityType(anyType, null, downstreamOf, outgoingRelationships, service);

service.addEdge(new Edge(datasetTwoUrn, datasetOneUrn, downstreamOf));
service.addEdge(new Edge(datasetTwoUrn, datasetOneUrn, downstreamOf, null, null, null, null));
syncAfterWrite();
doTestFindRelatedEntitiesEntityType(anyType, ImmutableList.of("null"), downstreamOf, outgoingRelationships, service);
doTestFindRelatedEntitiesEntityType(anyType, null, downstreamOf, outgoingRelationships, service, downstreamOfDatasetOneRelatedEntity);

service.addEdge(new Edge(datasetOneUrn, nullUrn, downstreamOf));
service.addEdge(new Edge(datasetOneUrn, nullUrn, downstreamOf, null, null, null, null));
syncAfterWrite();
doTestFindRelatedEntitiesEntityType(anyType, ImmutableList.of("null"), downstreamOf, outgoingRelationships, service, nullRelatedEntity);
doTestFindRelatedEntitiesEntityType(anyType, null, downstreamOf, outgoingRelationships, service, nullRelatedEntity, downstreamOfDatasetOneRelatedEntity);
Expand All @@ -944,12 +944,12 @@ public void testFindRelatedEntitiesNullDestinationType() throws Exception {
doTestFindRelatedEntitiesEntityType(anyType, ImmutableList.of("null"), downstreamOf, outgoingRelationships, service);
doTestFindRelatedEntitiesEntityType(anyType, null, downstreamOf, outgoingRelationships, service);

service.addEdge(new Edge(datasetTwoUrn, datasetOneUrn, downstreamOf));
service.addEdge(new Edge(datasetTwoUrn, datasetOneUrn, downstreamOf, null, null, null, null));
syncAfterWrite();
doTestFindRelatedEntitiesEntityType(anyType, ImmutableList.of("null"), downstreamOf, outgoingRelationships, service);
doTestFindRelatedEntitiesEntityType(anyType, null, downstreamOf, outgoingRelationships, service, downstreamOfDatasetOneRelatedEntity);

service.addEdge(new Edge(datasetOneUrn, nullUrn, downstreamOf));
service.addEdge(new Edge(datasetOneUrn, nullUrn, downstreamOf, null, null, null, null));
syncAfterWrite();
doTestFindRelatedEntitiesEntityType(anyType, ImmutableList.of("null"), downstreamOf, outgoingRelationships, service, nullRelatedEntity);
doTestFindRelatedEntitiesEntityType(anyType, null, downstreamOf, outgoingRelationships, service, nullRelatedEntity, downstreamOfDatasetOneRelatedEntity);
Expand Down Expand Up @@ -1424,7 +1424,7 @@ private List<Edge> getFullyConnectedGraph(int nodes, List<String> relationshipTy
int destinationType = destinationNode % 3;
Urn destination = createFromString("urn:li:type" + destinationType + ":(urn:li:node" + destinationNode + ")");

edges.add(new Edge(source, destination, relationship));
edges.add(new Edge(source, destination, relationship, null, null, null, null));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public void testRemoveEdgesFromNodeNoRelationshipTypes() {
public void testRemoveEdge() throws Exception {
DatasetUrn datasetUrn = new DatasetUrn(new DataPlatformUrn("snowflake"), "test", FabricType.TEST);
TagUrn tagUrn = new TagUrn("newTag");
Edge edge = new Edge(datasetUrn, tagUrn, TAG_RELATIONSHIP);
Edge edge = new Edge(datasetUrn, tagUrn, TAG_RELATIONSHIP, null, null, null, null);
getGraphService().addEdge(edge);
syncAfterWrite();
RelatedEntitiesResult result = getGraphService().findRelatedEntities(Collections.singletonList(datasetType),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package com.linkedin.metadata.kafka.hook;

import com.datahub.util.RecordUtils;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.schema.PathSpec;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.graph.Edge;
import com.linkedin.metadata.models.RelationshipFieldSpec;
import com.linkedin.mxe.MetadataChangeLog;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;

@Slf4j
public class GraphIndexUtils {

private GraphIndexUtils() { }

@Nullable
private static List<Urn> getActorList(@Nullable final String path, @Nonnull final RecordTemplate aspect) {
if (path == null) {
return null;
}
final PathSpec actorPathSpec = new PathSpec(path.split("/"));
final Optional<Object> value = RecordUtils.getFieldValue(aspect, actorPathSpec);
return (List<Urn>) value.orElse(null);
}

@Nullable
private static List<Long> getTimestampList(@Nullable final String path, @Nonnull final RecordTemplate aspect) {
if (path == null) {
return null;
}
final PathSpec timestampPathSpec = new PathSpec(path.split("/"));
final Optional<Object> value = RecordUtils.getFieldValue(aspect, timestampPathSpec);
return (List<Long>) value.orElse(null);
}

@Nullable
private static boolean isValueListValid(@Nullable final List<?> entryList, final int valueListSize) {
if (entryList == null) {
log.warn("Unable to get entry as entryList is null");
return false;
}
if (valueListSize != entryList.size()) {
log.warn("Unable to get entry for graph edge as values list and entry list have differing sizes");
return false;
}
return true;
}

@Nullable
private static Long getTimestamp(@Nullable final List<Long> timestampList, final int index, final int valueListSize) {
if (isValueListValid(timestampList, valueListSize)) {
return timestampList.get(index);
}
return null;
}

@Nullable
private static Urn getActor(@Nullable final List<Urn> actorList, final int index, final int valueListSize) {
if (isValueListValid(actorList, valueListSize)) {
return actorList.get(index);
}
return null;
}

/**
* Used to create new edges for the graph db, adding all the metadata associated with each edge based on the aspect.
* Returns a list of Edges to be consumed by the graph service.
*/
@Nonnull
public static List<Edge> extractGraphEdges(
@Nonnull final Map.Entry<RelationshipFieldSpec, List<Object>> extractedFieldsEntry,
@Nonnull final RecordTemplate aspect,
@Nonnull final Urn urn,
@Nonnull final MetadataChangeLog event
) {
final List<Edge> edgesToAdd = new ArrayList<>();
final String createdOnPath = extractedFieldsEntry.getKey().getRelationshipAnnotation().getCreatedOn();
final String createdActorPath = extractedFieldsEntry.getKey().getRelationshipAnnotation().getCreatedActor();
final String updatedOnPath = extractedFieldsEntry.getKey().getRelationshipAnnotation().getUpdatedOn();
final String updatedActorPath = extractedFieldsEntry.getKey().getRelationshipAnnotation().getUpdatedActor();

final List<Long> createdOnList = getTimestampList(createdOnPath, aspect);
final List<Urn> createdActorList = getActorList(createdActorPath, aspect);
final List<Long> updatedOnList = getTimestampList(updatedOnPath, aspect);
final List<Urn> updatedActorList = getActorList(updatedActorPath, aspect);

int index = 0;
for (Object fieldValue : extractedFieldsEntry.getValue()) {
Long createdOn = getTimestamp(createdOnList, index, extractedFieldsEntry.getValue().size());
Urn createdActor = getActor(createdActorList, index, extractedFieldsEntry.getValue().size());
final Long updatedOn = getTimestamp(updatedOnList, index, extractedFieldsEntry.getValue().size());
final Urn updatedActor = getActor(updatedActorList, index, extractedFieldsEntry.getValue().size());

if (createdOn == null && event.hasSystemMetadata()) {
createdOn = event.getSystemMetadata().getLastObserved();
}
if (createdActor == null && event.hasCreated()) {
createdActor = event.getCreated().getActor();
}

try {
edgesToAdd.add(
new Edge(
urn,
Urn.createFromString(fieldValue.toString()),
extractedFieldsEntry.getKey().getRelationshipName(),
createdOn,
createdActor,
updatedOn,
updatedActor
)
);
} catch (URISyntaxException e) {
log.error("Invalid destination urn: {}", fieldValue.toString(), e);
}
index++;
}
return edgesToAdd;
}
}
Loading

0 comments on commit 9fb2145

Please sign in to comment.