diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/factories/BootstrapManagerFactory.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/factories/BootstrapManagerFactory.java index 3e26a2e6e8362a..a102e632e2e52d 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/factories/BootstrapManagerFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/factories/BootstrapManagerFactory.java @@ -15,6 +15,7 @@ import com.linkedin.metadata.boot.steps.IngestRolesStep; import com.linkedin.metadata.boot.steps.IngestRootUserStep; import com.linkedin.metadata.boot.steps.RemoveClientIdAspectStep; +import com.linkedin.metadata.boot.steps.RestoreColumnLineageIndices; import com.linkedin.metadata.boot.steps.RestoreDbtSiblingsIndices; import com.linkedin.metadata.boot.steps.RestoreGlossaryIndices; import com.linkedin.metadata.boot.steps.UpgradeDefaultBrowsePathsStep; @@ -85,10 +86,11 @@ protected BootstrapManager createInstance() { final RestoreDbtSiblingsIndices restoreDbtSiblingsIndices = new RestoreDbtSiblingsIndices(_entityService, _entityRegistry); final RemoveClientIdAspectStep removeClientIdAspectStep = new RemoveClientIdAspectStep(_entityService); + final RestoreColumnLineageIndices restoreColumnLineageIndices = new RestoreColumnLineageIndices(_entityService, _entityRegistry); final List finalSteps = new ArrayList<>(ImmutableList.of(ingestRootUserStep, ingestPoliciesStep, ingestRolesStep, ingestDataPlatformsStep, ingestDataPlatformInstancesStep, _ingestRetentionPoliciesStep, restoreGlossaryIndicesStep, - removeClientIdAspectStep, restoreDbtSiblingsIndices, indexDataPlatformsStep)); + removeClientIdAspectStep, restoreDbtSiblingsIndices, indexDataPlatformsStep, restoreColumnLineageIndices)); if (_upgradeDefaultBrowsePathsEnabled) { finalSteps.add(new UpgradeDefaultBrowsePathsStep(_entityService)); diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/RestoreColumnLineageIndices.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/RestoreColumnLineageIndices.java new file mode 100644 index 00000000000000..6e1522051bfab0 --- /dev/null +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/RestoreColumnLineageIndices.java @@ -0,0 +1,168 @@ +package com.linkedin.metadata.boot.steps; + +import com.linkedin.common.AuditStamp; +import com.linkedin.common.InputFields; +import com.linkedin.common.urn.Urn; +import com.linkedin.data.template.RecordTemplate; +import com.linkedin.dataset.UpstreamLineage; +import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.Constants; +import com.linkedin.metadata.boot.UpgradeStep; +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.entity.ListResult; +import com.linkedin.metadata.models.AspectSpec; +import com.linkedin.metadata.models.registry.EntityRegistry; +import com.linkedin.metadata.query.ExtraInfo; +import lombok.extern.slf4j.Slf4j; + +import javax.annotation.Nonnull; +import java.util.Objects; + +@Slf4j +public class RestoreColumnLineageIndices extends UpgradeStep { + private static final String VERSION = "1"; + private static final String UPGRADE_ID = "restore-column-lineage-indices"; + private static final Integer BATCH_SIZE = 1000; + + private final EntityRegistry _entityRegistry; + + public RestoreColumnLineageIndices(@Nonnull final EntityService entityService, @Nonnull final EntityRegistry entityRegistry) { + super(entityService, VERSION, UPGRADE_ID); + _entityRegistry = Objects.requireNonNull(entityRegistry, "entityRegistry must not be null"); + } + + @Override + public void upgrade() throws Exception { + final AuditStamp auditStamp = + new AuditStamp().setActor(Urn.createFromString(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis()); + + final int totalUpstreamLineageCount = getAndRestoreUpstreamLineageIndices(0, auditStamp); + int upstreamLineageCount = BATCH_SIZE; + while (upstreamLineageCount < totalUpstreamLineageCount) { + getAndRestoreUpstreamLineageIndices(upstreamLineageCount, auditStamp); + upstreamLineageCount += BATCH_SIZE; + } + + final int totalChartInputFieldsCount = getAndRestoreInputFieldsIndices(Constants.CHART_ENTITY_NAME, 0, auditStamp); + int chartInputFieldsCount = BATCH_SIZE; + while (chartInputFieldsCount < totalChartInputFieldsCount) { + getAndRestoreInputFieldsIndices(Constants.CHART_ENTITY_NAME, chartInputFieldsCount, auditStamp); + chartInputFieldsCount += BATCH_SIZE; + } + + final int totalDashboardInputFieldsCount = getAndRestoreInputFieldsIndices(Constants.DASHBOARD_ENTITY_NAME, 0, auditStamp); + int dashboardInputFieldsCount = BATCH_SIZE; + while (dashboardInputFieldsCount < totalDashboardInputFieldsCount) { + getAndRestoreInputFieldsIndices(Constants.DASHBOARD_ENTITY_NAME, dashboardInputFieldsCount, auditStamp); + dashboardInputFieldsCount += BATCH_SIZE; + } + } + + @Nonnull + @Override + public ExecutionMode getExecutionMode() { + return ExecutionMode.ASYNC; + } + + private int getAndRestoreUpstreamLineageIndices(int start, AuditStamp auditStamp) { + final AspectSpec upstreamLineageAspectSpec = _entityRegistry.getEntitySpec(Constants.DATASET_ENTITY_NAME) + .getAspectSpec(Constants.UPSTREAM_LINEAGE_ASPECT_NAME); + + final ListResult latestAspects = _entityService.listLatestAspects( + Constants.DATASET_ENTITY_NAME, + Constants.UPSTREAM_LINEAGE_ASPECT_NAME, + start, + BATCH_SIZE); + + if (latestAspects.getTotalCount() == 0 || latestAspects.getValues() == null || latestAspects.getMetadata() == null) { + log.debug("Found 0 upstreamLineage aspects for datasets. Skipping migration."); + return 0; + } + + if (latestAspects.getValues().size() != latestAspects.getMetadata().getExtraInfos().size()) { + // Bad result -- we should log that we cannot migrate this batch of upstreamLineages. + log.warn("Failed to match upstreamLineage aspects with corresponding urns. Found mismatched length between aspects ({})" + + "and metadata ({}) for metadata {}", + latestAspects.getValues().size(), + latestAspects.getMetadata().getExtraInfos().size(), + latestAspects.getMetadata()); + return latestAspects.getTotalCount(); + } + + for (int i = 0; i < latestAspects.getValues().size(); i++) { + ExtraInfo info = latestAspects.getMetadata().getExtraInfos().get(i); + RecordTemplate upstreamLineageRecord = latestAspects.getValues().get(i); + Urn urn = info.getUrn(); + UpstreamLineage upstreamLineage = (UpstreamLineage) upstreamLineageRecord; + if (upstreamLineage == null) { + log.warn("Received null upstreamLineage for urn {}", urn); + continue; + } + + _entityService.produceMetadataChangeLog( + urn, + Constants.DATASET_ENTITY_NAME, + Constants.UPSTREAM_LINEAGE_ASPECT_NAME, + upstreamLineageAspectSpec, + null, + upstreamLineage, + null, + null, + auditStamp, + ChangeType.RESTATE); + } + + return latestAspects.getTotalCount(); + } + + private int getAndRestoreInputFieldsIndices(String entityName, int start, AuditStamp auditStamp) throws Exception { + final AspectSpec inputFieldsAspectSpec = _entityRegistry.getEntitySpec(entityName) + .getAspectSpec(Constants.INPUT_FIELDS_ASPECT_NAME); + + final ListResult latestAspects = _entityService.listLatestAspects( + entityName, + Constants.INPUT_FIELDS_ASPECT_NAME, + start, + BATCH_SIZE); + + if (latestAspects.getTotalCount() == 0 || latestAspects.getValues() == null || latestAspects.getMetadata() == null) { + log.debug("Found 0 inputFields aspects. Skipping migration."); + return 0; + } + + if (latestAspects.getValues().size() != latestAspects.getMetadata().getExtraInfos().size()) { + // Bad result -- we should log that we cannot migrate this batch of inputFields. + log.warn("Failed to match inputFields aspects with corresponding urns. Found mismatched length between aspects ({})" + + "and metadata ({}) for metadata {}", + latestAspects.getValues().size(), + latestAspects.getMetadata().getExtraInfos().size(), + latestAspects.getMetadata()); + return latestAspects.getTotalCount(); + } + + for (int i = 0; i < latestAspects.getValues().size(); i++) { + ExtraInfo info = latestAspects.getMetadata().getExtraInfos().get(i); + RecordTemplate inputFieldsRecord = latestAspects.getValues().get(i); + Urn urn = info.getUrn(); + InputFields inputFields = (InputFields) inputFieldsRecord; + if (inputFields == null) { + log.warn("Received null inputFields for urn {}", urn); + continue; + } + + _entityService.produceMetadataChangeLog( + urn, + entityName, + Constants.INPUT_FIELDS_ASPECT_NAME, + inputFieldsAspectSpec, + null, + inputFields, + null, + null, + auditStamp, + ChangeType.RESTATE); + } + + return latestAspects.getTotalCount(); + } +} diff --git a/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/RestoreColumnLineageIndicesTest.java b/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/RestoreColumnLineageIndicesTest.java new file mode 100644 index 00000000000000..b73e749142863f --- /dev/null +++ b/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/RestoreColumnLineageIndicesTest.java @@ -0,0 +1,304 @@ +package com.linkedin.metadata.boot.steps; + +import com.google.common.collect.ImmutableList; +import com.linkedin.common.AuditStamp; +import com.linkedin.common.InputFields; +import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.dataset.UpstreamLineage; +import com.linkedin.entity.Aspect; +import com.linkedin.entity.EntityResponse; +import com.linkedin.entity.EnvelopedAspect; +import com.linkedin.entity.EnvelopedAspectMap; +import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.Constants; +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.entity.ListResult; +import com.linkedin.metadata.models.AspectSpec; +import com.linkedin.metadata.models.EntitySpec; +import com.linkedin.metadata.models.registry.EntityRegistry; +import com.linkedin.metadata.query.ExtraInfo; +import com.linkedin.metadata.query.ExtraInfoArray; +import com.linkedin.metadata.query.ListResultMetadata; +import com.linkedin.mxe.MetadataChangeProposal; +import org.mockito.Mockito; +import org.testng.annotations.Test; + +import javax.annotation.Nonnull; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class RestoreColumnLineageIndicesTest { + + private static final String VERSION_1 = "1"; + private static final String VERSION_2 = "2"; + private static final String COLUMN_LINEAGE_UPGRADE_URN = + String.format("urn:li:%s:%s", Constants.DATA_HUB_UPGRADE_ENTITY_NAME, "restore-column-lineage-indices"); + private final Urn datasetUrn = UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleHdfsDataset,PROD)"); + private final Urn chartUrn = UrnUtils.getUrn("urn:li:chart:(looker,dashboard_elements.1)"); + private final Urn dashboardUrn = UrnUtils.getUrn("urn:li:dashboard:(looker,dashboards.thelook::web_analytics_overview)"); + + @Test + public void testExecuteFirstTime() throws Exception { + final EntityService mockService = Mockito.mock(EntityService.class); + final EntityRegistry mockRegistry = Mockito.mock(EntityRegistry.class); + + mockGetUpgradeStep(false, VERSION_1, mockService); + mockGetUpstreamLineage(datasetUrn, mockService); + mockGetInputFields(chartUrn, Constants.CHART_ENTITY_NAME, mockService); + mockGetInputFields(dashboardUrn, Constants.DASHBOARD_ENTITY_NAME, mockService); + + final AspectSpec aspectSpec = mockAspectSpecs(mockRegistry); + + final RestoreColumnLineageIndices restoreIndicesStep = new RestoreColumnLineageIndices(mockService, mockRegistry); + restoreIndicesStep.execute(); + + Mockito.verify(mockRegistry, Mockito.times(1)).getEntitySpec(Constants.DATASET_ENTITY_NAME); + Mockito.verify(mockRegistry, Mockito.times(1)).getEntitySpec(Constants.CHART_ENTITY_NAME); + Mockito.verify(mockRegistry, Mockito.times(1)).getEntitySpec(Constants.DASHBOARD_ENTITY_NAME); + // creates upgradeRequest and upgradeResult aspects + Mockito.verify(mockService, Mockito.times(2)).ingestProposal( + Mockito.any(MetadataChangeProposal.class), + Mockito.any(AuditStamp.class), + Mockito.eq(false) + ); + Mockito.verify(mockService, Mockito.times(1)).produceMetadataChangeLog( + Mockito.eq(datasetUrn), + Mockito.eq(Constants.DATASET_ENTITY_NAME), + Mockito.eq(Constants.UPSTREAM_LINEAGE_ASPECT_NAME), + Mockito.eq(aspectSpec), + Mockito.eq(null), + Mockito.any(), + Mockito.eq(null), + Mockito.eq(null), + Mockito.any(), + Mockito.eq(ChangeType.RESTATE) + ); + Mockito.verify(mockService, Mockito.times(1)).produceMetadataChangeLog( + Mockito.eq(chartUrn), + Mockito.eq(Constants.CHART_ENTITY_NAME), + Mockito.eq(Constants.INPUT_FIELDS_ASPECT_NAME), + Mockito.eq(aspectSpec), + Mockito.eq(null), + Mockito.any(), + Mockito.eq(null), + Mockito.eq(null), + Mockito.any(), + Mockito.eq(ChangeType.RESTATE) + ); + Mockito.verify(mockService, Mockito.times(1)).produceMetadataChangeLog( + Mockito.eq(dashboardUrn), + Mockito.eq(Constants.DASHBOARD_ENTITY_NAME), + Mockito.eq(Constants.INPUT_FIELDS_ASPECT_NAME), + Mockito.eq(aspectSpec), + Mockito.eq(null), + Mockito.any(), + Mockito.eq(null), + Mockito.eq(null), + Mockito.any(), + Mockito.eq(ChangeType.RESTATE) + ); + } + + @Test + public void testExecuteWithNewVersion() throws Exception { + final EntityService mockService = Mockito.mock(EntityService.class); + final EntityRegistry mockRegistry = Mockito.mock(EntityRegistry.class); + + mockGetUpgradeStep(true, VERSION_2, mockService); + mockGetUpstreamLineage(datasetUrn, mockService); + mockGetInputFields(chartUrn, Constants.CHART_ENTITY_NAME, mockService); + mockGetInputFields(dashboardUrn, Constants.DASHBOARD_ENTITY_NAME, mockService); + + final AspectSpec aspectSpec = mockAspectSpecs(mockRegistry); + + final RestoreColumnLineageIndices restoreIndicesStep = new RestoreColumnLineageIndices(mockService, mockRegistry); + restoreIndicesStep.execute(); + + Mockito.verify(mockRegistry, Mockito.times(1)).getEntitySpec(Constants.DATASET_ENTITY_NAME); + Mockito.verify(mockRegistry, Mockito.times(1)).getEntitySpec(Constants.CHART_ENTITY_NAME); + Mockito.verify(mockRegistry, Mockito.times(1)).getEntitySpec(Constants.DASHBOARD_ENTITY_NAME); + // creates upgradeRequest and upgradeResult aspects + Mockito.verify(mockService, Mockito.times(2)).ingestProposal( + Mockito.any(MetadataChangeProposal.class), + Mockito.any(AuditStamp.class), + Mockito.eq(false) + ); + Mockito.verify(mockService, Mockito.times(1)).produceMetadataChangeLog( + Mockito.eq(datasetUrn), + Mockito.eq(Constants.DATASET_ENTITY_NAME), + Mockito.eq(Constants.UPSTREAM_LINEAGE_ASPECT_NAME), + Mockito.eq(aspectSpec), + Mockito.eq(null), + Mockito.any(), + Mockito.eq(null), + Mockito.eq(null), + Mockito.any(), + Mockito.eq(ChangeType.RESTATE) + ); + Mockito.verify(mockService, Mockito.times(1)).produceMetadataChangeLog( + Mockito.eq(chartUrn), + Mockito.eq(Constants.CHART_ENTITY_NAME), + Mockito.eq(Constants.INPUT_FIELDS_ASPECT_NAME), + Mockito.eq(aspectSpec), + Mockito.eq(null), + Mockito.any(), + Mockito.eq(null), + Mockito.eq(null), + Mockito.any(), + Mockito.eq(ChangeType.RESTATE) + ); + Mockito.verify(mockService, Mockito.times(1)).produceMetadataChangeLog( + Mockito.eq(dashboardUrn), + Mockito.eq(Constants.DASHBOARD_ENTITY_NAME), + Mockito.eq(Constants.INPUT_FIELDS_ASPECT_NAME), + Mockito.eq(aspectSpec), + Mockito.eq(null), + Mockito.any(), + Mockito.eq(null), + Mockito.eq(null), + Mockito.any(), + Mockito.eq(ChangeType.RESTATE) + ); + } + + @Test + public void testDoesNotExecuteWithSameVersion() throws Exception { + final EntityService mockService = Mockito.mock(EntityService.class); + final EntityRegistry mockRegistry = Mockito.mock(EntityRegistry.class); + + mockGetUpgradeStep(true, VERSION_1, mockService); + mockGetUpstreamLineage(datasetUrn, mockService); + mockGetInputFields(chartUrn, Constants.CHART_ENTITY_NAME, mockService); + mockGetInputFields(dashboardUrn, Constants.DASHBOARD_ENTITY_NAME, mockService); + + final AspectSpec aspectSpec = mockAspectSpecs(mockRegistry); + + final RestoreColumnLineageIndices restoreIndicesStep = new RestoreColumnLineageIndices(mockService, mockRegistry); + restoreIndicesStep.execute(); + + Mockito.verify(mockRegistry, Mockito.times(0)).getEntitySpec(Constants.DATASET_ENTITY_NAME); + Mockito.verify(mockRegistry, Mockito.times(0)).getEntitySpec(Constants.CHART_ENTITY_NAME); + Mockito.verify(mockRegistry, Mockito.times(0)).getEntitySpec(Constants.DASHBOARD_ENTITY_NAME); + // creates upgradeRequest and upgradeResult aspects + Mockito.verify(mockService, Mockito.times(0)).ingestProposal( + Mockito.any(MetadataChangeProposal.class), + Mockito.any(AuditStamp.class), + Mockito.eq(false) + ); + Mockito.verify(mockService, Mockito.times(0)).produceMetadataChangeLog( + Mockito.eq(datasetUrn), + Mockito.eq(Constants.DATASET_ENTITY_NAME), + Mockito.eq(Constants.UPSTREAM_LINEAGE_ASPECT_NAME), + Mockito.eq(aspectSpec), + Mockito.eq(null), + Mockito.any(), + Mockito.eq(null), + Mockito.eq(null), + Mockito.any(), + Mockito.eq(ChangeType.RESTATE) + ); + Mockito.verify(mockService, Mockito.times(0)).produceMetadataChangeLog( + Mockito.eq(chartUrn), + Mockito.eq(Constants.CHART_ENTITY_NAME), + Mockito.eq(Constants.INPUT_FIELDS_ASPECT_NAME), + Mockito.eq(aspectSpec), + Mockito.eq(null), + Mockito.any(), + Mockito.eq(null), + Mockito.eq(null), + Mockito.any(), + Mockito.eq(ChangeType.RESTATE) + ); + Mockito.verify(mockService, Mockito.times(0)).produceMetadataChangeLog( + Mockito.eq(dashboardUrn), + Mockito.eq(Constants.DASHBOARD_ENTITY_NAME), + Mockito.eq(Constants.INPUT_FIELDS_ASPECT_NAME), + Mockito.eq(aspectSpec), + Mockito.eq(null), + Mockito.any(), + Mockito.eq(null), + Mockito.eq(null), + Mockito.any(), + Mockito.eq(ChangeType.RESTATE) + ); + } + + private void mockGetUpstreamLineage(@Nonnull Urn datasetUrn, @Nonnull EntityService mockService) { + final List extraInfos = ImmutableList.of( + new ExtraInfo() + .setUrn(datasetUrn) + .setVersion(0L) + .setAudit(new AuditStamp().setActor(UrnUtils.getUrn("urn:li:corpuser:test")).setTime(0L)) + ); + + Mockito.when(mockService.listLatestAspects( + Mockito.eq(Constants.DATASET_ENTITY_NAME), + Mockito.eq(Constants.UPSTREAM_LINEAGE_ASPECT_NAME), + Mockito.eq(0), + Mockito.eq(1000) + )).thenReturn(new ListResult<>( + ImmutableList.of(new UpstreamLineage()), + new ListResultMetadata().setExtraInfos(new ExtraInfoArray(extraInfos)), + 1, + false, + 1, + 1, + 1)); + } + + private void mockGetInputFields(@Nonnull Urn entityUrn, @Nonnull String entityName, @Nonnull EntityService mockService) { + final List extraInfos = ImmutableList.of( + new ExtraInfo() + .setUrn(entityUrn) + .setVersion(0L) + .setAudit(new AuditStamp().setActor(UrnUtils.getUrn("urn:li:corpuser:test")).setTime(0L)) + ); + + Mockito.when(mockService.listLatestAspects( + Mockito.eq(entityName), + Mockito.eq(Constants.INPUT_FIELDS_ASPECT_NAME), + Mockito.eq(0), + Mockito.eq(1000) + )).thenReturn(new ListResult<>( + ImmutableList.of(new InputFields()), + new ListResultMetadata().setExtraInfos(new ExtraInfoArray(extraInfos)), + 1, + false, + 1, + 1, + 1)); + } + + private AspectSpec mockAspectSpecs(@Nonnull EntityRegistry mockRegistry) { + final EntitySpec entitySpec = Mockito.mock(EntitySpec.class); + final AspectSpec aspectSpec = Mockito.mock(AspectSpec.class); + // Mock for upstreamLineage + Mockito.when(mockRegistry.getEntitySpec(Constants.DATASET_ENTITY_NAME)).thenReturn(entitySpec); + Mockito.when(entitySpec.getAspectSpec(Constants.UPSTREAM_LINEAGE_ASPECT_NAME)).thenReturn(aspectSpec); + // Mock inputFields for charts + Mockito.when(mockRegistry.getEntitySpec(Constants.CHART_ENTITY_NAME)).thenReturn(entitySpec); + Mockito.when(entitySpec.getAspectSpec(Constants.INPUT_FIELDS_ASPECT_NAME)).thenReturn(aspectSpec); + // Mock inputFields for dashboards + Mockito.when(mockRegistry.getEntitySpec(Constants.DASHBOARD_ENTITY_NAME)).thenReturn(entitySpec); + Mockito.when(entitySpec.getAspectSpec(Constants.INPUT_FIELDS_ASPECT_NAME)).thenReturn(aspectSpec); + + return aspectSpec; + } + + private void mockGetUpgradeStep(boolean shouldReturnResponse, @Nonnull String version, @Nonnull EntityService mockService) throws Exception { + + final Urn upgradeEntityUrn = UrnUtils.getUrn(COLUMN_LINEAGE_UPGRADE_URN); + final com.linkedin.upgrade.DataHubUpgradeRequest upgradeRequest = new com.linkedin.upgrade.DataHubUpgradeRequest().setVersion(version); + final Map upgradeRequestAspects = new HashMap<>(); + upgradeRequestAspects.put(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME, new EnvelopedAspect().setValue(new Aspect(upgradeRequest.data()))); + final EntityResponse response = new EntityResponse().setAspects(new EnvelopedAspectMap(upgradeRequestAspects)); + Mockito.when(mockService.getEntityV2( + Constants.DATA_HUB_UPGRADE_ENTITY_NAME, + upgradeEntityUrn, + Collections.singleton(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME) + )).thenReturn(shouldReturnResponse ? response : null); + } +}