Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): add async option to ingest proposal endpoint #6097

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public static void persistAspect(Urn urn, String aspectName, RecordTemplate aspe
proposal.setAspectName(aspectName);
proposal.setAspect(GenericRecordUtils.serializeAspect(aspect));
proposal.setChangeType(ChangeType.UPSERT);
entityService.ingestProposal(proposal, getAuditStamp(actor));
entityService.ingestProposal(proposal, getAuditStamp(actor), false);
}

public static MetadataChangeProposal buildMetadataChangeProposal(Urn urn, String aspectName, RecordTemplate aspect, Urn actor, EntityService entityService) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public CompletableFuture<Boolean> get(DataFetchingEnvironment environment) throw
MetadataChangeProposal proposal =
buildMetadataChangeProposal(actor, CORP_USER_SETTINGS_ASPECT_NAME, newSettings, actor, _entityService);

_entityService.ingestProposal(proposal, getAuditStamp(actor));
_entityService.ingestProposal(proposal, getAuditStamp(actor), false);

return true;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ private static MetadataChangeProposal buildSoftDeleteProposal(
private static void ingestChangeProposals(List<MetadataChangeProposal> changes, EntityService entityService, Urn actor) {
// TODO: Replace this with a batch ingest proposals endpoint.
for (MetadataChangeProposal change : changes) {
entityService.ingestProposal(change, getAuditStamp(actor));
entityService.ingestProposal(change, getAuditStamp(actor), false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ private static MetadataChangeProposal buildUpdateDeprecationProposal(
private static void ingestChangeProposals(List<MetadataChangeProposal> changes, EntityService entityService, Urn actor) {
// TODO: Replace this with a batch ingest proposals endpoint.
for (MetadataChangeProposal change : changes) {
entityService.ingestProposal(change, getAuditStamp(actor));
entityService.ingestProposal(change, getAuditStamp(actor), false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public static void validateDomain(Urn domainUrn, EntityService entityService) {
private static void ingestChangeProposals(List<MetadataChangeProposal> changes, EntityService entityService, Urn actor) {
// TODO: Replace this with a batch ingest proposals endpoint.
for (MetadataChangeProposal change : changes) {
entityService.ingestProposal(change, getAuditStamp(actor));
entityService.ingestProposal(change, getAuditStamp(actor), false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ private static GlossaryTermAssociationArray removeTermsIfExists(GlossaryTerms te
private static void ingestChangeProposals(List<MetadataChangeProposal> changes, EntityService entityService, Urn actor) {
// TODO: Replace this with a batch ingest proposals endpoint.
for (MetadataChangeProposal change : changes) {
entityService.ingestProposal(change, getAuditStamp(actor));
entityService.ingestProposal(change, getAuditStamp(actor), false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ public static Boolean validateRemoveInput(
private static void ingestChangeProposals(List<MetadataChangeProposal> changes, EntityService entityService, Urn actor) {
// TODO: Replace this with a batch ingest proposals endpoint.
for (MetadataChangeProposal change : changes) {
entityService.ingestProposal(change, getAuditStamp(actor));
entityService.ingestProposal(change, getAuditStamp(actor), false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ public Chart update(@Nonnull String urn, @Nonnull ChartUpdateInput input, @Nonnu
proposals.forEach(proposal -> proposal.setEntityUrn(UrnUtils.getUrn(urn)));

try {
_entityClient.batchIngestProposals(proposals, context.getAuthentication());
_entityClient.batchIngestProposals(proposals, context.getAuthentication(), false);
} catch (RemoteInvocationException e) {
throw new RuntimeException(String.format("Failed to write entity with urn %s", urn), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public Dashboard update(@Nonnull String urn, @Nonnull DashboardUpdateInput input
proposals.forEach(proposal -> proposal.setEntityUrn(UrnUtils.getUrn(urn)));

try {
_entityClient.batchIngestProposals(proposals, context.getAuthentication());
_entityClient.batchIngestProposals(proposals, context.getAuthentication(), false);
} catch (RemoteInvocationException e) {
throw new RuntimeException(String.format("Failed to write entity with urn %s", urn), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public DataFlow update(@Nonnull String urn, @Nonnull DataFlowUpdateInput input,
proposals.forEach(proposal -> proposal.setEntityUrn(UrnUtils.getUrn(urn)));

try {
_entityClient.batchIngestProposals(proposals, context.getAuthentication());
_entityClient.batchIngestProposals(proposals, context.getAuthentication(), false);
} catch (RemoteInvocationException e) {
throw new RuntimeException(String.format("Failed to write entity with urn %s", urn), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public DataJob update(@Nonnull String urn, @Nonnull DataJobUpdateInput input, @N
proposals.forEach(proposal -> proposal.setEntityUrn(UrnUtils.getUrn(urn)));

try {
_entityClient.batchIngestProposals(proposals, context.getAuthentication());
_entityClient.batchIngestProposals(proposals, context.getAuthentication(), false);
} catch (RemoteInvocationException e) {
throw new RuntimeException(String.format("Failed to write entity with urn %s", urn), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ public List<Dataset> batchUpdate(@Nonnull BatchDatasetUpdateInput[] input, @Nonn
final List<String> urns = Arrays.stream(input).map(BatchDatasetUpdateInput::getUrn).collect(Collectors.toList());

try {
_entityClient.batchIngestProposals(proposals, context.getAuthentication());
_entityClient.batchIngestProposals(proposals, context.getAuthentication(), false);
} catch (RemoteInvocationException e) {
throw new RuntimeException(String.format("Failed to write entity with urn %s", urns), e);
}
Expand All @@ -224,7 +224,7 @@ public Dataset update(@Nonnull String urn, @Nonnull DatasetUpdateInput input, @N
proposals.forEach(proposal -> proposal.setEntityUrn(UrnUtils.getUrn(urn)));

try {
_entityClient.batchIngestProposals(proposals, context.getAuthentication());
_entityClient.batchIngestProposals(proposals, context.getAuthentication(), false);
} catch (RemoteInvocationException e) {
throw new RuntimeException(String.format("Failed to write entity with urn %s", urn), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public Notebook update(@Nonnull String urn, @Nonnull NotebookUpdateInput input,
proposals.forEach(proposal -> proposal.setEntityUrn(UrnUtils.getUrn(urn)));

try {
_entityClient.batchIngestProposals(proposals, context.getAuthentication());
_entityClient.batchIngestProposals(proposals, context.getAuthentication(), false);
} catch (RemoteInvocationException e) {
throw new RuntimeException(String.format("Failed to write entity with urn %s", urn), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public Tag update(@Nonnull String urn, @Nonnull TagUpdateInput input, @Nonnull Q
final Collection<MetadataChangeProposal> proposals = TagUpdateInputMapper.map(input, actor);
proposals.forEach(proposal -> proposal.setEntityUrn(UrnUtils.getUrn(urn)));
try {
_entityClient.batchIngestProposals(proposals, context.getAuthentication());
_entityClient.batchIngestProposals(proposals, context.getAuthentication(), false);
} catch (RemoteInvocationException e) {
throw new RuntimeException(String.format("Failed to write entity with urn %s", urn), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import com.datahub.authentication.Authentication;
import com.datahub.authorization.AuthorizationResult;
import com.datahub.authorization.Authorizer;
import com.linkedin.common.AuditStamp;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.mxe.MetadataChangeProposal;
import org.mockito.Mockito;


Expand Down Expand Up @@ -36,5 +39,27 @@ public static QueryContext getMockDenyContext() {
return mockContext;
}

public static void verifyIngestProposal(EntityService mockService, int numberOfInvocations, MetadataChangeProposal proposal) {
Mockito.verify(mockService, Mockito.times(numberOfInvocations)).ingestProposal(
Mockito.eq(proposal),
Mockito.any(AuditStamp.class),
Mockito.eq(false)
);
}

public static void verifyIngestProposal(EntityService mockService, int numberOfInvocations) {
Mockito.verify(mockService, Mockito.times(numberOfInvocations)).ingestProposal(
Mockito.any(MetadataChangeProposal.class),
Mockito.any(AuditStamp.class),
Mockito.eq(false)
);
}

public static void verifyNoIngestProposal(EntityService mockService) {
Mockito.verify(mockService, Mockito.times(0)).ingestProposal(
Mockito.any(),
Mockito.any(AuditStamp.class), Mockito.anyBoolean());
}

private TestUtils() { }
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,7 @@ public void testGetSuccessNoExistingStatus() throws Exception {
proposal1.setAspect(GenericRecordUtils.serializeAspect(newStatus));
proposal1.setChangeType(ChangeType.UPSERT);

Mockito.verify(mockService, Mockito.times(1)).ingestProposal(
Mockito.eq(proposal1),
Mockito.any(AuditStamp.class)
);
verifyIngestProposal(mockService, 1, proposal1);

final MetadataChangeProposal proposal2 = new MetadataChangeProposal();
proposal2.setEntityUrn(Urn.createFromString(TEST_ENTITY_URN_2));
Expand All @@ -78,10 +75,7 @@ public void testGetSuccessNoExistingStatus() throws Exception {
proposal2.setAspect(GenericRecordUtils.serializeAspect(newStatus));
proposal2.setChangeType(ChangeType.UPSERT);

Mockito.verify(mockService, Mockito.times(1)).ingestProposal(
Mockito.eq(proposal2),
Mockito.any(AuditStamp.class)
);
verifyIngestProposal(mockService, 1, proposal2);
}

@Test
Expand Down Expand Up @@ -124,10 +118,7 @@ public void testGetSuccessExistingStatus() throws Exception {
proposal1.setAspect(GenericRecordUtils.serializeAspect(newStatus));
proposal1.setChangeType(ChangeType.UPSERT);

Mockito.verify(mockService, Mockito.times(1)).ingestProposal(
Mockito.eq(proposal1),
Mockito.any(AuditStamp.class)
);
verifyIngestProposal(mockService, 1, proposal1);

final MetadataChangeProposal proposal2 = new MetadataChangeProposal();
proposal2.setEntityUrn(Urn.createFromString(TEST_ENTITY_URN_2));
Expand All @@ -136,10 +127,7 @@ public void testGetSuccessExistingStatus() throws Exception {
proposal2.setAspect(GenericRecordUtils.serializeAspect(newStatus));
proposal2.setChangeType(ChangeType.UPSERT);

Mockito.verify(mockService, Mockito.times(1)).ingestProposal(
Mockito.eq(proposal2),
Mockito.any(AuditStamp.class)
);
verifyIngestProposal(mockService, 1, proposal2);
}

@Test
Expand Down Expand Up @@ -171,9 +159,7 @@ public void testGetFailureResourceDoesNotExist() throws Exception {
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);

assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join());
Mockito.verify(mockService, Mockito.times(0)).ingestProposal(
Mockito.any(),
Mockito.any(AuditStamp.class));
verifyNoIngestProposal(mockService);
}

@Test
Expand All @@ -191,9 +177,7 @@ public void testGetUnauthorized() throws Exception {
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);

assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join());
Mockito.verify(mockService, Mockito.times(0)).ingestProposal(
Mockito.any(),
Mockito.any(AuditStamp.class));
verifyNoIngestProposal(mockService);
}

@Test
Expand All @@ -202,7 +186,7 @@ public void testGetEntityClientException() throws Exception {

Mockito.doThrow(RuntimeException.class).when(mockService).ingestProposal(
Mockito.any(),
Mockito.any(AuditStamp.class));
Mockito.any(AuditStamp.class), Mockito.anyBoolean());

BatchUpdateSoftDeletedResolver resolver = new BatchUpdateSoftDeletedResolver(mockService);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,7 @@ public void testGetSuccessNoExistingDeprecation() throws Exception {
proposal1.setAspect(GenericRecordUtils.serializeAspect(newDeprecation));
proposal1.setChangeType(ChangeType.UPSERT);

Mockito.verify(mockService, Mockito.times(1)).ingestProposal(
Mockito.eq(proposal1),
Mockito.any(AuditStamp.class)
);
verifyIngestProposal(mockService, 1, proposal1);

final MetadataChangeProposal proposal2 = new MetadataChangeProposal();
proposal2.setEntityUrn(Urn.createFromString(TEST_ENTITY_URN_2));
Expand All @@ -85,10 +82,7 @@ public void testGetSuccessNoExistingDeprecation() throws Exception {
proposal2.setAspect(GenericRecordUtils.serializeAspect(newDeprecation));
proposal2.setChangeType(ChangeType.UPSERT);

Mockito.verify(mockService, Mockito.times(1)).ingestProposal(
Mockito.eq(proposal2),
Mockito.any(AuditStamp.class)
);
verifyIngestProposal(mockService, 1, proposal2);
}

@Test
Expand Down Expand Up @@ -140,10 +134,7 @@ public void testGetSuccessExistingDeprecation() throws Exception {
proposal1.setAspect(GenericRecordUtils.serializeAspect(newDeprecation));
proposal1.setChangeType(ChangeType.UPSERT);

Mockito.verify(mockService, Mockito.times(1)).ingestProposal(
Mockito.eq(proposal1),
Mockito.any(AuditStamp.class)
);
verifyIngestProposal(mockService, 1, proposal1);

final MetadataChangeProposal proposal2 = new MetadataChangeProposal();
proposal2.setEntityUrn(Urn.createFromString(TEST_ENTITY_URN_2));
Expand All @@ -152,10 +143,7 @@ public void testGetSuccessExistingDeprecation() throws Exception {
proposal2.setAspect(GenericRecordUtils.serializeAspect(newDeprecation));
proposal2.setChangeType(ChangeType.UPSERT);

Mockito.verify(mockService, Mockito.times(1)).ingestProposal(
Mockito.eq(proposal2),
Mockito.any(AuditStamp.class)
);
verifyIngestProposal(mockService, 1, proposal2);
}

@Test
Expand Down Expand Up @@ -188,9 +176,7 @@ public void testGetFailureResourceDoesNotExist() throws Exception {
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);

assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join());
Mockito.verify(mockService, Mockito.times(0)).ingestProposal(
Mockito.any(),
Mockito.any(AuditStamp.class));
verifyNoIngestProposal(mockService);
}

@Test
Expand All @@ -209,9 +195,7 @@ public void testGetUnauthorized() throws Exception {
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);

assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join());
Mockito.verify(mockService, Mockito.times(0)).ingestProposal(
Mockito.any(),
Mockito.any(AuditStamp.class));
verifyNoIngestProposal(mockService);
}

@Test
Expand All @@ -220,7 +204,7 @@ public void testGetEntityClientException() throws Exception {

Mockito.doThrow(RuntimeException.class).when(mockService).ingestProposal(
Mockito.any(),
Mockito.any(AuditStamp.class));
Mockito.any(AuditStamp.class), Mockito.anyBoolean());

BatchUpdateDeprecationResolver resolver = new BatchUpdateDeprecationResolver(mockService);

Expand Down
Loading