diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphService.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphService.java index 6a834352b92943..9b87c59d1e44cf 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphService.java @@ -6,11 +6,17 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import com.linkedin.common.urn.Urn; + import com.linkedin.metadata.graph.Edge; +import com.linkedin.metadata.graph.EntityLineageResult; +import com.linkedin.metadata.graph.GraphFilters; import com.linkedin.metadata.graph.GraphService; -import com.linkedin.metadata.models.registry.LineageRegistry; +import com.linkedin.metadata.graph.LineageDirection; +import com.linkedin.metadata.graph.LineageRelationship; +import com.linkedin.metadata.graph.LineageRelationshipArray; import com.linkedin.metadata.graph.RelatedEntitiesResult; import com.linkedin.metadata.graph.RelatedEntity; +import com.linkedin.metadata.models.registry.LineageRegistry; import com.linkedin.metadata.query.filter.Condition; import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray; import com.linkedin.metadata.query.filter.CriterionArray; @@ -18,12 +24,15 @@ import com.linkedin.metadata.query.filter.RelationshipDirection; import com.linkedin.metadata.query.filter.RelationshipFilter; import com.linkedin.metadata.utils.metrics.MetricUtils; + +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.StringJoiner; +import java.util.function.Function; import java.util.stream.Collectors; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -33,11 +42,14 @@ import org.apache.commons.lang.time.StopWatch; import org.apache.commons.lang3.ClassUtils; import org.apache.commons.lang3.StringUtils; + import org.neo4j.driver.Driver; +import org.neo4j.driver.Record; import org.neo4j.driver.Result; import org.neo4j.driver.Session; import org.neo4j.driver.SessionConfig; import org.neo4j.driver.exceptions.Neo4jException; +import org.neo4j.driver.internal.InternalRelationship; @Slf4j public class Neo4jGraphService implements GraphService { @@ -94,6 +106,85 @@ public void addEdge(@Nonnull final Edge edge) { executeStatements(statements); } + @Nonnull + @Override + public EntityLineageResult getLineage(@Nonnull Urn entityUrn, @Nonnull LineageDirection direction, + GraphFilters graphFilters, int offset, int count, int maxHops) { + log.debug(String.format("Neo4j getLineage maxHops = %d", maxHops)); + + final String statement = generateLineageStatement(entityUrn, direction, graphFilters, maxHops); + + List neo4jResult = statement != null ? runQuery(buildStatement(statement, new HashMap<>())).list() : new ArrayList<>(); + + // It is possible to have more than 1 path from node A to node B in the graph and previous query returns all the paths. + // We convert the List into Map with only the shortest paths. "item.get(i).size()" is the path size between two nodes in relation. + // The key for mapping is the destination node as the source node is always the same, and it is defined by parameter. + neo4jResult = neo4jResult.stream() + .collect(Collectors.toMap(item -> item.values().get(2).asNode().get("urn").asString(), + Function.identity(), + (item1, item2) -> item1.get(1).size() < item2.get(1).size() ? item1 : item2)) + .values() + .stream() + .collect(Collectors.toList()); + + LineageRelationshipArray relations = new LineageRelationshipArray(); + neo4jResult.stream() + .skip(offset).limit(count) + .forEach(item -> { + String urn = item.values().get(2).asNode().get("urn").asString(); + String relationType = ((InternalRelationship) item.get(1).asList().get(0)).type(); + int numHops = item.get(1).size(); + try { + relations.add(new LineageRelationship() + .setEntity(Urn.createFromString(urn)) + .setType(relationType) + .setDegree(numHops)); + } catch (URISyntaxException ignored) { + log.warn(String.format("Can't convert urn = %s, Error = %s", urn, ignored.getMessage())); + } + }); + + EntityLineageResult result = new EntityLineageResult().setStart(offset) + .setCount(relations.size()) + .setRelationships(relations) + .setTotal(neo4jResult.size()); + + log.debug(String.format("Neo4j getLineage results = %s", result)); + return result; + } + + private String generateLineageStatement(@Nonnull Urn entityUrn, @Nonnull LineageDirection direction, GraphFilters graphFilters, int maxHops) { + final String multiHopTemplateDirect = "MATCH (a {urn: '%s'})-[r:%s*1..%d]->(b) WHERE b:%s RETURN a,r,b"; + final String multiHopTemplateIndirect = "MATCH (a {urn: '%s'})<-[r:%s*1..%d]-(b) WHERE b:%s RETURN a,r,b"; + + List edgesToFetch = + getLineageRegistry().getLineageRelationships(entityUrn.getEntityType(), direction); + + String upstreamRel = edgesToFetch.stream() + .filter(item -> item.getDirection() == RelationshipDirection.OUTGOING) + .map(item -> item.getType()) + .collect(Collectors.joining("|")); + String dowStreamRel = edgesToFetch.stream() + .filter(item -> item.getDirection() == RelationshipDirection.INCOMING) + .map(item -> item.getType()) + .collect(Collectors.joining("|")); + + final String allowedEntityTypes = String.join(" OR b:", graphFilters.getAllowedEntityTypes()); + + final String statementDirect = String.format(multiHopTemplateDirect, entityUrn, upstreamRel, maxHops, allowedEntityTypes); + final String statementIndirect = String.format(multiHopTemplateIndirect, entityUrn, dowStreamRel, maxHops, allowedEntityTypes); + + String statement = null; + if (upstreamRel.length() > 0 && dowStreamRel.length() > 0) { + statement = statementDirect + " UNION " + statementIndirect; + } else if (upstreamRel.length() > 0) { + statement = statementDirect; + } else if (dowStreamRel.length() > 0) { + statement = statementIndirect; + } + return statement; + } + @Nonnull public RelatedEntitiesResult findRelatedEntities( @Nullable final List sourceTypes, @@ -420,4 +511,9 @@ private Statement getOrInsertNode(@Nonnull Urn urn) { return buildStatement(statement, params); } + + @Override + public boolean supportsMultiHop() { + return true; + } } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/GraphServiceTestBase.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/GraphServiceTestBase.java index 5647b828c37cba..ca80bf567672d8 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/graph/GraphServiceTestBase.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/GraphServiceTestBase.java @@ -1583,4 +1583,45 @@ public void run() { assertEquals(throwables.size(), 0); } + @Test + public void testPopulatedGraphServiceGetLineageMultihop() throws Exception { + GraphService service = getLineagePopulatedGraphService(); + + EntityLineageResult upstreamLineage = service.getLineage(datasetOneUrn, LineageDirection.UPSTREAM, 0, 1000, 2); + assertEquals(upstreamLineage.getTotal().intValue(), 0); + assertEquals(upstreamLineage.getRelationships().size(), 0); + + EntityLineageResult downstreamLineage = service.getLineage(datasetOneUrn, LineageDirection.DOWNSTREAM, 0, 1000, 2); + + assertEquals(downstreamLineage.getTotal().intValue(), 5); + assertEquals(downstreamLineage.getRelationships().size(), 5); + Map relationships = downstreamLineage.getRelationships().stream().collect(Collectors.toMap(LineageRelationship::getEntity, + Function.identity())); + assertTrue(relationships.containsKey(datasetTwoUrn)); + assertEquals(relationships.get(datasetTwoUrn).getDegree().intValue(), 1); + assertTrue(relationships.containsKey(datasetThreeUrn)); + assertEquals(relationships.get(datasetThreeUrn).getDegree().intValue(), 2); + assertTrue(relationships.containsKey(datasetFourUrn)); + assertEquals(relationships.get(datasetFourUrn).getDegree().intValue(), 2); + assertTrue(relationships.containsKey(dataJobOneUrn)); + assertEquals(relationships.get(dataJobOneUrn).getDegree().intValue(), 1); + assertTrue(relationships.containsKey(dataJobTwoUrn)); + assertEquals(relationships.get(dataJobTwoUrn).getDegree().intValue(), 1); + + upstreamLineage = service.getLineage(datasetThreeUrn, LineageDirection.UPSTREAM, 0, 1000, 2); + assertEquals(upstreamLineage.getTotal().intValue(), 3); + assertEquals(upstreamLineage.getRelationships().size(), 3); + relationships = upstreamLineage.getRelationships().stream().collect(Collectors.toMap(LineageRelationship::getEntity, + Function.identity())); + assertTrue(relationships.containsKey(datasetOneUrn)); + assertEquals(relationships.get(datasetOneUrn).getDegree().intValue(), 2); + assertTrue(relationships.containsKey(datasetTwoUrn)); + assertEquals(relationships.get(datasetTwoUrn).getDegree().intValue(), 1); + assertTrue(relationships.containsKey(dataJobOneUrn)); + assertEquals(relationships.get(dataJobOneUrn).getDegree().intValue(), 1); + + downstreamLineage = service.getLineage(datasetThreeUrn, LineageDirection.DOWNSTREAM, 0, 1000, 2); + assertEquals(downstreamLineage.getTotal().intValue(), 0); + assertEquals(downstreamLineage.getRelationships().size(), 0); + } } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/dgraph/DgraphGraphServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/dgraph/DgraphGraphServiceTest.java index a851b62527707f..abf9bf532ddd87 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/graph/dgraph/DgraphGraphServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/dgraph/DgraphGraphServiceTest.java @@ -784,4 +784,9 @@ public void testGetDestinationUrnsFromResponseData() { RELATED_ENTITY_COMPARATOR ); } + + @Override + public void testPopulatedGraphServiceGetLineageMultihop() { + // TODO: Remove this overridden method once the multihop for dGraph is implemented! + } } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphServiceTest.java index 104f7c06ec8863..888b606e4f0a5d 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphServiceTest.java @@ -3,11 +3,8 @@ import com.linkedin.common.urn.Urn; import com.linkedin.metadata.ElasticSearchTestUtils; import com.linkedin.metadata.ElasticTestUtils; -import com.linkedin.metadata.graph.EntityLineageResult; import com.linkedin.metadata.graph.GraphService; import com.linkedin.metadata.graph.GraphServiceTestBase; -import com.linkedin.metadata.graph.LineageDirection; -import com.linkedin.metadata.graph.LineageRelationship; import com.linkedin.metadata.graph.RelatedEntitiesResult; import com.linkedin.metadata.graph.RelatedEntity; import com.linkedin.metadata.models.registry.LineageRegistry; @@ -30,14 +27,10 @@ import java.util.Comparator; import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.function.Function; -import java.util.stream.Collectors; import static com.linkedin.metadata.DockerTestUtils.checkContainerEngine; import static com.linkedin.metadata.graph.elastic.ElasticSearchGraphService.INDEX_NAME; import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertTrue; public class ElasticSearchGraphServiceTest extends GraphServiceTestBase { @@ -208,45 +201,4 @@ public void testConcurrentRemoveNodes() { // https://github.com/datahub-project/datahub/issues/3118 throw new SkipException("ElasticSearchGraphService produces duplicates"); } - - @Test - public void testPopulatedGraphServiceGetLineageMultihop() throws Exception { - GraphService service = getLineagePopulatedGraphService(); - - EntityLineageResult upstreamLineage = service.getLineage(datasetOneUrn, LineageDirection.UPSTREAM, 0, 1000, 2); - assertEquals(upstreamLineage.getTotal().intValue(), 0); - assertEquals(upstreamLineage.getRelationships().size(), 0); - - EntityLineageResult downstreamLineage = service.getLineage(datasetOneUrn, LineageDirection.DOWNSTREAM, 0, 1000, 2); - assertEquals(downstreamLineage.getTotal().intValue(), 5); - assertEquals(downstreamLineage.getRelationships().size(), 5); - Map relationships = downstreamLineage.getRelationships().stream().collect(Collectors.toMap(LineageRelationship::getEntity, - Function.identity())); - assertTrue(relationships.containsKey(datasetTwoUrn)); - assertEquals(relationships.get(datasetTwoUrn).getDegree().intValue(), 1); - assertTrue(relationships.containsKey(datasetThreeUrn)); - assertEquals(relationships.get(datasetThreeUrn).getDegree().intValue(), 2); - assertTrue(relationships.containsKey(datasetFourUrn)); - assertEquals(relationships.get(datasetFourUrn).getDegree().intValue(), 2); - assertTrue(relationships.containsKey(dataJobOneUrn)); - assertEquals(relationships.get(dataJobOneUrn).getDegree().intValue(), 1); - assertTrue(relationships.containsKey(dataJobTwoUrn)); - assertEquals(relationships.get(dataJobTwoUrn).getDegree().intValue(), 1); - - upstreamLineage = service.getLineage(datasetThreeUrn, LineageDirection.UPSTREAM, 0, 1000, 2); - assertEquals(upstreamLineage.getTotal().intValue(), 3); - assertEquals(upstreamLineage.getRelationships().size(), 3); - relationships = upstreamLineage.getRelationships().stream().collect(Collectors.toMap(LineageRelationship::getEntity, - Function.identity())); - assertTrue(relationships.containsKey(datasetOneUrn)); - assertEquals(relationships.get(datasetOneUrn).getDegree().intValue(), 2); - assertTrue(relationships.containsKey(datasetTwoUrn)); - assertEquals(relationships.get(datasetTwoUrn).getDegree().intValue(), 1); - assertTrue(relationships.containsKey(dataJobOneUrn)); - assertEquals(relationships.get(dataJobOneUrn).getDegree().intValue(), 1); - - downstreamLineage = service.getLineage(datasetThreeUrn, LineageDirection.DOWNSTREAM, 0, 1000, 2); - assertEquals(downstreamLineage.getTotal().intValue(), 0); - assertEquals(downstreamLineage.getRelationships().size(), 0); - } } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphServiceTest.java index 9f9ddef8a3267f..e2c3808b1e3c3d 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphServiceTest.java @@ -2,9 +2,9 @@ import com.linkedin.metadata.graph.GraphService; import com.linkedin.metadata.graph.GraphServiceTestBase; -import com.linkedin.metadata.models.registry.LineageRegistry; import com.linkedin.metadata.graph.RelatedEntitiesResult; import com.linkedin.metadata.graph.RelatedEntity; +import com.linkedin.metadata.models.registry.LineageRegistry; import com.linkedin.metadata.models.registry.SnapshotEntityRegistry; import com.linkedin.metadata.query.filter.RelationshipFilter; import org.neo4j.driver.Driver;