Skip to content

Commit

Permalink
Merge branch 'master' into david-leifker/elasticsearch-improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker authored Nov 15, 2022
2 parents 81aad15 + a274406 commit f3e54cf
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 50 deletions.
2 changes: 1 addition & 1 deletion docker/datahub-mae-consumer/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ if [[ ${GRAPH_SERVICE_IMPL:-} != elasticsearch ]] && [[ ${SKIP_NEO4J_CHECK:-fals
dockerize_args+=("-wait" "$NEO4J_HOST")
fi

JAVA_TOOL_OPTIONS="${JDK_JAVA_OPTIONS:-}${JAVA_OPTS:+ JAVA_OPTS}${JMX_OPTS:+ JMX_OPTS}"
JAVA_TOOL_OPTIONS="${JDK_JAVA_OPTIONS:-}${JAVA_OPTS:+ $JAVA_OPTS}${JMX_OPTS:+ $JMX_OPTS}"
if [[ ${ENABLE_OTEL:-false} == true ]]; then
JAVA_TOOL_OPTIONS="$JAVA_TOOL_OPTIONS -javaagent:opentelemetry-javaagent-all.jar"
fi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,33 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.linkedin.common.urn.Urn;

import com.linkedin.metadata.graph.Edge;
import com.linkedin.metadata.graph.EntityLineageResult;
import com.linkedin.metadata.graph.GraphFilters;
import com.linkedin.metadata.graph.GraphService;
import com.linkedin.metadata.models.registry.LineageRegistry;
import com.linkedin.metadata.graph.LineageDirection;
import com.linkedin.metadata.graph.LineageRelationship;
import com.linkedin.metadata.graph.LineageRelationshipArray;
import com.linkedin.metadata.graph.RelatedEntitiesResult;
import com.linkedin.metadata.graph.RelatedEntity;
import com.linkedin.metadata.models.registry.LineageRegistry;
import com.linkedin.metadata.query.filter.Condition;
import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray;
import com.linkedin.metadata.query.filter.CriterionArray;
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.query.filter.RelationshipDirection;
import com.linkedin.metadata.query.filter.RelationshipFilter;
import com.linkedin.metadata.utils.metrics.MetricUtils;

import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -33,11 +42,14 @@
import org.apache.commons.lang.time.StopWatch;
import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.StringUtils;

import org.neo4j.driver.Driver;
import org.neo4j.driver.Record;
import org.neo4j.driver.Result;
import org.neo4j.driver.Session;
import org.neo4j.driver.SessionConfig;
import org.neo4j.driver.exceptions.Neo4jException;
import org.neo4j.driver.internal.InternalRelationship;

@Slf4j
public class Neo4jGraphService implements GraphService {
Expand Down Expand Up @@ -100,6 +112,85 @@ public void removeEdge(final Edge edge) {
throw new UnsupportedOperationException("Remove edge not supported by Neo4JGraphService at this time.");
}

@Nonnull
@Override
public EntityLineageResult getLineage(@Nonnull Urn entityUrn, @Nonnull LineageDirection direction,
GraphFilters graphFilters, int offset, int count, int maxHops) {
log.debug(String.format("Neo4j getLineage maxHops = %d", maxHops));

final String statement = generateLineageStatement(entityUrn, direction, graphFilters, maxHops);

List<Record> neo4jResult = statement != null ? runQuery(buildStatement(statement, new HashMap<>())).list() : new ArrayList<>();

// It is possible to have more than 1 path from node A to node B in the graph and previous query returns all the paths.
// We convert the List into Map with only the shortest paths. "item.get(i).size()" is the path size between two nodes in relation.
// The key for mapping is the destination node as the source node is always the same, and it is defined by parameter.
neo4jResult = neo4jResult.stream()
.collect(Collectors.toMap(item -> item.values().get(2).asNode().get("urn").asString(),
Function.identity(),
(item1, item2) -> item1.get(1).size() < item2.get(1).size() ? item1 : item2))
.values()
.stream()
.collect(Collectors.toList());

LineageRelationshipArray relations = new LineageRelationshipArray();
neo4jResult.stream()
.skip(offset).limit(count)
.forEach(item -> {
String urn = item.values().get(2).asNode().get("urn").asString();
String relationType = ((InternalRelationship) item.get(1).asList().get(0)).type();
int numHops = item.get(1).size();
try {
relations.add(new LineageRelationship()
.setEntity(Urn.createFromString(urn))
.setType(relationType)
.setDegree(numHops));
} catch (URISyntaxException ignored) {
log.warn(String.format("Can't convert urn = %s, Error = %s", urn, ignored.getMessage()));
}
});

EntityLineageResult result = new EntityLineageResult().setStart(offset)
.setCount(relations.size())
.setRelationships(relations)
.setTotal(neo4jResult.size());

log.debug(String.format("Neo4j getLineage results = %s", result));
return result;
}

private String generateLineageStatement(@Nonnull Urn entityUrn, @Nonnull LineageDirection direction, GraphFilters graphFilters, int maxHops) {
final String multiHopTemplateDirect = "MATCH (a {urn: '%s'})-[r:%s*1..%d]->(b) WHERE b:%s RETURN a,r,b";
final String multiHopTemplateIndirect = "MATCH (a {urn: '%s'})<-[r:%s*1..%d]-(b) WHERE b:%s RETURN a,r,b";

List<LineageRegistry.EdgeInfo> edgesToFetch =
getLineageRegistry().getLineageRelationships(entityUrn.getEntityType(), direction);

String upstreamRel = edgesToFetch.stream()
.filter(item -> item.getDirection() == RelationshipDirection.OUTGOING)
.map(item -> item.getType())
.collect(Collectors.joining("|"));
String dowStreamRel = edgesToFetch.stream()
.filter(item -> item.getDirection() == RelationshipDirection.INCOMING)
.map(item -> item.getType())
.collect(Collectors.joining("|"));

final String allowedEntityTypes = String.join(" OR b:", graphFilters.getAllowedEntityTypes());

final String statementDirect = String.format(multiHopTemplateDirect, entityUrn, upstreamRel, maxHops, allowedEntityTypes);
final String statementIndirect = String.format(multiHopTemplateIndirect, entityUrn, dowStreamRel, maxHops, allowedEntityTypes);

String statement = null;
if (upstreamRel.length() > 0 && dowStreamRel.length() > 0) {
statement = statementDirect + " UNION " + statementIndirect;
} else if (upstreamRel.length() > 0) {
statement = statementDirect;
} else if (dowStreamRel.length() > 0) {
statement = statementIndirect;
}
return statement;
}

@Nonnull
public RelatedEntitiesResult findRelatedEntities(
@Nullable final List<String> sourceTypes,
Expand Down Expand Up @@ -426,4 +517,9 @@ private Statement getOrInsertNode(@Nonnull Urn urn) {

return buildStatement(statement, params);
}

@Override
public boolean supportsMultiHop() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1585,4 +1585,45 @@ public void run() {
assertEquals(throwables.size(), 0);
}

@Test
public void testPopulatedGraphServiceGetLineageMultihop() throws Exception {
GraphService service = getLineagePopulatedGraphService();

EntityLineageResult upstreamLineage = service.getLineage(datasetOneUrn, LineageDirection.UPSTREAM, 0, 1000, 2);
assertEquals(upstreamLineage.getTotal().intValue(), 0);
assertEquals(upstreamLineage.getRelationships().size(), 0);

EntityLineageResult downstreamLineage = service.getLineage(datasetOneUrn, LineageDirection.DOWNSTREAM, 0, 1000, 2);

assertEquals(downstreamLineage.getTotal().intValue(), 5);
assertEquals(downstreamLineage.getRelationships().size(), 5);
Map<Urn, LineageRelationship> relationships = downstreamLineage.getRelationships().stream().collect(Collectors.toMap(LineageRelationship::getEntity,
Function.identity()));
assertTrue(relationships.containsKey(datasetTwoUrn));
assertEquals(relationships.get(datasetTwoUrn).getDegree().intValue(), 1);
assertTrue(relationships.containsKey(datasetThreeUrn));
assertEquals(relationships.get(datasetThreeUrn).getDegree().intValue(), 2);
assertTrue(relationships.containsKey(datasetFourUrn));
assertEquals(relationships.get(datasetFourUrn).getDegree().intValue(), 2);
assertTrue(relationships.containsKey(dataJobOneUrn));
assertEquals(relationships.get(dataJobOneUrn).getDegree().intValue(), 1);
assertTrue(relationships.containsKey(dataJobTwoUrn));
assertEquals(relationships.get(dataJobTwoUrn).getDegree().intValue(), 1);

upstreamLineage = service.getLineage(datasetThreeUrn, LineageDirection.UPSTREAM, 0, 1000, 2);
assertEquals(upstreamLineage.getTotal().intValue(), 3);
assertEquals(upstreamLineage.getRelationships().size(), 3);
relationships = upstreamLineage.getRelationships().stream().collect(Collectors.toMap(LineageRelationship::getEntity,
Function.identity()));
assertTrue(relationships.containsKey(datasetOneUrn));
assertEquals(relationships.get(datasetOneUrn).getDegree().intValue(), 2);
assertTrue(relationships.containsKey(datasetTwoUrn));
assertEquals(relationships.get(datasetTwoUrn).getDegree().intValue(), 1);
assertTrue(relationships.containsKey(dataJobOneUrn));
assertEquals(relationships.get(dataJobOneUrn).getDegree().intValue(), 1);

downstreamLineage = service.getLineage(datasetThreeUrn, LineageDirection.DOWNSTREAM, 0, 1000, 2);
assertEquals(downstreamLineage.getTotal().intValue(), 0);
assertEquals(downstreamLineage.getRelationships().size(), 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -784,4 +784,9 @@ public void testGetDestinationUrnsFromResponseData() {
RELATED_ENTITY_COMPARATOR
);
}

@Override
public void testPopulatedGraphServiceGetLineageMultihop() {
// TODO: Remove this overridden method once the multihop for dGraph is implemented!
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
import com.linkedin.metadata.graph.EntityLineageResult;
import com.linkedin.metadata.graph.GraphService;
import com.linkedin.metadata.graph.GraphServiceTestBase;
import com.linkedin.metadata.graph.LineageDirection;
import com.linkedin.metadata.graph.LineageRelationship;
import com.linkedin.metadata.graph.RelatedEntitiesResult;
import com.linkedin.metadata.graph.RelatedEntity;
import com.linkedin.metadata.models.registry.LineageRegistry;
Expand All @@ -36,14 +34,10 @@
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import static com.linkedin.metadata.graph.elastic.ElasticSearchGraphService.INDEX_NAME;
import static com.linkedin.metadata.search.utils.QueryUtils.*;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

@Import(ElasticSearchTestConfiguration.class)
public class ElasticSearchGraphServiceTest extends GraphServiceTestBase {
Expand Down Expand Up @@ -233,45 +227,4 @@ public void testConcurrentRemoveNodes() {
// https://github.com/datahub-project/datahub/issues/3118
throw new SkipException("ElasticSearchGraphService produces duplicates");
}

@Test
public void testPopulatedGraphServiceGetLineageMultihop() throws Exception {
GraphService service = getLineagePopulatedGraphService();

EntityLineageResult upstreamLineage = service.getLineage(datasetOneUrn, LineageDirection.UPSTREAM, 0, 1000, 2);
assertEquals(upstreamLineage.getTotal().intValue(), 0);
assertEquals(upstreamLineage.getRelationships().size(), 0);

EntityLineageResult downstreamLineage = service.getLineage(datasetOneUrn, LineageDirection.DOWNSTREAM, 0, 1000, 2);
assertEquals(downstreamLineage.getTotal().intValue(), 5);
assertEquals(downstreamLineage.getRelationships().size(), 5);
Map<Urn, LineageRelationship> relationships = downstreamLineage.getRelationships().stream().collect(Collectors.toMap(LineageRelationship::getEntity,
Function.identity()));
assertTrue(relationships.containsKey(datasetTwoUrn));
assertEquals(relationships.get(datasetTwoUrn).getDegree().intValue(), 1);
assertTrue(relationships.containsKey(datasetThreeUrn));
assertEquals(relationships.get(datasetThreeUrn).getDegree().intValue(), 2);
assertTrue(relationships.containsKey(datasetFourUrn));
assertEquals(relationships.get(datasetFourUrn).getDegree().intValue(), 2);
assertTrue(relationships.containsKey(dataJobOneUrn));
assertEquals(relationships.get(dataJobOneUrn).getDegree().intValue(), 1);
assertTrue(relationships.containsKey(dataJobTwoUrn));
assertEquals(relationships.get(dataJobTwoUrn).getDegree().intValue(), 1);

upstreamLineage = service.getLineage(datasetThreeUrn, LineageDirection.UPSTREAM, 0, 1000, 2);
assertEquals(upstreamLineage.getTotal().intValue(), 3);
assertEquals(upstreamLineage.getRelationships().size(), 3);
relationships = upstreamLineage.getRelationships().stream().collect(Collectors.toMap(LineageRelationship::getEntity,
Function.identity()));
assertTrue(relationships.containsKey(datasetOneUrn));
assertEquals(relationships.get(datasetOneUrn).getDegree().intValue(), 2);
assertTrue(relationships.containsKey(datasetTwoUrn));
assertEquals(relationships.get(datasetTwoUrn).getDegree().intValue(), 1);
assertTrue(relationships.containsKey(dataJobOneUrn));
assertEquals(relationships.get(dataJobOneUrn).getDegree().intValue(), 1);

downstreamLineage = service.getLineage(datasetThreeUrn, LineageDirection.DOWNSTREAM, 0, 1000, 2);
assertEquals(downstreamLineage.getTotal().intValue(), 0);
assertEquals(downstreamLineage.getRelationships().size(), 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

import com.linkedin.metadata.graph.GraphService;
import com.linkedin.metadata.graph.GraphServiceTestBase;
import com.linkedin.metadata.models.registry.LineageRegistry;
import com.linkedin.metadata.graph.RelatedEntitiesResult;
import com.linkedin.metadata.graph.RelatedEntity;
import com.linkedin.metadata.models.registry.LineageRegistry;
import com.linkedin.metadata.models.registry.SnapshotEntityRegistry;
import com.linkedin.metadata.query.filter.RelationshipFilter;
import org.neo4j.driver.Driver;
Expand Down

0 comments on commit f3e54cf

Please sign in to comment.