Skip to content

Commit

Permalink
feat(ingest): add async option to ingest proposal endpoint (#6097)
Browse files Browse the repository at this point in the history
* feat(ingest): add async option to ingest proposal endpoint

* small tweak to validate before write to K, also keep existing path for timeseries aspects

* avoid double convert

Co-authored-by: Shirshanka Das <[email protected]>
  • Loading branch information
RyanHolstien and shirshanka authored Oct 4, 2022
1 parent 5fb875a commit bfb903c
Show file tree
Hide file tree
Showing 59 changed files with 396 additions and 405 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public static void persistAspect(Urn urn, String aspectName, RecordTemplate aspe
proposal.setAspectName(aspectName);
proposal.setAspect(GenericRecordUtils.serializeAspect(aspect));
proposal.setChangeType(ChangeType.UPSERT);
entityService.ingestProposal(proposal, getAuditStamp(actor));
entityService.ingestProposal(proposal, getAuditStamp(actor), false);
}

public static MetadataChangeProposal buildMetadataChangeProposal(Urn urn, String aspectName, RecordTemplate aspect, Urn actor, EntityService entityService) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public CompletableFuture<Boolean> get(DataFetchingEnvironment environment) throw
MetadataChangeProposal proposal =
buildMetadataChangeProposal(actor, CORP_USER_SETTINGS_ASPECT_NAME, newSettings, actor, _entityService);

_entityService.ingestProposal(proposal, getAuditStamp(actor));
_entityService.ingestProposal(proposal, getAuditStamp(actor), false);

return true;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ private static MetadataChangeProposal buildSoftDeleteProposal(
private static void ingestChangeProposals(List<MetadataChangeProposal> changes, EntityService entityService, Urn actor) {
// TODO: Replace this with a batch ingest proposals endpoint.
for (MetadataChangeProposal change : changes) {
entityService.ingestProposal(change, getAuditStamp(actor));
entityService.ingestProposal(change, getAuditStamp(actor), false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ private static MetadataChangeProposal buildUpdateDeprecationProposal(
private static void ingestChangeProposals(List<MetadataChangeProposal> changes, EntityService entityService, Urn actor) {
// TODO: Replace this with a batch ingest proposals endpoint.
for (MetadataChangeProposal change : changes) {
entityService.ingestProposal(change, getAuditStamp(actor));
entityService.ingestProposal(change, getAuditStamp(actor), false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public static void validateDomain(Urn domainUrn, EntityService entityService) {
private static void ingestChangeProposals(List<MetadataChangeProposal> changes, EntityService entityService, Urn actor) {
// TODO: Replace this with a batch ingest proposals endpoint.
for (MetadataChangeProposal change : changes) {
entityService.ingestProposal(change, getAuditStamp(actor));
entityService.ingestProposal(change, getAuditStamp(actor), false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ private static GlossaryTermAssociationArray removeTermsIfExists(GlossaryTerms te
private static void ingestChangeProposals(List<MetadataChangeProposal> changes, EntityService entityService, Urn actor) {
// TODO: Replace this with a batch ingest proposals endpoint.
for (MetadataChangeProposal change : changes) {
entityService.ingestProposal(change, getAuditStamp(actor));
entityService.ingestProposal(change, getAuditStamp(actor), false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ public static Boolean validateRemoveInput(
private static void ingestChangeProposals(List<MetadataChangeProposal> changes, EntityService entityService, Urn actor) {
// TODO: Replace this with a batch ingest proposals endpoint.
for (MetadataChangeProposal change : changes) {
entityService.ingestProposal(change, getAuditStamp(actor));
entityService.ingestProposal(change, getAuditStamp(actor), false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ public Chart update(@Nonnull String urn, @Nonnull ChartUpdateInput input, @Nonnu
proposals.forEach(proposal -> proposal.setEntityUrn(UrnUtils.getUrn(urn)));

try {
_entityClient.batchIngestProposals(proposals, context.getAuthentication());
_entityClient.batchIngestProposals(proposals, context.getAuthentication(), false);
} catch (RemoteInvocationException e) {
throw new RuntimeException(String.format("Failed to write entity with urn %s", urn), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public Dashboard update(@Nonnull String urn, @Nonnull DashboardUpdateInput input
proposals.forEach(proposal -> proposal.setEntityUrn(UrnUtils.getUrn(urn)));

try {
_entityClient.batchIngestProposals(proposals, context.getAuthentication());
_entityClient.batchIngestProposals(proposals, context.getAuthentication(), false);
} catch (RemoteInvocationException e) {
throw new RuntimeException(String.format("Failed to write entity with urn %s", urn), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public DataFlow update(@Nonnull String urn, @Nonnull DataFlowUpdateInput input,
proposals.forEach(proposal -> proposal.setEntityUrn(UrnUtils.getUrn(urn)));

try {
_entityClient.batchIngestProposals(proposals, context.getAuthentication());
_entityClient.batchIngestProposals(proposals, context.getAuthentication(), false);
} catch (RemoteInvocationException e) {
throw new RuntimeException(String.format("Failed to write entity with urn %s", urn), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public DataJob update(@Nonnull String urn, @Nonnull DataJobUpdateInput input, @N
proposals.forEach(proposal -> proposal.setEntityUrn(UrnUtils.getUrn(urn)));

try {
_entityClient.batchIngestProposals(proposals, context.getAuthentication());
_entityClient.batchIngestProposals(proposals, context.getAuthentication(), false);
} catch (RemoteInvocationException e) {
throw new RuntimeException(String.format("Failed to write entity with urn %s", urn), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ public List<Dataset> batchUpdate(@Nonnull BatchDatasetUpdateInput[] input, @Nonn
final List<String> urns = Arrays.stream(input).map(BatchDatasetUpdateInput::getUrn).collect(Collectors.toList());

try {
_entityClient.batchIngestProposals(proposals, context.getAuthentication());
_entityClient.batchIngestProposals(proposals, context.getAuthentication(), false);
} catch (RemoteInvocationException e) {
throw new RuntimeException(String.format("Failed to write entity with urn %s", urns), e);
}
Expand All @@ -224,7 +224,7 @@ public Dataset update(@Nonnull String urn, @Nonnull DatasetUpdateInput input, @N
proposals.forEach(proposal -> proposal.setEntityUrn(UrnUtils.getUrn(urn)));

try {
_entityClient.batchIngestProposals(proposals, context.getAuthentication());
_entityClient.batchIngestProposals(proposals, context.getAuthentication(), false);
} catch (RemoteInvocationException e) {
throw new RuntimeException(String.format("Failed to write entity with urn %s", urn), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public Notebook update(@Nonnull String urn, @Nonnull NotebookUpdateInput input,
proposals.forEach(proposal -> proposal.setEntityUrn(UrnUtils.getUrn(urn)));

try {
_entityClient.batchIngestProposals(proposals, context.getAuthentication());
_entityClient.batchIngestProposals(proposals, context.getAuthentication(), false);
} catch (RemoteInvocationException e) {
throw new RuntimeException(String.format("Failed to write entity with urn %s", urn), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public Tag update(@Nonnull String urn, @Nonnull TagUpdateInput input, @Nonnull Q
final Collection<MetadataChangeProposal> proposals = TagUpdateInputMapper.map(input, actor);
proposals.forEach(proposal -> proposal.setEntityUrn(UrnUtils.getUrn(urn)));
try {
_entityClient.batchIngestProposals(proposals, context.getAuthentication());
_entityClient.batchIngestProposals(proposals, context.getAuthentication(), false);
} catch (RemoteInvocationException e) {
throw new RuntimeException(String.format("Failed to write entity with urn %s", urn), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import com.datahub.authentication.Authentication;
import com.datahub.authorization.AuthorizationResult;
import com.datahub.authorization.Authorizer;
import com.linkedin.common.AuditStamp;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.mxe.MetadataChangeProposal;
import org.mockito.Mockito;


Expand Down Expand Up @@ -36,5 +39,27 @@ public static QueryContext getMockDenyContext() {
return mockContext;
}

public static void verifyIngestProposal(EntityService mockService, int numberOfInvocations, MetadataChangeProposal proposal) {
Mockito.verify(mockService, Mockito.times(numberOfInvocations)).ingestProposal(
Mockito.eq(proposal),
Mockito.any(AuditStamp.class),
Mockito.eq(false)
);
}

public static void verifyIngestProposal(EntityService mockService, int numberOfInvocations) {
Mockito.verify(mockService, Mockito.times(numberOfInvocations)).ingestProposal(
Mockito.any(MetadataChangeProposal.class),
Mockito.any(AuditStamp.class),
Mockito.eq(false)
);
}

public static void verifyNoIngestProposal(EntityService mockService) {
Mockito.verify(mockService, Mockito.times(0)).ingestProposal(
Mockito.any(),
Mockito.any(AuditStamp.class), Mockito.anyBoolean());
}

private TestUtils() { }
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,7 @@ public void testGetSuccessNoExistingStatus() throws Exception {
proposal1.setAspect(GenericRecordUtils.serializeAspect(newStatus));
proposal1.setChangeType(ChangeType.UPSERT);

Mockito.verify(mockService, Mockito.times(1)).ingestProposal(
Mockito.eq(proposal1),
Mockito.any(AuditStamp.class)
);
verifyIngestProposal(mockService, 1, proposal1);

final MetadataChangeProposal proposal2 = new MetadataChangeProposal();
proposal2.setEntityUrn(Urn.createFromString(TEST_ENTITY_URN_2));
Expand All @@ -78,10 +75,7 @@ public void testGetSuccessNoExistingStatus() throws Exception {
proposal2.setAspect(GenericRecordUtils.serializeAspect(newStatus));
proposal2.setChangeType(ChangeType.UPSERT);

Mockito.verify(mockService, Mockito.times(1)).ingestProposal(
Mockito.eq(proposal2),
Mockito.any(AuditStamp.class)
);
verifyIngestProposal(mockService, 1, proposal2);
}

@Test
Expand Down Expand Up @@ -124,10 +118,7 @@ public void testGetSuccessExistingStatus() throws Exception {
proposal1.setAspect(GenericRecordUtils.serializeAspect(newStatus));
proposal1.setChangeType(ChangeType.UPSERT);

Mockito.verify(mockService, Mockito.times(1)).ingestProposal(
Mockito.eq(proposal1),
Mockito.any(AuditStamp.class)
);
verifyIngestProposal(mockService, 1, proposal1);

final MetadataChangeProposal proposal2 = new MetadataChangeProposal();
proposal2.setEntityUrn(Urn.createFromString(TEST_ENTITY_URN_2));
Expand All @@ -136,10 +127,7 @@ public void testGetSuccessExistingStatus() throws Exception {
proposal2.setAspect(GenericRecordUtils.serializeAspect(newStatus));
proposal2.setChangeType(ChangeType.UPSERT);

Mockito.verify(mockService, Mockito.times(1)).ingestProposal(
Mockito.eq(proposal2),
Mockito.any(AuditStamp.class)
);
verifyIngestProposal(mockService, 1, proposal2);
}

@Test
Expand Down Expand Up @@ -171,9 +159,7 @@ public void testGetFailureResourceDoesNotExist() throws Exception {
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);

assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join());
Mockito.verify(mockService, Mockito.times(0)).ingestProposal(
Mockito.any(),
Mockito.any(AuditStamp.class));
verifyNoIngestProposal(mockService);
}

@Test
Expand All @@ -191,9 +177,7 @@ public void testGetUnauthorized() throws Exception {
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);

assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join());
Mockito.verify(mockService, Mockito.times(0)).ingestProposal(
Mockito.any(),
Mockito.any(AuditStamp.class));
verifyNoIngestProposal(mockService);
}

@Test
Expand All @@ -202,7 +186,7 @@ public void testGetEntityClientException() throws Exception {

Mockito.doThrow(RuntimeException.class).when(mockService).ingestProposal(
Mockito.any(),
Mockito.any(AuditStamp.class));
Mockito.any(AuditStamp.class), Mockito.anyBoolean());

BatchUpdateSoftDeletedResolver resolver = new BatchUpdateSoftDeletedResolver(mockService);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,7 @@ public void testGetSuccessNoExistingDeprecation() throws Exception {
proposal1.setAspect(GenericRecordUtils.serializeAspect(newDeprecation));
proposal1.setChangeType(ChangeType.UPSERT);

Mockito.verify(mockService, Mockito.times(1)).ingestProposal(
Mockito.eq(proposal1),
Mockito.any(AuditStamp.class)
);
verifyIngestProposal(mockService, 1, proposal1);

final MetadataChangeProposal proposal2 = new MetadataChangeProposal();
proposal2.setEntityUrn(Urn.createFromString(TEST_ENTITY_URN_2));
Expand All @@ -85,10 +82,7 @@ public void testGetSuccessNoExistingDeprecation() throws Exception {
proposal2.setAspect(GenericRecordUtils.serializeAspect(newDeprecation));
proposal2.setChangeType(ChangeType.UPSERT);

Mockito.verify(mockService, Mockito.times(1)).ingestProposal(
Mockito.eq(proposal2),
Mockito.any(AuditStamp.class)
);
verifyIngestProposal(mockService, 1, proposal2);
}

@Test
Expand Down Expand Up @@ -140,10 +134,7 @@ public void testGetSuccessExistingDeprecation() throws Exception {
proposal1.setAspect(GenericRecordUtils.serializeAspect(newDeprecation));
proposal1.setChangeType(ChangeType.UPSERT);

Mockito.verify(mockService, Mockito.times(1)).ingestProposal(
Mockito.eq(proposal1),
Mockito.any(AuditStamp.class)
);
verifyIngestProposal(mockService, 1, proposal1);

final MetadataChangeProposal proposal2 = new MetadataChangeProposal();
proposal2.setEntityUrn(Urn.createFromString(TEST_ENTITY_URN_2));
Expand All @@ -152,10 +143,7 @@ public void testGetSuccessExistingDeprecation() throws Exception {
proposal2.setAspect(GenericRecordUtils.serializeAspect(newDeprecation));
proposal2.setChangeType(ChangeType.UPSERT);

Mockito.verify(mockService, Mockito.times(1)).ingestProposal(
Mockito.eq(proposal2),
Mockito.any(AuditStamp.class)
);
verifyIngestProposal(mockService, 1, proposal2);
}

@Test
Expand Down Expand Up @@ -188,9 +176,7 @@ public void testGetFailureResourceDoesNotExist() throws Exception {
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);

assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join());
Mockito.verify(mockService, Mockito.times(0)).ingestProposal(
Mockito.any(),
Mockito.any(AuditStamp.class));
verifyNoIngestProposal(mockService);
}

@Test
Expand All @@ -209,9 +195,7 @@ public void testGetUnauthorized() throws Exception {
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);

assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join());
Mockito.verify(mockService, Mockito.times(0)).ingestProposal(
Mockito.any(),
Mockito.any(AuditStamp.class));
verifyNoIngestProposal(mockService);
}

@Test
Expand All @@ -220,7 +204,7 @@ public void testGetEntityClientException() throws Exception {

Mockito.doThrow(RuntimeException.class).when(mockService).ingestProposal(
Mockito.any(),
Mockito.any(AuditStamp.class));
Mockito.any(AuditStamp.class), Mockito.anyBoolean());

BatchUpdateDeprecationResolver resolver = new BatchUpdateDeprecationResolver(mockService);

Expand Down
Loading

0 comments on commit bfb903c

Please sign in to comment.