Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Neo4j multihop support #6104

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
72b2af1
Multihop getLineage for neo4j
djordje-mijatovic Sep 30, 2022
e174615
Multihop getLineage for neo4j - Test implementation and handling diff…
djordje-mijatovic Oct 3, 2022
bb7cdae
Merge branch 'master' into neo4j-multihopp-support
djordje-mijatovic Oct 3, 2022
d9ed6ea
Multihop getLineage for neo4j - Update code style
djordje-mijatovic Oct 4, 2022
4b6ddd7
Merge remote-tracking branch 'origin/neo4j-multihopp-support' into ne…
djordje-mijatovic Oct 4, 2022
a5a355c
Merge branch 'master' into neo4j-multihopp-support
djordje-mijatovic Oct 4, 2022
9ae63ba
Merge branch 'master' into neo4j-multihopp-support
djordje-mijatovic Oct 5, 2022
302e37c
Merge branch 'master' into neo4j-multihopp-support
djordje-mijatovic Oct 6, 2022
eeadf72
Merge branch 'master' into neo4j-multihopp-support
djordje-mijatovic Oct 7, 2022
3e0d808
Merge branch 'master' into neo4j-multihopp-support
djordje-mijatovic Oct 7, 2022
b9b0d58
Merge branch 'master' into neo4j-multihopp-support
djordje-mijatovic Oct 7, 2022
3829d7a
Small refactoring and use LineageRegistry
djordje-mijatovic Oct 10, 2022
3e05d7d
Merge branch 'master' into neo4j-multihopp-support
djordje-mijatovic Oct 10, 2022
26ac833
Merge branch 'master' into neo4j-multihopp-support
djordje-mijatovic Oct 17, 2022
e5e7afb
Merge branch 'master' into neo4j-multihopp-support
djordje-mijatovic Oct 25, 2022
4d5c6ae
Override MultiHop test for dGraph
djordje-mijatovic Oct 25, 2022
bfb2f03
Merge branch 'master' into neo4j-multihopp-support
djordje-mijatovic Oct 27, 2022
c52a255
Small fixed
djordje-mijatovic Oct 27, 2022
b52b2ae
Merge branch 'master' into neo4j-multihopp-support
djordje-mijatovic Nov 1, 2022
4103cbb
Merge branch 'master' into neo4j-multihopp-support
djordje-mijatovic Nov 7, 2022
9593c41
Merge branch 'master' into neo4j-multihopp-support
djordje-mijatovic Nov 11, 2022
76b4457
Merge branch 'master' into neo4j-multihopp-support
jjoyce0510 Nov 15, 2022
be2c812
Merge branch 'master' into neo4j-multihopp-support
jjoyce0510 Nov 15, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,33 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.linkedin.common.urn.Urn;

import com.linkedin.metadata.graph.Edge;
import com.linkedin.metadata.graph.EntityLineageResult;
import com.linkedin.metadata.graph.GraphFilters;
import com.linkedin.metadata.graph.GraphService;
import com.linkedin.metadata.models.registry.LineageRegistry;
import com.linkedin.metadata.graph.LineageDirection;
import com.linkedin.metadata.graph.LineageRelationship;
import com.linkedin.metadata.graph.LineageRelationshipArray;
import com.linkedin.metadata.graph.RelatedEntitiesResult;
import com.linkedin.metadata.graph.RelatedEntity;
import com.linkedin.metadata.models.registry.LineageRegistry;
import com.linkedin.metadata.query.filter.Condition;
import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray;
import com.linkedin.metadata.query.filter.CriterionArray;
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.query.filter.RelationshipDirection;
import com.linkedin.metadata.query.filter.RelationshipFilter;
import com.linkedin.metadata.utils.metrics.MetricUtils;

import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -33,11 +42,14 @@
import org.apache.commons.lang.time.StopWatch;
import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.StringUtils;

import org.neo4j.driver.Driver;
import org.neo4j.driver.Record;
import org.neo4j.driver.Result;
import org.neo4j.driver.Session;
import org.neo4j.driver.SessionConfig;
import org.neo4j.driver.exceptions.Neo4jException;
import org.neo4j.driver.internal.InternalRelationship;

@Slf4j
public class Neo4jGraphService implements GraphService {
Expand Down Expand Up @@ -94,6 +106,85 @@ public void addEdge(@Nonnull final Edge edge) {
executeStatements(statements);
}

@Nonnull
@Override
public EntityLineageResult getLineage(@Nonnull Urn entityUrn, @Nonnull LineageDirection direction,
GraphFilters graphFilters, int offset, int count, int maxHops) {
log.debug(String.format("Neo4j getLineage maxHops = %d", maxHops));

final String statement = generateLineageStatement(entityUrn, direction, graphFilters, maxHops);

List<Record> neo4jResult = statement != null ? runQuery(buildStatement(statement, new HashMap<>())).list() : new ArrayList<>();

// It is possible to have more than 1 path from node A to node B in the graph and previous query returns all the paths.
// We convert the List into Map with only the shortest paths. "item.get(i).size()" is the path size between two nodes in relation.
// The key for mapping is the destination node as the source node is always the same, and it is defined by parameter.
neo4jResult = neo4jResult.stream()
djordje-mijatovic marked this conversation as resolved.
Show resolved Hide resolved
.collect(Collectors.toMap(item -> item.values().get(2).asNode().get("urn").asString(),
djordje-mijatovic marked this conversation as resolved.
Show resolved Hide resolved
Function.identity(),
(item1, item2) -> item1.get(1).size() < item2.get(1).size() ? item1 : item2))
djordje-mijatovic marked this conversation as resolved.
Show resolved Hide resolved
.values()
.stream()
.collect(Collectors.toList());

LineageRelationshipArray relations = new LineageRelationshipArray();
neo4jResult.stream()
.skip(offset).limit(count)
.forEach(item -> {
String urn = item.values().get(2).asNode().get("urn").asString();
String relationType = ((InternalRelationship) item.get(1).asList().get(0)).type();
int numHops = item.get(1).size();
try {
relations.add(new LineageRelationship()
.setEntity(Urn.createFromString(urn))
.setType(relationType)
.setDegree(numHops));
} catch (URISyntaxException ignored) {
log.warn(String.format("Can't convert urn = %s, Error = %s", urn, ignored.getMessage()));
}
});

EntityLineageResult result = new EntityLineageResult().setStart(offset)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor formatting:

new EntityLineageResult()
    .setStart(offset) 

.setCount(relations.size())
.setRelationships(relations)
.setTotal(neo4jResult.size());

log.debug(String.format("Neo4j getLineage results = %s", result));
return result;
}

private String generateLineageStatement(@Nonnull Urn entityUrn, @Nonnull LineageDirection direction, GraphFilters graphFilters, int maxHops) {
final String multiHopTemplateDirect = "MATCH (a {urn: '%s'})-[r:%s*1..%d]->(b) WHERE b:%s RETURN a,r,b";
final String multiHopTemplateIndirect = "MATCH (a {urn: '%s'})<-[r:%s*1..%d]-(b) WHERE b:%s RETURN a,r,b";

List<LineageRegistry.EdgeInfo> edgesToFetch =
getLineageRegistry().getLineageRelationships(entityUrn.getEntityType(), direction);

String upstreamRel = edgesToFetch.stream()
.filter(item -> item.getDirection() == RelationshipDirection.OUTGOING)
.map(item -> item.getType())
.collect(Collectors.joining("|"));
String dowStreamRel = edgesToFetch.stream()
.filter(item -> item.getDirection() == RelationshipDirection.INCOMING)
.map(item -> item.getType())
.collect(Collectors.joining("|"));

final String allowedEntityTypes = String.join(" OR b:", graphFilters.getAllowedEntityTypes());

final String statementDirect = String.format(multiHopTemplateDirect, entityUrn, upstreamRel, maxHops, allowedEntityTypes);
final String statementIndirect = String.format(multiHopTemplateIndirect, entityUrn, dowStreamRel, maxHops, allowedEntityTypes);

String statement = null;
if (upstreamRel.length() > 0 && dowStreamRel.length() > 0) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

naming nit: dowStreamRel -> downStreamRel

statement = statementDirect + " UNION " + statementIndirect;
} else if (upstreamRel.length() > 0) {
statement = statementDirect;
} else if (dowStreamRel.length() > 0) {
statement = statementIndirect;
}
return statement;
}

@Nonnull
public RelatedEntitiesResult findRelatedEntities(
@Nullable final List<String> sourceTypes,
Expand Down Expand Up @@ -420,4 +511,9 @@ private Statement getOrInsertNode(@Nonnull Urn urn) {

return buildStatement(statement, params);
}

@Override
public boolean supportsMultiHop() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1583,4 +1583,45 @@ public void run() {
assertEquals(throwables.size(), 0);
}

@Test
public void testPopulatedGraphServiceGetLineageMultihop() throws Exception {
GraphService service = getLineagePopulatedGraphService();

EntityLineageResult upstreamLineage = service.getLineage(datasetOneUrn, LineageDirection.UPSTREAM, 0, 1000, 2);
assertEquals(upstreamLineage.getTotal().intValue(), 0);
assertEquals(upstreamLineage.getRelationships().size(), 0);

EntityLineageResult downstreamLineage = service.getLineage(datasetOneUrn, LineageDirection.DOWNSTREAM, 0, 1000, 2);

assertEquals(downstreamLineage.getTotal().intValue(), 5);
assertEquals(downstreamLineage.getRelationships().size(), 5);
Map<Urn, LineageRelationship> relationships = downstreamLineage.getRelationships().stream().collect(Collectors.toMap(LineageRelationship::getEntity,
Function.identity()));
assertTrue(relationships.containsKey(datasetTwoUrn));
assertEquals(relationships.get(datasetTwoUrn).getDegree().intValue(), 1);
assertTrue(relationships.containsKey(datasetThreeUrn));
assertEquals(relationships.get(datasetThreeUrn).getDegree().intValue(), 2);
assertTrue(relationships.containsKey(datasetFourUrn));
assertEquals(relationships.get(datasetFourUrn).getDegree().intValue(), 2);
assertTrue(relationships.containsKey(dataJobOneUrn));
assertEquals(relationships.get(dataJobOneUrn).getDegree().intValue(), 1);
assertTrue(relationships.containsKey(dataJobTwoUrn));
assertEquals(relationships.get(dataJobTwoUrn).getDegree().intValue(), 1);

upstreamLineage = service.getLineage(datasetThreeUrn, LineageDirection.UPSTREAM, 0, 1000, 2);
assertEquals(upstreamLineage.getTotal().intValue(), 3);
assertEquals(upstreamLineage.getRelationships().size(), 3);
relationships = upstreamLineage.getRelationships().stream().collect(Collectors.toMap(LineageRelationship::getEntity,
Function.identity()));
assertTrue(relationships.containsKey(datasetOneUrn));
assertEquals(relationships.get(datasetOneUrn).getDegree().intValue(), 2);
assertTrue(relationships.containsKey(datasetTwoUrn));
assertEquals(relationships.get(datasetTwoUrn).getDegree().intValue(), 1);
assertTrue(relationships.containsKey(dataJobOneUrn));
assertEquals(relationships.get(dataJobOneUrn).getDegree().intValue(), 1);

downstreamLineage = service.getLineage(datasetThreeUrn, LineageDirection.DOWNSTREAM, 0, 1000, 2);
assertEquals(downstreamLineage.getTotal().intValue(), 0);
assertEquals(downstreamLineage.getRelationships().size(), 0);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice test.

Any cases for when there are no downstream or upstream edges for the source urn?

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -784,4 +784,9 @@ public void testGetDestinationUrnsFromResponseData() {
RELATED_ENTITY_COMPARATOR
);
}

@Override
public void testPopulatedGraphServiceGetLineageMultihop() {
// TODO: Remove this overridden method once the multihop for dGraph is implemented!
djordje-mijatovic marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,8 @@
import com.linkedin.common.urn.Urn;
import com.linkedin.metadata.ElasticSearchTestUtils;
import com.linkedin.metadata.ElasticTestUtils;
import com.linkedin.metadata.graph.EntityLineageResult;
import com.linkedin.metadata.graph.GraphService;
import com.linkedin.metadata.graph.GraphServiceTestBase;
import com.linkedin.metadata.graph.LineageDirection;
import com.linkedin.metadata.graph.LineageRelationship;
import com.linkedin.metadata.graph.RelatedEntitiesResult;
import com.linkedin.metadata.graph.RelatedEntity;
import com.linkedin.metadata.models.registry.LineageRegistry;
Expand All @@ -30,14 +27,10 @@
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import static com.linkedin.metadata.DockerTestUtils.checkContainerEngine;
import static com.linkedin.metadata.graph.elastic.ElasticSearchGraphService.INDEX_NAME;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;


public class ElasticSearchGraphServiceTest extends GraphServiceTestBase {
Expand Down Expand Up @@ -208,45 +201,4 @@ public void testConcurrentRemoveNodes() {
// https://github.com/datahub-project/datahub/issues/3118
throw new SkipException("ElasticSearchGraphService produces duplicates");
}

@Test
public void testPopulatedGraphServiceGetLineageMultihop() throws Exception {
jjoyce0510 marked this conversation as resolved.
Show resolved Hide resolved
GraphService service = getLineagePopulatedGraphService();

EntityLineageResult upstreamLineage = service.getLineage(datasetOneUrn, LineageDirection.UPSTREAM, 0, 1000, 2);
assertEquals(upstreamLineage.getTotal().intValue(), 0);
assertEquals(upstreamLineage.getRelationships().size(), 0);

EntityLineageResult downstreamLineage = service.getLineage(datasetOneUrn, LineageDirection.DOWNSTREAM, 0, 1000, 2);
assertEquals(downstreamLineage.getTotal().intValue(), 5);
assertEquals(downstreamLineage.getRelationships().size(), 5);
Map<Urn, LineageRelationship> relationships = downstreamLineage.getRelationships().stream().collect(Collectors.toMap(LineageRelationship::getEntity,
Function.identity()));
assertTrue(relationships.containsKey(datasetTwoUrn));
assertEquals(relationships.get(datasetTwoUrn).getDegree().intValue(), 1);
assertTrue(relationships.containsKey(datasetThreeUrn));
assertEquals(relationships.get(datasetThreeUrn).getDegree().intValue(), 2);
assertTrue(relationships.containsKey(datasetFourUrn));
assertEquals(relationships.get(datasetFourUrn).getDegree().intValue(), 2);
assertTrue(relationships.containsKey(dataJobOneUrn));
assertEquals(relationships.get(dataJobOneUrn).getDegree().intValue(), 1);
assertTrue(relationships.containsKey(dataJobTwoUrn));
assertEquals(relationships.get(dataJobTwoUrn).getDegree().intValue(), 1);

upstreamLineage = service.getLineage(datasetThreeUrn, LineageDirection.UPSTREAM, 0, 1000, 2);
assertEquals(upstreamLineage.getTotal().intValue(), 3);
assertEquals(upstreamLineage.getRelationships().size(), 3);
relationships = upstreamLineage.getRelationships().stream().collect(Collectors.toMap(LineageRelationship::getEntity,
Function.identity()));
assertTrue(relationships.containsKey(datasetOneUrn));
assertEquals(relationships.get(datasetOneUrn).getDegree().intValue(), 2);
assertTrue(relationships.containsKey(datasetTwoUrn));
assertEquals(relationships.get(datasetTwoUrn).getDegree().intValue(), 1);
assertTrue(relationships.containsKey(dataJobOneUrn));
assertEquals(relationships.get(dataJobOneUrn).getDegree().intValue(), 1);

downstreamLineage = service.getLineage(datasetThreeUrn, LineageDirection.DOWNSTREAM, 0, 1000, 2);
assertEquals(downstreamLineage.getTotal().intValue(), 0);
assertEquals(downstreamLineage.getRelationships().size(), 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

import com.linkedin.metadata.graph.GraphService;
import com.linkedin.metadata.graph.GraphServiceTestBase;
import com.linkedin.metadata.models.registry.LineageRegistry;
import com.linkedin.metadata.graph.RelatedEntitiesResult;
import com.linkedin.metadata.graph.RelatedEntity;
import com.linkedin.metadata.models.registry.LineageRegistry;
import com.linkedin.metadata.models.registry.SnapshotEntityRegistry;
import com.linkedin.metadata.query.filter.RelationshipFilter;
import org.neo4j.driver.Driver;
Expand Down