Skip to content

Commit

Permalink
Merge branch 'master' into sql_common_bug_fix
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 committed Aug 5, 2022
2 parents 29918f0 + 8ac4361 commit a5089b8
Show file tree
Hide file tree
Showing 192 changed files with 11,765 additions and 812 deletions.
47 changes: 47 additions & 0 deletions .github/workflows/check-quickstart.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
name: check quickstart
on:
push:
branches:
- master
workflow_dispatch:

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true


jobs:
test-quickstart:
strategy:
fail-fast: false
matrix:
os: [ubuntu-20.04]
runs-on: ${{ matrix.os }}
steps:
- uses: actions/setup-python@v2
with:
python-version: "3.9.9"
- name: Install acryl-datahub
run: |
pip install --upgrade acryl-datahub
datahub version
python -c "import platform; print(platform.platform())"
- name: Run quickstart
run: |
datahub docker quickstart
- name: Ingest sample data
run: |
datahub docker ingest-sample-data
- name: See status
run: |
docker ps -a && datahub docker check
- name: store logs
if: failure()
run: |
docker logs datahub-gms >& quickstart-gms.log
- name: Upload logs
uses: actions/upload-artifact@v2
if: failure()
with:
name: docker-quickstart-logs-${{ matrix.os }}
path: "*.log"
4 changes: 2 additions & 2 deletions .github/workflows/close-stale-issues.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ jobs:
steps:
- uses: actions/stale@v5
with:
days-before-issue-stale: 90
days-before-issue-stale: 30
days-before-issue-close: 30
stale-issue-label: "stale"
stale-issue-message: "This issue is stale because it has been open for 90 days with no activity. If you believe this is still an issue on the latest DataHub release please leave a comment with the version that you tested it with. If this is a question/discussion please head to https://slack.datahubproject.io. For feature requests please use https://feature-requests.datahubproject.io"
stale-issue-message: "This issue is stale because it has been open for 30 days with no activity. If you believe this is still an issue on the latest DataHub release please leave a comment with the version that you tested it with. If this is a question/discussion please head to https://slack.datahubproject.io. For feature requests please use https://feature-requests.datahubproject.io"
close-issue-message: "This issue was closed because it has been inactive for 30 days since being marked as stale."
days-before-pr-stale: -1
days-before-pr-close: -1
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/publish-datahub-jars.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,7 @@ jobs:
echo signingKey=$SIGNING_KEY >> gradle.properties
./gradlew :metadata-integration:java:spark-lineage:printVersion
./gradlew :metadata-integration:java:spark-lineage:publishToMavenLocal
# Publish apache ranger plugin to maven
./gradlew :datahub-ranger-plugin:printVersion
./gradlew :datahub-ranger-plugin:publishMavenJavaPublicationToMavenLocal
#./gradlew :metadata-integration:java:datahub-client:closeAndReleaseRepository --info
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,15 @@
import com.linkedin.datahub.graphql.resolvers.mutate.AddTagsResolver;
import com.linkedin.datahub.graphql.resolvers.mutate.AddTermResolver;
import com.linkedin.datahub.graphql.resolvers.mutate.AddTermsResolver;
import com.linkedin.datahub.graphql.resolvers.mutate.MutableTypeBatchResolver;
import com.linkedin.datahub.graphql.resolvers.mutate.BatchAddOwnersResolver;
import com.linkedin.datahub.graphql.resolvers.mutate.BatchAddTagsResolver;
import com.linkedin.datahub.graphql.resolvers.mutate.BatchAddTermsResolver;
import com.linkedin.datahub.graphql.resolvers.mutate.BatchRemoveOwnersResolver;
import com.linkedin.datahub.graphql.resolvers.mutate.BatchRemoveTagsResolver;
import com.linkedin.datahub.graphql.resolvers.mutate.BatchRemoveTermsResolver;
import com.linkedin.datahub.graphql.resolvers.mutate.BatchUpdateDeprecationResolver;
import com.linkedin.datahub.graphql.resolvers.mutate.BatchSetDomainResolver;
import com.linkedin.datahub.graphql.resolvers.mutate.MutableTypeResolver;
import com.linkedin.datahub.graphql.resolvers.mutate.RemoveLinkResolver;
import com.linkedin.datahub.graphql.resolvers.mutate.RemoveOwnerResolver;
Expand Down Expand Up @@ -673,6 +682,7 @@ private String getUrnField(DataFetchingEnvironment env) {
private void configureMutationResolvers(final RuntimeWiring.Builder builder) {
builder.type("Mutation", typeWiring -> typeWiring
.dataFetcher("updateDataset", new MutableTypeResolver<>(datasetType))
.dataFetcher("updateDatasets", new MutableTypeBatchResolver<>(datasetType))
.dataFetcher("createTag", new CreateTagResolver(this.entityClient))
.dataFetcher("updateTag", new MutableTypeResolver<>(tagType))
.dataFetcher("setTagColor", new SetTagColorResolver(entityClient, entityService))
Expand All @@ -686,17 +696,23 @@ private void configureMutationResolvers(final RuntimeWiring.Builder builder) {
.dataFetcher("updateCorpGroupProperties", new MutableTypeResolver<>(corpGroupType))
.dataFetcher("addTag", new AddTagResolver(entityService))
.dataFetcher("addTags", new AddTagsResolver(entityService))
.dataFetcher("batchAddTags", new BatchAddTagsResolver(entityService))
.dataFetcher("removeTag", new RemoveTagResolver(entityService))
.dataFetcher("batchRemoveTags", new BatchRemoveTagsResolver(entityService))
.dataFetcher("addTerm", new AddTermResolver(entityService))
.dataFetcher("batchAddTerms", new BatchAddTermsResolver(entityService))
.dataFetcher("addTerms", new AddTermsResolver(entityService))
.dataFetcher("removeTerm", new RemoveTermResolver(entityService))
.dataFetcher("batchRemoveTerms", new BatchRemoveTermsResolver(entityService))
.dataFetcher("createPolicy", new UpsertPolicyResolver(this.entityClient))
.dataFetcher("updatePolicy", new UpsertPolicyResolver(this.entityClient))
.dataFetcher("deletePolicy", new DeletePolicyResolver(this.entityClient))
.dataFetcher("updateDescription", new UpdateDescriptionResolver(entityService))
.dataFetcher("addOwner", new AddOwnerResolver(entityService))
.dataFetcher("addOwners", new AddOwnersResolver(entityService))
.dataFetcher("batchAddOwners", new BatchAddOwnersResolver(entityService))
.dataFetcher("removeOwner", new RemoveOwnerResolver(entityService))
.dataFetcher("batchRemoveOwners", new BatchRemoveOwnersResolver(entityService))
.dataFetcher("addLink", new AddLinkResolver(entityService))
.dataFetcher("removeLink", new RemoveLinkResolver(entityService))
.dataFetcher("addGroupMembers", new AddGroupMembersResolver(this.groupService))
Expand All @@ -708,7 +724,9 @@ private void configureMutationResolvers(final RuntimeWiring.Builder builder) {
.dataFetcher("createDomain", new CreateDomainResolver(this.entityClient))
.dataFetcher("deleteDomain", new DeleteDomainResolver(entityClient))
.dataFetcher("setDomain", new SetDomainResolver(this.entityClient, this.entityService))
.dataFetcher("batchSetDomain", new BatchSetDomainResolver(this.entityService))
.dataFetcher("updateDeprecation", new UpdateDeprecationResolver(this.entityClient, this.entityService))
.dataFetcher("batchUpdateDeprecation", new BatchUpdateDeprecationResolver(entityService))
.dataFetcher("unsetDomain", new UnsetDomainResolver(this.entityClient, this.entityService))
.dataFetcher("createSecret", new CreateSecretResolver(this.entityClient, this.secretService))
.dataFetcher("deleteSecret", new DeleteSecretResolver(this.entityClient))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,26 @@
import com.linkedin.datahub.graphql.generated.IngestionConfig;
import com.linkedin.datahub.graphql.generated.IngestionSchedule;
import com.linkedin.datahub.graphql.generated.IngestionSource;
import com.linkedin.datahub.graphql.generated.StructuredReport;
import com.linkedin.datahub.graphql.types.common.mappers.StringMapMapper;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.EnvelopedAspect;
import com.linkedin.entity.EnvelopedAspectMap;
import com.linkedin.execution.ExecutionRequestInput;
import com.linkedin.execution.ExecutionRequestResult;
import com.linkedin.execution.ExecutionRequestSource;
import com.linkedin.execution.StructuredExecutionReport;
import com.linkedin.ingestion.DataHubIngestionSourceConfig;
import com.linkedin.ingestion.DataHubIngestionSourceInfo;
import com.linkedin.ingestion.DataHubIngestionSourceSchedule;
import com.linkedin.metadata.Constants;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import lombok.extern.slf4j.Slf4j;


@Slf4j
public class IngestionResolverUtils {

public static List<ExecutionRequest> mapExecutionRequests(final Collection<EntityResponse> requests) {
Expand Down Expand Up @@ -77,13 +81,28 @@ public static com.linkedin.datahub.graphql.generated.ExecutionRequestResult mapE
result.setStartTimeMs(execRequestResult.getStartTimeMs());
result.setDurationMs(execRequestResult.getDurationMs());
result.setReport(execRequestResult.getReport());
if (execRequestResult.hasStructuredReport()) {
result.setStructuredReport(mapStructuredReport(execRequestResult.getStructuredReport()));
}
return result;
}

public static StructuredReport mapStructuredReport(final StructuredExecutionReport structuredReport) {
StructuredReport structuredReportResult = new StructuredReport();
structuredReportResult.setType(structuredReport.getType());
structuredReportResult.setSerializedValue(structuredReport.getSerializedValue());
structuredReportResult.setContentType(structuredReport.getContentType());
return structuredReportResult;
}

public static List<IngestionSource> mapIngestionSources(final Collection<EntityResponse> entities) {
final List<IngestionSource> results = new ArrayList<>();
for (EntityResponse response : entities) {
results.add(mapIngestionSource(response));
try {
results.add(mapIngestionSource(response));
} catch (IllegalStateException e) {
log.error("Unable to map ingestion source, continuing to other sources.", e);
}
}
return results;
}
Expand All @@ -95,6 +114,10 @@ public static IngestionSource mapIngestionSource(final EntityResponse ingestionS
// There should ALWAYS be an info aspect.
final EnvelopedAspect envelopedInfo = aspects.get(Constants.INGESTION_INFO_ASPECT_NAME);

if (envelopedInfo == null) {
throw new IllegalStateException("No ingestion source info aspect exists for urn: " + entityUrn);
}

// Bind into a strongly typed object.
final DataHubIngestionSourceInfo ingestionSourceInfo = new DataHubIngestionSourceInfo(envelopedInfo.getValue().data());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ public CompletableFuture<String> get(final DataFetchingEnvironment environment)

Map<String, String> arguments = new HashMap<>();
arguments.put(RECIPE_ARG_NAME, input.getRecipe());
arguments.put(VERSION_ARG_NAME, _ingestionConfiguration.getDefaultCliVersion());
execInput.setArgs(new StringMap(arguments));

proposal.setEntityType(Constants.EXECUTION_REQUEST_ENTITY_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
import com.linkedin.mxe.MetadataChangeProposal;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import lombok.extern.slf4j.Slf4j;
import org.json.JSONException;
import org.json.JSONObject;

import java.net.URISyntaxException;
import java.util.Optional;
import java.util.UUID;
Expand All @@ -31,6 +35,7 @@
/**
* Creates or updates an ingestion source. Requires the MANAGE_INGESTION privilege.
*/
@Slf4j
public class UpsertIngestionSourceResolver implements DataFetcher<CompletableFuture<String>> {

private final EntityClient _entityClient;
Expand All @@ -51,6 +56,7 @@ public CompletableFuture<String> get(final DataFetchingEnvironment environment)
final UpdateIngestionSourceInput input = bindArgument(environment.getArgument("input"), UpdateIngestionSourceInput.class);

final MetadataChangeProposal proposal = new MetadataChangeProposal();
String ingestionSourceUrnString;

if (ingestionSourceUrn.isPresent()) {
// Update existing ingestion source
Expand All @@ -61,6 +67,7 @@ public CompletableFuture<String> get(final DataFetchingEnvironment environment)
String.format("Malformed urn %s provided.", ingestionSourceUrn.get()),
DataHubGraphQLErrorCode.BAD_REQUEST);
}
ingestionSourceUrnString = ingestionSourceUrn.get();
} else {
// Create new ingestion source
// Since we are creating a new Ingestion Source, we need to generate a unique UUID.
Expand All @@ -71,10 +78,11 @@ public CompletableFuture<String> get(final DataFetchingEnvironment environment)
final DataHubIngestionSourceKey key = new DataHubIngestionSourceKey();
key.setId(uuidStr);
proposal.setEntityKeyAspect(GenericRecordUtils.serializeAspect(key));
ingestionSourceUrnString = String.format("urn:li:dataHubIngestionSource:%s", uuidStr);
}

// Create the policy info.
final DataHubIngestionSourceInfo info = mapIngestionSourceInfo(input);
final DataHubIngestionSourceInfo info = mapIngestionSourceInfo(input, ingestionSourceUrnString);
proposal.setEntityType(Constants.INGESTION_SOURCE_ENTITY_NAME);
proposal.setAspectName(Constants.INGESTION_INFO_ASPECT_NAME);
proposal.setAspect(GenericRecordUtils.serializeAspect(info));
Expand All @@ -90,20 +98,24 @@ public CompletableFuture<String> get(final DataFetchingEnvironment environment)
});
}

private DataHubIngestionSourceInfo mapIngestionSourceInfo(final UpdateIngestionSourceInput input) {
private DataHubIngestionSourceInfo mapIngestionSourceInfo(final UpdateIngestionSourceInput input, final String ingestionSourceUrn) {
final DataHubIngestionSourceInfo result = new DataHubIngestionSourceInfo();
result.setType(input.getType());
result.setName(input.getName());
result.setConfig(mapConfig(input.getConfig()));
result.setConfig(mapConfig(input.getConfig(), ingestionSourceUrn));
if (input.getSchedule() != null) {
result.setSchedule(mapSchedule(input.getSchedule()));
}
return result;
}

private DataHubIngestionSourceConfig mapConfig(final UpdateIngestionSourceConfigInput input) {
private DataHubIngestionSourceConfig mapConfig(final UpdateIngestionSourceConfigInput input, final String ingestionSourceUrn) {
final DataHubIngestionSourceConfig result = new DataHubIngestionSourceConfig();
result.setRecipe(input.getRecipe());
String recipe = input.getRecipe();
if (recipe != null) {
recipe = optionallySetPipelineName(recipe, ingestionSourceUrn);
}
result.setRecipe(recipe);
if (input.getVersion() != null) {
result.setVersion(input.getVersion());
}
Expand All @@ -119,4 +131,19 @@ private DataHubIngestionSourceSchedule mapSchedule(final UpdateIngestionSourceSc
result.setTimezone(input.getTimezone());
return result;
}

private String optionallySetPipelineName(String recipe, String ingestionSourceUrn) {
try {
JSONObject jsonRecipe = new JSONObject(recipe);
boolean hasPipelineName = jsonRecipe.has("pipeline_name") && jsonRecipe.get("pipeline_name") != null && !jsonRecipe.get("pipeline_name").equals("");

if (!hasPipelineName) {
jsonRecipe.put("pipeline_name", ingestionSourceUrn);
recipe = jsonRecipe.toString();
}
} catch (JSONException e) {
log.warn("Error parsing ingestion recipe in JSON form", e);
}
return recipe;
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package com.linkedin.datahub.graphql.resolvers.mutate;

import com.google.common.collect.ImmutableList;
import com.linkedin.common.urn.CorpuserUrn;

import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.exception.AuthorizationException;
import com.linkedin.datahub.graphql.generated.AddOwnerInput;
import com.linkedin.datahub.graphql.generated.OwnerEntityType;
import com.linkedin.datahub.graphql.generated.OwnerInput;
import com.linkedin.datahub.graphql.generated.OwnershipType;
import com.linkedin.datahub.graphql.generated.ResourceRefInput;
import com.linkedin.datahub.graphql.resolvers.mutate.util.OwnerUtils;
import com.linkedin.metadata.entity.EntityService;
import graphql.schema.DataFetcher;
Expand Down Expand Up @@ -50,11 +53,9 @@ public CompletableFuture<Boolean> get(DataFetchingEnvironment environment) throw
log.debug("Adding Owner. input: {}", input.toString());

Urn actor = CorpuserUrn.createFromString(((QueryContext) environment.getContext()).getActorUrn());
OwnerUtils.addOwner(
ownerUrn,
// Assumption Alert: Assumes that GraphQL ownership type === GMS ownership type
com.linkedin.common.OwnershipType.valueOf(type.name()),
targetUrn,
OwnerUtils.addOwnersToResources(
ImmutableList.of(new OwnerInput(input.getOwnerUrn(), ownerEntityType, type)),
ImmutableList.of(new ResourceRefInput(input.getResourceUrn(), null, null)),
actor,
_entityService
);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package com.linkedin.datahub.graphql.resolvers.mutate;

import com.google.common.collect.ImmutableList;
import com.linkedin.common.urn.CorpuserUrn;

import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.exception.AuthorizationException;
import com.linkedin.datahub.graphql.generated.AddOwnersInput;
import com.linkedin.datahub.graphql.generated.OwnerInput;
import com.linkedin.datahub.graphql.generated.ResourceRefInput;
import com.linkedin.datahub.graphql.resolvers.mutate.util.OwnerUtils;
import com.linkedin.metadata.entity.EntityService;
import graphql.schema.DataFetcher;
Expand Down Expand Up @@ -47,9 +49,9 @@ public CompletableFuture<Boolean> get(DataFetchingEnvironment environment) throw
log.debug("Adding Owners. input: {}", input.toString());

Urn actor = CorpuserUrn.createFromString(((QueryContext) environment.getContext()).getActorUrn());
OwnerUtils.addOwners(
OwnerUtils.addOwnersToResources(
owners,
targetUrn,
ImmutableList.of(new ResourceRefInput(input.getResourceUrn(), null, null)),
actor,
_entityService
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.exception.AuthorizationException;
import com.linkedin.datahub.graphql.generated.ResourceRefInput;
import com.linkedin.datahub.graphql.generated.TagAssociationInput;
import com.linkedin.datahub.graphql.resolvers.mutate.util.LabelUtils;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.entity.EntityService;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
Expand All @@ -34,12 +36,12 @@ public CompletableFuture<Boolean> get(DataFetchingEnvironment environment) throw
}

return CompletableFuture.supplyAsync(() -> {
LabelUtils.validateInput(
LabelUtils.validateResourceAndLabel(
tagUrn,
targetUrn,
input.getSubResource(),
input.getSubResourceType(),
"tag",
Constants.TAG_ENTITY_NAME,
_entityService,
false
);
Expand All @@ -52,10 +54,9 @@ public CompletableFuture<Boolean> get(DataFetchingEnvironment environment) throw

log.info("Adding Tag. input: {}", input.toString());
Urn actor = CorpuserUrn.createFromString(((QueryContext) environment.getContext()).getActorUrn());
LabelUtils.addTagsToTarget(
LabelUtils.addTagsToResources(
ImmutableList.of(tagUrn),
targetUrn,
input.getSubResource(),
ImmutableList.of(new ResourceRefInput(input.getResourceUrn(), input.getSubResourceType(), input.getSubResource())),
actor,
_entityService
);
Expand Down
Loading

0 comments on commit a5089b8

Please sign in to comment.