Skip to content

Commit

Permalink
fix(struct-prop): fix unintended struct prop ES mutation (datahub-pro…
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker authored Oct 30, 2024
1 parent 55e3d1d commit 88b77b5
Show file tree
Hide file tree
Showing 16 changed files with 187 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.linkedin.datahub.upgrade.config.restoreindices;

import com.linkedin.datahub.upgrade.config.SystemUpdateCondition;
import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade;
import com.linkedin.datahub.upgrade.system.restoreindices.structuredproperties.PropertyDefinitions;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import io.datahubproject.metadata.context.OperationContext;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;

@Configuration
@Conditional(SystemUpdateCondition.NonBlockingSystemUpdateCondition.class)
public class PropertyDefinitionsConfig {

@Bean
public NonBlockingSystemUpgrade propertyDefinitions(
final OperationContext opContext,
final EntityService<?> entityService,
final AspectDao aspectDao,
@Value("${systemUpdate.propertyDefinitions.enabled}") final boolean enabled,
@Value("${systemUpdate.propertyDefinitions.batchSize}") final Integer batchSize,
@Value("${systemUpdate.propertyDefinitions.delayMs}") final Integer delayMs,
@Value("${systemUpdate.propertyDefinitions.limit}") final Integer limit) {
return new PropertyDefinitions(
opContext, entityService, aspectDao, enabled, batchSize, delayMs, limit);
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package com.linkedin.datahub.upgrade.config;
package com.linkedin.datahub.upgrade.config.restoreindices;

import com.linkedin.datahub.upgrade.config.SystemUpdateCondition;
import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade;
import com.linkedin.datahub.upgrade.system.domaindescription.ReindexDomainDescription;
import com.linkedin.datahub.upgrade.system.restoreindices.domaindescription.ReindexDomainDescription;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import io.datahubproject.metadata.context.OperationContext;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.linkedin.datahub.upgrade.config.graph;
package com.linkedin.datahub.upgrade.config.restoreindices.graph;

import com.linkedin.datahub.upgrade.config.SystemUpdateCondition;
import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade;
import com.linkedin.datahub.upgrade.system.graph.vianodes.ReindexDataJobViaNodesCLL;
import com.linkedin.datahub.upgrade.system.restoreindices.graph.vianodes.ReindexDataJobViaNodesCLL;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import io.datahubproject.metadata.context.OperationContext;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.linkedin.datahub.upgrade.config.graph;
package com.linkedin.datahub.upgrade.config.restoreindices.graph;

import com.linkedin.datahub.upgrade.config.SystemUpdateCondition;
import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade;
import com.linkedin.datahub.upgrade.system.graph.edgestatus.ReindexEdgeStatus;
import com.linkedin.datahub.upgrade.system.restoreindices.graph.edgestatus.ReindexEdgeStatus;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import io.datahubproject.metadata.context.OperationContext;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.datahub.upgrade.system.domaindescription;
package com.linkedin.datahub.upgrade.system.restoreindices.domaindescription;

import com.google.common.collect.ImmutableList;
import com.linkedin.datahub.upgrade.UpgradeStep;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.datahub.upgrade.system.domaindescription;
package com.linkedin.datahub.upgrade.system.restoreindices.domaindescription;

import static com.linkedin.metadata.Constants.*;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.datahub.upgrade.system.graph.edgestatus;
package com.linkedin.datahub.upgrade.system.restoreindices.graph.edgestatus;

import com.google.common.collect.ImmutableList;
import com.linkedin.datahub.upgrade.UpgradeStep;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.datahub.upgrade.system.graph.edgestatus;
package com.linkedin.datahub.upgrade.system.restoreindices.graph.edgestatus;

import static com.linkedin.metadata.Constants.STATUS_ASPECT_NAME;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.datahub.upgrade.system.graph.vianodes;
package com.linkedin.datahub.upgrade.system.restoreindices.graph.vianodes;

import com.google.common.collect.ImmutableList;
import com.linkedin.datahub.upgrade.UpgradeStep;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.datahub.upgrade.system.graph.vianodes;
package com.linkedin.datahub.upgrade.system.restoreindices.graph.vianodes;

import static com.linkedin.metadata.Constants.*;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.linkedin.datahub.upgrade.system.restoreindices.structuredproperties;

import com.google.common.collect.ImmutableList;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import io.datahubproject.metadata.context.OperationContext;
import java.util.List;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;

/**
* A job that reindexes all domain aspects as part of reindexing descriptions This is required to
* fix the analytics for domains
*/
@Slf4j
public class PropertyDefinitions implements NonBlockingSystemUpgrade {

private final List<UpgradeStep> _steps;

public PropertyDefinitions(
@Nonnull OperationContext opContext,
EntityService<?> entityService,
AspectDao aspectDao,
boolean enabled,
Integer batchSize,
Integer batchDelayMs,
Integer limit) {
if (enabled) {
_steps =
ImmutableList.of(
new PropertyDefinitionsStep(
opContext, entityService, aspectDao, batchSize, batchDelayMs, limit));
} else {
_steps = ImmutableList.of();
}
}

@Override
public String id() {
return this.getClass().getName();
}

@Override
public List<UpgradeStep> steps() {
return _steps;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.linkedin.datahub.upgrade.system.restoreindices.structuredproperties;

import static com.linkedin.metadata.Constants.*;

import com.linkedin.datahub.upgrade.system.AbstractMCLStep;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import io.datahubproject.metadata.context.OperationContext;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.Nullable;

@Slf4j
public class PropertyDefinitionsStep extends AbstractMCLStep {

public PropertyDefinitionsStep(
OperationContext opContext,
EntityService<?> entityService,
AspectDao aspectDao,
Integer batchSize,
Integer batchDelayMs,
Integer limit) {
super(opContext, entityService, aspectDao, batchSize, batchDelayMs, limit);
}

@Override
public String id() {
return "structured-property-definitions-v1";
}

@Nonnull
@Override
protected String getAspectName() {
return STRUCTURED_PROPERTY_DEFINITION_ASPECT_NAME;
}

@Nullable
@Override
protected String getUrnLike() {
return "urn:li:" + STRUCTURED_PROPERTY_ENTITY_NAME + ":%";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeManager;
import com.linkedin.datahub.upgrade.system.SystemUpdateNonBlocking;
import com.linkedin.datahub.upgrade.system.bootstrapmcps.BootstrapMCPStep;
import com.linkedin.datahub.upgrade.system.graph.vianodes.ReindexDataJobViaNodesCLL;
import com.linkedin.datahub.upgrade.system.restoreindices.graph.vianodes.ReindexDataJobViaNodesCLL;
import com.linkedin.metadata.boot.kafka.MockSystemUpdateDeserializer;
import com.linkedin.metadata.boot.kafka.MockSystemUpdateSerializer;
import com.linkedin.metadata.config.kafka.KafkaConfiguration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,12 @@ private void handleUpdateChangeEvent(
updateSystemMetadata(event.getSystemMetadata(), urn, aspectSpec, aspect);
}

// Step 1. Handle StructuredProperties Index Mapping changes
updateIndexMappings(urn, entitySpec, aspectSpec, aspect, previousAspect);
try {
// Step 1. Handle StructuredProperties Index Mapping changes
updateIndexMappings(urn, entitySpec, aspectSpec, aspect, previousAspect);
} catch (Exception e) {
log.error("Issue with updating index mappings for structured property change", e);
}

// Step 2. For all aspects, attempt to update Search
updateSearchService(opContext, event);
Expand All @@ -192,7 +196,8 @@ public void updateIndexMappings(
EntitySpec entitySpec,
AspectSpec aspectSpec,
RecordTemplate newValue,
RecordTemplate oldValue) {
RecordTemplate oldValue)
throws CloneNotSupportedException {
if (structuredPropertiesHookEnabled
&& STRUCTURED_PROPERTY_ENTITY_NAME.equals(entitySpec.getName())
&& STRUCTURED_PROPERTY_DEFINITION_ASPECT_NAME.equals(aspectSpec.getName())) {
Expand All @@ -205,7 +210,7 @@ public void updateIndexMappings(
.orElse(new UrnArray());

StructuredPropertyDefinition newDefinition =
new StructuredPropertyDefinition(newValue.data());
new StructuredPropertyDefinition(newValue.data().copy());
newDefinition.getEntityTypes().removeAll(oldEntityTypes);

if (newDefinition.getEntityTypes().size() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;

import com.linkedin.common.AuditStamp;
import com.linkedin.common.InputField;
Expand Down Expand Up @@ -56,6 +57,7 @@
import com.linkedin.schema.NumberType;
import com.linkedin.schema.SchemaField;
import com.linkedin.schema.SchemaFieldDataType;
import com.linkedin.structured.StructuredPropertyDefinition;
import io.datahubproject.metadata.context.OperationContext;
import io.datahubproject.test.metadata.context.TestOperationContexts;
import java.net.URISyntaxException;
Expand Down Expand Up @@ -101,6 +103,7 @@ public class UpdateIndicesHookTest {
private Urn actorUrn;
private UpdateIndicesService updateIndicesService;
private UpdateIndicesHook reprocessUIHook;
private OperationContext opContext;

@Value("${elasticsearch.index.maxArrayLength}")
private int maxArrayLength;
Expand Down Expand Up @@ -133,13 +136,12 @@ public void setupTest() {
mockEntityIndexBuilders,
"MD5");

OperationContext systemOperationContext =
TestOperationContexts.systemContextNoSearchAuthorization();
opContext = TestOperationContexts.systemContextNoSearchAuthorization();

updateIndicesHook = new UpdateIndicesHook(updateIndicesService, true, false);
updateIndicesHook.init(systemOperationContext);
updateIndicesHook.init(opContext);
reprocessUIHook = new UpdateIndicesHook(updateIndicesService, true, true);
reprocessUIHook.init(systemOperationContext);
reprocessUIHook.init(opContext);
}

@Test
Expand Down Expand Up @@ -478,6 +480,38 @@ public void testMCLUIPreProcessedReprocess() throws Exception {
.upsertDocument(any(OperationContext.class), Mockito.any(), Mockito.any(), Mockito.any());
}

@Test
public void testUpdateIndexMappings() throws CloneNotSupportedException {
// ensure no mutation
EntitySpec entitySpec =
opContext.getEntityRegistry().getEntitySpec(STRUCTURED_PROPERTY_ENTITY_NAME);
AspectSpec aspectSpec = entitySpec.getAspectSpec(STRUCTURED_PROPERTY_DEFINITION_ASPECT_NAME);

StructuredPropertyDefinition oldValueOrigin =
new StructuredPropertyDefinition()
.setEntityTypes(
new UrnArray(
UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hive,foo1,PROD)"),
UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hive,foo2,PROD)")));
StructuredPropertyDefinition newValueOrigin =
new StructuredPropertyDefinition()
.setEntityTypes(
new UrnArray(
UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hive,foo2,PROD)"),
UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hive,foo3,PROD)")));

StructuredPropertyDefinition oldValue =
new StructuredPropertyDefinition(oldValueOrigin.data().copy());
StructuredPropertyDefinition newValue =
new StructuredPropertyDefinition(newValueOrigin.data().copy());

updateIndicesService.updateIndexMappings(
UrnUtils.getUrn(TEST_DATASET_URN), entitySpec, aspectSpec, newValue, oldValue);

assertEquals(oldValue, oldValueOrigin, "Ensure no mutation to input objects");
assertEquals(newValue, newValueOrigin, "Ensure no mutation to input objects");
}

private EntityRegistry createMockEntityRegistry() {
// need to mock this registry instead of using test-entity-registry.yml because inputFields does
// not work due to a known bug
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,11 @@ systemUpdate:
batchSize: ${BOOTSTRAP_SYSTEM_UPDATE_EDGE_STATUS_BATCH_SIZE:1000}
delayMs: ${BOOTSTRAP_SYSTEM_UPDATE_EDGE_STATUS_DELAY_MS:5000}
limit: ${BOOTSTRAP_SYSTEM_UPDATE_EDGE_STATUS_LIMIT:0}
propertyDefinitions:
enabled: ${BOOTSTRAP_SYSTEM_UPDATE_PROPERTY_DEFINITIONS_ENABLED:true}
batchSize: ${BOOTSTRAP_SYSTEM_UPDATE_PROPERTY_DEFINITIONS_BATCH_SIZE:500}
delayMs: ${BOOTSTRAP_SYSTEM_UPDATE_PROPERTY_DEFINITIONS_DELAY_MS:1000}
limit: ${BOOTSTRAP_SYSTEM_UPDATE_PROPERTY_DEFINITIONS_CLL_LIMIT:0}

structuredProperties:
enabled: ${ENABLE_STRUCTURED_PROPERTIES_HOOK:true} # applies structured properties mappings
Expand Down

0 comments on commit 88b77b5

Please sign in to comment.