Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use DataFlow relationships instead of LineageMappings for capturing lineage #7318

Merged
merged 22 commits into from
Feb 1, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
37f77b7
DE OMAS - replace LineageMappings with DataFlows
popa-raluca Dec 21, 2022
9797015
DE OMAS - replace LineageMappings with DataFlows
popa-raluca Dec 21, 2022
41ce218
AL OMAS - replace LineageMappings with DataFlows
popa-raluca Dec 21, 2022
9fadb99
OLS - replace LineageMappings with DataFlows
popa-raluca Jan 4, 2023
67565cb
Merge branch 'main' of https://github.com/odpi/egeria into data-flows
popa-raluca Jan 10, 2023
d86503e
DE OMAS - update Postman collections with DataFlows
popa-raluca Jan 11, 2023
d0d17bb
DE OMAS - update Postman collections with DataFlows
popa-raluca Jan 12, 2023
5ee42e3
DE OMAS - update sample events with DataFlows
popa-raluca Jan 12, 2023
6d54774
OLS - fix path separator
popa-raluca Jan 12, 2023
d9e6d26
DE OMAS - update documentation model with DataFlows
popa-raluca Jan 12, 2023
d60abd2
Merge branch 'main' into data-flows
popa-raluca Jan 13, 2023
56e389c
DE OMAS - fix javadoc
popa-raluca Jan 13, 2023
c6323c6
DE OMAS - fix fvt
popa-raluca Jan 13, 2023
8b2a258
DE OMAS - fix fvt
popa-raluca Jan 17, 2023
e6fd929
Merge branch 'main' into data-flows
popa-raluca Jan 20, 2023
1080db6
DE OMAS - fix sonar warn
popa-raluca Jan 20, 2023
adfaaa0
Merge branch 'main' into data-flows
Jan 25, 2023
3d82325
Merge branch 'main' into data-flows
Jan 26, 2023
a2bac58
Fix conflict
Jan 26, 2023
de47c96
Merge branch 'main' into data-flows
popa-raluca Jan 31, 2023
47df15c
DE OMAS - fix sonar warn
popa-raluca Feb 1, 2023
6ac1df8
Merge branch 'main' into data-flows
popa-raluca Feb 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ The Glossary Term Context Event contains the description of the term plus the fu
Glossary Category or Schema Elements that are involved in lineage relationship with the processed term.
This is send only when the Semantic Assigment or Term Categorization relationships are created.

The event sent for Process entities includes information about the lineage process, the lineage mappings and context of the Schema Elements.
The event sent for Process entities includes information about the lineage process, the data flows and context of the Schema Elements.
The full context of the Process is first time sent when the status of the entity is changed from DRAFT to ACTIVE, for the rest of the lineage entity types
the Lineage Event sent contains only the entity that has been changed.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public enum AssetLineageEventType implements Serializable {
CATEGORY_ANCHORS_EVENT(12, "GlossaryCategoriesEvent", "Has the categorizations for an anchor"),
COLUMN_CONTEXT_EVENT(13, "ColumnContextEvent", "Has the context for a column"),
ASSET_CONTEXT_EVENT(14, "AssetContextEvent", "Has the asset context for a relational table or a data file"),
LINEAGE_MAPPINGS_EVENT(15, "LineageMappingsEvent", "Has the lineage mappings for a column"),
DATA_FLOWS_EVENT(15, "DataFlowsEvent", "Has the data flows for a column"),
LINEAGE_SYNC_EVENT(99, "LineageSyncEvent","AssetLineage internal processing information shared with external software components like governance servers."),
UNKNOWN_ASSET_LINEAGE_EVENT(100, "UnknownAssetLineageEvent", "An AssetLineage OMAS event that is not recognized by the local handlers.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.ATTRIBUTE_FOR_SCHEMA;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.COLLECTION_MEMBERSHIP;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.LINEAGE_MAPPING;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.DATA_FLOW;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.PORT_ALIAS;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.PORT_DELEGATION;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.PORT_IMPLEMENTATION;
Expand Down Expand Up @@ -102,15 +102,15 @@ public Multimap<String, RelationshipsContext> buildProcessContext(String userId,
.map(GraphContext::getToVertex).collect(Collectors.toSet());

for (LineageEntity tabularColumn : tabularColumns) {
addLineageContextForColumn(userId, context, tabularColumn.getGuid(), tabularColumn.getTypeDefName());
addDataFlowsContextForColumn(userId, context, tabularColumn.getGuid(), tabularColumn.getTypeDefName());
}
}
return context;
}

/**
* Adds lineage context for the tabular column. It adds the lineage mappings for the column and the column context for all the technical assets
* that have lineage mappings to it.
* Adds data flows context for the tabular column. It adds the data flows for the column and the column context for all the technical assets
* that have data flows to it.
*
* @param userId userId of user making request.
* @param context the context to be updated
Expand All @@ -119,16 +119,16 @@ public Multimap<String, RelationshipsContext> buildProcessContext(String userId,
*
* @throws OCFCheckedExceptionBase checked exception for reporting errors found when using OCF connectors
*/
private void addLineageContextForColumn(String userId, Multimap<String, RelationshipsContext> context, String columnGUID,
String typeDefName) throws OCFCheckedExceptionBase {
List<Relationship> lineageMappings = handlerHelper.getRelationshipsByType(userId, columnGUID, LINEAGE_MAPPING, typeDefName);
private void addDataFlowsContextForColumn(String userId, Multimap<String, RelationshipsContext> context, String columnGUID,
String typeDefName) throws OCFCheckedExceptionBase {
List<Relationship> dataFlows = handlerHelper.getRelationshipsByType(userId, columnGUID, DATA_FLOW, typeDefName);

context.put(AssetLineageEventType.LINEAGE_MAPPINGS_EVENT.getEventTypeName(),
handlerHelper.buildContextForRelationships(userId, columnGUID, lineageMappings));
context.put(AssetLineageEventType.DATA_FLOWS_EVENT.getEventTypeName(),
handlerHelper.buildContextForRelationships(userId, columnGUID, dataFlows));

for (Relationship lineageMapping : lineageMappings) {
for (Relationship dataFlow : dataFlows) {
context.putAll(Multimaps.forMap(assetContextHandler.buildSchemaElementContext(userId, handlerHelper.getEntityAtTheEnd(userId,
columnGUID, lineageMapping))));
columnGUID, dataFlow))));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.LINEAGE_MAPPING;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.DATA_FLOW;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.PROCESS;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.PROCESS_HIERARCHY;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.SEMANTIC_ASSIGNMENT;
Expand Down Expand Up @@ -314,8 +314,8 @@ private void processNewRelationshipEvent(Relationship relationship) throws OCFCh
publisher.publishLineageRelationshipEvent(converter.createLineageRelationship(relationship),
AssetLineageEventType.NEW_RELATIONSHIP_EVENT);
break;
case LINEAGE_MAPPING:
publisher.publishLineageMappingRelationshipEvent(converter.createLineageRelationship(relationship),
case DATA_FLOW:
publisher.publishDataFlowRelationshipEvent(converter.createLineageRelationship(relationship),
AssetLineageEventType.NEW_RELATIONSHIP_EVENT);
break;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,25 +363,25 @@ public void publishLineageEntityEvent(LineageEntity lineageEntity, AssetLineageE
* @throws ConnectorCheckedException unable to send the event due to connectivity issue
* @throws JsonProcessingException exception parsing the event json
*/
public void publishLineageMappingRelationshipEvent(LineageRelationship lineageRelationship, AssetLineageEventType eventType) throws
public void publishDataFlowRelationshipEvent(LineageRelationship lineageRelationship, AssetLineageEventType eventType) throws
OCFCheckedExceptionBase,
JsonProcessingException {
publishLineageRelationshipEvent(lineageRelationship, eventType);

publishLineageMappingContext(lineageRelationship.getSourceEntity());
publishLineageMappingContext(lineageRelationship.getTargetEntity());
publishDataFlowContext(lineageRelationship.getSourceEntity());
publishDataFlowContext(lineageRelationship.getTargetEntity());
}

/**
* Publishes the context for an entity involved in a lineage mapping. If the entity is of type column, it will publish the column context.
* Publishes the context for an entity involved in a data flow. If the entity is of type column, it will publish the column context.
* If the entity is of type asset, it will publish the asset context.
*
* @param lineageEntity the lineage entity
*
* @throws ConnectorCheckedException unable to send the event due to connectivity issue
* @throws JsonProcessingException exception parsing the event json
*/
private void publishLineageMappingContext(LineageEntity lineageEntity) throws JsonProcessingException, OCFCheckedExceptionBase {
private void publishDataFlowContext(LineageEntity lineageEntity) throws JsonProcessingException, OCFCheckedExceptionBase {
publishLineageRelationshipsEvents(Multimaps.forMap(assetContextHandler.buildColumnContext(serverUserName, lineageEntity)));
publishLineageRelationshipsEvents(Multimaps.forMap(assetContextHandler.buildAssetContext(serverUserName, lineageEntity)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,6 @@
/* Copyright Contributors to the ODPi Egeria project. */
package org.odpi.openmetadata.accessservices.assetlineage.util;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
* Constants for Open Metadata Types names used to build lineage functionality
*/
Expand Down Expand Up @@ -57,7 +51,7 @@ public final class AssetLineageConstants {
public static final String PORT_DELEGATION = "PortDelegation";
public static final String PROCESS_PORT = "ProcessPort";
public static final String COLLECTION_MEMBERSHIP = "CollectionMembership";
public static final String LINEAGE_MAPPING = "LineageMapping";
public static final String DATA_FLOW = "DataFlow";
public static final String PORT_SCHEMA = "PortSchema";
public static final String NESTED_FILE = "NestedFile";
public static final String FOLDER_HIERARCHY = "FolderHierarchy";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.FILE_FOLDER;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.FOLDER_HIERARCHY;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.GLOSSARY_TERM;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.LINEAGE_MAPPING;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.DATA_FLOW;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.NESTED_FILE;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.NESTED_SCHEMA_ATTRIBUTE;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.PORT;
Expand Down Expand Up @@ -77,7 +77,7 @@ public AssetLineageTypesValidator(OMRSRepositoryHelper repositoryHelper, Map<Str
final Set<String> defaultTopicRelationships =
Set.of(ATTRIBUTE_FOR_SCHEMA, SCHEMA_TYPE_OPTION, ASSET_SCHEMA_TYPE);
final Set<String> defaultProcessRelationships =
Set.of(ATTRIBUTE_FOR_SCHEMA, ASSET_SCHEMA_TYPE, PORT_SCHEMA, PORT_DELEGATION, PROCESS_PORT, PROCESS_HIERARCHY, LINEAGE_MAPPING);
Set.of(ATTRIBUTE_FOR_SCHEMA, ASSET_SCHEMA_TYPE, PORT_SCHEMA, PORT_DELEGATION, PROCESS_PORT, PROCESS_HIERARCHY, DATA_FLOW);
final Set<String> defaultGlossaryTermRelationships =
Set.of(SEMANTIC_ASSIGNMENT, TERM_CATEGORIZATION);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import static org.mockito.Mockito.when;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.ATTRIBUTE_FOR_SCHEMA;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.COLLECTION_MEMBERSHIP;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.LINEAGE_MAPPING;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.DATA_FLOW;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.PORT_ALIAS;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.PORT_DELEGATION;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.PORT_IMPLEMENTATION;
Expand Down Expand Up @@ -106,13 +106,13 @@ void buildProcessContext_withPortImplementation() throws OCFCheckedExceptionBase
EntityDetail tabularSchemaType = mock(EntityDetail.class);
when(handlerHelper.addContextForRelationships(USER, portEntity, PORT_SCHEMA, portContext)).thenReturn(tabularSchemaType);

List<Relationship> lineageMappings = mockLineageMappings(portContext.stream().findFirst().get());
EntityDetail entityDetail = mockEntityAtTheEnd(lineageMappings.get(0));
List<Relationship> dataFlows = mockDataFlows(portContext.stream().findFirst().get());
EntityDetail entityDetail = mockEntityAtTheEnd(dataFlows.get(0));

processContextHandler.buildProcessContext(USER, process);
verify(handlerHelper, times(1)).addContextForRelationships(USER, collectionEntity, COLLECTION_MEMBERSHIP, collectionContext);
verify(handlerHelper, times(1)).addContextForRelationships(USER, tabularSchemaType, ATTRIBUTE_FOR_SCHEMA, portContext);
verify(handlerHelper, times(1)).buildContextForRelationships(USER, GUID, lineageMappings);
verify(handlerHelper, times(1)).buildContextForRelationships(USER, GUID, dataFlows);
verify(assetContextHandler, times(1)).buildSchemaElementContext(USER, entityDetail);
}

Expand All @@ -128,13 +128,13 @@ private Set<GraphContext> mockGraphContext(List<Relationship> collection, String
return context;
}

private List<Relationship> mockLineageMappings(GraphContext graphContext) throws OCFCheckedExceptionBase {
private List<Relationship> mockDataFlows(GraphContext graphContext) throws OCFCheckedExceptionBase {
LineageEntity lineageEntity = mock(LineageEntity.class);
when(lineageEntity.getGuid()).thenReturn(GUID);
when(lineageEntity.getTypeDefName()).thenReturn(RELATIONAL_COLUMN);
when(graphContext.getToVertex()).thenReturn(lineageEntity);

return mockGetRelationships(LINEAGE_MAPPING, RELATIONAL_COLUMN);
return mockGetRelationships(DATA_FLOW, RELATIONAL_COLUMN);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.ASSET_SCHEMA_TYPE;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.CLASSIFICATION_NAME_ASSET_ZONE_MEMBERSHIP;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.DATABASE;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.LINEAGE_MAPPING;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.DATA_FLOW;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.PROCESS;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.PROCESS_HIERARCHY;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.RELATIONAL_TABLE;
Expand Down Expand Up @@ -334,25 +334,25 @@ void processInstanceEvent_newRelationship_NotSupportedRelationshipType() throws
AssetLineageEventType.NEW_RELATIONSHIP_EVENT);
verify(assetLineagePublisher, times(0)).publishGlossaryContext(GUID);
verify(assetLineagePublisher, times(0))
.publishLineageMappingRelationshipEvent(lineageRelationship, AssetLineageEventType.NEW_RELATIONSHIP_EVENT);
.publishDataFlowRelationshipEvent(lineageRelationship, AssetLineageEventType.NEW_RELATIONSHIP_EVENT);
}

@Test
void processInstanceEvent_newRelationship_LineageMapping() throws OCFCheckedExceptionBase, JsonProcessingException {
Relationship relationship = mockRelationship(LINEAGE_MAPPING);
void processInstanceEvent_newRelationship_DataFlow() throws OCFCheckedExceptionBase, JsonProcessingException {
Relationship relationship = mockRelationship(DATA_FLOW);
OMRSInstanceEvent instanceEvent = mockInstanceEvent(relationship, OMRSInstanceEventType.NEW_RELATIONSHIP_EVENT);
LineageRelationship lineageRelationship = mockLineageRelationship(relationship);
when(assetLineageTypesValidator.isValidLineageRelationshipType(relationship)).thenReturn(true);

assetLineageOMRSTopicListener.processInstanceEvent(instanceEvent);

verify(assetLineagePublisher, times(1))
.publishLineageMappingRelationshipEvent(lineageRelationship, AssetLineageEventType.NEW_RELATIONSHIP_EVENT);
.publishDataFlowRelationshipEvent(lineageRelationship, AssetLineageEventType.NEW_RELATIONSHIP_EVENT);
}

@Test
void processInstanceEvent_updatedRelationship() throws OCFCheckedExceptionBase, JsonProcessingException {
Relationship relationship = mockRelationship(LINEAGE_MAPPING);
Relationship relationship = mockRelationship(DATA_FLOW);
OMRSInstanceEvent instanceEvent = mockInstanceEvent(relationship, OMRSInstanceEventType.UPDATED_RELATIONSHIP_EVENT);
LineageRelationship lineageRelationship = mockLineageRelationship(relationship);
when(assetLineageTypesValidator.isValidLineageRelationshipType(relationship)).thenReturn(true);
Expand All @@ -365,7 +365,7 @@ void processInstanceEvent_updatedRelationship() throws OCFCheckedExceptionBase,

@Test
void processInstanceEvent_deletedRelationship() throws OCFCheckedExceptionBase, JsonProcessingException {
Relationship relationship = mockRelationship(LINEAGE_MAPPING);
Relationship relationship = mockRelationship(DATA_FLOW);
OMRSInstanceEvent instanceEvent = mockInstanceEvent(relationship, OMRSInstanceEventType.DELETED_RELATIONSHIP_EVENT);
LineageRelationship lineageRelationship = mockLineageRelationship(relationship);
when(assetLineageTypesValidator.isValidLineageRelationshipType(relationship)).thenReturn(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import static org.mockito.Mockito.when;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.ASSET;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.CLASSIFICATION_NAME_INCOMPLETE;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.LINEAGE_MAPPING;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.DATA_FLOW;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.REFERENCEABLE;

@ExtendWith(MockitoExtension.class)
Expand Down Expand Up @@ -61,7 +61,7 @@ void hasValidClassificationTypes_false() {
void isValidLineageRelationshipType() {
Relationship relationship = mock(Relationship.class);
InstanceType instanceType = mock(InstanceType.class);
when(instanceType.getTypeDefName()).thenReturn(LINEAGE_MAPPING);
when(instanceType.getTypeDefName()).thenReturn(DATA_FLOW);
when(relationship.getType()).thenReturn(instanceType);
EntityProxy entityOneProxy = mock(EntityProxy.class);
when(entityOneProxy.getType()).thenReturn(instanceType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
property = "class")
@JsonSubTypes({
@JsonSubTypes.Type(value = DataEngineRegistrationEvent.class, name = "DataEngineRegistrationEvent"),
@JsonSubTypes.Type(value = LineageMappingsEvent.class, name = "LineageMappingsEvent"),
@JsonSubTypes.Type(value = DataFlowsEvent.class, name = "DataFlowsEvent"),
@JsonSubTypes.Type(value = PortAliasEvent.class, name = "PortAliasEvent"),
@JsonSubTypes.Type(value = PortImplementationEvent.class, name = "PortImplementationEvent"),
@JsonSubTypes.Type(value = ProcessEvent.class, name = "ProcessEvent"),
Expand Down
Loading