Skip to content

Commit

Permalink
Merge pull request #7092 from popa-raluca/ols
Browse files Browse the repository at this point in the history
OLS - add ability to query for element hierarchy
  • Loading branch information
popa-raluca authored Nov 22, 2022
2 parents 65607b3 + 0ba6e5a commit 992952d
Show file tree
Hide file tree
Showing 14 changed files with 673 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
import org.odpi.openmetadata.governanceservers.openlineage.model.LineageVerticesAndEdges;
import org.odpi.openmetadata.governanceservers.openlineage.model.NodeNamesSearchCriteria;
import org.odpi.openmetadata.governanceservers.openlineage.model.Scope;
import org.odpi.openmetadata.governanceservers.openlineage.requests.ElementHierarchyRequest;
import org.odpi.openmetadata.governanceservers.openlineage.requests.HierarchyType;
import org.odpi.openmetadata.governanceservers.openlineage.requests.LineageSearchRequest;
import org.odpi.openmetadata.governanceservers.openlineage.requests.Node;
import org.odpi.openmetadata.governanceservers.openlineage.responses.LineageNodeNamesResponse;
import org.odpi.openmetadata.governanceservers.openlineage.responses.LineageResponse;
import org.odpi.openmetadata.governanceservers.openlineage.responses.LineageTypesResponse;
import org.odpi.openmetadata.governanceservers.openlineage.responses.LineageSearchResponse;
import org.odpi.openmetadata.governanceservers.openlineage.responses.LineageTypesResponse;
import org.odpi.openmetadata.governanceservers.openlineage.responses.LineageVertexResponse;
import org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.model.ffdc.JanusConnectorException;
import org.slf4j.Logger;
Expand All @@ -45,6 +47,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.hasLabel;
import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.inE;
Expand All @@ -55,23 +58,32 @@
import static org.odpi.openmetadata.governanceservers.openlineage.ffdc.OpenLineageServerErrorCode.ERROR_TYPES_NOT_FOUND;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.model.JanusConnectorErrorCode.CLASSIFICATION_NOT_FOUND;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.model.JanusConnectorErrorCode.COULD_NOT_RETRIEVE_VERTEX;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.model.JanusConnectorErrorCode.HIERARCHY_ERROR;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.model.JanusConnectorErrorCode.LINEAGE_NOT_FOUND;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.model.JanusConnectorErrorCode.NODES_NOT_FOUND;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.model.JanusConnectorErrorCode.TYPES_NOT_FOUND;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.model.JanusConnectorErrorCode.SEARCH_ERROR;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.model.JanusConnectorErrorCode.TYPES_NOT_FOUND;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.ASSETS;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.ASSET_LINEAGE_VARIABLES;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.ASSET_SCHEMA_TYPE;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.ATTRIBUTE_FOR_SCHEMA;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.AVRO_FILE;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.CLASSIFICATION_GRAPH;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.COLUMN_SPACE_DELIMITER;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.COMMA_SPACE_DELIMITER;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.CONNECTION_ENDPOINT;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.CONNECTION_TO_ASSET;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.CSV_FILE;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.DATA_CONTENT_FOR_DATA_SET;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.DATA_FILE;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.DATA_FILE_AND_SUBTYPES;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.DOCUMENT;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.EMBEDDED_PROPERTIES;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.EMPTY_STRING;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.EVENT_SCHEMA_ATTRIBUTE;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.FILE_FOLDER;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.GLOSSARY;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.GLOSSARY_CATEGORY;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.GLOSSARY_TERM;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.GLOSSARY_TERM_AND_CLASSIFICATION_EDGES;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.INCOMPLETE;
Expand All @@ -80,6 +92,8 @@
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.LINEAGE_MAPPING;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.LOG_FILE;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.MEDIA_FILE;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.NESTED_FILE;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.NESTED_SCHEMA_ATTRIBUTE;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.PROCESS;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.RELATIONAL_COLUMN;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.RELATIONAL_COLUMN_AND_CLASSIFICATION_EDGES;
Expand All @@ -90,10 +104,9 @@
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.TABULAR_COLUMN;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.TABULAR_COLUMN_AND_CLASSIFICATION_EDGES;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.TABULAR_FILE_COLUMN;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.TERM_ANCHOR;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.TERM_CATEGORIZATION;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.TOPIC;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.GLOSSARY;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.GLOSSARY_CATEGORY;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.GraphConstants.CONDENSED_NODE_DISPLAY_NAME;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.GraphConstants.DESTINATION_CONDENSATION;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.GraphConstants.EDGE_LABEL_CLASSIFICATION;
Expand Down Expand Up @@ -285,7 +298,7 @@ private List<Vertex> querySources(GraphTraversalSource g, String guid, List<Stri
/**
* Returns the ultimate destination graph of queried entity, which can be a column or a table. In case of tables,
* relationships of type LineageMapping will be traversed forwards, all the way to the destination. If no vertices
* are found, than DataFlow relationships are used for traversal. In case of columns, DataFlow relationships are
* are found, then DataFlow relationships are used for traversal. In case of columns, DataFlow relationships are
* directly used
*
* @param guid queried entity
Expand Down Expand Up @@ -611,11 +624,77 @@ public LineageNodeNamesResponse getNodes(NodeNamesSearchCriteria searchCriteria)
return nodeNamesResponse;
}

@Override
public LineageResponse getElementHierarchy(ElementHierarchyRequest elementHierarchyRequest) {
LineageVerticesAndEdges elementHierarchy = graphHelper.getResult(this::getElementHierarchy, elementHierarchyRequest, this::handleGetElementHierarchyException);
return new LineageResponse(elementHierarchy);
}

private LineageVerticesAndEdges getElementHierarchy(GraphTraversalSource g, ElementHierarchyRequest elementHierarchyRequest) {
List<String> hierarchyEdgeLabels = getHierarchyEdgeLabels(elementHierarchyRequest);
if (CollectionUtils.isEmpty(hierarchyEdgeLabels)) {
return new LineageVerticesAndEdges();
}
Graph hierarchyGraph = this.graphHelper.getResult(this::queryEndToEnd, elementHierarchyRequest.getGuid(), hierarchyEdgeLabels,
this::handleLineageNotFoundException);
if (hierarchyGraph == null || !hierarchyGraph.vertices().hasNext()) {
return new LineageVerticesAndEdges();
}

return this.lineageGraphQueryHelper.getLineageVerticesAndEdges(hierarchyGraph, true);
}

private List<String> getHierarchyEdgeLabels(ElementHierarchyRequest elementHierarchyRequest) {
Vertex queriedVertex =
this.graphHelper.getResult(this::getQueriedVertex, elementHierarchyRequest.getGuid(), this::handleGetQueriedVertexException);
String label = queriedVertex.label();

List<String> upwardEdgeLabels = Collections.emptyList();
List<String> downwardEdgeLabels = Collections.emptyList();
if (DATA_FILE_AND_SUBTYPES.contains(label)) {
upwardEdgeLabels = List.of(CONNECTION_TO_ASSET, NESTED_FILE, CONNECTION_ENDPOINT);
downwardEdgeLabels = List.of(ATTRIBUTE_FOR_SCHEMA, ASSET_SCHEMA_TYPE);
}
switch (label) {
case RELATIONAL_TABLE:
upwardEdgeLabels = List.of(ATTRIBUTE_FOR_SCHEMA, ASSET_SCHEMA_TYPE, DATA_CONTENT_FOR_DATA_SET, CONNECTION_TO_ASSET,
CONNECTION_ENDPOINT);
downwardEdgeLabels = List.of(NESTED_SCHEMA_ATTRIBUTE);
break;
case TABULAR_COLUMN:
case TABULAR_FILE_COLUMN:
upwardEdgeLabels = List.of(ATTRIBUTE_FOR_SCHEMA, ASSET_SCHEMA_TYPE, CONNECTION_TO_ASSET, NESTED_FILE, CONNECTION_ENDPOINT);
break;
case RELATIONAL_COLUMN:
upwardEdgeLabels = List.of(NESTED_SCHEMA_ATTRIBUTE, ATTRIBUTE_FOR_SCHEMA, ASSET_SCHEMA_TYPE, DATA_CONTENT_FOR_DATA_SET, CONNECTION_TO_ASSET,
CONNECTION_ENDPOINT);
break;
case GLOSSARY_TERM:
upwardEdgeLabels = List.of(TERM_CATEGORIZATION, TERM_ANCHOR);
downwardEdgeLabels = List.of(SEMANTIC_ASSIGNMENT);
break;
case GLOSSARY:
downwardEdgeLabels = List.of(TERM_CATEGORIZATION, TERM_ANCHOR);
break;
}

HierarchyType hierarchyType = elementHierarchyRequest.getHierarchyType();
switch (hierarchyType) {
case UPWARD:
return upwardEdgeLabels;
case DOWNWARD:
return downwardEdgeLabels;
case ALL:
return Stream.concat(upwardEdgeLabels.stream(), downwardEdgeLabels.stream()).collect(Collectors.toList());
default:
return Collections.emptyList();
}
}

private List<String> getNodes(GraphTraversalSource g, NodeNamesSearchCriteria searchCriteria) {
return g.V().has(PROPERTY_KEY_LABEL, searchCriteria.getType()).values(PROPERTY_KEY_INSTANCEPROP_DISPLAY_NAME)
.filter(x -> x.toString().toLowerCase().contains(searchCriteria.getSearchValue().toLowerCase()))
.limit(searchCriteria.getLimit()).map(Object::toString).toList();

}

/**
Expand Down Expand Up @@ -678,6 +757,7 @@ private Map<String, String> retrieveAllProperties(Vertex vertex) {
* @param lineageSearchRequest the criteria for the search
* @return all the entities in the graph that match the criteria
*/
@Override
public LineageSearchResponse search(LineageSearchRequest lineageSearchRequest) {
List<LineageVertex> result = this.graphHelper.getResult(this::getSearchResult, lineageSearchRequest, this::handleSearchError);
return new LineageSearchResponse(result);
Expand Down Expand Up @@ -731,4 +811,7 @@ private void handleSearchError(Exception e, LineageSearchRequest lineageSearchRe
auditLog.logException(SEARCH_ERROR.getFormattedErrorMessage(), SEARCH_ERROR.getMessageDefinition(lineageSearchRequest.toString()), e);
}

private void handleGetElementHierarchyException(Exception e, ElementHierarchyRequest elementHierarchyRequest) {
auditLog.logException(HIERARCHY_ERROR.getFormattedErrorMessage(), SEARCH_ERROR.getMessageDefinition(elementHierarchyRequest.toString()), e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,18 @@ public enum JanusConnectorErrorCode implements AuditLogMessageSet {
OMRSAuditLogRecordSeverity.ERROR,
"Could not execute search request {0}",
"Searching in the database produced an error",
"Check the search input and try again")
;
"Check the search input and try again"),
HIERARCHY_ERROR("OPEN-LINEAGE-SERVICES-039",
OMRSAuditLogRecordSeverity.ERROR,
"Could not execute hierarchy request {0}",
"Retrieving the element hierarchy produced an error",
"Check the input and try again");

private static final Logger log = LoggerFactory.getLogger(JanusConnectorErrorCode.class);
private String errorMessageId;
private String errorMessage;
private String systemAction;
private String userAction;
private final String errorMessageId;
private final String errorMessage;
private final String systemAction;
private final String userAction;
private OMRSAuditLogRecordSeverity severity;
AuditLogMessageDefinition auditLogMessageDefinition;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ private Constants() {
public static final String DATA_CONTENT_FOR_DATA_SET = "DataContentForDataSet";
public static final String SEMANTIC_ASSIGNMENT = "SemanticAssignment";
public static final String TERM_CATEGORIZATION = "TermCategorization";
public static final String TERM_ANCHOR = "TermAnchor";
public static final String PORT_DELEGATION = "PortDelegation";
public static final String PROCESS_PORT = "ProcessPort";
public static final String PORT_IMPLEMENTATION = "PortImplementation";
Expand Down
Loading

0 comments on commit 992952d

Please sign in to comment.