Skip to content

Commit

Permalink
PR edits
Browse files Browse the repository at this point in the history
  • Loading branch information
Chris Collins authored and Chris Collins committed Nov 15, 2022
1 parent b783864 commit 61b6dee
Show file tree
Hide file tree
Showing 3 changed files with 225 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.graph.Edge;
import com.linkedin.metadata.graph.GraphService;
import com.linkedin.metadata.key.SchemaFieldKey;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.models.RelationshipFieldSpec;
Expand Down Expand Up @@ -60,6 +61,7 @@

import static com.linkedin.metadata.search.utils.QueryUtils.*;

// TODO: Backfill tests for this class in UpdateIndicesHookTest.java
@Slf4j
@Component
@Import({GraphServiceFactory.class, EntitySearchServiceFactory.class, TimeseriesAspectServiceFactory.class,
Expand Down Expand Up @@ -183,26 +185,25 @@ private void updateFineGrainedEdgesAndRelationships(
}
}

private Urn generateSchemaFieldUrn(@Nonnull String resourceUrn, @Nonnull String fieldPath) {
private Urn generateSchemaFieldUrn(@Nonnull final String resourceUrn, @Nonnull final String fieldPath) {
// we rely on schemaField fieldPaths to be encoded since we do that with fineGrainedLineage on the ingestion side
String encodedFieldPath = fieldPath.replaceAll("\\(", "%28").replaceAll("\\)", "%29").replaceAll(",", "%2C");
String urnString = String.format("urn:li:schemaField:(%s,%s)", resourceUrn, encodedFieldPath);
return UrnUtils.getUrn(urnString);
final String encodedFieldPath = fieldPath.replaceAll("\\(", "%28").replaceAll("\\)", "%29").replaceAll(",", "%2C");
final SchemaFieldKey key = new SchemaFieldKey().setParent(UrnUtils.getUrn(resourceUrn)).setFieldPath(encodedFieldPath);
return EntityKeyUtils.convertEntityKeyToUrn(key, Constants.SCHEMA_FIELD_ENTITY_NAME);
}

private void updateInputFieldEdgesAndRelationships(
Urn urn,
RecordTemplate aspect,
List<Edge> edgesToAdd,
HashMap<Urn, Set<String>> urnToRelationshipTypesBeingAdded
@Nonnull final Urn urn,
@Nonnull final InputFields inputFields,
@Nonnull final List<Edge> edgesToAdd,
@Nonnull final HashMap<Urn, Set<String>> urnToRelationshipTypesBeingAdded
) {
InputFields inputFields = new InputFields(aspect.data());
if (inputFields.hasFields()) {
for (InputField field : inputFields.getFields()) {
for (final InputField field : inputFields.getFields()) {
if (field.hasSchemaFieldUrn() && field.hasSchemaField() && field.getSchemaField().hasFieldPath()) {
Urn sourceFieldUrn = generateSchemaFieldUrn(urn.toString(), field.getSchemaField().getFieldPath());
final Urn sourceFieldUrn = generateSchemaFieldUrn(urn.toString(), field.getSchemaField().getFieldPath());
edgesToAdd.add(new Edge(sourceFieldUrn, field.getSchemaFieldUrn(), DOWNSTREAM_OF));
Set<String> relationshipTypes = urnToRelationshipTypesBeingAdded.getOrDefault(sourceFieldUrn, new HashSet<>());
final Set<String> relationshipTypes = urnToRelationshipTypesBeingAdded.getOrDefault(sourceFieldUrn, new HashSet<>());
relationshipTypes.add(DOWNSTREAM_OF);
urnToRelationshipTypesBeingAdded.put(sourceFieldUrn, relationshipTypes);
}
Expand All @@ -220,7 +221,8 @@ private Pair<List<Edge>, HashMap<Urn, Set<String>>> getEdgesAndRelationshipTypes
updateFineGrainedEdgesAndRelationships(aspect, edgesToAdd, urnToRelationshipTypesBeingAdded);
}
if (aspectSpec.getName().equals(Constants.INPUT_FIELDS_ASPECT_NAME)) {
updateInputFieldEdgesAndRelationships(urn, aspect, edgesToAdd, urnToRelationshipTypesBeingAdded);
final InputFields inputFields = new InputFields(aspect.data());
updateInputFieldEdgesAndRelationships(urn, inputFields, edgesToAdd, urnToRelationshipTypesBeingAdded);
}

Map<RelationshipFieldSpec, List<Object>> extractedFields =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
package com.linkedin.metadata.kafka.hook;

import com.linkedin.common.AuditStamp;
import com.linkedin.common.InputField;
import com.linkedin.common.InputFieldArray;
import com.linkedin.common.InputFields;
import com.linkedin.common.UrnArray;
import com.linkedin.common.urn.DatasetUrn;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.schema.RecordDataSchema;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.dataset.DatasetLineageType;
import com.linkedin.dataset.FineGrainedLineage;
import com.linkedin.dataset.FineGrainedLineageArray;
import com.linkedin.dataset.Upstream;
import com.linkedin.dataset.UpstreamArray;
import com.linkedin.dataset.UpstreamLineage;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.graph.Edge;
import com.linkedin.metadata.graph.GraphService;
import com.linkedin.metadata.key.ChartKey;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.models.registry.ConfigEntityRegistry;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray;
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.query.filter.RelationshipDirection;
import com.linkedin.metadata.search.EntitySearchService;
import com.linkedin.metadata.search.transformer.SearchDocumentTransformer;
import com.linkedin.metadata.systemmetadata.SystemMetadataService;
import com.linkedin.metadata.timeseries.TimeseriesAspectService;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.mxe.MetadataChangeLog;
import com.linkedin.schema.SchemaField;
import org.mockito.Mockito;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.Collections;

import static com.linkedin.metadata.Constants.DATASET_ENTITY_NAME;
import static com.linkedin.metadata.search.utils.QueryUtils.newRelationshipFilter;

public class UpdateIndicesHookTest {
// going to want a test where we have an upstreamLineage aspect with finegrained, check that we call _graphService.addEdge for each edge
// as well as _graphService.removeEdgesFromNode for each field and their relationships

private static final long EVENT_TIME = 123L;
private static final String TEST_DATASET_URN = "urn:li:dataset:(urn:li:dataPlatform:hive,SampleCypressHiveDataset,PROD)";
private static final String TEST_CHART_URN = "urn:li:chart:(looker,dashboard_elements.1)";
private static final String TEST_ACTOR_URN = "urn:li:corpuser:test";
private static final String DOWNSTREAM_OF = "DownstreamOf";
private UpdateIndicesHook _updateIndicesHook;
private GraphService _mockGraphService;
private EntitySearchService _mockEntitySearchService;
private TimeseriesAspectService _mockTimeseriesAspectService;
private SystemMetadataService _mockSystemMetadataService;
private SearchDocumentTransformer _mockSearchDocumentTransformer;
private Urn _actorUrn;

@BeforeMethod
public void setupTest() {
_actorUrn = UrnUtils.getUrn(TEST_ACTOR_URN);
EntityRegistry registry = new ConfigEntityRegistry(
UpdateIndicesHookTest.class.getClassLoader().getResourceAsStream("test-entity-registry.yml"));
_mockGraphService = Mockito.mock(GraphService.class);
_mockEntitySearchService = Mockito.mock(EntitySearchService.class);
_mockTimeseriesAspectService = Mockito.mock(TimeseriesAspectService.class);
_mockSystemMetadataService = Mockito.mock(SystemMetadataService.class);
_mockSearchDocumentTransformer = Mockito.mock(SearchDocumentTransformer.class);
_updateIndicesHook = new UpdateIndicesHook(
_mockGraphService,
_mockEntitySearchService,
_mockTimeseriesAspectService,
_mockSystemMetadataService,
registry,
_mockSearchDocumentTransformer
);
}

@Test
public void testFineGrainedLineageEdgesAreAdded() throws Exception {
Urn upstreamUrn = UrnUtils.getUrn("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleCypressHdfsDataset,PROD),foo_info)");
Urn downstreamUrn = UrnUtils.getUrn("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,SampleCypressHiveDataset,PROD),field_foo)");
MetadataChangeLog event = createUpstreamLineageMCL(upstreamUrn, downstreamUrn);
_updateIndicesHook.invoke(event);

Edge edge = new Edge(downstreamUrn, upstreamUrn, DOWNSTREAM_OF);
Mockito.verify(_mockGraphService, Mockito.times(1)).addEdge(Mockito.eq(edge));
Mockito.verify(_mockGraphService, Mockito.times(1)).removeEdgesFromNode(
Mockito.eq(downstreamUrn),
Mockito.eq(new ArrayList<>(Collections.singleton(DOWNSTREAM_OF))),
Mockito.eq(newRelationshipFilter(new Filter().setOr(new ConjunctiveCriterionArray()), RelationshipDirection.OUTGOING))
);
}

@Test
public void testInputFieldsEdgesAreAdded() throws Exception {
Urn upstreamUrn = UrnUtils.getUrn("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,thelook.explore.order_items,PROD),users.count)");
String downstreamFieldPath = "users.count";
MetadataChangeLog event = createInputFieldsMCL(upstreamUrn, downstreamFieldPath);
EntityRegistry mockEntityRegistry = createMockEntityRegistry();
_updateIndicesHook = new UpdateIndicesHook(
_mockGraphService,
_mockEntitySearchService,
_mockTimeseriesAspectService,
_mockSystemMetadataService,
mockEntityRegistry,
_mockSearchDocumentTransformer
);

_updateIndicesHook.invoke(event);

Urn downstreamUrn = UrnUtils.getUrn(String.format("urn:li:schemaField:(%s,%s)", TEST_CHART_URN, downstreamFieldPath));

Edge edge = new Edge(downstreamUrn, upstreamUrn, DOWNSTREAM_OF);
Mockito.verify(_mockGraphService, Mockito.times(1)).addEdge(Mockito.eq(edge));
Mockito.verify(_mockGraphService, Mockito.times(1)).removeEdgesFromNode(
Mockito.eq(downstreamUrn),
Mockito.eq(new ArrayList<>(Collections.singleton(DOWNSTREAM_OF))),
Mockito.eq(newRelationshipFilter(new Filter().setOr(new ConjunctiveCriterionArray()), RelationshipDirection.OUTGOING))
);
}

private EntityRegistry createMockEntityRegistry() {
// need to mock this registry instead of using test-entity-registry.yml because inputFields does not work due to a known bug
EntityRegistry mockEntityRegistry = Mockito.mock(EntityRegistry.class);
EntitySpec entitySpec = Mockito.mock(EntitySpec.class);
AspectSpec aspectSpec = createMockAspectSpec(InputFields.class, InputFields.dataSchema());
Mockito.when(mockEntityRegistry.getEntitySpec(Constants.CHART_ENTITY_NAME)).thenReturn(entitySpec);
Mockito.when(entitySpec.getAspectSpec(Constants.INPUT_FIELDS_ASPECT_NAME)).thenReturn(aspectSpec);
Mockito.when(aspectSpec.isTimeseries()).thenReturn(false);
Mockito.when(aspectSpec.getName()).thenReturn(Constants.INPUT_FIELDS_ASPECT_NAME);
AspectSpec chartKeyAspectSpec = createMockAspectSpec(ChartKey.class, ChartKey.dataSchema());
Mockito.when(entitySpec.getKeyAspectSpec()).thenReturn(chartKeyAspectSpec);
return mockEntityRegistry;
}

private <T extends RecordTemplate> AspectSpec createMockAspectSpec(Class<T> clazz, RecordDataSchema schema) {
AspectSpec mockSpec = Mockito.mock(AspectSpec.class);
Mockito.when(mockSpec.getDataTemplateClass()).thenReturn((Class<RecordTemplate>) clazz);
Mockito.when(mockSpec.getPegasusSchema()).thenReturn(schema);
return mockSpec;
}

private MetadataChangeLog createUpstreamLineageMCL(Urn upstreamUrn, Urn downstreamUrn) throws Exception {
MetadataChangeLog event = new MetadataChangeLog();
event.setEntityType(Constants.DATASET_ENTITY_NAME);
event.setAspectName(Constants.UPSTREAM_LINEAGE_ASPECT_NAME);
event.setChangeType(ChangeType.UPSERT);

UpstreamLineage upstreamLineage = new UpstreamLineage();
FineGrainedLineageArray fineGrainedLineages = new FineGrainedLineageArray();
FineGrainedLineage fineGrainedLineage = new FineGrainedLineage();
UrnArray upstreamUrns = new UrnArray();
upstreamUrns.add(upstreamUrn);
fineGrainedLineage.setUpstreams(upstreamUrns);
UrnArray downstreamUrns = new UrnArray();
downstreamUrns.add(downstreamUrn);
fineGrainedLineage.setDownstreams(downstreamUrns);
fineGrainedLineages.add(fineGrainedLineage);
upstreamLineage.setFineGrainedLineages(fineGrainedLineages);
final UpstreamArray upstreamArray = new UpstreamArray();
final Upstream upstream = new Upstream();
upstream.setType(DatasetLineageType.TRANSFORMED);
upstream.setDataset(DatasetUrn.createFromString("urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleCypressHdfsDataset,PROD)"));
upstreamArray.add(upstream);
upstreamLineage.setUpstreams(upstreamArray);

event.setAspect(GenericRecordUtils.serializeAspect(upstreamLineage));
event.setEntityUrn(Urn.createFromString(TEST_DATASET_URN));
event.setEntityType(DATASET_ENTITY_NAME);
event.setCreated(new AuditStamp().setActor(_actorUrn).setTime(EVENT_TIME));
return event;
}

private MetadataChangeLog createInputFieldsMCL(Urn upstreamUrn, String downstreamFieldPath) throws Exception {
MetadataChangeLog event = new MetadataChangeLog();
event.setEntityType(Constants.CHART_ENTITY_NAME);
event.setAspectName(Constants.INPUT_FIELDS_ASPECT_NAME);
event.setChangeType(ChangeType.UPSERT);
InputFields inputFields = new InputFields();
InputFieldArray inputFieldsArray = new InputFieldArray();
InputField inputField = new InputField();
inputField.setSchemaFieldUrn(upstreamUrn);
SchemaField schemaField = new SchemaField();
schemaField.setFieldPath(downstreamFieldPath);
inputField.setSchemaField(schemaField);
inputFieldsArray.add(inputField);
inputFields.setFields(inputFieldsArray);

event.setAspect(GenericRecordUtils.serializeAspect(inputFields));
event.setEntityUrn(Urn.createFromString(TEST_CHART_URN));
event.setEntityType(Constants.CHART_ENTITY_NAME);
event.setCreated(new AuditStamp().setActor(_actorUrn).setTime(EVENT_TIME));
return event;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,13 @@ entities:
- dataHubExecutionRequestInput
- dataHubExecutionRequestSignal
- dataHubExecutionRequestResult
- name: dataset
keyAspect: datasetKey
aspects:
- upstreamLineage
- name: chart
keyAspect: chartKey
aspects:
- domains
events:
- name: entityChangeEvent

0 comments on commit 61b6dee

Please sign in to comment.