Skip to content

Commit

Permalink
Merge branch 'master' into business-attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
deepgarg-visa authored Mar 28, 2024
2 parents 8411393 + db668e8 commit 89d9e93
Show file tree
Hide file tree
Showing 169 changed files with 22,810 additions and 1,651 deletions.
10 changes: 5 additions & 5 deletions .github/workflows/airflow-plugin.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,16 @@ jobs:
extra_pip_requirements: "apache-airflow~=2.2.4"
extra_pip_extras: plugin-v1
- python-version: "3.10"
extra_pip_requirements: 'apache-airflow~=2.4.0 pluggy==1.0.0 "pendulum<3.0" "Flask-Session<0.6.0"'
extra_pip_extras: plugin-v2
extra_pip_requirements: "apache-airflow==2.4.3"
extra_pip_extras: plugin-v2,test-airflow24
- python-version: "3.10"
extra_pip_requirements: 'apache-airflow~=2.6.0 "pendulum<3.0" "Flask-Session<0.6.0"'
extra_pip_requirements: 'apache-airflow==2.6.3 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.6.3/constraints-3.10.txt'
extra_pip_extras: plugin-v2
- python-version: "3.10"
extra_pip_requirements: 'apache-airflow~=2.7.0 pydantic==2.4.2 "Flask-Session<0.6.0"'
extra_pip_requirements: 'apache-airflow==2.7.3 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.7.3/constraints-3.10.txt'
extra_pip_extras: plugin-v2
- python-version: "3.10"
extra_pip_requirements: 'apache-airflow>=2.8.0 pydantic>=2.4.2 "Flask-Session<0.6.0"'
extra_pip_requirements: 'apache-airflow==2.8.1 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.8.1/constraints-3.10.txt'
extra_pip_extras: plugin-v2
fail-fast: false
steps:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ jobs:
./gradlew build \
-x :metadata-ingestion:build \
-x :metadata-ingestion:check \
-x docs-website:build \
-x :docs-website:build \
-x :metadata-integration:java:spark-lineage:test \
-x :metadata-io:test \
-x :metadata-ingestion-modules:airflow-plugin:build \
Expand Down
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ project.ext.spec = [
'restliDocgen' : 'com.linkedin.pegasus:restli-docgen:' + pegasusVersion,
'restliServer' : 'com.linkedin.pegasus:restli-server:' + pegasusVersion,
'restliSpringBridge': 'com.linkedin.pegasus:restli-spring-bridge:' + pegasusVersion,
'restliTestUtils' : 'com.linkedin.pegasus:restli-client-testutils:' + pegasusVersion,
]
]
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.linkedin.common.EntityRelationship;
import com.linkedin.common.EntityRelationships;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.Assertion;
import com.linkedin.datahub.graphql.generated.Entity;
Expand All @@ -25,8 +26,10 @@
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;

/** GraphQL Resolver used for fetching the list of Assertions associated with an Entity. */
@Slf4j
public class EntityAssertionsResolver
implements DataFetcher<CompletableFuture<EntityAssertionsResult>> {

Expand All @@ -49,6 +52,8 @@ public CompletableFuture<EntityAssertionsResult> get(DataFetchingEnvironment env
final String entityUrn = ((Entity) environment.getSource()).getUrn();
final Integer start = environment.getArgumentOrDefault("start", 0);
final Integer count = environment.getArgumentOrDefault("count", 200);
final Boolean includeSoftDeleted =
environment.getArgumentOrDefault("includeSoftDeleted", false);

try {
// Step 1: Fetch set of assertions associated with the target entity from the Graph
Expand Down Expand Up @@ -84,6 +89,7 @@ public CompletableFuture<EntityAssertionsResult> get(DataFetchingEnvironment env
gmsResults.stream()
.filter(Objects::nonNull)
.map(r -> AssertionMapper.map(context, r))
.filter(assertion -> assertionExists(assertion, includeSoftDeleted, context))
.collect(Collectors.toList());

// Step 4: Package and return result
Expand All @@ -98,4 +104,17 @@ public CompletableFuture<EntityAssertionsResult> get(DataFetchingEnvironment env
}
});
}

private boolean assertionExists(
Assertion assertion, Boolean includeSoftDeleted, QueryContext context) {
try {
return _entityClient.exists(
UrnUtils.getUrn(assertion.getUrn()), includeSoftDeleted, context.getAuthentication());
} catch (RemoteInvocationException e) {
log.error(
String.format("Unable to check if assertion %s exists, ignoring it", assertion.getUrn()),
e);
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.datahub.authorization.AuthorizationConfiguration;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.template.SetMode;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.authorization.AuthorizationUtils;
import com.linkedin.datahub.graphql.generated.Entity;
Expand All @@ -16,6 +17,7 @@
import com.linkedin.datahub.graphql.generated.Restricted;
import com.linkedin.datahub.graphql.types.common.mappers.UrnToEntityMapper;
import com.linkedin.metadata.graph.SiblingGraphService;
import com.linkedin.metadata.query.LineageFlags;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import io.datahubproject.metadata.services.RestrictedService;
Expand Down Expand Up @@ -77,8 +79,9 @@ public CompletableFuture<EntityLineageResult> get(DataFetchingEnvironment enviro
1,
separateSiblings != null ? input.getSeparateSiblings() : false,
new HashSet<>(),
startTimeMillis,
endTimeMillis);
new LineageFlags()
.setStartTimeMillis(startTimeMillis, SetMode.REMOVE_IF_NULL)
.setEndTimeMillis(endTimeMillis, SetMode.REMOVE_IF_NULL));

Set<Urn> restrictedUrns = new HashSet<>();
entityLineageResult
Expand All @@ -96,7 +99,7 @@ public CompletableFuture<EntityLineageResult> get(DataFetchingEnvironment enviro
} catch (Exception e) {
log.error("Failed to fetch lineage for {}", finalUrn);
throw new RuntimeException(
String.format("Failed to fetch lineage for {}", finalUrn), e);
String.format("Failed to fetch lineage for %s", finalUrn), e);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
import com.linkedin.datahub.graphql.generated.ScrollAcrossLineageInput;
import com.linkedin.datahub.graphql.generated.ScrollAcrossLineageResults;
import com.linkedin.datahub.graphql.resolvers.ResolverUtils;
import com.linkedin.datahub.graphql.types.common.mappers.LineageFlagsInputMapper;
import com.linkedin.datahub.graphql.types.entitytype.EntityTypeMapper;
import com.linkedin.datahub.graphql.types.mappers.UrnScrollAcrossLineageResultsMapper;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.query.LineageFlags;
import com.linkedin.metadata.query.SearchFlags;
import com.linkedin.r2.RemoteInvocationException;
import graphql.schema.DataFetcher;
Expand Down Expand Up @@ -73,10 +75,19 @@ public CompletableFuture<ScrollAcrossLineageResults> get(DataFetchingEnvironment
String keepAlive = input.getKeepAlive() != null ? input.getKeepAlive() : "5m";

@Nullable
final Long startTimeMillis =
input.getStartTimeMillis() == null ? null : input.getStartTimeMillis();
Long startTimeMillis = input.getStartTimeMillis() == null ? null : input.getStartTimeMillis();
@Nullable
final Long endTimeMillis = input.getEndTimeMillis() == null ? null : input.getEndTimeMillis();
Long endTimeMillis = input.getEndTimeMillis() == null ? null : input.getEndTimeMillis();

final LineageFlags lineageFlags = LineageFlagsInputMapper.map(context, input.getLineageFlags());
if (lineageFlags.getStartTimeMillis() == null && startTimeMillis != null) {
lineageFlags.setStartTimeMillis(startTimeMillis);
}

if (lineageFlags.getEndTimeMillis() == null && endTimeMillis != null) {
lineageFlags.setEndTimeMillis(endTimeMillis);
}
;

com.linkedin.metadata.graph.LineageDirection resolvedDirection =
com.linkedin.metadata.graph.LineageDirection.valueOf(lineageDirection.toString());
Expand Down Expand Up @@ -110,7 +121,8 @@ public CompletableFuture<ScrollAcrossLineageResults> get(DataFetchingEnvironment
_entityClient.scrollAcrossLineage(
context
.getOperationContext()
.withSearchFlags(flags -> searchFlags != null ? searchFlags : flags),
.withSearchFlags(flags -> searchFlags != null ? searchFlags : flags)
.withLineageFlags(flags -> lineageFlags != null ? lineageFlags : flags),
urn,
resolvedDirection,
entityNames,
Expand All @@ -120,9 +132,7 @@ public CompletableFuture<ScrollAcrossLineageResults> get(DataFetchingEnvironment
null,
scrollId,
keepAlive,
count,
startTimeMillis,
endTimeMillis));
count));
} catch (RemoteInvocationException e) {
log.error(
"Failed to execute scroll across relationships: source urn {}, direction {}, entity types {}, query {}, filters: {}, start: {}, count: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
import com.linkedin.datahub.graphql.generated.SearchAcrossLineageInput;
import com.linkedin.datahub.graphql.generated.SearchAcrossLineageResults;
import com.linkedin.datahub.graphql.resolvers.ResolverUtils;
import com.linkedin.datahub.graphql.types.common.mappers.LineageFlagsInputMapper;
import com.linkedin.datahub.graphql.types.common.mappers.SearchFlagsInputMapper;
import com.linkedin.datahub.graphql.types.entitytype.EntityTypeMapper;
import com.linkedin.datahub.graphql.types.mappers.UrnSearchAcrossLineageResultsMapper;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.query.LineageFlags;
import com.linkedin.metadata.query.SearchFlags;
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.search.LineageSearchResult;
Expand Down Expand Up @@ -106,10 +108,18 @@ public CompletableFuture<SearchAcrossLineageResults> get(DataFetchingEnvironment
final Integer maxHops = getMaxHops(facetFilters);

@Nullable
final Long startTimeMillis =
input.getStartTimeMillis() == null ? null : input.getStartTimeMillis();
Long startTimeMillis = input.getStartTimeMillis() == null ? null : input.getStartTimeMillis();
@Nullable
final Long endTimeMillis = input.getEndTimeMillis() == null ? null : input.getEndTimeMillis();
Long endTimeMillis = input.getEndTimeMillis() == null ? null : input.getEndTimeMillis();

final LineageFlags lineageFlags = LineageFlagsInputMapper.map(context, input.getLineageFlags());
if (lineageFlags.getStartTimeMillis() == null && startTimeMillis != null) {
lineageFlags.setStartTimeMillis(startTimeMillis);
}

if (lineageFlags.getEndTimeMillis() == null && endTimeMillis != null) {
lineageFlags.setEndTimeMillis(endTimeMillis);
}

com.linkedin.metadata.graph.LineageDirection resolvedDirection =
com.linkedin.metadata.graph.LineageDirection.valueOf(lineageDirection.toString());
Expand Down Expand Up @@ -140,7 +150,10 @@ public CompletableFuture<SearchAcrossLineageResults> get(DataFetchingEnvironment
}
LineageSearchResult salResults =
_entityClient.searchAcrossLineage(
context.getOperationContext().withSearchFlags(flags -> searchFlags),
context
.getOperationContext()
.withSearchFlags(flags -> searchFlags)
.withLineageFlags(flags -> lineageFlags),
urn,
resolvedDirection,
entityNames,
Expand All @@ -149,9 +162,7 @@ public CompletableFuture<SearchAcrossLineageResults> get(DataFetchingEnvironment
filter,
null,
start,
count,
startTimeMillis,
endTimeMillis);
count);

return UrnSearchAcrossLineageResultsMapper.map(context, salResults);
} catch (RemoteInvocationException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.linkedin.assertion.AssertionInfo;
import com.linkedin.common.DataPlatformInstance;
import com.linkedin.common.Status;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.DataMap;
import com.linkedin.datahub.graphql.QueryContext;
Expand Down Expand Up @@ -56,6 +57,18 @@ public static Assertion map(@Nullable QueryContext context, final EntityResponse
result.setPlatform(unknownPlatform);
}

final EnvelopedAspect envelopedStatus = aspects.get(Constants.STATUS_ASPECT_NAME);
if (envelopedStatus != null) {
result.setStatus(mapStatus(new Status(envelopedStatus.getValue().data())));
}

return result;
}

private static com.linkedin.datahub.graphql.generated.Status mapStatus(Status status) {
final com.linkedin.datahub.graphql.generated.Status result =
new com.linkedin.datahub.graphql.generated.Status();
result.setRemoved(status.isRemoved());
return result;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.linkedin.datahub.graphql.types.common.mappers;

import com.linkedin.common.UrnArray;
import com.linkedin.common.UrnArrayMap;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.EntityTypeToPlatforms;
import com.linkedin.datahub.graphql.generated.LineageFlags;
import com.linkedin.datahub.graphql.types.entitytype.EntityTypeMapper;
import com.linkedin.datahub.graphql.types.mappers.ModelMapper;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/**
* Maps GraphQL SearchFlags to Pegasus
*
* <p>To be replaced by auto-generated mappers implementations
*/
public class LineageFlagsInputMapper
implements ModelMapper<LineageFlags, com.linkedin.metadata.query.LineageFlags> {

public static final LineageFlagsInputMapper INSTANCE = new LineageFlagsInputMapper();

@Nonnull
public static com.linkedin.metadata.query.LineageFlags map(
QueryContext queryContext, @Nonnull final LineageFlags lineageFlags) {
return INSTANCE.apply(queryContext, lineageFlags);
}

@Override
public com.linkedin.metadata.query.LineageFlags apply(
QueryContext context, @Nullable final LineageFlags lineageFlags) {
com.linkedin.metadata.query.LineageFlags result =
new com.linkedin.metadata.query.LineageFlags();
if (lineageFlags == null) {
return result;
}
if (lineageFlags.getIgnoreAsHops() != null) {
result.setIgnoreAsHops(mapIgnoreAsHops(lineageFlags.getIgnoreAsHops()));
}
if (lineageFlags.getEndTimeMillis() != null) {
result.setEndTimeMillis(lineageFlags.getEndTimeMillis());
}
if (lineageFlags.getStartTimeMillis() != null) {
result.setStartTimeMillis(lineageFlags.getStartTimeMillis());
}
if (lineageFlags.getEntitiesExploredPerHopLimit() != null) {
result.setEntitiesExploredPerHopLimit(lineageFlags.getEntitiesExploredPerHopLimit());
}
return result;
}

private static UrnArrayMap mapIgnoreAsHops(List<EntityTypeToPlatforms> ignoreAsHops) {
UrnArrayMap result = new UrnArrayMap();
ignoreAsHops.forEach(
ignoreAsHop ->
result.put(
EntityTypeMapper.getName(ignoreAsHop.getEntityType()),
new UrnArray(
Optional.ofNullable(ignoreAsHop.getPlatforms())
.orElse(Collections.emptyList())
.stream()
.map(UrnUtils::getUrn)
.collect(Collectors.toList()))));
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ public com.linkedin.datahub.graphql.generated.DataProcessRunEvent apply(
if (runEvent.hasResult()) {
result.setResult(DataProcessInstanceRunResultMapper.map(context, runEvent.getResult()));
}
if (runEvent.hasDurationMillis()) {
result.setDurationMillis(runEvent.getDurationMillis());
}

return result;
}
Expand Down
12 changes: 11 additions & 1 deletion datahub-graphql-core/src/main/resources/entity.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -1596,7 +1596,7 @@ type Dataset implements EntityWithRelationships & Entity & BrowsableEntity {
"""
Assertions associated with the Dataset
"""
assertions(start: Int, count: Int): EntityAssertionsResult
assertions(start: Int, count: Int, includeSoftDeleted: Boolean): EntityAssertionsResult

"""
Edges extending from this entity
Expand Down Expand Up @@ -6525,6 +6525,11 @@ type DataProcessRunEvent implements TimeSeriesAspect {
The timestamp associated with the run event in milliseconds
"""
timestampMillis: Long!

"""
The duration of the run in milliseconds
"""
durationMillis: Long
}

"""
Expand Down Expand Up @@ -7263,6 +7268,11 @@ type Assertion implements EntityWithRelationships & Entity {
"""
lineage(input: LineageInput!): EntityLineageResult

"""
Status metadata of the assertion
"""
status: Status

"""
Experimental API.
For fetching extra aspects that do not have custom UI code yet
Expand Down
Loading

0 comments on commit 89d9e93

Please sign in to comment.