diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/mappers/UrnSearchAcrossLineageResultsMapper.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/mappers/UrnSearchAcrossLineageResultsMapper.java index ca363deb90c4d..d0c5605976d63 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/mappers/UrnSearchAcrossLineageResultsMapper.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/mappers/UrnSearchAcrossLineageResultsMapper.java @@ -71,6 +71,7 @@ private SearchAcrossLineageResult mapResult( .setDegrees(new ArrayList<>(searchEntity.getDegrees())) .setExplored(Boolean.TRUE.equals(searchEntity.isExplored())) .setIgnoredAsHop(Boolean.TRUE.equals(searchEntity.isIgnoredAsHop())) + .setTruncatedChildren(Boolean.TRUE.equals(searchEntity.isTruncatedChildren())) .build(); } } diff --git a/datahub-graphql-core/src/main/resources/search.graphql b/datahub-graphql-core/src/main/resources/search.graphql index 499ac3a0860d4..c7b5e61e9831c 100644 --- a/datahub-graphql-core/src/main/resources/search.graphql +++ b/datahub-graphql-core/src/main/resources/search.graphql @@ -747,6 +747,11 @@ type SearchAcrossLineageResult { """ explored: Boolean! + """ + Indicates this destination node has additional unexplored child relationships + """ + truncatedChildren: Boolean! + """ Whether this relationship was ignored as a hop """ diff --git a/metadata-ingestion/examples/perf/lineage_perf_example.py b/metadata-ingestion/examples/perf/lineage_perf_example.py new file mode 100644 index 0000000000000..3ee78bacb268a --- /dev/null +++ b/metadata-ingestion/examples/perf/lineage_perf_example.py @@ -0,0 +1,402 @@ +from typing import Iterable + +from datahub.emitter.mce_builder import make_data_job_urn, make_dataset_urn +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.graph.client import get_default_graph +from datahub.metadata.schema_classes import ( + DataJobInputOutputClass, + DatasetLineageTypeClass, + DatasetPropertiesClass, + List, + StatusClass, + UpstreamClass, + UpstreamLineageClass, +) +from datahub.utilities.urns.dataset_urn import DatasetUrn + + +def lineage_mcp_generator( + urn: str, upstreams: List[str] +) -> Iterable[MetadataChangeProposalWrapper]: + yield MetadataChangeProposalWrapper( + entityUrn=urn, + aspect=UpstreamLineageClass( + upstreams=[ + UpstreamClass( + dataset=upstream, + type=DatasetLineageTypeClass.TRANSFORMED, + ) + for upstream in upstreams + ] + ), + ) + for upstream in upstreams: + yield MetadataChangeProposalWrapper( + entityUrn=upstream, aspect=StatusClass(removed=False) + ) + for urn_itr in [urn, *upstreams]: + yield MetadataChangeProposalWrapper( + entityUrn=urn_itr, + aspect=DatasetPropertiesClass(name=DatasetUrn.from_string(urn_itr).name), + ) + + +def datajob_lineage_mcp_generator( + urn: str, upstreams: List[str], downstreams: List[str] +) -> Iterable[MetadataChangeProposalWrapper]: + yield MetadataChangeProposalWrapper( + entityUrn=urn, + aspect=DataJobInputOutputClass( + inputDatasets=upstreams, + outputDatasets=downstreams, + ), + ) + for upstream in upstreams: + yield MetadataChangeProposalWrapper( + entityUrn=upstream, aspect=StatusClass(removed=False) + ) + for downstream in downstreams: + yield MetadataChangeProposalWrapper( + entityUrn=downstream, aspect=StatusClass(removed=False) + ) + + +def scenario_truncate_basic(): + """searchAcrossLineage(root, depth=n, breadth=3, skip=None) + All 21 urns. + """ + + path = "truncate.basic" + root_urn = make_dataset_urn("snowflake", f"{path}.root") + + yield from lineage_mcp_generator( + root_urn, + [make_dataset_urn("snowflake", f"{path}.u_{i}") for i in range(10)], + ) + + for i in range(10): + yield from lineage_mcp_generator( + make_dataset_urn("snowflake", f"{path}.d_{i}"), [root_urn] + ) + + +def scenario_truncate_intermediate(): + """searchAcrossLineage(root, depth=3, skip=None) + 1 root urn, all 3 direct upstreams and downstreams, and then 4 urns for each 'expanded' urn. + Total 1 + 3 + 4*3 = 16 urns. + """ + + path = "truncate.intermediate" + root_urn = make_dataset_urn("snowflake", f"{path}.root") + + yield from lineage_mcp_generator( + root_urn, [make_dataset_urn("snowflake", f"{path}.u_{i}") for i in range(10)] + ) + + for i in range(3): + yield from lineage_mcp_generator( + make_dataset_urn("snowflake", f"{path}.u_{i}"), + [make_dataset_urn("snowflake", f"{path}.u_{i}_u_{j}") for j in range(3)], + ) + + for i in range(3): + yield from lineage_mcp_generator( + make_dataset_urn("snowflake", f"{path}.d_{i}"), [root_urn] + ) + for j in range(3): + yield from lineage_mcp_generator( + make_dataset_urn("snowflake", f"{path}.d_{i}d_{j}"), + [make_dataset_urn("snowflake", f"{path}.d_{i}")], + ) + + +def scenario_truncate_complex(): + """searchAcrossLineage(root, depth=n, breadth=3, skip=None) + 1 root urn, + direct (lvl a) upstream, + its two (lvl b) upstreams, + each of their 3 (lvl c) upstreams, + each of their 4 (lvl d) upstreams, + then, for three of the lvl d nodes, 5 (lvl e) upstreams each, + then, for two of the lvl e nodes, 6 (lvl f) upstreams, and for the other lvl e node, 1 (lvl f) upstream. + Total 1 + 1 + 2 + (2 * 3) + (2 * 3 * 4) + (2 * 3 * 3 * 5) + (2 * 3 * 3 * 2 * 6) + (2 * 3 * 3 * 1 * 1) = 358 urns. + """ + + path = "truncate.complex" + root_urn = make_dataset_urn("snowflake", f"{path}.root") + lvl_a = make_dataset_urn("snowflake", f"{path}.u_0") + lvl_b = {i: make_dataset_urn("snowflake", f"{path}.u_0_u_{i}") for i in range(2)} + lvl_c = { + (a, b): make_dataset_urn("snowflake", f"{path}.u_0_u_{a}_u_{b}") + for a in range(2) + for b in range(3) + } + lvl_d = { + (a, b, c): make_dataset_urn("snowflake", f"{path}.u_0_u_{a}_u_{b}_u_{c}") + for a in range(2) + for b in range(3) + for c in range(4) + } + lvl_e = { + (a, b, c, d): make_dataset_urn( + "snowflake", f"{path}.u_0_u_{a}_u_{b}_u_{c}_u_{d}" + ) + for a in range(2) + for b in range(3) + for c in range(4) + for d in range(5) + } + lvl_f = { + (a, b, c, d, e): make_dataset_urn( + "snowflake", f"{path}.u_0_u_{a}_u_{b}_u_{c}_u_{d}_u_{e}" + ) + for a in range(2) + for b in range(3) + for c in range(4) + for d in range(5) + for e in range(6 if d % 2 == 0 else 1) + } + + yield from lineage_mcp_generator(root_urn, [lvl_a]) + yield from lineage_mcp_generator(lvl_a, list(lvl_b.values())) + for a, urn in lvl_b.items(): + yield from lineage_mcp_generator(urn, [lvl_c[(a, b)] for b in range(3)]) + for (a, b), urn in lvl_c.items(): + yield from lineage_mcp_generator(urn, [lvl_d[(a, b, c)] for c in range(4)]) + for (a, b, c), urn in lvl_d.items(): + yield from lineage_mcp_generator(urn, [lvl_e[(a, b, c, d)] for d in range(5)]) + for (a, b, c, d), urn in lvl_e.items(): + yield from lineage_mcp_generator( + urn, [lvl_f[(a, b, c, d, e)] for e in range(6 if d % 2 == 0 else 1)] + ) + + +def scenario_skip_basic(): + """searchAcrossLineage(root, depth=1, breadth=10, skip=[{type: "dataJob"}, {type: "dataset", platform: "urn:li:dataPlatform:dbt"}]) + 1 root urn, both airflow nodes, both dbt nodes, and all 6 snowflake neighbors. + Total 1 + 2 + 2 + 6 = 11 urns. + """ + path = "skip.basic" + root_urn = make_dataset_urn("snowflake", f"{path}.root") + upstream_dbt_urn = make_dataset_urn("dbt", f"{path}.u_0") + upstream_airflow_urn = make_data_job_urn("airflow", f"{path}.flow", f"{path}.u_0") + + yield from lineage_mcp_generator( + root_urn, + [ + make_dataset_urn("snowflake", f"{path}.u_direct"), + upstream_dbt_urn, + ], + ) + yield from lineage_mcp_generator( + upstream_dbt_urn, + [make_dataset_urn("snowflake", f"{path}.u_through_dbt")], + ) + yield from datajob_lineage_mcp_generator( + upstream_airflow_urn, + [make_dataset_urn("snowflake", f"{path}.u_through_airflow")], + [root_urn], + ) + + downstream_dbt_urn = make_dataset_urn("dbt", f"{path}.d_0") + downstream_airflow_urn = make_data_job_urn("airflow", f"{path}.flow", f"{path}.d_0") + yield from lineage_mcp_generator( + make_dataset_urn("snowflake", f"{path}.d_direct"), + [root_urn], + ) + yield from lineage_mcp_generator( + downstream_dbt_urn, + [root_urn], + ) + yield from lineage_mcp_generator( + make_dataset_urn("snowflake", f"{path}.d_through_dbt"), + [downstream_dbt_urn], + ) + yield from datajob_lineage_mcp_generator( + downstream_airflow_urn, + [root_urn], + [make_dataset_urn("snowflake", f"{path}.d_through_airflow")], + ) + + +def scenario_skip_intermediate(): + """searchAcrossLineage(root, depth=1, breadth=10, skip=[{type: "dataJob"}, {type: "dataset", platform: "urn:li:dataPlatform:dbt"}]) + 1 root urn and all nodes aside from those upstream of `skip.intermediate.u_indirect_1`. + Total 11 urns. + searchAcrossLineage(root, depth=2, breadth=10, skip=[{type: "dataJob"}, {type: "dataset", platform: "urn:li:dataPlatform:dbt"}]) + All 14 urns. + """ + path = "skip.intermediate" + root_urn = make_dataset_urn("snowflake", f"{path}.root") + upstream_dbt_urns = [make_dataset_urn("dbt", f"{path}.u_{i}") for i in range(6)] + upstream_airflow_urn = make_data_job_urn("airflow", f"{path}.flow", f"{path}.u_0") + + yield from lineage_mcp_generator( + root_urn, + [ + make_dataset_urn("snowflake", f"{path}.u_direct"), + upstream_dbt_urns[0], + ], + ) + yield from datajob_lineage_mcp_generator( + upstream_airflow_urn, [upstream_dbt_urns[1]], [upstream_dbt_urns[0]] + ) + yield from lineage_mcp_generator( + upstream_dbt_urns[1], + [ + upstream_dbt_urns[2], + ], + ) + yield from lineage_mcp_generator( + upstream_dbt_urns[2], + [ + upstream_dbt_urns[3], + upstream_dbt_urns[4], + ], + ) + yield from lineage_mcp_generator( + upstream_dbt_urns[3], + [make_dataset_urn("snowflake", f"{path}.u_indirect_0")], + ) + yield from lineage_mcp_generator( + upstream_dbt_urns[4], + [ + make_dataset_urn("snowflake", f"{path}.u_indirect_1"), + make_dataset_urn("snowflake", f"{path}.u_indirect_2"), + ], + ) + yield from lineage_mcp_generator( + make_dataset_urn("snowflake", f"{path}.u_indirect_1"), + [make_dataset_urn("snowflake", f"{path}.u_depth_2"), upstream_dbt_urns[5]], + ) + yield from lineage_mcp_generator( + upstream_dbt_urns[5], + [ + make_dataset_urn("snowflake", f"{path}.u_depth_2_indirect"), + ], + ) + + +def scenario_skip_complex(): + """searchAcrossLineage(root, depth=1, breadth=1, skip=[{type: "dataJob"}, {type: "dataset", platform: "urn:li:dataPlatform:dbt"}]) + The 11 urns from scenario_skip_intermediate, plus 2 snowflake urns and 1 dbt node from the single expanded upstream. + Total 14 urns. + """ + path = "skip.complex" + root_urn = make_dataset_urn("snowflake", f"{path}.root") + upstream_dbt_urns = [make_dataset_urn("dbt", f"{path}.u_{i}") for i in range(5)] + upstream_airflow_urn = make_data_job_urn("airflow", f"{path}.flow", f"{path}.u_0") + depth_one_snowflake_urns = { + "direct": make_dataset_urn("snowflake", f"{path}.u_direct"), + "indirect_0": make_dataset_urn("snowflake", f"{path}.u_indirect_0"), + "indirect_1": make_dataset_urn("snowflake", f"{path}.u_indirect_1"), + "indirect_2": make_dataset_urn("snowflake", f"{path}.u_indirect_2"), + } + + yield from lineage_mcp_generator( + root_urn, + [ + depth_one_snowflake_urns["direct"], + upstream_dbt_urns[0], + ], + ) + yield from datajob_lineage_mcp_generator( + upstream_airflow_urn, [upstream_dbt_urns[1]], [upstream_dbt_urns[0]] + ) + yield from lineage_mcp_generator( + upstream_dbt_urns[1], + [ + upstream_dbt_urns[2], + ], + ) + yield from lineage_mcp_generator( + upstream_dbt_urns[2], + [ + upstream_dbt_urns[3], + upstream_dbt_urns[4], + ], + ) + yield from lineage_mcp_generator( + upstream_dbt_urns[3], + [depth_one_snowflake_urns["indirect_0"]], + ) + yield from lineage_mcp_generator( + upstream_dbt_urns[4], + [ + depth_one_snowflake_urns["indirect_1"], + depth_one_snowflake_urns["indirect_2"], + ], + ) + + for name, urn in depth_one_snowflake_urns.items(): + dbt_urn = make_dataset_urn("dbt", f"{path}.u_{name}") + yield from lineage_mcp_generator( + urn, + [make_dataset_urn("snowflake", f"{path}.direct_u_{name}"), dbt_urn], + ) + yield from lineage_mcp_generator( + dbt_urn, + [make_dataset_urn("snowflake", f"{path}.indirect_u_{name}")], + ) + + +def scenario_perf(): + """searchAcrossLineage(root, depth=n, breadth=3, skip=None) + 1 root urn, + direct (lvl a) upstream, + its 100 (lvl b) upstreams, + each of their 30 (lvl c) upstreams, + each of their 40 (lvl d) upstreams, + then, 50 (lvl e) upstreams each, + then, half of lvl e nodes, 6 (lvl f) upstreams, and for the other lvl e node, 1 (lvl f) upstream. + Total 1 + 1 + 100 + (100 * 30) + (100 * 30 * 40) + (100 * 30 * 40 * 5) = 723,102 urns. + Disabled by default to avoid overloading + """ + + path = "lineage.perf" + root_urn = make_dataset_urn("snowflake", f"{path}.root") + lvl_a = make_dataset_urn("snowflake", f"{path}.u_0") + lvl_b = {i: make_dataset_urn("snowflake", f"{path}.u_0_u_{i}") for i in range(100)} + lvl_c = { + (a, b): make_dataset_urn("snowflake", f"{path}.u_0_u_{a}_u_{b}") + for a in range(100) + for b in range(30) + } + lvl_d = { + (a, b, c): make_dataset_urn("snowflake", f"{path}.u_0_u_{a}_u_{b}_u_{c}") + for a in range(100) + for b in range(30) + for c in range(40) + } + lvl_e = { + (a, b, c, d): make_dataset_urn( + "snowflake", f"{path}.u_0_u_{a}_u_{b}_u_{c}_u_{d}" + ) + for a in range(100) + for b in range(30) + for c in range(40) + for d in range(5) + } + + yield from lineage_mcp_generator(root_urn, [lvl_a]) + yield from lineage_mcp_generator(lvl_a, list(lvl_b.values())) + for a, urn in lvl_b.items(): + yield from lineage_mcp_generator(urn, [lvl_c[(a, b)] for b in range(30)]) + for (a, b), urn in lvl_c.items(): + yield from lineage_mcp_generator(urn, [lvl_d[(a, b, c)] for c in range(40)]) + for (a, b, c), urn in lvl_d.items(): + yield from lineage_mcp_generator(urn, [lvl_e[(a, b, c, d)] for d in range(5)]) + + +if __name__ == "__main__": + graph = get_default_graph() + for mcp in [ + *scenario_truncate_basic(), + *scenario_truncate_intermediate(), + *scenario_truncate_complex(), + *scenario_skip_basic(), + *scenario_skip_intermediate(), + *scenario_skip_complex(), + # *scenario_perf(), + ]: + graph.emit_mcp(mcp) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java index 00c19fd3835cf..30b688761d584 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java @@ -37,7 +37,6 @@ import io.opentelemetry.extension.annotations.WithSpan; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashSet; @@ -65,9 +64,18 @@ import org.opensearch.index.query.BoolQueryBuilder; import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.QueryBuilders; +import org.opensearch.index.query.TermQueryBuilder; import org.opensearch.search.SearchHit; +import org.opensearch.search.aggregations.AggregationBuilders; +import org.opensearch.search.aggregations.bucket.filter.FilterAggregationBuilder; +import org.opensearch.search.aggregations.bucket.filter.ParsedFilter; +import org.opensearch.search.aggregations.bucket.terms.ParsedStringTerms; +import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.opensearch.search.aggregations.metrics.ParsedTopHits; +import org.opensearch.search.aggregations.metrics.TopHitsAggregationBuilder; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.search.rescore.QueryRescorerBuilder; +import org.opensearch.search.sort.SortOrder; /** A search DAO for Elasticsearch backend. */ @Slf4j @@ -89,7 +97,13 @@ public class ESGraphQueryDAO { static final String UPDATED_ON = "updatedOn"; static final String UPDATED_ACTOR = "updatedActor"; static final String PROPERTIES = "properties"; + static final String SCORE_FIELD = "_score"; static final String UI = "UI"; + static final String FILTER_BY_SOURCE_RELATIONSHIP = "filter_by_source_relationship"; + static final String FILTER_BY_DESTINATION_RELATIONSHIP = "filter_by_destination_relationship"; + static final String GROUP_BY_SOURCE_AGG = "group_by_source"; + static final String GROUP_BY_DESTINATION_AGG = "group_by_destination"; + static final String TOP_DOCUMENTS_AGG = "top_documents"; @Nonnull public static void addFilterToQueryBuilder( @@ -118,15 +132,7 @@ private SearchResponse executeLineageSearchQuery( @Nonnull final QueryBuilder query, final int offset, final int count) { SearchRequest searchRequest = new SearchRequest(); - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - - searchSourceBuilder.from(offset); - searchSourceBuilder.size(count); - - searchSourceBuilder.query(query); - if (graphQueryConfiguration.isBoostViaNodes()) { - addViaNodeBoostQuery(searchSourceBuilder); - } + SearchSourceBuilder searchSourceBuilder = sharedSourceBuilder(query, offset, count); searchRequest.source(searchSourceBuilder); @@ -141,24 +147,77 @@ private SearchResponse executeLineageSearchQuery( } } - private SearchResponse executeLineageSearchQuery( - @Nonnull final QueryBuilder query, - @Nullable Object[] sort, - @Nullable String pitId, - @Nonnull String keepAlive, - final int count) { - SearchRequest searchRequest = new SearchRequest(); - + private SearchSourceBuilder sharedSourceBuilder( + @Nonnull final QueryBuilder query, final int offset, final int count) { SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - ESUtils.setSearchAfter(searchSourceBuilder, sort, pitId, keepAlive); + searchSourceBuilder.from(offset); searchSourceBuilder.size(count); + searchSourceBuilder.query(query); + if (graphQueryConfiguration.isBoostViaNodes()) { + addViaNodeBoostQuery(searchSourceBuilder); + } + return searchSourceBuilder; + } + private SearchResponse executeGroupByLineageSearchQuery( + @Nonnull final QueryBuilder query, + final int offset, + final int count, + final Set> validEdges) { + SearchRequest searchRequest = new SearchRequest(); + + SearchSourceBuilder searchSourceBuilder = sharedSourceBuilder(query, offset, 0); + + // We have to group by both Source AND Destination because edge types may go in different + // directions for lineage + // set up filters for each relationship type in the correct direction to limit buckets + BoolQueryBuilder sourceFilterQuery = QueryBuilders.boolQuery(); + sourceFilterQuery.minimumShouldMatch(1); + validEdges.stream() + .filter(pair -> RelationshipDirection.OUTGOING.equals(pair.getValue().getDirection())) + .forEach(pair -> sourceFilterQuery.should(getAggregationFilter(pair))); + + BoolQueryBuilder destFilterQuery = QueryBuilders.boolQuery(); + destFilterQuery.minimumShouldMatch(1); + validEdges.stream() + .filter(pair -> RelationshipDirection.INCOMING.equals(pair.getValue().getDirection())) + .forEach(pair -> destFilterQuery.should(getAggregationFilter(pair))); + + FilterAggregationBuilder sourceRelationshipTypeFilters = + AggregationBuilders.filter(FILTER_BY_SOURCE_RELATIONSHIP, sourceFilterQuery); + FilterAggregationBuilder destRelationshipTypeFilters = + AggregationBuilders.filter(FILTER_BY_DESTINATION_RELATIONSHIP, destFilterQuery); + TermsAggregationBuilder sourceAgg = + AggregationBuilders.terms(GROUP_BY_SOURCE_AGG) + .field(SOURCE + ".urn") + .size( + graphQueryConfiguration + .getBatchSize()); // Number of buckets can be up to batch size per query for + // each + + TermsAggregationBuilder destAgg = + AggregationBuilders.terms(GROUP_BY_DESTINATION_AGG) + .field(DESTINATION + ".urn") + .size(graphQueryConfiguration.getBatchSize()); + + TopHitsAggregationBuilder topHitsAgg = + AggregationBuilders.topHits(TOP_DOCUMENTS_AGG) + .size(count) + .sort(SCORE_FIELD, SortOrder.DESC); + sourceAgg.subAggregation(topHitsAgg); + destAgg.subAggregation(topHitsAgg); + + sourceRelationshipTypeFilters.subAggregation(sourceAgg); + destRelationshipTypeFilters.subAggregation(destAgg); + searchSourceBuilder.aggregation(sourceRelationshipTypeFilters); + searchSourceBuilder.aggregation(destRelationshipTypeFilters); searchRequest.source(searchSourceBuilder); searchRequest.indices(indexConvention.getIndexName(INDEX_NAME)); - try (Timer.Context ignored = MetricUtils.timer(this.getClass(), "esQuery").time()) { + try (Timer.Context ignored = + MetricUtils.timer(this.getClass(), "esLineageGroupByQuery").time()) { MetricUtils.counter(this.getClass(), SEARCH_EXECUTIONS_METRIC).inc(); return client.search(searchRequest, RequestOptions.DEFAULT); } catch (Exception e) { @@ -167,6 +226,21 @@ private SearchResponse executeLineageSearchQuery( } } + private BoolQueryBuilder getAggregationFilter(Pair pair) { + BoolQueryBuilder subFilter = QueryBuilders.boolQuery(); + TermQueryBuilder relationshipTypeTerm = + QueryBuilders.termQuery(RELATIONSHIP_TYPE, pair.getValue().getType()); + subFilter.must(relationshipTypeTerm); + TermQueryBuilder sourceTypeTerm = + QueryBuilders.termQuery(SOURCE + ".entityType", pair.getKey()); + subFilter.must(sourceTypeTerm); + TermQueryBuilder destinationTypeTerm = + QueryBuilders.termQuery( + DESTINATION + ".entityType", pair.getValue().getOpposingEntityType()); + subFilter.must(destinationTypeTerm); + return subFilter; + } + public SearchResponse getSearchResponse( @Nullable final List sourceTypes, @Nonnull final Filter sourceEntityFilter, @@ -412,16 +486,7 @@ private Stream processOneHopLineage( intermediateStream = Stream.concat(intermediateStream, ignoreAsHopUrns); } } - // We limit after adding all the relationships at the previous level so each hop is fully - // returned, - // but we only explore a limited number of entities per hop, sort to make the truncation - // consistent - if (lineageFlags.getEntitiesExploredPerHopLimit() != null) { - intermediateStream = - intermediateStream - .sorted(Comparator.comparing(Urn::toString)) - .limit(lineageFlags.getEntitiesExploredPerHopLimit()); - } + if (remainingHops > 0) { // If there are hops remaining, we expect to explore everything getting passed back to the // loop, barring a timeout @@ -537,11 +602,6 @@ private List getLineageRelationships( Collectors.toMap( Function.identity(), entityType -> lineageRegistry.getLineageRelationships(entityType, direction))); - - QueryBuilder finalQuery = - getLineageQuery(urnsPerEntityType, edgesPerEntityType, graphFilters, lineageFlags); - SearchResponse response = - executeLineageSearchQuery(finalQuery, 0, graphQueryConfiguration.getMaxResult()); Set entityUrnSet = new HashSet<>(entityUrns); // Get all valid edges given the set of urns to hop from Set> validEdges = @@ -550,16 +610,37 @@ private List getLineageRelationships( entry -> entry.getValue().stream().map(edgeInfo -> Pair.of(entry.getKey(), edgeInfo))) .collect(Collectors.toSet()); - return extractRelationships( - entityUrnSet, - response, - validEdges, - visitedEntities, - viaEntities, - numHops, - remainingHops, - existingPaths, - exploreMultiplePaths); + + QueryBuilder finalQuery = + getLineageQuery(urnsPerEntityType, edgesPerEntityType, graphFilters, lineageFlags); + SearchResponse response; + if (lineageFlags != null && lineageFlags.getEntitiesExploredPerHopLimit() != null) { + response = + executeGroupByLineageSearchQuery( + finalQuery, 0, lineageFlags.getEntitiesExploredPerHopLimit(), validEdges); + return extractRelationshipsGroupByQuery( + entityUrnSet, + response, + validEdges, + visitedEntities, + viaEntities, + numHops, + remainingHops, + existingPaths, + exploreMultiplePaths); + } else { + response = executeLineageSearchQuery(finalQuery, 0, graphQueryConfiguration.getMaxResult()); + return extractRelationships( + entityUrnSet, + response, + validEdges, + visitedEntities, + viaEntities, + numHops, + remainingHops, + existingPaths, + exploreMultiplePaths); + } } @VisibleForTesting @@ -756,158 +837,20 @@ private static List extractRelationships( log.debug("numHits: {}, numHops {}, remainingHops {}", hits.length, numHops, remainingHops); int index = -1; for (SearchHit hit : hits) { - index++; - final Map document = hit.getSourceAsMap(); - final Urn sourceUrn = - UrnUtils.getUrn(((Map) document.get(SOURCE)).get("urn").toString()); - final Urn destinationUrn = - UrnUtils.getUrn( - ((Map) document.get(DESTINATION)).get("urn").toString()); - final String type = document.get(RELATIONSHIP_TYPE).toString(); - if (sourceUrn.equals(destinationUrn)) { - log.debug("Skipping a self-edge of type {} on {}", type, sourceUrn); - continue; - } - final Number createdOnNumber = (Number) document.getOrDefault(CREATED_ON, null); - final Long createdOn = createdOnNumber != null ? createdOnNumber.longValue() : null; - final Number updatedOnNumber = (Number) document.getOrDefault(UPDATED_ON, null); - final Long updatedOn = updatedOnNumber != null ? updatedOnNumber.longValue() : null; - final String createdActorString = (String) document.getOrDefault(CREATED_ACTOR, null); - final Urn createdActor = - createdActorString == null ? null : UrnUtils.getUrn(createdActorString); - final String updatedActorString = (String) document.getOrDefault(UPDATED_ACTOR, null); - final Urn updatedActor = - updatedActorString == null ? null : UrnUtils.getUrn(updatedActorString); - final Map properties; - if (document.containsKey(PROPERTIES) && document.get(PROPERTIES) instanceof Map) { - properties = (Map) document.get(PROPERTIES); - } else { - properties = Collections.emptyMap(); - } - boolean isManual = properties.containsKey(SOURCE) && properties.get(SOURCE).equals("UI"); - Urn viaEntity = null; - String viaContent = (String) document.getOrDefault(EDGE_FIELD_VIA, null); - if (viaContent != null) { - try { - viaEntity = Urn.createFromString(viaContent); - } catch (Exception e) { - log.warn( - "Failed to parse urn from via entity {}, will swallow exception and continue...", - viaContent); - } - } - log.debug("{}: viaEntity {}", index, viaEntity); - - // Potential outgoing edge - if (entityUrns.contains(sourceUrn)) { - log.debug("{}: entity urns contains source urn {}", index, sourceUrn); - // Skip if already visited or if we're exploring multiple paths - // Skip if edge is not a valid outgoing edge - if ((exploreMultiplePaths || !visitedEntities.contains(destinationUrn)) - && validEdges.contains( - Pair.of( - sourceUrn.getEntityType(), - new EdgeInfo( - type, - RelationshipDirection.OUTGOING, - destinationUrn.getEntityType().toLowerCase())))) { - - if (visitedEntities.contains(destinationUrn)) { - log.debug("Found a second path to the same urn {}", destinationUrn); - } - // Append the edge to a set of unique graph paths. - if (addEdgeToPaths(existingPaths, sourceUrn, viaEntity, destinationUrn)) { - final LineageRelationship relationship = - createLineageRelationship( - type, - destinationUrn, - numHops, - existingPaths.getOrDefault(destinationUrn, new UrnArrayArray()), - // Fetch the paths to the next level entity. - createdOn, - createdActor, - updatedOn, - updatedActor, - isManual); - log.debug("Adding relationship {} to urn {}", relationship, destinationUrn); - lineageRelationshipMap.put(relationship.getEntity(), relationship); - if ((viaEntity != null) && (!viaEntities.contains(viaEntity))) { - UrnArrayArray viaPaths = getViaPaths(existingPaths, destinationUrn, viaEntity); - LineageRelationship viaRelationship = - createLineageRelationship( - type, - viaEntity, - numHops, - viaPaths, - createdOn, - createdActor, - updatedOn, - updatedActor, - isManual); - viaEntities.add(viaEntity); - lineageRelationshipMap.put(viaRelationship.getEntity(), viaRelationship); - log.debug("Adding via entity {} with paths {}", viaEntity, viaPaths); - } - } - visitedEntities.add(destinationUrn); - } - } - - // Potential incoming edge - if (entityUrns.contains(destinationUrn)) { - // Skip if already visited or if we're exploring multiple paths - // Skip if edge is not a valid outgoing edge - log.debug("entity urns contains destination urn {}", destinationUrn); - if ((exploreMultiplePaths || !visitedEntities.contains(sourceUrn)) - && validEdges.contains( - Pair.of( - destinationUrn.getEntityType(), - new EdgeInfo( - type, - RelationshipDirection.INCOMING, - sourceUrn.getEntityType().toLowerCase())))) { - if (visitedEntities.contains(sourceUrn)) { - log.debug("Found a second path to the same urn {}", sourceUrn); - } - visitedEntities.add(sourceUrn); - // Append the edge to a set of unique graph paths. - if (addEdgeToPaths(existingPaths, destinationUrn, viaEntity, sourceUrn)) { - log.debug("Adding incoming edge: {}, {}, {}", destinationUrn, viaEntity, sourceUrn); - final LineageRelationship relationship = - createLineageRelationship( - type, - sourceUrn, - numHops, - existingPaths.getOrDefault(sourceUrn, new UrnArrayArray()), - // Fetch the paths to the next level entity. - createdOn, - createdActor, - updatedOn, - updatedActor, - isManual); - log.debug("Adding relationship {} to urn {}", relationship, sourceUrn); - lineageRelationshipMap.put(relationship.getEntity(), relationship); - if ((viaEntity != null) && (!viaEntities.contains(viaEntity))) { - UrnArrayArray viaPaths = getViaPaths(existingPaths, sourceUrn, viaEntity); - viaEntities.add(viaEntity); - LineageRelationship viaRelationship = - createLineageRelationship( - type, - viaEntity, - numHops, - viaPaths, - createdOn, - createdActor, - updatedOn, - updatedActor, - isManual); - lineageRelationshipMap.put(viaRelationship.getEntity(), viaRelationship); - log.debug("Adding via relationship {} to urn {}", viaRelationship, viaEntity); - } - } - } - } + processSearchHit( + hit, + entityUrns, + index, + exploreMultiplePaths, + visitedEntities, + validEdges, + existingPaths, + numHops, + false, + lineageRelationshipMap, + viaEntities); } + List result = new ArrayList<>(lineageRelationshipMap.values()); log.debug("Number of lineage relationships in list: {}", result.size()); return result; @@ -919,6 +862,261 @@ private static List extractRelationships( } } + private static void processSearchHit( + SearchHit hit, + Set entityUrns, + int index, + boolean exploreMultiplePaths, + Set visitedEntities, + Set> validEdges, + Map existingPaths, + int numHops, + boolean truncatedChildren, + Map lineageRelationshipMap, + Set viaEntities) { + index++; + // Extract fields + final Map document = hit.getSourceAsMap(); + final Urn sourceUrn = + UrnUtils.getUrn(((Map) document.get(SOURCE)).get("urn").toString()); + final Urn destinationUrn = + UrnUtils.getUrn(((Map) document.get(DESTINATION)).get("urn").toString()); + final String type = document.get(RELATIONSHIP_TYPE).toString(); + if (sourceUrn.equals(destinationUrn)) { + log.debug("Skipping a self-edge of type {} on {}", type, sourceUrn); + return; + } + final Number createdOnNumber = (Number) document.getOrDefault(CREATED_ON, null); + final Long createdOn = createdOnNumber != null ? createdOnNumber.longValue() : null; + final Number updatedOnNumber = (Number) document.getOrDefault(UPDATED_ON, null); + final Long updatedOn = updatedOnNumber != null ? updatedOnNumber.longValue() : null; + final String createdActorString = (String) document.getOrDefault(CREATED_ACTOR, null); + final Urn createdActor = + createdActorString == null ? null : UrnUtils.getUrn(createdActorString); + final String updatedActorString = (String) document.getOrDefault(UPDATED_ACTOR, null); + final Urn updatedActor = + updatedActorString == null ? null : UrnUtils.getUrn(updatedActorString); + final Map properties; + if (document.containsKey(PROPERTIES) && document.get(PROPERTIES) instanceof Map) { + properties = (Map) document.get(PROPERTIES); + } else { + properties = Collections.emptyMap(); + } + boolean isManual = properties.containsKey(SOURCE) && properties.get(SOURCE).equals("UI"); + Urn viaEntity = null; + String viaContent = (String) document.getOrDefault(EDGE_FIELD_VIA, null); + if (viaContent != null) { + try { + viaEntity = Urn.createFromString(viaContent); + } catch (Exception e) { + log.warn( + "Failed to parse urn from via entity {}, will swallow exception and continue...", + viaContent); + } + } + log.debug("{}: viaEntity {}", index, viaEntity); + + // Potential outgoing edge + if (entityUrns.contains(sourceUrn)) { + processOutgoingEdge( + entityUrns, + sourceUrn, + index, + exploreMultiplePaths, + visitedEntities, + destinationUrn, + validEdges, + type, + existingPaths, + viaEntity, + numHops, + createdOn, + createdActor, + updatedOn, + updatedActor, + isManual, + truncatedChildren, + lineageRelationshipMap, + viaEntities); + } + + // Potential incoming edge + if (entityUrns.contains(destinationUrn)) { + processIncomingEdge( + entityUrns, + sourceUrn, + exploreMultiplePaths, + visitedEntities, + destinationUrn, + validEdges, + type, + existingPaths, + viaEntity, + numHops, + createdOn, + createdActor, + updatedOn, + updatedActor, + isManual, + truncatedChildren, + lineageRelationshipMap, + viaEntities); + } + } + + private static void processOutgoingEdge( + Set entityUrns, + Urn sourceUrn, + int index, + boolean exploreMultiplePaths, + Set visitedEntities, + Urn destinationUrn, + Set> validEdges, + String type, + Map existingPaths, + Urn viaEntity, + int numHops, + Long createdOn, + Urn createdActor, + Long updatedOn, + Urn updatedActor, + boolean isManual, + boolean truncatedChildren, + Map lineageRelationshipMap, + Set viaEntities) { + if (entityUrns.contains(sourceUrn)) { + log.debug("{}: entity urns contains source urn {}", index, sourceUrn); + // Skip if already visited or if we're exploring multiple paths + // Skip if edge is not a valid outgoing edge + if ((exploreMultiplePaths || !visitedEntities.contains(destinationUrn)) + && validEdges.contains( + Pair.of( + sourceUrn.getEntityType(), + new EdgeInfo( + type, + RelationshipDirection.OUTGOING, + destinationUrn.getEntityType().toLowerCase())))) { + + if (visitedEntities.contains(destinationUrn)) { + log.debug("Found a second path to the same urn {}", destinationUrn); + } + // Append the edge to a set of unique graph paths. + if (addEdgeToPaths(existingPaths, sourceUrn, viaEntity, destinationUrn)) { + final LineageRelationship relationship = + createLineageRelationship( + type, + destinationUrn, + numHops, + existingPaths.getOrDefault(destinationUrn, new UrnArrayArray()), + // Fetch the paths to the next level entity. + createdOn, + createdActor, + updatedOn, + updatedActor, + isManual, + truncatedChildren); + log.debug("Adding relationship {} to urn {}", relationship, destinationUrn); + lineageRelationshipMap.put(relationship.getEntity(), relationship); + if ((viaEntity != null) && (!viaEntities.contains(viaEntity))) { + UrnArrayArray viaPaths = getViaPaths(existingPaths, destinationUrn, viaEntity); + LineageRelationship viaRelationship = + createLineageRelationship( + type, + viaEntity, + numHops, + viaPaths, + createdOn, + createdActor, + updatedOn, + updatedActor, + isManual, + truncatedChildren); + viaEntities.add(viaEntity); + lineageRelationshipMap.put(viaRelationship.getEntity(), viaRelationship); + log.debug("Adding via entity {} with paths {}", viaEntity, viaPaths); + } + } + visitedEntities.add(destinationUrn); + } + } + } + + private static void processIncomingEdge( + Set entityUrns, + Urn sourceUrn, + boolean exploreMultiplePaths, + Set visitedEntities, + Urn destinationUrn, + Set> validEdges, + String type, + Map existingPaths, + Urn viaEntity, + int numHops, + Long createdOn, + Urn createdActor, + Long updatedOn, + Urn updatedActor, + boolean isManual, + boolean truncatedChildren, + Map lineageRelationshipMap, + Set viaEntities) { + if (entityUrns.contains(destinationUrn)) { + // Skip if already visited or if we're exploring multiple paths + // Skip if edge is not a valid outgoing edge + log.debug("entity urns contains destination urn {}", destinationUrn); + if ((exploreMultiplePaths || !visitedEntities.contains(sourceUrn)) + && validEdges.contains( + Pair.of( + destinationUrn.getEntityType(), + new EdgeInfo( + type, + RelationshipDirection.INCOMING, + sourceUrn.getEntityType().toLowerCase())))) { + if (visitedEntities.contains(sourceUrn)) { + log.debug("Found a second path to the same urn {}", sourceUrn); + } + visitedEntities.add(sourceUrn); + // Append the edge to a set of unique graph paths. + if (addEdgeToPaths(existingPaths, destinationUrn, viaEntity, sourceUrn)) { + log.debug("Adding incoming edge: {}, {}, {}", destinationUrn, viaEntity, sourceUrn); + final LineageRelationship relationship = + createLineageRelationship( + type, + sourceUrn, + numHops, + existingPaths.getOrDefault(sourceUrn, new UrnArrayArray()), + // Fetch the paths to the next level entity. + createdOn, + createdActor, + updatedOn, + updatedActor, + isManual, + truncatedChildren); + log.debug("Adding relationship {} to urn {}", relationship, sourceUrn); + lineageRelationshipMap.put(relationship.getEntity(), relationship); + if ((viaEntity != null) && (!viaEntities.contains(viaEntity))) { + UrnArrayArray viaPaths = getViaPaths(existingPaths, sourceUrn, viaEntity); + viaEntities.add(viaEntity); + LineageRelationship viaRelationship = + createLineageRelationship( + type, + viaEntity, + numHops, + viaPaths, + createdOn, + createdActor, + updatedOn, + updatedActor, + isManual, + truncatedChildren); + lineageRelationshipMap.put(viaRelationship.getEntity(), viaRelationship); + log.debug("Adding via relationship {} to urn {}", viaRelationship, viaEntity); + } + } + } + } + } + private static UrnArrayArray getViaPaths( Map existingPaths, Urn destinationUrn, Urn viaEntity) { UrnArrayArray destinationPaths = @@ -946,7 +1144,8 @@ private static LineageRelationship createLineageRelationship( @Nullable final Urn createdActor, @Nullable final Long updatedOn, @Nullable final Urn updatedActor, - final boolean isManual) { + final boolean isManual, + final boolean truncatedChildren) { final LineageRelationship relationship = new LineageRelationship() .setType(type) @@ -967,9 +1166,88 @@ private static LineageRelationship createLineageRelationship( relationship.setUpdatedActor(updatedActor); } relationship.setIsManual(isManual); + relationship.setTruncatedChildren(truncatedChildren); return relationship; } + @WithSpan + private static List extractRelationshipsGroupByQuery( + @Nonnull Set entityUrns, + @Nonnull SearchResponse searchResponse, + Set> validEdges, + Set visitedEntities, + Set viaEntities, + int numHops, + int remainingHops, + Map existingPaths, + boolean exploreMultiplePaths) { + try { + Map lineageRelationshipMap = new HashMap<>(); + ParsedFilter sourceFilterAgg = + searchResponse.getAggregations().get(FILTER_BY_SOURCE_RELATIONSHIP); + ParsedStringTerms sourceTermsAgg = sourceFilterAgg.getAggregations().get(GROUP_BY_SOURCE_AGG); + SearchHit[] hits = new SearchHit[0]; + List sourceBuckets = + (List) sourceTermsAgg.getBuckets(); + int index = -1; + for (ParsedStringTerms.ParsedBucket bucket : sourceBuckets) { + ParsedTopHits topHits = bucket.getAggregations().get(TOP_DOCUMENTS_AGG); + SearchHit[] topHitsArray = topHits.getHits().getHits(); + boolean truncatedChildren = topHits.getHits().getTotalHits().value > topHitsArray.length; + for (SearchHit hit : topHitsArray) { + processSearchHit( + hit, + entityUrns, + index, + exploreMultiplePaths, + visitedEntities, + validEdges, + existingPaths, + numHops, + truncatedChildren, + lineageRelationshipMap, + viaEntities); + } + } + + ParsedFilter destFilterAgg = + searchResponse.getAggregations().get(FILTER_BY_DESTINATION_RELATIONSHIP); + ParsedStringTerms destTermsAgg = + destFilterAgg.getAggregations().get(GROUP_BY_DESTINATION_AGG); + List destBuckets = + (List) destTermsAgg.getBuckets(); + for (ParsedStringTerms.ParsedBucket bucket : destBuckets) { + ParsedTopHits topHits = bucket.getAggregations().get(TOP_DOCUMENTS_AGG); + SearchHit[] topHitsArray = topHits.getHits().getHits(); + boolean truncatedChildren = topHits.getHits().getTotalHits().value > topHitsArray.length; + for (SearchHit hit : topHitsArray) { + processSearchHit( + hit, + entityUrns, + index, + exploreMultiplePaths, + visitedEntities, + validEdges, + existingPaths, + numHops, + truncatedChildren, + lineageRelationshipMap, + viaEntities); + } + } + log.debug("numHits: {}, numHops {}, remainingHops {}", hits.length, numHops, remainingHops); + + List result = new ArrayList<>(lineageRelationshipMap.values()); + log.debug("Number of lineage relationships in list: {}", result.size()); + return result; + } catch (Exception e) { + // This exception handler merely exists to log the exception at an appropriate point and + // rethrow + log.error("Caught exception", e); + throw e; + } + } + private static BoolQueryBuilder getOutGoingEdgeQuery( @Nonnull List urns, @Nonnull List outgoingEdges, diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/LineageSearchService.java b/metadata-io/src/main/java/com/linkedin/metadata/search/LineageSearchService.java index c06457768d725..95c8eb13beb93 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/LineageSearchService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/LineageSearchService.java @@ -745,6 +745,7 @@ private LineageSearchEntity buildLineageSearchEntity( entity.setDegrees(lineageRelationship.getDegrees()); } entity.setExplored(Boolean.TRUE.equals(lineageRelationship.isExplored())); + entity.setTruncatedChildren(Boolean.TRUE.equals(lineageRelationship.isTruncatedChildren())); entity.setIgnoredAsHop(Boolean.TRUE.equals(lineageRelationship.isIgnoredAsHop())); } return entity; diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/GraphServiceTestBase.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/GraphServiceTestBase.java index c151c1f381ce9..1aebc48153bbe 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/graph/GraphServiceTestBase.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/GraphServiceTestBase.java @@ -2163,7 +2163,7 @@ public void testHighlyConnectedGraphWalk() throws Exception { } assertEquals(new HashSet<>(relatedEntities.getEntities()), expectedRelatedEntities); - Urn root = UrnUtils.getUrn(relatedEntities.getEntities().get(0).getUrn()); + Urn root = dataset1Urn; EntityLineageResult lineageResult = getGraphService(false) .getLineage( @@ -2180,13 +2180,18 @@ public void testHighlyConnectedGraphWalk() throws Exception { 1000, 100, new LineageFlags().setEntitiesExploredPerHopLimit(5)); - assertEquals(lineageResult.getRelationships().size(), 19); + // Unable to explore all paths because multi is disabled, but will be at least 5 since it will + // explore 5 edges + assertTrue( + lineageResult.getRelationships().size() >= 5 + && lineageResult.getRelationships().size() < 20, + "Size was: " + lineageResult.getRelationships().size()); LineageRelationshipArray relationships = lineageResult.getRelationships(); int maxDegree = relationships.stream() .flatMap(relationship -> relationship.getDegrees().stream()) .reduce(0, Math::max); - assertEquals(maxDegree, 1); + assertTrue(maxDegree > 1); EntityLineageResult lineageResultMulti = getGraphService(true) @@ -2205,13 +2210,16 @@ public void testHighlyConnectedGraphWalk() throws Exception { 100, new LineageFlags().setEntitiesExploredPerHopLimit(5)); - assertEquals(lineageResultMulti.getRelationships().size(), 20); + assertTrue( + lineageResultMulti.getRelationships().size() >= 5 + && lineageResultMulti.getRelationships().size() <= 20, + "Size was: " + lineageResultMulti.getRelationships().size()); relationships = lineageResultMulti.getRelationships(); maxDegree = relationships.stream() .flatMap(relationship -> relationship.getDegrees().stream()) .reduce(0, Math::max); - assertTrue(maxDegree > 4); + assertTrue(maxDegree >= 2); // Reset graph service getGraphService(); diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/search/SearchGraphServiceTestBase.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/search/SearchGraphServiceTestBase.java index 23b4c82ca0566..d1ee1996e5b8a 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/graph/search/SearchGraphServiceTestBase.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/search/SearchGraphServiceTestBase.java @@ -430,7 +430,10 @@ public void testExplored() throws Exception { Assert.assertTrue(Boolean.TRUE.equals(result.getRelationships().get(0).isExplored())); EntityLineageResult result2 = getUpstreamLineage(dataset2Urn, null, null, 10, 0); - Assert.assertTrue(result2.getRelationships().get(0).isExplored() == null); + Assert.assertTrue(result2.getRelationships().isEmpty()); + + EntityLineageResult result3 = getUpstreamLineage(dataset2Urn, null, null, 10, 1); + Assert.assertTrue(result3.getRelationships().get(0).isExplored()); } /** diff --git a/metadata-models/src/main/pegasus/com/linkedin/metadata/graph/LineageRelationship.pdl b/metadata-models/src/main/pegasus/com/linkedin/metadata/graph/LineageRelationship.pdl index 552dd7323b551..7535c7e9292ec 100644 --- a/metadata-models/src/main/pegasus/com/linkedin/metadata/graph/LineageRelationship.pdl +++ b/metadata-models/src/main/pegasus/com/linkedin/metadata/graph/LineageRelationship.pdl @@ -73,6 +73,11 @@ record LineageRelationship { */ explored: optional boolean + /** + * Indicates this destination node has additional unexplored child relationships + */ + truncatedChildren: optional boolean + /** * Whether this relationship was ignored as a hop while performing the graph walk */ diff --git a/metadata-models/src/main/pegasus/com/linkedin/metadata/search/LineageSearchEntity.pdl b/metadata-models/src/main/pegasus/com/linkedin/metadata/search/LineageSearchEntity.pdl index 3fd8a48c6bf5e..3f246b5014df0 100644 --- a/metadata-models/src/main/pegasus/com/linkedin/metadata/search/LineageSearchEntity.pdl +++ b/metadata-models/src/main/pegasus/com/linkedin/metadata/search/LineageSearchEntity.pdl @@ -35,6 +35,11 @@ record LineageSearchEntity includes SearchEntity { */ explored: optional boolean + /** + * Indicates this destination node has additional unexplored child relationships + */ + truncatedChildren: optional boolean + /** * Whether this relationship was ignored as a hop while performing the graph walk */ diff --git a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.aspects.snapshot.json b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.aspects.snapshot.json index 00b434d30356f..eb81fe3ff8db3 100644 --- a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.aspects.snapshot.json +++ b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.aspects.snapshot.json @@ -843,10 +843,10 @@ "PRE" : "Designates pre-production fabrics", "PROD" : "Designates production fabrics", "QA" : "Designates quality assurance fabrics", + "RVW" : "Designates review fabrics", "STG" : "Designates staging fabrics", "TEST" : "Designates testing fabrics", - "UAT" : "Designates user acceptance testing fabrics", - "RVW" : "Designates review fabrics" + "UAT" : "Designates user acceptance testing fabrics" } }, { "type" : "record", @@ -2489,7 +2489,13 @@ }, { "name" : "lastModified", "type" : "com.linkedin.common.AuditStamp", - "doc" : "Audit stamp containing who last modified the status and when." + "doc" : "Audit stamp containing who last modified the status and when.", + "Searchable" : { + "/time" : { + "fieldName" : "statusLastModifiedAt", + "fieldType" : "COUNT" + } + } } ], "Aspect" : { "name" : "corpUserStatus" @@ -2861,8 +2867,9 @@ }, { "name" : "label", "type" : "string", - "doc" : "Label of the field. Provides a more human-readable name for the field than field path. Some sources will\nprovide this metadata but not all sources have the concept of a label. If just one string is associated with\na field in a source, that is most likely a description.", + "doc" : "Label of the field. Provides a more human-readable name for the field than field path. Some sources will\nprovide this metadata but not all sources have the concept of a label. If just one string is associated with\na field in a source, that is most likely a description.\n\nNote that this field is deprecated and is not surfaced in the UI.", "optional" : true, + "Deprecated" : true, "Searchable" : { "boostScore" : 0.2, "fieldName" : "fieldLabels", diff --git a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json index ffbcdd1b2adb3..38d91856f1536 100644 --- a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json +++ b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json @@ -843,10 +843,10 @@ "PRE" : "Designates pre-production fabrics", "PROD" : "Designates production fabrics", "QA" : "Designates quality assurance fabrics", + "RVW" : "Designates review fabrics", "STG" : "Designates staging fabrics", "TEST" : "Designates testing fabrics", - "UAT" : "Designates user acceptance testing fabrics", - "RVW" : "Designates review fabrics" + "UAT" : "Designates user acceptance testing fabrics" } }, { "type" : "record", @@ -2801,7 +2801,13 @@ }, { "name" : "lastModified", "type" : "com.linkedin.common.AuditStamp", - "doc" : "Audit stamp containing who last modified the status and when." + "doc" : "Audit stamp containing who last modified the status and when.", + "Searchable" : { + "/time" : { + "fieldName" : "statusLastModifiedAt", + "fieldType" : "COUNT" + } + } } ], "Aspect" : { "name" : "corpUserStatus" @@ -3249,8 +3255,9 @@ }, { "name" : "label", "type" : "string", - "doc" : "Label of the field. Provides a more human-readable name for the field than field path. Some sources will\nprovide this metadata but not all sources have the concept of a label. If just one string is associated with\na field in a source, that is most likely a description.", + "doc" : "Label of the field. Provides a more human-readable name for the field than field path. Some sources will\nprovide this metadata but not all sources have the concept of a label. If just one string is associated with\na field in a source, that is most likely a description.\n\nNote that this field is deprecated and is not surfaced in the UI.", "optional" : true, + "Deprecated" : true, "Searchable" : { "boostScore" : 0.2, "fieldName" : "fieldLabels", @@ -5344,7 +5351,12 @@ "items" : "com.linkedin.common.Urn" }, "doc" : "A specific set of users to apply the policy to (disjunctive)", - "optional" : true + "optional" : true, + "Searchable" : { + "/*" : { + "fieldType" : "URN" + } + } }, { "name" : "groups", "type" : { @@ -5352,7 +5364,12 @@ "items" : "com.linkedin.common.Urn" }, "doc" : "A specific set of groups to apply the policy to (disjunctive)", - "optional" : true + "optional" : true, + "Searchable" : { + "/*" : { + "fieldType" : "URN" + } + } }, { "name" : "resourceOwners", "type" : "boolean", @@ -5370,12 +5387,18 @@ "name" : "allUsers", "type" : "boolean", "doc" : "Whether the filter should apply to all users.", - "default" : false + "default" : false, + "Searchable" : { + "fieldType" : "BOOLEAN" + } }, { "name" : "allGroups", "type" : "boolean", "doc" : "Whether the filter should apply to all groups.", - "default" : false + "default" : false, + "Searchable" : { + "fieldType" : "BOOLEAN" + } }, { "name" : "roles", "type" : { @@ -5389,6 +5412,11 @@ "entityTypes" : [ "dataHubRole" ], "name" : "IsAssociatedWithRole" } + }, + "Searchable" : { + "/*" : { + "fieldType" : "URN" + } } } ] }, @@ -6211,6 +6239,11 @@ "type" : "boolean", "doc" : "Marks an entity as having been explored for as a part of the graph walk", "optional" : true + }, { + "name" : "truncatedChildren", + "type" : "boolean", + "doc" : "Indicates this destination node has additional unexplored child relationships", + "optional" : true }, { "name" : "ignoredAsHop", "type" : "boolean", diff --git a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.runs.snapshot.json b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.runs.snapshot.json index 0139072b2ae15..e1c8d3007d59d 100644 --- a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.runs.snapshot.json +++ b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.runs.snapshot.json @@ -585,10 +585,10 @@ "PRE" : "Designates pre-production fabrics", "PROD" : "Designates production fabrics", "QA" : "Designates quality assurance fabrics", + "RVW" : "Designates review fabrics", "STG" : "Designates staging fabrics", "TEST" : "Designates testing fabrics", - "UAT" : "Designates user acceptance testing fabrics", - "RVW" : "Designates review fabrics" + "UAT" : "Designates user acceptance testing fabrics" } }, { "type" : "record", @@ -2222,7 +2222,13 @@ }, { "name" : "lastModified", "type" : "com.linkedin.common.AuditStamp", - "doc" : "Audit stamp containing who last modified the status and when." + "doc" : "Audit stamp containing who last modified the status and when.", + "Searchable" : { + "/time" : { + "fieldName" : "statusLastModifiedAt", + "fieldType" : "COUNT" + } + } } ], "Aspect" : { "name" : "corpUserStatus" @@ -2594,8 +2600,9 @@ }, { "name" : "label", "type" : "string", - "doc" : "Label of the field. Provides a more human-readable name for the field than field path. Some sources will\nprovide this metadata but not all sources have the concept of a label. If just one string is associated with\na field in a source, that is most likely a description.", + "doc" : "Label of the field. Provides a more human-readable name for the field than field path. Some sources will\nprovide this metadata but not all sources have the concept of a label. If just one string is associated with\na field in a source, that is most likely a description.\n\nNote that this field is deprecated and is not surfaced in the UI.", "optional" : true, + "Deprecated" : true, "Searchable" : { "boostScore" : 0.2, "fieldName" : "fieldLabels", diff --git a/metadata-service/restli-api/src/main/snapshot/com.linkedin.lineage.relationships.snapshot.json b/metadata-service/restli-api/src/main/snapshot/com.linkedin.lineage.relationships.snapshot.json index 3886faffadedb..ba29f43dae0a6 100644 --- a/metadata-service/restli-api/src/main/snapshot/com.linkedin.lineage.relationships.snapshot.json +++ b/metadata-service/restli-api/src/main/snapshot/com.linkedin.lineage.relationships.snapshot.json @@ -182,6 +182,11 @@ "type" : "boolean", "doc" : "Marks this relationship as explored during the graph walk", "optional" : true + }, { + "name" : "truncatedChildren", + "type" : "boolean", + "doc" : "Indicates this destination node has additional unexplored child relationships", + "optional" : true }, { "name" : "ignoredAsHop", "type" : "boolean", diff --git a/metadata-service/restli-api/src/main/snapshot/com.linkedin.operations.operations.snapshot.json b/metadata-service/restli-api/src/main/snapshot/com.linkedin.operations.operations.snapshot.json index 1caeed2570317..8572ae2f07943 100644 --- a/metadata-service/restli-api/src/main/snapshot/com.linkedin.operations.operations.snapshot.json +++ b/metadata-service/restli-api/src/main/snapshot/com.linkedin.operations.operations.snapshot.json @@ -585,10 +585,10 @@ "PRE" : "Designates pre-production fabrics", "PROD" : "Designates production fabrics", "QA" : "Designates quality assurance fabrics", + "RVW" : "Designates review fabrics", "STG" : "Designates staging fabrics", "TEST" : "Designates testing fabrics", - "UAT" : "Designates user acceptance testing fabrics", - "RVW" : "Designates review fabrics" + "UAT" : "Designates user acceptance testing fabrics" } }, { "type" : "record", @@ -2216,7 +2216,13 @@ }, { "name" : "lastModified", "type" : "com.linkedin.common.AuditStamp", - "doc" : "Audit stamp containing who last modified the status and when." + "doc" : "Audit stamp containing who last modified the status and when.", + "Searchable" : { + "/time" : { + "fieldName" : "statusLastModifiedAt", + "fieldType" : "COUNT" + } + } } ], "Aspect" : { "name" : "corpUserStatus" @@ -2588,8 +2594,9 @@ }, { "name" : "label", "type" : "string", - "doc" : "Label of the field. Provides a more human-readable name for the field than field path. Some sources will\nprovide this metadata but not all sources have the concept of a label. If just one string is associated with\na field in a source, that is most likely a description.", + "doc" : "Label of the field. Provides a more human-readable name for the field than field path. Some sources will\nprovide this metadata but not all sources have the concept of a label. If just one string is associated with\na field in a source, that is most likely a description.\n\nNote that this field is deprecated and is not surfaced in the UI.", "optional" : true, + "Deprecated" : true, "Searchable" : { "boostScore" : 0.2, "fieldName" : "fieldLabels", diff --git a/metadata-service/restli-api/src/main/snapshot/com.linkedin.platform.platform.snapshot.json b/metadata-service/restli-api/src/main/snapshot/com.linkedin.platform.platform.snapshot.json index 1592333988b4c..bb32d6a870d48 100644 --- a/metadata-service/restli-api/src/main/snapshot/com.linkedin.platform.platform.snapshot.json +++ b/metadata-service/restli-api/src/main/snapshot/com.linkedin.platform.platform.snapshot.json @@ -843,10 +843,10 @@ "PRE" : "Designates pre-production fabrics", "PROD" : "Designates production fabrics", "QA" : "Designates quality assurance fabrics", + "RVW" : "Designates review fabrics", "STG" : "Designates staging fabrics", "TEST" : "Designates testing fabrics", - "UAT" : "Designates user acceptance testing fabrics", - "RVW" : "Designates review fabrics" + "UAT" : "Designates user acceptance testing fabrics" } }, { "type" : "record", @@ -2795,7 +2795,13 @@ }, { "name" : "lastModified", "type" : "com.linkedin.common.AuditStamp", - "doc" : "Audit stamp containing who last modified the status and when." + "doc" : "Audit stamp containing who last modified the status and when.", + "Searchable" : { + "/time" : { + "fieldName" : "statusLastModifiedAt", + "fieldType" : "COUNT" + } + } } ], "Aspect" : { "name" : "corpUserStatus" @@ -3243,8 +3249,9 @@ }, { "name" : "label", "type" : "string", - "doc" : "Label of the field. Provides a more human-readable name for the field than field path. Some sources will\nprovide this metadata but not all sources have the concept of a label. If just one string is associated with\na field in a source, that is most likely a description.", + "doc" : "Label of the field. Provides a more human-readable name for the field than field path. Some sources will\nprovide this metadata but not all sources have the concept of a label. If just one string is associated with\na field in a source, that is most likely a description.\n\nNote that this field is deprecated and is not surfaced in the UI.", "optional" : true, + "Deprecated" : true, "Searchable" : { "boostScore" : 0.2, "fieldName" : "fieldLabels", @@ -5338,7 +5345,12 @@ "items" : "com.linkedin.common.Urn" }, "doc" : "A specific set of users to apply the policy to (disjunctive)", - "optional" : true + "optional" : true, + "Searchable" : { + "/*" : { + "fieldType" : "URN" + } + } }, { "name" : "groups", "type" : { @@ -5346,7 +5358,12 @@ "items" : "com.linkedin.common.Urn" }, "doc" : "A specific set of groups to apply the policy to (disjunctive)", - "optional" : true + "optional" : true, + "Searchable" : { + "/*" : { + "fieldType" : "URN" + } + } }, { "name" : "resourceOwners", "type" : "boolean", @@ -5364,12 +5381,18 @@ "name" : "allUsers", "type" : "boolean", "doc" : "Whether the filter should apply to all users.", - "default" : false + "default" : false, + "Searchable" : { + "fieldType" : "BOOLEAN" + } }, { "name" : "allGroups", "type" : "boolean", "doc" : "Whether the filter should apply to all groups.", - "default" : false + "default" : false, + "Searchable" : { + "fieldType" : "BOOLEAN" + } }, { "name" : "roles", "type" : { @@ -5383,6 +5406,11 @@ "entityTypes" : [ "dataHubRole" ], "name" : "IsAssociatedWithRole" } + }, + "Searchable" : { + "/*" : { + "fieldType" : "URN" + } } } ] },