Skip to content

Commit

Permalink
feat(gms): add ingestProposalBatch endpoint (datahub-project#10706)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored and sleeperdeep committed Jun 25, 2024
1 parent 55f151a commit d6395dc
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 30 deletions.
10 changes: 10 additions & 0 deletions metadata-ingestion/src/datahub/emitter/rest_emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,16 @@ def emit_mcp(

self._emit_generic(url, payload)

def emit_mcps(
self, mcps: List[Union[MetadataChangeProposal, MetadataChangeProposalWrapper]]
) -> None:
url = f"{self._gms_server}/aspects?action=ingestProposalBatch"

mcp_objs = [pre_json_transform(mcp.to_obj()) for mcp in mcps]
payload = json.dumps({"proposals": mcp_objs})

self._emit_generic(url, payload)

@deprecated
def emit_usage(self, usageStats: UsageAggregation) -> None:
url = f"{self._gms_server}/usageStats?action=batchIngest"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,18 @@
"default" : "unset"
} ],
"returns" : "string"
}, {
"name" : "ingestProposalBatch",
"javaMethodName" : "ingestProposalBatch",
"parameters" : [ {
"name" : "proposals",
"type" : "{ \"type\" : \"array\", \"items\" : \"com.linkedin.mxe.MetadataChangeProposal\" }"
}, {
"name" : "async",
"type" : "string",
"default" : "unset"
} ],
"returns" : "string"
}, {
"name" : "restoreIndices",
"javaMethodName" : "restoreIndices",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4122,6 +4122,18 @@
"default" : "unset"
} ],
"returns" : "string"
}, {
"name" : "ingestProposalBatch",
"javaMethodName" : "ingestProposalBatch",
"parameters" : [ {
"name" : "proposals",
"type" : "{ \"type\" : \"array\", \"items\" : \"com.linkedin.mxe.MetadataChangeProposal\" }"
}, {
"name" : "async",
"type" : "string",
"default" : "unset"
} ],
"returns" : "string"
}, {
"name" : "restoreIndices",
"javaMethodName" : "restoreIndices",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import io.opentelemetry.extension.annotations.WithSpan;
import java.net.URISyntaxException;
import java.time.Clock;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -69,10 +70,12 @@ public class AspectResource extends CollectionResourceTaskTemplate<String, Versi

private static final String ACTION_GET_TIMESERIES_ASPECT = "getTimeseriesAspectValues";
private static final String ACTION_INGEST_PROPOSAL = "ingestProposal";
private static final String ACTION_INGEST_PROPOSAL_BATCH = "ingestProposalBatch";
private static final String ACTION_GET_COUNT = "getCount";
private static final String PARAM_ENTITY = "entity";
private static final String PARAM_ASPECT = "aspect";
private static final String PARAM_PROPOSAL = "proposal";
private static final String PARAM_PROPOSALS = "proposals";
private static final String PARAM_START_TIME_MILLIS = "startTimeMillis";
private static final String PARAM_END_TIME_MILLIS = "endTimeMillis";
private static final String PARAM_LATEST_VALUE = "latestValue";
Expand Down Expand Up @@ -235,56 +238,87 @@ public Task<String> ingestProposal(
@ActionParam(PARAM_PROPOSAL) @Nonnull MetadataChangeProposal metadataChangeProposal,
@ActionParam(PARAM_ASYNC) @Optional(UNSET) String async)
throws URISyntaxException {
log.info("INGEST PROPOSAL proposal: {}", metadataChangeProposal);
log.info("INGEST PROPOSAL proposal: {}", metadataChangeProposal);

final boolean asyncBool;
if (UNSET.equals(async)) {
asyncBool = Boolean.parseBoolean(System.getenv(ASYNC_INGEST_DEFAULT_NAME));
} else {
asyncBool = Boolean.parseBoolean(async);
}

return ingestProposals(List.of(metadataChangeProposal), asyncBool);
}

@Action(name = ACTION_INGEST_PROPOSAL_BATCH)
@Nonnull
@WithSpan
public Task<String> ingestProposalBatch(
@ActionParam(PARAM_PROPOSALS) @Nonnull MetadataChangeProposal[] metadataChangeProposals,
@ActionParam(PARAM_ASYNC) @Optional(UNSET) String async)
throws URISyntaxException {
log.info("INGEST PROPOSAL BATCH proposals: {}", Arrays.asList(metadataChangeProposals));

final boolean asyncBool;
if (UNSET.equals(async)) {
asyncBool = Boolean.parseBoolean(System.getenv(ASYNC_INGEST_DEFAULT_NAME));
} else {
asyncBool = Boolean.parseBoolean(async);
}

return ingestProposals(Arrays.asList(metadataChangeProposals), asyncBool);
}

final boolean asyncBool;
if (UNSET.equals(async)) {
asyncBool = Boolean.parseBoolean(System.getenv(ASYNC_INGEST_DEFAULT_NAME));
} else {
asyncBool = Boolean.parseBoolean(async);
}

private Task<String> ingestProposals(
@Nonnull List<MetadataChangeProposal> metadataChangeProposals,
boolean asyncBool)
throws URISyntaxException {
Authentication authentication = AuthenticationContext.getAuthentication();
final OperationContext opContext = OperationContext.asSession(
systemOperationContext, RequestContext.builder().buildRestli(ACTION_INGEST_PROPOSAL, metadataChangeProposal.getEntityType()), _authorizer, authentication, true);

/*
Ingest Authorization Checks
*/
List<Pair<MetadataChangeProposal, Integer>> exceptions = isAPIAuthorized(authentication, _authorizer, ENTITY,
opContext.getEntityRegistry(), List.of(metadataChangeProposal))

Set<String> entityTypes = metadataChangeProposals.stream()
.map(MetadataChangeProposal::getEntityType)
.collect(Collectors.toSet());
final OperationContext opContext = OperationContext.asSession(
systemOperationContext, RequestContext.builder().buildRestli(ACTION_INGEST_PROPOSAL, entityTypes), _authorizer, authentication, true);

// Ingest Authorization Checks
List<Pair<MetadataChangeProposal, Integer>> exceptions = isAPIAuthorized(authentication, _authorizer, ENTITY,
opContext.getEntityRegistry(), metadataChangeProposals)
.stream().filter(p -> p.getSecond() != HttpStatus.S_200_OK.getCode())
.collect(Collectors.toList());
if (!exceptions.isEmpty()) {
throw new RestLiServiceException(
HttpStatus.S_403_FORBIDDEN, "User is unauthorized to modify entity: " + exceptions.stream()
if (!exceptions.isEmpty()) {
String errorMessages = exceptions.stream()
.map(ex -> String.format("HttpStatus: %s Urn: %s", ex.getSecond(), ex.getFirst().getEntityUrn()))
.collect(Collectors.toList()));
}

.collect(Collectors.joining(", "));
throw new RestLiServiceException(
HttpStatus.S_403_FORBIDDEN, "User is unauthorized to modify entity: " + errorMessages);
}
String actorUrnStr = authentication.getActor().toUrnStr();
final AuditStamp auditStamp =
new AuditStamp().setTime(_clock.millis()).setActor(Urn.createFromString(actorUrnStr));

return RestliUtil.toTask(() -> {
log.debug("Proposal: {}", metadataChangeProposal);
log.debug("Proposals: {}", metadataChangeProposals);
try {
final AspectsBatch batch = AspectsBatchImpl.builder()
.mcps(List.of(metadataChangeProposal), auditStamp, opContext.getRetrieverContext().get())
.mcps(metadataChangeProposals, auditStamp, opContext.getRetrieverContext().get())
.build();

Set<IngestResult> results =
_entityService.ingestProposal(opContext, batch, asyncBool);

java.util.Optional<IngestResult> one = results.stream().findFirst();
for (IngestResult result : results) {
// Update runIds, only works for existing documents, so ES document must exist
Urn resultUrn = result.getUrn();

// Update runIds, only works for existing documents, so ES document must exist
Urn resultUrn = one.map(IngestResult::getUrn).orElse(metadataChangeProposal.getEntityUrn());
if (one.map(result -> result.isProcessedMCL() || result.isUpdate()).orElse(false)) {
tryIndexRunId(opContext,
resultUrn, metadataChangeProposal.getSystemMetadata(), entitySearchService);
if (resultUrn != null && (result.isProcessedMCL() || result.isUpdate())) {
tryIndexRunId(opContext, resultUrn, result.getRequest().getSystemMetadata(), entitySearchService);
}
}
return resultUrn.toString();

// TODO: We don't actually use this return value anywhere. Maybe we should just stop returning it altogether?
return "success";
} catch (ValidationException e) {
throw new RestLiServiceException(HttpStatus.S_422_UNPROCESSABLE_ENTITY, e.getMessage());
}
Expand Down

0 comments on commit d6395dc

Please sign in to comment.