-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(lineage) Implement CLL impact analysis for inputFields #6426
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,8 +2,11 @@ | |
|
||
import com.fasterxml.jackson.core.JsonProcessingException; | ||
import com.fasterxml.jackson.databind.JsonNode; | ||
import com.linkedin.common.InputField; | ||
import com.linkedin.common.InputFields; | ||
import com.linkedin.common.Status; | ||
import com.linkedin.common.urn.Urn; | ||
import com.linkedin.common.urn.UrnUtils; | ||
import com.linkedin.data.template.RecordTemplate; | ||
import com.linkedin.dataset.FineGrainedLineage; | ||
import com.linkedin.dataset.UpstreamLineage; | ||
|
@@ -17,6 +20,7 @@ | |
import com.linkedin.metadata.Constants; | ||
import com.linkedin.metadata.graph.Edge; | ||
import com.linkedin.metadata.graph.GraphService; | ||
import com.linkedin.metadata.key.SchemaFieldKey; | ||
import com.linkedin.metadata.models.AspectSpec; | ||
import com.linkedin.metadata.models.EntitySpec; | ||
import com.linkedin.metadata.models.RelationshipFieldSpec; | ||
|
@@ -57,6 +61,7 @@ | |
|
||
import static com.linkedin.metadata.search.utils.QueryUtils.*; | ||
|
||
// TODO: Backfill tests for this class in UpdateIndicesHookTest.java | ||
@Slf4j | ||
@Component | ||
@Import({GraphServiceFactory.class, EntitySearchServiceFactory.class, TimeseriesAspectServiceFactory.class, | ||
|
@@ -180,15 +185,45 @@ private void updateFineGrainedEdgesAndRelationships( | |
} | ||
} | ||
|
||
private Urn generateSchemaFieldUrn(@Nonnull final String resourceUrn, @Nonnull final String fieldPath) { | ||
// we rely on schemaField fieldPaths to be encoded since we do that with fineGrainedLineage on the ingestion side | ||
final String encodedFieldPath = fieldPath.replaceAll("\\(", "%28").replaceAll("\\)", "%29").replaceAll(",", "%2C"); | ||
final SchemaFieldKey key = new SchemaFieldKey().setParent(UrnUtils.getUrn(resourceUrn)).setFieldPath(encodedFieldPath); | ||
return EntityKeyUtils.convertEntityKeyToUrn(key, Constants.SCHEMA_FIELD_ENTITY_NAME); | ||
} | ||
|
||
private void updateInputFieldEdgesAndRelationships( | ||
@Nonnull final Urn urn, | ||
@Nonnull final InputFields inputFields, | ||
@Nonnull final List<Edge> edgesToAdd, | ||
@Nonnull final HashMap<Urn, Set<String>> urnToRelationshipTypesBeingAdded | ||
) { | ||
if (inputFields.hasFields()) { | ||
for (final InputField field : inputFields.getFields()) { | ||
if (field.hasSchemaFieldUrn() && field.hasSchemaField() && field.getSchemaField().hasFieldPath()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doesn't this already have a schemaFieldUrn in this case? Why cannot we use this URN? Is it not encoded? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is part of the confusing modeling thing here - There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. and the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. got it ty |
||
final Urn sourceFieldUrn = generateSchemaFieldUrn(urn.toString(), field.getSchemaField().getFieldPath()); | ||
edgesToAdd.add(new Edge(sourceFieldUrn, field.getSchemaFieldUrn(), DOWNSTREAM_OF)); | ||
final Set<String> relationshipTypes = urnToRelationshipTypesBeingAdded.getOrDefault(sourceFieldUrn, new HashSet<>()); | ||
relationshipTypes.add(DOWNSTREAM_OF); | ||
urnToRelationshipTypesBeingAdded.put(sourceFieldUrn, relationshipTypes); | ||
} | ||
} | ||
} | ||
} | ||
|
||
private Pair<List<Edge>, HashMap<Urn, Set<String>>> getEdgesAndRelationshipTypesFromAspect(Urn urn, AspectSpec aspectSpec, RecordTemplate aspect) { | ||
final List<Edge> edgesToAdd = new ArrayList<>(); | ||
final HashMap<Urn, Set<String>> urnToRelationshipTypesBeingAdded = new HashMap<>(); | ||
|
||
// we need to manually set schemaField <-> schemaField edges for fineGrainedLineage and inputFields | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor: Ideally this domain-specific schema field logic does not reside inside a much more generic UpdateIndicesHook. There should be some abstraction for encapsulating such special case logic and a way to register this logic with the index updater. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (For a future refactor) The idea of UpdateIndicesHook is to be completely agnostic of domain-specific logic that exists There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah this all makes sense to me and is good to call out |
||
// since @Relationship only links between the parent entity urn and something else. | ||
if (aspectSpec.getName().equals(Constants.UPSTREAM_LINEAGE_ASPECT_NAME)) { | ||
// we need to manually set schemaField <-> schemaField edges for fineGrainedLineage since | ||
// @Relationship only links between the parent entity urn and something else. | ||
updateFineGrainedEdgesAndRelationships(aspect, edgesToAdd, urnToRelationshipTypesBeingAdded); | ||
} | ||
if (aspectSpec.getName().equals(Constants.INPUT_FIELDS_ASPECT_NAME)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Qq - Do we have unit tests for this class? If not, we absolutely need to backfill since it's so important There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we do not have anything for this class... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add a TODO to do this on this file? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah I'm about to push up the beginning of a test file for this, but will add a TODO at the top of the class to backfill the rest of the functionality! |
||
final InputFields inputFields = new InputFields(aspect.data()); | ||
updateInputFieldEdgesAndRelationships(urn, inputFields, edgesToAdd, urnToRelationshipTypesBeingAdded); | ||
} | ||
|
||
Map<RelationshipFieldSpec, List<Object>> extractedFields = | ||
FieldExtractor.extractFields(aspect, aspectSpec.getRelationshipFieldSpecs()); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,202 @@ | ||
package com.linkedin.metadata.kafka.hook; | ||
|
||
import com.linkedin.common.AuditStamp; | ||
import com.linkedin.common.InputField; | ||
import com.linkedin.common.InputFieldArray; | ||
import com.linkedin.common.InputFields; | ||
import com.linkedin.common.UrnArray; | ||
import com.linkedin.common.urn.DatasetUrn; | ||
import com.linkedin.common.urn.Urn; | ||
import com.linkedin.common.urn.UrnUtils; | ||
import com.linkedin.data.schema.RecordDataSchema; | ||
import com.linkedin.data.template.RecordTemplate; | ||
import com.linkedin.dataset.DatasetLineageType; | ||
import com.linkedin.dataset.FineGrainedLineage; | ||
import com.linkedin.dataset.FineGrainedLineageArray; | ||
import com.linkedin.dataset.Upstream; | ||
import com.linkedin.dataset.UpstreamArray; | ||
import com.linkedin.dataset.UpstreamLineage; | ||
import com.linkedin.events.metadata.ChangeType; | ||
import com.linkedin.metadata.Constants; | ||
import com.linkedin.metadata.graph.Edge; | ||
import com.linkedin.metadata.graph.GraphService; | ||
import com.linkedin.metadata.key.ChartKey; | ||
import com.linkedin.metadata.models.AspectSpec; | ||
import com.linkedin.metadata.models.EntitySpec; | ||
import com.linkedin.metadata.models.registry.ConfigEntityRegistry; | ||
import com.linkedin.metadata.models.registry.EntityRegistry; | ||
import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray; | ||
import com.linkedin.metadata.query.filter.Filter; | ||
import com.linkedin.metadata.query.filter.RelationshipDirection; | ||
import com.linkedin.metadata.search.EntitySearchService; | ||
import com.linkedin.metadata.search.transformer.SearchDocumentTransformer; | ||
import com.linkedin.metadata.systemmetadata.SystemMetadataService; | ||
import com.linkedin.metadata.timeseries.TimeseriesAspectService; | ||
import com.linkedin.metadata.utils.GenericRecordUtils; | ||
import com.linkedin.mxe.MetadataChangeLog; | ||
import com.linkedin.schema.SchemaField; | ||
import org.mockito.Mockito; | ||
import org.testng.annotations.BeforeMethod; | ||
import org.testng.annotations.Test; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
|
||
import static com.linkedin.metadata.Constants.DATASET_ENTITY_NAME; | ||
import static com.linkedin.metadata.search.utils.QueryUtils.newRelationshipFilter; | ||
|
||
public class UpdateIndicesHookTest { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for writing these! |
||
// going to want a test where we have an upstreamLineage aspect with finegrained, check that we call _graphService.addEdge for each edge | ||
// as well as _graphService.removeEdgesFromNode for each field and their relationships | ||
|
||
private static final long EVENT_TIME = 123L; | ||
private static final String TEST_DATASET_URN = "urn:li:dataset:(urn:li:dataPlatform:hive,SampleCypressHiveDataset,PROD)"; | ||
private static final String TEST_CHART_URN = "urn:li:chart:(looker,dashboard_elements.1)"; | ||
private static final String TEST_ACTOR_URN = "urn:li:corpuser:test"; | ||
private static final String DOWNSTREAM_OF = "DownstreamOf"; | ||
private UpdateIndicesHook _updateIndicesHook; | ||
private GraphService _mockGraphService; | ||
private EntitySearchService _mockEntitySearchService; | ||
private TimeseriesAspectService _mockTimeseriesAspectService; | ||
private SystemMetadataService _mockSystemMetadataService; | ||
private SearchDocumentTransformer _mockSearchDocumentTransformer; | ||
private Urn _actorUrn; | ||
|
||
@BeforeMethod | ||
public void setupTest() { | ||
_actorUrn = UrnUtils.getUrn(TEST_ACTOR_URN); | ||
EntityRegistry registry = new ConfigEntityRegistry( | ||
UpdateIndicesHookTest.class.getClassLoader().getResourceAsStream("test-entity-registry.yml")); | ||
_mockGraphService = Mockito.mock(GraphService.class); | ||
_mockEntitySearchService = Mockito.mock(EntitySearchService.class); | ||
_mockTimeseriesAspectService = Mockito.mock(TimeseriesAspectService.class); | ||
_mockSystemMetadataService = Mockito.mock(SystemMetadataService.class); | ||
_mockSearchDocumentTransformer = Mockito.mock(SearchDocumentTransformer.class); | ||
_updateIndicesHook = new UpdateIndicesHook( | ||
_mockGraphService, | ||
_mockEntitySearchService, | ||
_mockTimeseriesAspectService, | ||
_mockSystemMetadataService, | ||
registry, | ||
_mockSearchDocumentTransformer | ||
); | ||
} | ||
|
||
@Test | ||
public void testFineGrainedLineageEdgesAreAdded() throws Exception { | ||
Urn upstreamUrn = UrnUtils.getUrn("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleCypressHdfsDataset,PROD),foo_info)"); | ||
Urn downstreamUrn = UrnUtils.getUrn("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,SampleCypressHiveDataset,PROD),field_foo)"); | ||
MetadataChangeLog event = createUpstreamLineageMCL(upstreamUrn, downstreamUrn); | ||
_updateIndicesHook.invoke(event); | ||
|
||
Edge edge = new Edge(downstreamUrn, upstreamUrn, DOWNSTREAM_OF); | ||
Mockito.verify(_mockGraphService, Mockito.times(1)).addEdge(Mockito.eq(edge)); | ||
Mockito.verify(_mockGraphService, Mockito.times(1)).removeEdgesFromNode( | ||
Mockito.eq(downstreamUrn), | ||
Mockito.eq(new ArrayList<>(Collections.singleton(DOWNSTREAM_OF))), | ||
Mockito.eq(newRelationshipFilter(new Filter().setOr(new ConjunctiveCriterionArray()), RelationshipDirection.OUTGOING)) | ||
); | ||
} | ||
|
||
@Test | ||
public void testInputFieldsEdgesAreAdded() throws Exception { | ||
Urn upstreamUrn = UrnUtils.getUrn("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,thelook.explore.order_items,PROD),users.count)"); | ||
String downstreamFieldPath = "users.count"; | ||
MetadataChangeLog event = createInputFieldsMCL(upstreamUrn, downstreamFieldPath); | ||
EntityRegistry mockEntityRegistry = createMockEntityRegistry(); | ||
_updateIndicesHook = new UpdateIndicesHook( | ||
_mockGraphService, | ||
_mockEntitySearchService, | ||
_mockTimeseriesAspectService, | ||
_mockSystemMetadataService, | ||
mockEntityRegistry, | ||
_mockSearchDocumentTransformer | ||
); | ||
|
||
_updateIndicesHook.invoke(event); | ||
|
||
Urn downstreamUrn = UrnUtils.getUrn(String.format("urn:li:schemaField:(%s,%s)", TEST_CHART_URN, downstreamFieldPath)); | ||
|
||
Edge edge = new Edge(downstreamUrn, upstreamUrn, DOWNSTREAM_OF); | ||
Mockito.verify(_mockGraphService, Mockito.times(1)).addEdge(Mockito.eq(edge)); | ||
Mockito.verify(_mockGraphService, Mockito.times(1)).removeEdgesFromNode( | ||
Mockito.eq(downstreamUrn), | ||
Mockito.eq(new ArrayList<>(Collections.singleton(DOWNSTREAM_OF))), | ||
Mockito.eq(newRelationshipFilter(new Filter().setOr(new ConjunctiveCriterionArray()), RelationshipDirection.OUTGOING)) | ||
); | ||
} | ||
|
||
private EntityRegistry createMockEntityRegistry() { | ||
// need to mock this registry instead of using test-entity-registry.yml because inputFields does not work due to a known bug | ||
EntityRegistry mockEntityRegistry = Mockito.mock(EntityRegistry.class); | ||
EntitySpec entitySpec = Mockito.mock(EntitySpec.class); | ||
AspectSpec aspectSpec = createMockAspectSpec(InputFields.class, InputFields.dataSchema()); | ||
Mockito.when(mockEntityRegistry.getEntitySpec(Constants.CHART_ENTITY_NAME)).thenReturn(entitySpec); | ||
Mockito.when(entitySpec.getAspectSpec(Constants.INPUT_FIELDS_ASPECT_NAME)).thenReturn(aspectSpec); | ||
Mockito.when(aspectSpec.isTimeseries()).thenReturn(false); | ||
Mockito.when(aspectSpec.getName()).thenReturn(Constants.INPUT_FIELDS_ASPECT_NAME); | ||
AspectSpec chartKeyAspectSpec = createMockAspectSpec(ChartKey.class, ChartKey.dataSchema()); | ||
Mockito.when(entitySpec.getKeyAspectSpec()).thenReturn(chartKeyAspectSpec); | ||
return mockEntityRegistry; | ||
} | ||
|
||
private <T extends RecordTemplate> AspectSpec createMockAspectSpec(Class<T> clazz, RecordDataSchema schema) { | ||
AspectSpec mockSpec = Mockito.mock(AspectSpec.class); | ||
Mockito.when(mockSpec.getDataTemplateClass()).thenReturn((Class<RecordTemplate>) clazz); | ||
Mockito.when(mockSpec.getPegasusSchema()).thenReturn(schema); | ||
return mockSpec; | ||
} | ||
|
||
private MetadataChangeLog createUpstreamLineageMCL(Urn upstreamUrn, Urn downstreamUrn) throws Exception { | ||
MetadataChangeLog event = new MetadataChangeLog(); | ||
event.setEntityType(Constants.DATASET_ENTITY_NAME); | ||
event.setAspectName(Constants.UPSTREAM_LINEAGE_ASPECT_NAME); | ||
event.setChangeType(ChangeType.UPSERT); | ||
|
||
UpstreamLineage upstreamLineage = new UpstreamLineage(); | ||
FineGrainedLineageArray fineGrainedLineages = new FineGrainedLineageArray(); | ||
FineGrainedLineage fineGrainedLineage = new FineGrainedLineage(); | ||
UrnArray upstreamUrns = new UrnArray(); | ||
upstreamUrns.add(upstreamUrn); | ||
fineGrainedLineage.setUpstreams(upstreamUrns); | ||
UrnArray downstreamUrns = new UrnArray(); | ||
downstreamUrns.add(downstreamUrn); | ||
fineGrainedLineage.setDownstreams(downstreamUrns); | ||
fineGrainedLineages.add(fineGrainedLineage); | ||
upstreamLineage.setFineGrainedLineages(fineGrainedLineages); | ||
final UpstreamArray upstreamArray = new UpstreamArray(); | ||
final Upstream upstream = new Upstream(); | ||
upstream.setType(DatasetLineageType.TRANSFORMED); | ||
upstream.setDataset(DatasetUrn.createFromString("urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleCypressHdfsDataset,PROD)")); | ||
upstreamArray.add(upstream); | ||
upstreamLineage.setUpstreams(upstreamArray); | ||
|
||
event.setAspect(GenericRecordUtils.serializeAspect(upstreamLineage)); | ||
event.setEntityUrn(Urn.createFromString(TEST_DATASET_URN)); | ||
event.setEntityType(DATASET_ENTITY_NAME); | ||
event.setCreated(new AuditStamp().setActor(_actorUrn).setTime(EVENT_TIME)); | ||
return event; | ||
} | ||
|
||
private MetadataChangeLog createInputFieldsMCL(Urn upstreamUrn, String downstreamFieldPath) throws Exception { | ||
MetadataChangeLog event = new MetadataChangeLog(); | ||
event.setEntityType(Constants.CHART_ENTITY_NAME); | ||
event.setAspectName(Constants.INPUT_FIELDS_ASPECT_NAME); | ||
event.setChangeType(ChangeType.UPSERT); | ||
InputFields inputFields = new InputFields(); | ||
InputFieldArray inputFieldsArray = new InputFieldArray(); | ||
InputField inputField = new InputField(); | ||
inputField.setSchemaFieldUrn(upstreamUrn); | ||
SchemaField schemaField = new SchemaField(); | ||
schemaField.setFieldPath(downstreamFieldPath); | ||
inputField.setSchemaField(schemaField); | ||
inputFieldsArray.add(inputField); | ||
inputFields.setFields(inputFieldsArray); | ||
|
||
event.setAspect(GenericRecordUtils.serializeAspect(inputFields)); | ||
event.setEntityUrn(Urn.createFromString(TEST_CHART_URN)); | ||
event.setEntityType(Constants.CHART_ENTITY_NAME); | ||
event.setCreated(new AuditStamp().setActor(_actorUrn).setTime(EVENT_TIME)); | ||
return event; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice - thanks for the explanation