Skip to content

Commit

Permalink
feat(schemaField): populate schemaFields with side effects
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker committed Aug 26, 2024
1 parent 94e7706 commit dcb59e9
Show file tree
Hide file tree
Showing 141 changed files with 156,848 additions and 416 deletions.
7 changes: 7 additions & 0 deletions .github/workflows/docker-unified.yml
Original file line number Diff line number Diff line change
Expand Up @@ -956,6 +956,13 @@ jobs:
docker pull '${{ env.DATAHUB_ELASTIC_SETUP_IMAGE }}:head'
docker tag '${{ env.DATAHUB_ELASTIC_SETUP_IMAGE }}:head' '${{ env.DATAHUB_ELASTIC_SETUP_IMAGE }}:${{ needs.setup.outputs.unique_tag }}'
fi
if [ '${{ needs.setup.outputs.integrations_service_change }}' == 'false' ]; then
echo 'datahub-integration-service head images'
docker pull '${{ env.DATAHUB_INTEGRATIONS_IMAGE }}:head'
docker tag '${{ env.DATAHUB_INTEGRATIONS_IMAGE }}:head' '${{ env.DATAHUB_INTEGRATIONS_IMAGE }}:${{ needs.setup.outputs.unique_tag }}'
fi
- name: CI Slim Head Images
run: |
if [ '${{ needs.setup.outputs.ingestion_change }}' == 'false' ]; then
echo 'datahub-ingestion head-slim images'
docker pull '${{ env.DATAHUB_INGESTION_IMAGE }}:head-slim'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.datahub.authentication.Actor;
import com.datahub.authentication.Authentication;
import com.datahub.plugins.auth.authorization.Authorizer;
import com.linkedin.metadata.config.DataHubAppConfiguration;
import io.datahubproject.metadata.context.OperationContext;

/** Provided as input to GraphQL resolvers; used to carry information about GQL request context. */
Expand Down Expand Up @@ -31,4 +32,6 @@ default String getActorUrn() {
* @return Returns the operational context
*/
OperationContext getOperationContext();

DataHubAppConfiguration getDataHubAppConfig();
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.fasterxml.jackson.core.StreamReadConstraints;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableSet;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.template.StringArray;
import com.linkedin.datahub.graphql.QueryContext;
Expand All @@ -23,7 +22,6 @@
import com.linkedin.metadata.query.filter.CriterionArray;
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.search.utils.ESUtils;
import com.linkedin.metadata.search.utils.QueryUtils;
import com.linkedin.metadata.service.ViewService;
import com.linkedin.view.DataHubViewInfo;
import graphql.schema.DataFetchingEnvironment;
Expand Down Expand Up @@ -222,27 +220,6 @@ private static String getFilterField(
return ESUtils.toKeywordField(originalField, skipKeywordSuffix, aspectRetriever);
}

public static Filter buildFilterWithUrns(@Nonnull Set<Urn> urns, @Nullable Filter inputFilters) {
Criterion urnMatchCriterion =
new Criterion()
.setField("urn")
.setValue("")
.setValues(
new StringArray(urns.stream().map(Object::toString).collect(Collectors.toList())));
if (inputFilters == null) {
return QueryUtils.newFilter(urnMatchCriterion);
}

// Add urn match criterion to each or clause
if (inputFilters.getOr() != null && !inputFilters.getOr().isEmpty()) {
for (ConjunctiveCriterion conjunctiveCriterion : inputFilters.getOr()) {
conjunctiveCriterion.getAnd().add(urnMatchCriterion);
}
return inputFilters;
}
return QueryUtils.newFilter(urnMatchCriterion);
}

public static Filter viewFilter(
OperationContext opContext, ViewService viewService, String viewUrn) {
if (viewUrn == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ private CustomAssertionInfo createCustomAssertionInfo(

if (input.getFieldPath() != null) {
customAssertionInfo.setField(
SchemaFieldUtils.generateSchemaFieldUrn(entityUrn.toString(), input.getFieldPath()));
SchemaFieldUtils.generateSchemaFieldUrn(entityUrn, input.getFieldPath()));
}
return customAssertionInfo;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.linkedin.datahub.graphql.resolvers.dataproduct;

import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.bindArgument;
import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.buildFilterWithUrns;
import static com.linkedin.metadata.search.utils.QueryUtils.buildFilterWithUrns;

import com.google.common.collect.ImmutableList;
import com.linkedin.common.urn.Urn;
Expand All @@ -11,6 +11,8 @@
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
import com.linkedin.datahub.graphql.generated.DataProduct;
import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.datahub.graphql.generated.ExtraProperty;
import com.linkedin.datahub.graphql.generated.FacetFilterInput;
import com.linkedin.datahub.graphql.generated.SearchAcrossEntitiesInput;
import com.linkedin.datahub.graphql.generated.SearchResults;
import com.linkedin.datahub.graphql.resolvers.ResolverUtils;
Expand All @@ -30,8 +32,12 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -44,6 +50,7 @@
public class ListDataProductAssetsResolver
implements DataFetcher<CompletableFuture<SearchResults>> {

private static final String OUTPUT_PORTS_FILTER_FIELD = "isOutputPort";
private static final int DEFAULT_START = 0;
private static final int DEFAULT_COUNT = 10;

Expand All @@ -63,6 +70,7 @@ public CompletableFuture<SearchResults> get(DataFetchingEnvironment environment)

// 1. Get urns of assets belonging to Data Product using an aspect query
List<Urn> assetUrns = new ArrayList<>();
Set<String> outputPorts = Collections.EMPTY_SET;
try {
final EntityResponse entityResponse =
_entityClient.getV2(
Expand All @@ -86,6 +94,11 @@ public CompletableFuture<SearchResults> get(DataFetchingEnvironment environment)
dataProductProperties.getAssets().stream()
.map(DataProductAssociation::getDestinationUrn)
.collect(Collectors.toList()));
outputPorts =
dataProductProperties.getAssets().stream()
.filter(DataProductAssociation::isOutputPort)
.map(dpa -> dpa.getDestinationUrn().toString())
.collect(Collectors.toSet());
}
}
} catch (Exception e) {
Expand Down Expand Up @@ -117,6 +130,7 @@ public CompletableFuture<SearchResults> get(DataFetchingEnvironment environment)
final int start = input.getStart() != null ? input.getStart() : DEFAULT_START;
final int count = input.getCount() != null ? input.getCount() : DEFAULT_COUNT;

Set<String> finalOutputPorts = outputPorts;
return GraphQLConcurrencyUtils.supplyAsync(
() -> {
// if no assets in data product properties, exit early before search and return empty
Expand All @@ -130,13 +144,21 @@ public CompletableFuture<SearchResults> get(DataFetchingEnvironment environment)
return results;
}

List<FacetFilterInput> filters = input.getFilters();
final List<Urn> urnsToFilterOn = getUrnsToFilterOn(assetUrns, finalOutputPorts, filters);
// need to remove output ports filter so we don't send to elastic
if (filters != null) {
filters.removeIf(f -> f.getField().equals(OUTPUT_PORTS_FILTER_FIELD));
}
// add urns from the aspect to our filters
final Filter baseFilter =
ResolverUtils.buildFilter(
input.getFilters(),
filters,
input.getOrFilters(),
context.getOperationContext().getAspectRetriever());
final Filter finalFilter = buildFilterWithUrns(new HashSet<>(assetUrns), baseFilter);
final Filter finalFilter =
buildFilterWithUrns(
context.getDataHubAppConfig(), new HashSet<>(urnsToFilterOn), baseFilter);

final SearchFlags searchFlags;
com.linkedin.datahub.graphql.generated.SearchFlags inputFlags = input.getSearchFlags();
Expand All @@ -155,18 +177,34 @@ public CompletableFuture<SearchResults> get(DataFetchingEnvironment environment)
start,
count);

return UrnSearchResultsMapper.map(
context,
_entityClient.searchAcrossEntities(
context
.getOperationContext()
.withSearchFlags(flags -> searchFlags != null ? searchFlags : flags),
finalEntityNames,
sanitizedQuery,
finalFilter,
start,
count,
null));
SearchResults results =
UrnSearchResultsMapper.map(
context,
_entityClient.searchAcrossEntities(
context
.getOperationContext()
.withSearchFlags(flags -> searchFlags != null ? searchFlags : flags),
finalEntityNames,
sanitizedQuery,
finalFilter,
start,
count,
null,
null));
results
.getSearchResults()
.forEach(
searchResult -> {
if (finalOutputPorts.contains(searchResult.getEntity().getUrn())) {
if (searchResult.getExtraProperties() == null) {
searchResult.setExtraProperties(new ArrayList<>());
}
searchResult
.getExtraProperties()
.add(new ExtraProperty("isOutputPort", "true"));
}
});
return results;
} catch (Exception e) {
log.error(
"Failed to execute search for data product assets: entity types {}, query {}, filters: {}, start: {}, count: {}",
Expand All @@ -186,4 +224,37 @@ public CompletableFuture<SearchResults> get(DataFetchingEnvironment environment)
this.getClass().getSimpleName(),
"get");
}

/**
* Check to see if our filters list has a hardcoded filter for output ports. If so, let this
* filter determine which urns we filter search results on. Otherwise, if no output port filter is
* found, return all asset urns as per usual.
*/
@Nonnull
private List<Urn> getUrnsToFilterOn(
@Nonnull final List<Urn> assetUrns,
@Nonnull final Set<String> outputPortUrns,
@Nullable final List<FacetFilterInput> filters) {
Optional<FacetFilterInput> isOutputPort =
filters != null
? filters.stream()
.filter(f -> f.getField().equals(OUTPUT_PORTS_FILTER_FIELD))
.findFirst()
: Optional.empty();

// optionally get entities that explicitly are or are not output ports
List<Urn> urnsToFilterOn = assetUrns;
if (isOutputPort.isPresent()) {
if (isOutputPort.get().getValue().equals("true")) {
urnsToFilterOn = outputPortUrns.stream().map(UrnUtils::getUrn).collect(Collectors.toList());
} else {
urnsToFilterOn =
assetUrns.stream()
.filter(u -> !outputPortUrns.contains(u.toString()))
.collect(Collectors.toList());
}
}

return urnsToFilterOn;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@
import com.datahub.authentication.Authentication;
import com.datahub.authentication.post.PostService;
import com.linkedin.common.Media;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.authorization.AuthorizationUtils;
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
import com.linkedin.datahub.graphql.exception.AuthorizationException;
import com.linkedin.datahub.graphql.generated.CreatePostInput;
import com.linkedin.datahub.graphql.generated.PostContentType;
import com.linkedin.datahub.graphql.generated.PostType;
import com.linkedin.datahub.graphql.generated.UpdateMediaInput;
import com.linkedin.datahub.graphql.generated.UpdatePostContentInput;
import com.linkedin.datahub.graphql.generated.*;
import com.linkedin.metadata.utils.SchemaFieldUtils;
import com.linkedin.post.PostContent;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
Expand Down Expand Up @@ -46,6 +44,18 @@ public CompletableFuture<Boolean> get(final DataFetchingEnvironment environment)
final String description = content.getDescription();
final UpdateMediaInput updateMediaInput = content.getMedia();
final Authentication authentication = context.getAuthentication();
final String targetResource = input.getResourceUrn();
final String targetSubresource = input.getSubResource();

String targetUrn;
if (targetSubresource != null) {
targetUrn =
SchemaFieldUtils.generateSchemaFieldUrn(
UrnUtils.getUrn(targetResource), targetSubresource)
.toString();
} else {
targetUrn = targetResource;
}

Media media =
updateMediaInput == null
Expand All @@ -59,7 +69,7 @@ public CompletableFuture<Boolean> get(final DataFetchingEnvironment environment)
() -> {
try {
return _postService.createPost(
context.getOperationContext(), type.toString(), postContent);
context.getOperationContext(), type.toString(), postContent, targetUrn);
} catch (Exception e) {
throw new RuntimeException("Failed to create a new post", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ private SchemaFieldEntity createSchemaFieldEntity(
@Nonnull final com.linkedin.schema.SchemaField input, @Nonnull Urn entityUrn) {
SchemaFieldEntity schemaFieldEntity = new SchemaFieldEntity();
schemaFieldEntity.setUrn(
SchemaFieldUtils.generateSchemaFieldUrn(entityUrn.toString(), input.getFieldPath())
.toString());
SchemaFieldUtils.generateSchemaFieldUrn(entityUrn, input.getFieldPath()).toString());
schemaFieldEntity.setType(EntityType.SCHEMA_FIELD);
return schemaFieldEntity;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@

import com.linkedin.common.UrnArray;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.StringMap;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.AggregationMetadata;
import com.linkedin.datahub.graphql.generated.EntityPath;
import com.linkedin.datahub.graphql.generated.ExtraProperty;
import com.linkedin.datahub.graphql.generated.FacetMetadata;
import com.linkedin.datahub.graphql.generated.MatchedField;
import com.linkedin.datahub.graphql.generated.SearchResult;
Expand Down Expand Up @@ -35,7 +37,24 @@ public static SearchResult mapResult(
return new SearchResult(
UrnToEntityMapper.map(context, searchEntity.getEntity()),
getInsightsFromFeatures(searchEntity.getFeatures()),
getMatchedFieldEntry(context, searchEntity.getMatchedFields()));
getMatchedFieldEntry(context, searchEntity.getMatchedFields()),
getExtraProperties(searchEntity.getExtraFields()));
}

private static List<ExtraProperty> getExtraProperties(@Nullable StringMap extraFields) {
if (extraFields == null) {
return List.of();
} else {
return extraFields.entrySet().stream()
.map(
entry -> {
ExtraProperty extraProperty = new ExtraProperty();
extraProperty.setName(entry.getKey());
extraProperty.setValue(entry.getValue());
return extraProperty;
})
.collect(Collectors.toList());
}
}

public static FacetMetadata mapFacet(
Expand Down
15 changes: 15 additions & 0 deletions datahub-graphql-core/src/main/resources/entity.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -11274,6 +11274,21 @@ input CreatePostInput {
The content of the post
"""
content: UpdatePostContentInput!

"""
Optional target URN for the post
"""
resourceUrn: String

"""
An optional type of a sub resource to attach the Tag to
"""
subResourceType: SubResourceType

"""
Optional target subresource for the post
"""
subResource: String
}

"""
Expand Down
Loading

0 comments on commit dcb59e9

Please sign in to comment.