From e82ea65db43de6cf3b8995c0550dd6650c99c453 Mon Sep 17 00:00:00 2001 From: Chris Collins Date: Mon, 14 Nov 2022 12:48:15 -0500 Subject: [PATCH 1/2] add upgrade step for upstreamLineage and inputFields --- .../factories/BootstrapManagerFactory.java | 5 +- .../steps/RestoreColumnLineageIndices.java | 182 +++++++++++ .../RestoreColumnLineageIndicesTest.java | 289 ++++++++++++++++++ 3 files changed, 475 insertions(+), 1 deletion(-) create mode 100644 metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/RestoreColumnLineageIndices.java create mode 100644 metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/RestoreColumnLineageIndicesTest.java diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/factories/BootstrapManagerFactory.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/factories/BootstrapManagerFactory.java index 3e26a2e6e8362a..fbfefea8348760 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/factories/BootstrapManagerFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/factories/BootstrapManagerFactory.java @@ -15,6 +15,7 @@ import com.linkedin.metadata.boot.steps.IngestRolesStep; import com.linkedin.metadata.boot.steps.IngestRootUserStep; import com.linkedin.metadata.boot.steps.RemoveClientIdAspectStep; +import com.linkedin.metadata.boot.steps.RestoreColumnLineageIndices; import com.linkedin.metadata.boot.steps.RestoreDbtSiblingsIndices; import com.linkedin.metadata.boot.steps.RestoreGlossaryIndices; import com.linkedin.metadata.boot.steps.UpgradeDefaultBrowsePathsStep; @@ -85,10 +86,12 @@ protected BootstrapManager createInstance() { final RestoreDbtSiblingsIndices restoreDbtSiblingsIndices = new RestoreDbtSiblingsIndices(_entityService, _entityRegistry); final RemoveClientIdAspectStep removeClientIdAspectStep = new RemoveClientIdAspectStep(_entityService); + final RestoreColumnLineageIndices restoreColumnLineageIndices = + new RestoreColumnLineageIndices(_entityService, _entitySearchService, _entityRegistry); final List finalSteps = new ArrayList<>(ImmutableList.of(ingestRootUserStep, ingestPoliciesStep, ingestRolesStep, ingestDataPlatformsStep, ingestDataPlatformInstancesStep, _ingestRetentionPoliciesStep, restoreGlossaryIndicesStep, - removeClientIdAspectStep, restoreDbtSiblingsIndices, indexDataPlatformsStep)); + removeClientIdAspectStep, restoreDbtSiblingsIndices, indexDataPlatformsStep, restoreColumnLineageIndices)); if (_upgradeDefaultBrowsePathsEnabled) { finalSteps.add(new UpgradeDefaultBrowsePathsStep(_entityService)); diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/RestoreColumnLineageIndices.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/RestoreColumnLineageIndices.java new file mode 100644 index 00000000000000..5882010b9aac58 --- /dev/null +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/RestoreColumnLineageIndices.java @@ -0,0 +1,182 @@ +package com.linkedin.metadata.boot.steps; + +import com.linkedin.common.AuditStamp; +import com.linkedin.common.InputFields; +import com.linkedin.common.urn.Urn; +import com.linkedin.dataset.UpstreamLineage; +import com.linkedin.entity.EntityResponse; +import com.linkedin.entity.EnvelopedAspectMap; +import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.Constants; +import com.linkedin.metadata.boot.UpgradeStep; +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.models.AspectSpec; +import com.linkedin.metadata.models.registry.EntityRegistry; +import com.linkedin.metadata.search.EntitySearchService; +import com.linkedin.metadata.search.SearchEntity; +import com.linkedin.metadata.search.SearchResult; +import lombok.extern.slf4j.Slf4j; + +import javax.annotation.Nonnull; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static com.linkedin.metadata.Constants.UPSTREAM_LINEAGE_ASPECT_NAME; + +@Slf4j +public class RestoreColumnLineageIndices extends UpgradeStep { + private static final String VERSION = "1"; + private static final String UPGRADE_ID = "restore-column-lineage-indices"; + private static final Integer BATCH_SIZE = 1000; + + private final EntitySearchService _entitySearchService; + private final EntityRegistry _entityRegistry; + + public RestoreColumnLineageIndices(EntityService entityService, EntitySearchService entitySearchService, + EntityRegistry entityRegistry) { + super(entityService, VERSION, UPGRADE_ID); + _entitySearchService = entitySearchService; + _entityRegistry = entityRegistry; + } + + @Override + public void upgrade() throws Exception { + final AuditStamp auditStamp = + new AuditStamp().setActor(Urn.createFromString(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis()); + + final int totalUpstreamLineageCount = getAndRestoreUpstreamLineageIndices(0, auditStamp); + int upstreamLineageCount = BATCH_SIZE; + while (upstreamLineageCount < totalUpstreamLineageCount) { + getAndRestoreUpstreamLineageIndices(upstreamLineageCount, auditStamp); + upstreamLineageCount += BATCH_SIZE; + } + + final int totalChartInputFieldsCount = getAndRestoreInputFieldsIndices(Constants.CHART_ENTITY_NAME, 0, auditStamp); + int chartInputFieldsCount = BATCH_SIZE; + while (chartInputFieldsCount < totalChartInputFieldsCount) { + getAndRestoreInputFieldsIndices(Constants.CHART_ENTITY_NAME, chartInputFieldsCount, auditStamp); + chartInputFieldsCount += BATCH_SIZE; + } + + final int totalDashboardInputFieldsCount = getAndRestoreInputFieldsIndices(Constants.DASHBOARD_ENTITY_NAME, 0, auditStamp); + int dashboardInputFieldsCount = BATCH_SIZE; + while (dashboardInputFieldsCount < totalDashboardInputFieldsCount) { + getAndRestoreInputFieldsIndices(Constants.DASHBOARD_ENTITY_NAME, dashboardInputFieldsCount, auditStamp); + dashboardInputFieldsCount += BATCH_SIZE; + } + } + + @Nonnull + @Override + public ExecutionMode getExecutionMode() { + return ExecutionMode.ASYNC; + } + + private int getAndRestoreUpstreamLineageIndices(int start, AuditStamp auditStamp) + throws Exception { + final AspectSpec upstreamLineageAspectSpec = _entityRegistry.getEntitySpec(Constants.DATASET_ENTITY_NAME) + .getAspectSpec(Constants.UPSTREAM_LINEAGE_ASPECT_NAME); + SearchResult datasetResult = + _entitySearchService.search(Constants.DATASET_ENTITY_NAME, "", null, null, start, BATCH_SIZE); + List datasetUrns = datasetResult.getEntities().stream().map(SearchEntity::getEntity).collect(Collectors.toList()); + if (datasetUrns.size() == 0) { + return 0; + } + final Map upstreamLineageResponses = + _entityService.getEntitiesV2(Constants.DATASET_ENTITY_NAME, new HashSet<>(datasetUrns), + Collections.singleton(Constants.UPSTREAM_LINEAGE_ASPECT_NAME) + ); + + // Loop over Datasets and produce changelog + for (Urn datasetUrn : datasetUrns) { + EntityResponse upstreamLineageResponse = upstreamLineageResponses.get(datasetUrn); + if (upstreamLineageResponse == null) { + log.warn("Dataset not in set of entity responses {}", datasetUrn); + continue; + } + UpstreamLineage upstreamLineage = mapUpstreamLineage(upstreamLineageResponse); + if (upstreamLineage == null) { + log.warn("Received null upstreamLineage for urn {}", datasetUrn); + continue; + } + + _entityService.produceMetadataChangeLog( + datasetUrn, + Constants.DATASET_ENTITY_NAME, + Constants.UPSTREAM_LINEAGE_ASPECT_NAME, + upstreamLineageAspectSpec, + null, + upstreamLineage, + null, + null, + auditStamp, + ChangeType.RESTATE); + } + + return datasetResult.getNumEntities(); + } + + private int getAndRestoreInputFieldsIndices(String entityName, int start, AuditStamp auditStamp) throws Exception { + final AspectSpec inputFieldsAspectSpec = _entityRegistry.getEntitySpec(entityName) + .getAspectSpec(Constants.INPUT_FIELDS_ASPECT_NAME); + SearchResult entityResult = + _entitySearchService.search(entityName, "", null, null, start, BATCH_SIZE); + List entityUrns = entityResult.getEntities().stream().map(SearchEntity::getEntity).collect(Collectors.toList()); + if (entityUrns.size() == 0) { + return 0; + } + final Map inputFieldsResponses = + _entityService.getEntitiesV2(entityName, new HashSet<>(entityUrns), + Collections.singleton(Constants.INPUT_FIELDS_ASPECT_NAME) + ); + + // Loop over Datasets and produce changelog + for (Urn entityUrn : entityUrns) { + EntityResponse inputFieldsResponse = inputFieldsResponses.get(entityUrn); + if (inputFieldsResponse == null) { + log.warn("Entity not in set of entity responses {}", entityUrn); + continue; + } + InputFields inputFields = mapInputFields(inputFieldsResponse); + if (inputFields == null) { + log.warn("Received null inputFields for urn {}", entityUrn); + continue; + } + + _entityService.produceMetadataChangeLog( + entityUrn, + entityName, + Constants.INPUT_FIELDS_ASPECT_NAME, + inputFieldsAspectSpec, + null, + inputFields, + null, + null, + auditStamp, + ChangeType.RESTATE); + } + + return entityResult.getNumEntities(); + } + + private UpstreamLineage mapUpstreamLineage(EntityResponse entityResponse) { + EnvelopedAspectMap aspectMap = entityResponse.getAspects(); + if (!aspectMap.containsKey(UPSTREAM_LINEAGE_ASPECT_NAME)) { + return null; + } + + return new UpstreamLineage(aspectMap.get(Constants.UPSTREAM_LINEAGE_ASPECT_NAME).getValue().data()); + } + + private InputFields mapInputFields(EntityResponse entityResponse) { + EnvelopedAspectMap aspectMap = entityResponse.getAspects(); + if (!aspectMap.containsKey(Constants.INPUT_FIELDS_ASPECT_NAME)) { + return null; + } + + return new InputFields(aspectMap.get(Constants.INPUT_FIELDS_ASPECT_NAME).getValue().data()); + } +} diff --git a/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/RestoreColumnLineageIndicesTest.java b/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/RestoreColumnLineageIndicesTest.java new file mode 100644 index 00000000000000..45ddce3131ebe1 --- /dev/null +++ b/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/RestoreColumnLineageIndicesTest.java @@ -0,0 +1,289 @@ +package com.linkedin.metadata.boot.steps; + +import com.google.common.collect.ImmutableList; +import com.linkedin.common.AuditStamp; +import com.linkedin.common.InputFields; +import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.dataset.UpstreamLineage; +import com.linkedin.entity.Aspect; +import com.linkedin.entity.EntityResponse; +import com.linkedin.entity.EnvelopedAspect; +import com.linkedin.entity.EnvelopedAspectMap; +import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.Constants; +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.models.AspectSpec; +import com.linkedin.metadata.models.EntitySpec; +import com.linkedin.metadata.models.registry.EntityRegistry; +import com.linkedin.metadata.search.EntitySearchService; +import com.linkedin.metadata.search.SearchEntity; +import com.linkedin.metadata.search.SearchEntityArray; +import com.linkedin.metadata.search.SearchResult; +import com.linkedin.mxe.MetadataChangeProposal; +import org.mockito.Mockito; +import org.testng.annotations.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; + +public class RestoreColumnLineageIndicesTest { + + private static final String VERSION_1 = "1"; + private static final String VERSION_2 = "2"; + private static final String COLUMN_LINEAGE_UPGRADE_URN = + String.format("urn:li:%s:%s", Constants.DATA_HUB_UPGRADE_ENTITY_NAME, "restore-column-lineage-indices"); + private final Urn datasetUrn = UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleHdfsDataset,PROD)"); + private final Urn chartUrn = UrnUtils.getUrn("urn:li:chart:(looker,dashboard_elements.1)"); + private final Urn dashboardUrn = UrnUtils.getUrn("urn:li:dashboard:(looker,dashboards.thelook::web_analytics_overview)"); + + @Test + public void testExecuteFirstTime() throws Exception { + final EntityService mockService = Mockito.mock(EntityService.class); + final EntitySearchService mockSearchService = Mockito.mock(EntitySearchService.class); + final EntityRegistry mockRegistry = Mockito.mock(EntityRegistry.class); + + mockGetUpgradeStep(false, VERSION_1, mockService); + mockGetUpstreamLineage(datasetUrn, mockSearchService, mockService); + mockGetInputFields(chartUrn, Constants.CHART_ENTITY_NAME, mockSearchService, mockService); + mockGetInputFields(dashboardUrn, Constants.DASHBOARD_ENTITY_NAME, mockSearchService, mockService); + + AspectSpec aspectSpec = mockAspectSpecs(mockRegistry); + + RestoreColumnLineageIndices restoreIndicesStep = new RestoreColumnLineageIndices(mockService, mockSearchService, mockRegistry); + restoreIndicesStep.execute(); + + Mockito.verify(mockRegistry, Mockito.times(1)).getEntitySpec(Constants.DATASET_ENTITY_NAME); + Mockito.verify(mockRegistry, Mockito.times(1)).getEntitySpec(Constants.CHART_ENTITY_NAME); + Mockito.verify(mockRegistry, Mockito.times(1)).getEntitySpec(Constants.DASHBOARD_ENTITY_NAME); + // creates upgradeRequest and upgradeResult aspects + Mockito.verify(mockService, Mockito.times(2)).ingestProposal( + Mockito.any(MetadataChangeProposal.class), + Mockito.any(AuditStamp.class), + Mockito.eq(false) + ); + Mockito.verify(mockService, Mockito.times(1)).produceMetadataChangeLog( + Mockito.eq(datasetUrn), + Mockito.eq(Constants.DATASET_ENTITY_NAME), + Mockito.eq(Constants.UPSTREAM_LINEAGE_ASPECT_NAME), + Mockito.eq(aspectSpec), + Mockito.eq(null), + Mockito.any(), + Mockito.eq(null), + Mockito.eq(null), + Mockito.any(), + Mockito.eq(ChangeType.RESTATE) + ); + Mockito.verify(mockService, Mockito.times(1)).produceMetadataChangeLog( + Mockito.eq(chartUrn), + Mockito.eq(Constants.CHART_ENTITY_NAME), + Mockito.eq(Constants.INPUT_FIELDS_ASPECT_NAME), + Mockito.eq(aspectSpec), + Mockito.eq(null), + Mockito.any(), + Mockito.eq(null), + Mockito.eq(null), + Mockito.any(), + Mockito.eq(ChangeType.RESTATE) + ); + Mockito.verify(mockService, Mockito.times(1)).produceMetadataChangeLog( + Mockito.eq(dashboardUrn), + Mockito.eq(Constants.DASHBOARD_ENTITY_NAME), + Mockito.eq(Constants.INPUT_FIELDS_ASPECT_NAME), + Mockito.eq(aspectSpec), + Mockito.eq(null), + Mockito.any(), + Mockito.eq(null), + Mockito.eq(null), + Mockito.any(), + Mockito.eq(ChangeType.RESTATE) + ); + } + + @Test + public void testExecuteWithNewVersion() throws Exception { + final EntityService mockService = Mockito.mock(EntityService.class); + final EntitySearchService mockSearchService = Mockito.mock(EntitySearchService.class); + final EntityRegistry mockRegistry = Mockito.mock(EntityRegistry.class); + + mockGetUpgradeStep(true, VERSION_2, mockService); + mockGetUpstreamLineage(datasetUrn, mockSearchService, mockService); + mockGetInputFields(chartUrn, Constants.CHART_ENTITY_NAME, mockSearchService, mockService); + mockGetInputFields(dashboardUrn, Constants.DASHBOARD_ENTITY_NAME, mockSearchService, mockService); + + AspectSpec aspectSpec = mockAspectSpecs(mockRegistry); + + RestoreColumnLineageIndices restoreIndicesStep = new RestoreColumnLineageIndices(mockService, mockSearchService, mockRegistry); + restoreIndicesStep.execute(); + + Mockito.verify(mockRegistry, Mockito.times(1)).getEntitySpec(Constants.DATASET_ENTITY_NAME); + Mockito.verify(mockRegistry, Mockito.times(1)).getEntitySpec(Constants.CHART_ENTITY_NAME); + Mockito.verify(mockRegistry, Mockito.times(1)).getEntitySpec(Constants.DASHBOARD_ENTITY_NAME); + // creates upgradeRequest and upgradeResult aspects + Mockito.verify(mockService, Mockito.times(2)).ingestProposal( + Mockito.any(MetadataChangeProposal.class), + Mockito.any(AuditStamp.class), + Mockito.eq(false) + ); + Mockito.verify(mockService, Mockito.times(1)).produceMetadataChangeLog( + Mockito.eq(datasetUrn), + Mockito.eq(Constants.DATASET_ENTITY_NAME), + Mockito.eq(Constants.UPSTREAM_LINEAGE_ASPECT_NAME), + Mockito.eq(aspectSpec), + Mockito.eq(null), + Mockito.any(), + Mockito.eq(null), + Mockito.eq(null), + Mockito.any(), + Mockito.eq(ChangeType.RESTATE) + ); + Mockito.verify(mockService, Mockito.times(1)).produceMetadataChangeLog( + Mockito.eq(chartUrn), + Mockito.eq(Constants.CHART_ENTITY_NAME), + Mockito.eq(Constants.INPUT_FIELDS_ASPECT_NAME), + Mockito.eq(aspectSpec), + Mockito.eq(null), + Mockito.any(), + Mockito.eq(null), + Mockito.eq(null), + Mockito.any(), + Mockito.eq(ChangeType.RESTATE) + ); + Mockito.verify(mockService, Mockito.times(1)).produceMetadataChangeLog( + Mockito.eq(dashboardUrn), + Mockito.eq(Constants.DASHBOARD_ENTITY_NAME), + Mockito.eq(Constants.INPUT_FIELDS_ASPECT_NAME), + Mockito.eq(aspectSpec), + Mockito.eq(null), + Mockito.any(), + Mockito.eq(null), + Mockito.eq(null), + Mockito.any(), + Mockito.eq(ChangeType.RESTATE) + ); + } + + @Test + public void testDoesNotExecuteWithSameVersion() throws Exception { + final EntityService mockService = Mockito.mock(EntityService.class); + final EntitySearchService mockSearchService = Mockito.mock(EntitySearchService.class); + final EntityRegistry mockRegistry = Mockito.mock(EntityRegistry.class); + + mockGetUpgradeStep(true, VERSION_1, mockService); + mockGetUpstreamLineage(datasetUrn, mockSearchService, mockService); + mockGetInputFields(chartUrn, Constants.CHART_ENTITY_NAME, mockSearchService, mockService); + mockGetInputFields(dashboardUrn, Constants.DASHBOARD_ENTITY_NAME, mockSearchService, mockService); + + AspectSpec aspectSpec = mockAspectSpecs(mockRegistry); + + RestoreColumnLineageIndices restoreIndicesStep = new RestoreColumnLineageIndices(mockService, mockSearchService, mockRegistry); + restoreIndicesStep.execute(); + + Mockito.verify(mockRegistry, Mockito.times(0)).getEntitySpec(Constants.DATASET_ENTITY_NAME); + Mockito.verify(mockRegistry, Mockito.times(0)).getEntitySpec(Constants.CHART_ENTITY_NAME); + Mockito.verify(mockRegistry, Mockito.times(0)).getEntitySpec(Constants.DASHBOARD_ENTITY_NAME); + // creates upgradeRequest and upgradeResult aspects + Mockito.verify(mockService, Mockito.times(0)).ingestProposal( + Mockito.any(MetadataChangeProposal.class), + Mockito.any(AuditStamp.class), + Mockito.eq(false) + ); + Mockito.verify(mockService, Mockito.times(0)).produceMetadataChangeLog( + Mockito.eq(datasetUrn), + Mockito.eq(Constants.DATASET_ENTITY_NAME), + Mockito.eq(Constants.UPSTREAM_LINEAGE_ASPECT_NAME), + Mockito.eq(aspectSpec), + Mockito.eq(null), + Mockito.any(), + Mockito.eq(null), + Mockito.eq(null), + Mockito.any(), + Mockito.eq(ChangeType.RESTATE) + ); + Mockito.verify(mockService, Mockito.times(0)).produceMetadataChangeLog( + Mockito.eq(chartUrn), + Mockito.eq(Constants.CHART_ENTITY_NAME), + Mockito.eq(Constants.INPUT_FIELDS_ASPECT_NAME), + Mockito.eq(aspectSpec), + Mockito.eq(null), + Mockito.any(), + Mockito.eq(null), + Mockito.eq(null), + Mockito.any(), + Mockito.eq(ChangeType.RESTATE) + ); + Mockito.verify(mockService, Mockito.times(0)).produceMetadataChangeLog( + Mockito.eq(dashboardUrn), + Mockito.eq(Constants.DASHBOARD_ENTITY_NAME), + Mockito.eq(Constants.INPUT_FIELDS_ASPECT_NAME), + Mockito.eq(aspectSpec), + Mockito.eq(null), + Mockito.any(), + Mockito.eq(null), + Mockito.eq(null), + Mockito.any(), + Mockito.eq(ChangeType.RESTATE) + ); + } + + private void mockGetUpstreamLineage(Urn datasetUrn, EntitySearchService mockSearchService, EntityService mockService) throws Exception { + Map upstreamLineageAspects = new HashMap<>(); + upstreamLineageAspects.put(Constants.UPSTREAM_LINEAGE_ASPECT_NAME, new EnvelopedAspect().setValue(new Aspect(new UpstreamLineage().data()))); + Map upstreamLineageResponses = new HashMap<>(); + upstreamLineageResponses.put(datasetUrn, new EntityResponse().setUrn(datasetUrn).setAspects(new EnvelopedAspectMap(upstreamLineageAspects))); + Mockito.when(mockSearchService.search(Constants.DATASET_ENTITY_NAME, "", null, null, 0, 1000)) + .thenReturn(new SearchResult().setNumEntities(1).setEntities(new SearchEntityArray(ImmutableList.of(new SearchEntity().setEntity(datasetUrn))))); + Mockito.when(mockService.getEntitiesV2( + Constants.DATASET_ENTITY_NAME, + new HashSet<>(Collections.singleton(datasetUrn)), + Collections.singleton(Constants.UPSTREAM_LINEAGE_ASPECT_NAME))) + .thenReturn(upstreamLineageResponses); + } + + private void mockGetInputFields(Urn entityUrn, String entityName, EntitySearchService mockSearchService, EntityService mockService) throws Exception { + Map inputFieldsAspects = new HashMap<>(); + inputFieldsAspects.put(Constants.INPUT_FIELDS_ASPECT_NAME, new EnvelopedAspect().setValue(new Aspect(new InputFields().data()))); + Map inputFieldsResponses = new HashMap<>(); + inputFieldsResponses.put(entityUrn, new EntityResponse().setUrn(entityUrn).setAspects(new EnvelopedAspectMap(inputFieldsAspects))); + Mockito.when(mockSearchService.search(entityName, "", null, null, 0, 1000)) + .thenReturn(new SearchResult().setNumEntities(1).setEntities(new SearchEntityArray(ImmutableList.of(new SearchEntity().setEntity(entityUrn))))); + Mockito.when(mockService.getEntitiesV2( + entityName, + new HashSet<>(Collections.singleton(entityUrn)), + Collections.singleton(Constants.INPUT_FIELDS_ASPECT_NAME) + )) + .thenReturn(inputFieldsResponses); + } + + private AspectSpec mockAspectSpecs(EntityRegistry mockRegistry) { + EntitySpec entitySpec = Mockito.mock(EntitySpec.class); + AspectSpec aspectSpec = Mockito.mock(AspectSpec.class); + // Mock for upstreamLineage + Mockito.when(mockRegistry.getEntitySpec(Constants.DATASET_ENTITY_NAME)).thenReturn(entitySpec); + Mockito.when(entitySpec.getAspectSpec(Constants.UPSTREAM_LINEAGE_ASPECT_NAME)).thenReturn(aspectSpec); + // Mock inputFields for charts + Mockito.when(mockRegistry.getEntitySpec(Constants.CHART_ENTITY_NAME)).thenReturn(entitySpec); + Mockito.when(entitySpec.getAspectSpec(Constants.INPUT_FIELDS_ASPECT_NAME)).thenReturn(aspectSpec); + // Mock inputFields for dashboards + Mockito.when(mockRegistry.getEntitySpec(Constants.DASHBOARD_ENTITY_NAME)).thenReturn(entitySpec); + Mockito.when(entitySpec.getAspectSpec(Constants.INPUT_FIELDS_ASPECT_NAME)).thenReturn(aspectSpec); + + return aspectSpec; + } + + private void mockGetUpgradeStep(boolean shouldReturnResponse, String version, EntityService mockService) throws Exception { + + final Urn upgradeEntityUrn = UrnUtils.getUrn(COLUMN_LINEAGE_UPGRADE_URN); + com.linkedin.upgrade.DataHubUpgradeRequest upgradeRequest = new com.linkedin.upgrade.DataHubUpgradeRequest().setVersion(version); + Map upgradeRequestAspects = new HashMap<>(); + upgradeRequestAspects.put(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME, new EnvelopedAspect().setValue(new Aspect(upgradeRequest.data()))); + EntityResponse response = new EntityResponse().setAspects(new EnvelopedAspectMap(upgradeRequestAspects)); + Mockito.when(mockService.getEntityV2( + Constants.DATA_HUB_UPGRADE_ENTITY_NAME, + upgradeEntityUrn, + Collections.singleton(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME) + )).thenReturn(shouldReturnResponse ? response : null); + } +} From 21434f1156e531d917c6867214638b96dcbe6ecb Mon Sep 17 00:00:00 2001 From: Chris Collins Date: Tue, 15 Nov 2022 13:55:30 -0500 Subject: [PATCH 2/2] PR edits --- .../factories/BootstrapManagerFactory.java | 3 +- .../steps/RestoreColumnLineageIndices.java | 140 ++++++++---------- .../RestoreColumnLineageIndicesTest.java | 125 +++++++++------- 3 files changed, 134 insertions(+), 134 deletions(-) diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/factories/BootstrapManagerFactory.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/factories/BootstrapManagerFactory.java index fbfefea8348760..a102e632e2e52d 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/factories/BootstrapManagerFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/factories/BootstrapManagerFactory.java @@ -86,8 +86,7 @@ protected BootstrapManager createInstance() { final RestoreDbtSiblingsIndices restoreDbtSiblingsIndices = new RestoreDbtSiblingsIndices(_entityService, _entityRegistry); final RemoveClientIdAspectStep removeClientIdAspectStep = new RemoveClientIdAspectStep(_entityService); - final RestoreColumnLineageIndices restoreColumnLineageIndices = - new RestoreColumnLineageIndices(_entityService, _entitySearchService, _entityRegistry); + final RestoreColumnLineageIndices restoreColumnLineageIndices = new RestoreColumnLineageIndices(_entityService, _entityRegistry); final List finalSteps = new ArrayList<>(ImmutableList.of(ingestRootUserStep, ingestPoliciesStep, ingestRolesStep, ingestDataPlatformsStep, ingestDataPlatformInstancesStep, _ingestRetentionPoliciesStep, restoreGlossaryIndicesStep, diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/RestoreColumnLineageIndices.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/RestoreColumnLineageIndices.java index 5882010b9aac58..6e1522051bfab0 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/RestoreColumnLineageIndices.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/RestoreColumnLineageIndices.java @@ -3,28 +3,20 @@ import com.linkedin.common.AuditStamp; import com.linkedin.common.InputFields; import com.linkedin.common.urn.Urn; +import com.linkedin.data.template.RecordTemplate; import com.linkedin.dataset.UpstreamLineage; -import com.linkedin.entity.EntityResponse; -import com.linkedin.entity.EnvelopedAspectMap; import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.Constants; import com.linkedin.metadata.boot.UpgradeStep; import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.entity.ListResult; import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.models.registry.EntityRegistry; -import com.linkedin.metadata.search.EntitySearchService; -import com.linkedin.metadata.search.SearchEntity; -import com.linkedin.metadata.search.SearchResult; +import com.linkedin.metadata.query.ExtraInfo; import lombok.extern.slf4j.Slf4j; import javax.annotation.Nonnull; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -import static com.linkedin.metadata.Constants.UPSTREAM_LINEAGE_ASPECT_NAME; +import java.util.Objects; @Slf4j public class RestoreColumnLineageIndices extends UpgradeStep { @@ -32,14 +24,11 @@ public class RestoreColumnLineageIndices extends UpgradeStep { private static final String UPGRADE_ID = "restore-column-lineage-indices"; private static final Integer BATCH_SIZE = 1000; - private final EntitySearchService _entitySearchService; private final EntityRegistry _entityRegistry; - public RestoreColumnLineageIndices(EntityService entityService, EntitySearchService entitySearchService, - EntityRegistry entityRegistry) { + public RestoreColumnLineageIndices(@Nonnull final EntityService entityService, @Nonnull final EntityRegistry entityRegistry) { super(entityService, VERSION, UPGRADE_ID); - _entitySearchService = entitySearchService; - _entityRegistry = entityRegistry; + _entityRegistry = Objects.requireNonNull(entityRegistry, "entityRegistry must not be null"); } @Override @@ -75,36 +64,43 @@ public ExecutionMode getExecutionMode() { return ExecutionMode.ASYNC; } - private int getAndRestoreUpstreamLineageIndices(int start, AuditStamp auditStamp) - throws Exception { + private int getAndRestoreUpstreamLineageIndices(int start, AuditStamp auditStamp) { final AspectSpec upstreamLineageAspectSpec = _entityRegistry.getEntitySpec(Constants.DATASET_ENTITY_NAME) .getAspectSpec(Constants.UPSTREAM_LINEAGE_ASPECT_NAME); - SearchResult datasetResult = - _entitySearchService.search(Constants.DATASET_ENTITY_NAME, "", null, null, start, BATCH_SIZE); - List datasetUrns = datasetResult.getEntities().stream().map(SearchEntity::getEntity).collect(Collectors.toList()); - if (datasetUrns.size() == 0) { + + final ListResult latestAspects = _entityService.listLatestAspects( + Constants.DATASET_ENTITY_NAME, + Constants.UPSTREAM_LINEAGE_ASPECT_NAME, + start, + BATCH_SIZE); + + if (latestAspects.getTotalCount() == 0 || latestAspects.getValues() == null || latestAspects.getMetadata() == null) { + log.debug("Found 0 upstreamLineage aspects for datasets. Skipping migration."); return 0; } - final Map upstreamLineageResponses = - _entityService.getEntitiesV2(Constants.DATASET_ENTITY_NAME, new HashSet<>(datasetUrns), - Collections.singleton(Constants.UPSTREAM_LINEAGE_ASPECT_NAME) - ); - - // Loop over Datasets and produce changelog - for (Urn datasetUrn : datasetUrns) { - EntityResponse upstreamLineageResponse = upstreamLineageResponses.get(datasetUrn); - if (upstreamLineageResponse == null) { - log.warn("Dataset not in set of entity responses {}", datasetUrn); - continue; - } - UpstreamLineage upstreamLineage = mapUpstreamLineage(upstreamLineageResponse); + + if (latestAspects.getValues().size() != latestAspects.getMetadata().getExtraInfos().size()) { + // Bad result -- we should log that we cannot migrate this batch of upstreamLineages. + log.warn("Failed to match upstreamLineage aspects with corresponding urns. Found mismatched length between aspects ({})" + + "and metadata ({}) for metadata {}", + latestAspects.getValues().size(), + latestAspects.getMetadata().getExtraInfos().size(), + latestAspects.getMetadata()); + return latestAspects.getTotalCount(); + } + + for (int i = 0; i < latestAspects.getValues().size(); i++) { + ExtraInfo info = latestAspects.getMetadata().getExtraInfos().get(i); + RecordTemplate upstreamLineageRecord = latestAspects.getValues().get(i); + Urn urn = info.getUrn(); + UpstreamLineage upstreamLineage = (UpstreamLineage) upstreamLineageRecord; if (upstreamLineage == null) { - log.warn("Received null upstreamLineage for urn {}", datasetUrn); + log.warn("Received null upstreamLineage for urn {}", urn); continue; } _entityService.produceMetadataChangeLog( - datasetUrn, + urn, Constants.DATASET_ENTITY_NAME, Constants.UPSTREAM_LINEAGE_ASPECT_NAME, upstreamLineageAspectSpec, @@ -116,38 +112,46 @@ private int getAndRestoreUpstreamLineageIndices(int start, AuditStamp auditStamp ChangeType.RESTATE); } - return datasetResult.getNumEntities(); + return latestAspects.getTotalCount(); } private int getAndRestoreInputFieldsIndices(String entityName, int start, AuditStamp auditStamp) throws Exception { final AspectSpec inputFieldsAspectSpec = _entityRegistry.getEntitySpec(entityName) .getAspectSpec(Constants.INPUT_FIELDS_ASPECT_NAME); - SearchResult entityResult = - _entitySearchService.search(entityName, "", null, null, start, BATCH_SIZE); - List entityUrns = entityResult.getEntities().stream().map(SearchEntity::getEntity).collect(Collectors.toList()); - if (entityUrns.size() == 0) { + + final ListResult latestAspects = _entityService.listLatestAspects( + entityName, + Constants.INPUT_FIELDS_ASPECT_NAME, + start, + BATCH_SIZE); + + if (latestAspects.getTotalCount() == 0 || latestAspects.getValues() == null || latestAspects.getMetadata() == null) { + log.debug("Found 0 inputFields aspects. Skipping migration."); return 0; } - final Map inputFieldsResponses = - _entityService.getEntitiesV2(entityName, new HashSet<>(entityUrns), - Collections.singleton(Constants.INPUT_FIELDS_ASPECT_NAME) - ); - - // Loop over Datasets and produce changelog - for (Urn entityUrn : entityUrns) { - EntityResponse inputFieldsResponse = inputFieldsResponses.get(entityUrn); - if (inputFieldsResponse == null) { - log.warn("Entity not in set of entity responses {}", entityUrn); - continue; - } - InputFields inputFields = mapInputFields(inputFieldsResponse); + + if (latestAspects.getValues().size() != latestAspects.getMetadata().getExtraInfos().size()) { + // Bad result -- we should log that we cannot migrate this batch of inputFields. + log.warn("Failed to match inputFields aspects with corresponding urns. Found mismatched length between aspects ({})" + + "and metadata ({}) for metadata {}", + latestAspects.getValues().size(), + latestAspects.getMetadata().getExtraInfos().size(), + latestAspects.getMetadata()); + return latestAspects.getTotalCount(); + } + + for (int i = 0; i < latestAspects.getValues().size(); i++) { + ExtraInfo info = latestAspects.getMetadata().getExtraInfos().get(i); + RecordTemplate inputFieldsRecord = latestAspects.getValues().get(i); + Urn urn = info.getUrn(); + InputFields inputFields = (InputFields) inputFieldsRecord; if (inputFields == null) { - log.warn("Received null inputFields for urn {}", entityUrn); + log.warn("Received null inputFields for urn {}", urn); continue; } _entityService.produceMetadataChangeLog( - entityUrn, + urn, entityName, Constants.INPUT_FIELDS_ASPECT_NAME, inputFieldsAspectSpec, @@ -159,24 +163,6 @@ private int getAndRestoreInputFieldsIndices(String entityName, int start, AuditS ChangeType.RESTATE); } - return entityResult.getNumEntities(); - } - - private UpstreamLineage mapUpstreamLineage(EntityResponse entityResponse) { - EnvelopedAspectMap aspectMap = entityResponse.getAspects(); - if (!aspectMap.containsKey(UPSTREAM_LINEAGE_ASPECT_NAME)) { - return null; - } - - return new UpstreamLineage(aspectMap.get(Constants.UPSTREAM_LINEAGE_ASPECT_NAME).getValue().data()); - } - - private InputFields mapInputFields(EntityResponse entityResponse) { - EnvelopedAspectMap aspectMap = entityResponse.getAspects(); - if (!aspectMap.containsKey(Constants.INPUT_FIELDS_ASPECT_NAME)) { - return null; - } - - return new InputFields(aspectMap.get(Constants.INPUT_FIELDS_ASPECT_NAME).getValue().data()); + return latestAspects.getTotalCount(); } } diff --git a/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/RestoreColumnLineageIndicesTest.java b/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/RestoreColumnLineageIndicesTest.java index 45ddce3131ebe1..b73e749142863f 100644 --- a/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/RestoreColumnLineageIndicesTest.java +++ b/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/RestoreColumnLineageIndicesTest.java @@ -13,20 +13,21 @@ import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.Constants; import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.entity.ListResult; import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.models.EntitySpec; import com.linkedin.metadata.models.registry.EntityRegistry; -import com.linkedin.metadata.search.EntitySearchService; -import com.linkedin.metadata.search.SearchEntity; -import com.linkedin.metadata.search.SearchEntityArray; -import com.linkedin.metadata.search.SearchResult; +import com.linkedin.metadata.query.ExtraInfo; +import com.linkedin.metadata.query.ExtraInfoArray; +import com.linkedin.metadata.query.ListResultMetadata; import com.linkedin.mxe.MetadataChangeProposal; import org.mockito.Mockito; import org.testng.annotations.Test; +import javax.annotation.Nonnull; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; +import java.util.List; import java.util.Map; public class RestoreColumnLineageIndicesTest { @@ -42,17 +43,16 @@ public class RestoreColumnLineageIndicesTest { @Test public void testExecuteFirstTime() throws Exception { final EntityService mockService = Mockito.mock(EntityService.class); - final EntitySearchService mockSearchService = Mockito.mock(EntitySearchService.class); final EntityRegistry mockRegistry = Mockito.mock(EntityRegistry.class); mockGetUpgradeStep(false, VERSION_1, mockService); - mockGetUpstreamLineage(datasetUrn, mockSearchService, mockService); - mockGetInputFields(chartUrn, Constants.CHART_ENTITY_NAME, mockSearchService, mockService); - mockGetInputFields(dashboardUrn, Constants.DASHBOARD_ENTITY_NAME, mockSearchService, mockService); + mockGetUpstreamLineage(datasetUrn, mockService); + mockGetInputFields(chartUrn, Constants.CHART_ENTITY_NAME, mockService); + mockGetInputFields(dashboardUrn, Constants.DASHBOARD_ENTITY_NAME, mockService); - AspectSpec aspectSpec = mockAspectSpecs(mockRegistry); + final AspectSpec aspectSpec = mockAspectSpecs(mockRegistry); - RestoreColumnLineageIndices restoreIndicesStep = new RestoreColumnLineageIndices(mockService, mockSearchService, mockRegistry); + final RestoreColumnLineageIndices restoreIndicesStep = new RestoreColumnLineageIndices(mockService, mockRegistry); restoreIndicesStep.execute(); Mockito.verify(mockRegistry, Mockito.times(1)).getEntitySpec(Constants.DATASET_ENTITY_NAME); @@ -105,17 +105,16 @@ public void testExecuteFirstTime() throws Exception { @Test public void testExecuteWithNewVersion() throws Exception { final EntityService mockService = Mockito.mock(EntityService.class); - final EntitySearchService mockSearchService = Mockito.mock(EntitySearchService.class); final EntityRegistry mockRegistry = Mockito.mock(EntityRegistry.class); mockGetUpgradeStep(true, VERSION_2, mockService); - mockGetUpstreamLineage(datasetUrn, mockSearchService, mockService); - mockGetInputFields(chartUrn, Constants.CHART_ENTITY_NAME, mockSearchService, mockService); - mockGetInputFields(dashboardUrn, Constants.DASHBOARD_ENTITY_NAME, mockSearchService, mockService); + mockGetUpstreamLineage(datasetUrn, mockService); + mockGetInputFields(chartUrn, Constants.CHART_ENTITY_NAME, mockService); + mockGetInputFields(dashboardUrn, Constants.DASHBOARD_ENTITY_NAME, mockService); - AspectSpec aspectSpec = mockAspectSpecs(mockRegistry); + final AspectSpec aspectSpec = mockAspectSpecs(mockRegistry); - RestoreColumnLineageIndices restoreIndicesStep = new RestoreColumnLineageIndices(mockService, mockSearchService, mockRegistry); + final RestoreColumnLineageIndices restoreIndicesStep = new RestoreColumnLineageIndices(mockService, mockRegistry); restoreIndicesStep.execute(); Mockito.verify(mockRegistry, Mockito.times(1)).getEntitySpec(Constants.DATASET_ENTITY_NAME); @@ -168,17 +167,16 @@ public void testExecuteWithNewVersion() throws Exception { @Test public void testDoesNotExecuteWithSameVersion() throws Exception { final EntityService mockService = Mockito.mock(EntityService.class); - final EntitySearchService mockSearchService = Mockito.mock(EntitySearchService.class); final EntityRegistry mockRegistry = Mockito.mock(EntityRegistry.class); mockGetUpgradeStep(true, VERSION_1, mockService); - mockGetUpstreamLineage(datasetUrn, mockSearchService, mockService); - mockGetInputFields(chartUrn, Constants.CHART_ENTITY_NAME, mockSearchService, mockService); - mockGetInputFields(dashboardUrn, Constants.DASHBOARD_ENTITY_NAME, mockSearchService, mockService); + mockGetUpstreamLineage(datasetUrn, mockService); + mockGetInputFields(chartUrn, Constants.CHART_ENTITY_NAME, mockService); + mockGetInputFields(dashboardUrn, Constants.DASHBOARD_ENTITY_NAME, mockService); - AspectSpec aspectSpec = mockAspectSpecs(mockRegistry); + final AspectSpec aspectSpec = mockAspectSpecs(mockRegistry); - RestoreColumnLineageIndices restoreIndicesStep = new RestoreColumnLineageIndices(mockService, mockSearchService, mockRegistry); + final RestoreColumnLineageIndices restoreIndicesStep = new RestoreColumnLineageIndices(mockService, mockRegistry); restoreIndicesStep.execute(); Mockito.verify(mockRegistry, Mockito.times(0)).getEntitySpec(Constants.DATASET_ENTITY_NAME); @@ -228,38 +226,55 @@ public void testDoesNotExecuteWithSameVersion() throws Exception { ); } - private void mockGetUpstreamLineage(Urn datasetUrn, EntitySearchService mockSearchService, EntityService mockService) throws Exception { - Map upstreamLineageAspects = new HashMap<>(); - upstreamLineageAspects.put(Constants.UPSTREAM_LINEAGE_ASPECT_NAME, new EnvelopedAspect().setValue(new Aspect(new UpstreamLineage().data()))); - Map upstreamLineageResponses = new HashMap<>(); - upstreamLineageResponses.put(datasetUrn, new EntityResponse().setUrn(datasetUrn).setAspects(new EnvelopedAspectMap(upstreamLineageAspects))); - Mockito.when(mockSearchService.search(Constants.DATASET_ENTITY_NAME, "", null, null, 0, 1000)) - .thenReturn(new SearchResult().setNumEntities(1).setEntities(new SearchEntityArray(ImmutableList.of(new SearchEntity().setEntity(datasetUrn))))); - Mockito.when(mockService.getEntitiesV2( - Constants.DATASET_ENTITY_NAME, - new HashSet<>(Collections.singleton(datasetUrn)), - Collections.singleton(Constants.UPSTREAM_LINEAGE_ASPECT_NAME))) - .thenReturn(upstreamLineageResponses); + private void mockGetUpstreamLineage(@Nonnull Urn datasetUrn, @Nonnull EntityService mockService) { + final List extraInfos = ImmutableList.of( + new ExtraInfo() + .setUrn(datasetUrn) + .setVersion(0L) + .setAudit(new AuditStamp().setActor(UrnUtils.getUrn("urn:li:corpuser:test")).setTime(0L)) + ); + + Mockito.when(mockService.listLatestAspects( + Mockito.eq(Constants.DATASET_ENTITY_NAME), + Mockito.eq(Constants.UPSTREAM_LINEAGE_ASPECT_NAME), + Mockito.eq(0), + Mockito.eq(1000) + )).thenReturn(new ListResult<>( + ImmutableList.of(new UpstreamLineage()), + new ListResultMetadata().setExtraInfos(new ExtraInfoArray(extraInfos)), + 1, + false, + 1, + 1, + 1)); } - private void mockGetInputFields(Urn entityUrn, String entityName, EntitySearchService mockSearchService, EntityService mockService) throws Exception { - Map inputFieldsAspects = new HashMap<>(); - inputFieldsAspects.put(Constants.INPUT_FIELDS_ASPECT_NAME, new EnvelopedAspect().setValue(new Aspect(new InputFields().data()))); - Map inputFieldsResponses = new HashMap<>(); - inputFieldsResponses.put(entityUrn, new EntityResponse().setUrn(entityUrn).setAspects(new EnvelopedAspectMap(inputFieldsAspects))); - Mockito.when(mockSearchService.search(entityName, "", null, null, 0, 1000)) - .thenReturn(new SearchResult().setNumEntities(1).setEntities(new SearchEntityArray(ImmutableList.of(new SearchEntity().setEntity(entityUrn))))); - Mockito.when(mockService.getEntitiesV2( - entityName, - new HashSet<>(Collections.singleton(entityUrn)), - Collections.singleton(Constants.INPUT_FIELDS_ASPECT_NAME) - )) - .thenReturn(inputFieldsResponses); + private void mockGetInputFields(@Nonnull Urn entityUrn, @Nonnull String entityName, @Nonnull EntityService mockService) { + final List extraInfos = ImmutableList.of( + new ExtraInfo() + .setUrn(entityUrn) + .setVersion(0L) + .setAudit(new AuditStamp().setActor(UrnUtils.getUrn("urn:li:corpuser:test")).setTime(0L)) + ); + + Mockito.when(mockService.listLatestAspects( + Mockito.eq(entityName), + Mockito.eq(Constants.INPUT_FIELDS_ASPECT_NAME), + Mockito.eq(0), + Mockito.eq(1000) + )).thenReturn(new ListResult<>( + ImmutableList.of(new InputFields()), + new ListResultMetadata().setExtraInfos(new ExtraInfoArray(extraInfos)), + 1, + false, + 1, + 1, + 1)); } - private AspectSpec mockAspectSpecs(EntityRegistry mockRegistry) { - EntitySpec entitySpec = Mockito.mock(EntitySpec.class); - AspectSpec aspectSpec = Mockito.mock(AspectSpec.class); + private AspectSpec mockAspectSpecs(@Nonnull EntityRegistry mockRegistry) { + final EntitySpec entitySpec = Mockito.mock(EntitySpec.class); + final AspectSpec aspectSpec = Mockito.mock(AspectSpec.class); // Mock for upstreamLineage Mockito.when(mockRegistry.getEntitySpec(Constants.DATASET_ENTITY_NAME)).thenReturn(entitySpec); Mockito.when(entitySpec.getAspectSpec(Constants.UPSTREAM_LINEAGE_ASPECT_NAME)).thenReturn(aspectSpec); @@ -273,13 +288,13 @@ private AspectSpec mockAspectSpecs(EntityRegistry mockRegistry) { return aspectSpec; } - private void mockGetUpgradeStep(boolean shouldReturnResponse, String version, EntityService mockService) throws Exception { + private void mockGetUpgradeStep(boolean shouldReturnResponse, @Nonnull String version, @Nonnull EntityService mockService) throws Exception { final Urn upgradeEntityUrn = UrnUtils.getUrn(COLUMN_LINEAGE_UPGRADE_URN); - com.linkedin.upgrade.DataHubUpgradeRequest upgradeRequest = new com.linkedin.upgrade.DataHubUpgradeRequest().setVersion(version); - Map upgradeRequestAspects = new HashMap<>(); + final com.linkedin.upgrade.DataHubUpgradeRequest upgradeRequest = new com.linkedin.upgrade.DataHubUpgradeRequest().setVersion(version); + final Map upgradeRequestAspects = new HashMap<>(); upgradeRequestAspects.put(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME, new EnvelopedAspect().setValue(new Aspect(upgradeRequest.data()))); - EntityResponse response = new EntityResponse().setAspects(new EnvelopedAspectMap(upgradeRequestAspects)); + final EntityResponse response = new EntityResponse().setAspects(new EnvelopedAspectMap(upgradeRequestAspects)); Mockito.when(mockService.getEntityV2( Constants.DATA_HUB_UPGRADE_ENTITY_NAME, upgradeEntityUrn,