Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Template Manager OMVS #8339

Merged
merged 2 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@

@ucURL=http://localhost:8080
#@ucURL=http://egeria.pdr-associates.com:8070

@baseURL=https://localhost:9443
@viewServer=view-server

Expand Down Expand Up @@ -548,6 +550,25 @@ Content-Type: application/json
}


###
# @name initiateGovernanceActionProcess certify-hospital
# Using the named governance action process as a template, initiate a chain of engine actions.

POST {{baseURL}}/servers/{{viewServer}}/api/open-metadata/automated-curation/governance-action-processes/initiate
Authorization: Bearer {{token}}
Content-Type: application/json

{
"processQualifiedName": "ClinicalTrials:PROJ-CT-TBDF:certify-hospital",
"actionTargets": [
{
"class" : "NewActionTarget",
"actionTargetName": "hospital",
"actionTargetGUID": "{{hamptonHospitalGUID}}"
}]
}


###
#===============================================
# Onboard the hospitals
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
###
# Create the basic structure of the CocoDataLake
#
@ucURL=http://localhost:8080
#@ucURL=http://egeria.pdr-associates.com:8070
#@ucURL=http://localhost:8080
@ucURL=http://egeria.pdr-associates.com:8070

@cocoDataLake=/Users/amandachessell/coco_data_lake

Expand Down Expand Up @@ -67,16 +67,6 @@ Content-Type: application/json
}


###
# @name create schema
POST {{ucURL}}/api/2.1/unity-catalog/schemas
Content-Type: application/json

{
"name": "teddy_bear_drop_foot",
"catalog_name" : "clinical_trials",
"comment": "Observational study on the Teddy Bear Drop Foot condition."
}

###
#----------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,19 @@
import org.odpi.openmetadata.accessservices.assetmanager.events.AssetManagerOutTopicEvent;
import org.odpi.openmetadata.adapters.connectors.integration.openlineage.ffdc.OpenLineageIntegrationConnectorAuditCode;
import org.odpi.openmetadata.frameworks.connectors.ffdc.ConnectorCheckedException;
import org.odpi.openmetadata.frameworks.openmetadata.metadataelements.ElementClassification;
import org.odpi.openmetadata.frameworks.openmetadata.metadataelements.ElementHeader;
import org.odpi.openmetadata.frameworks.governanceaction.properties.ActionTargetElement;
import org.odpi.openmetadata.frameworks.governanceaction.properties.EngineActionElement;
import org.odpi.openmetadata.frameworks.openmetadata.enums.EngineActionStatus;
import org.odpi.openmetadata.frameworks.openmetadata.metadataelements.ElementHeader;
import org.odpi.openmetadata.frameworks.openmetadata.types.OpenMetadataProperty;
import org.odpi.openmetadata.frameworks.openmetadata.types.OpenMetadataType;
import org.odpi.openmetadata.integrationservices.lineage.connector.LineageIntegratorConnector;
import org.odpi.openmetadata.integrationservices.lineage.connector.LineageIntegratorContext;
import org.odpi.openmetadata.integrationservices.lineage.properties.OpenLineageInputDataSet;
import org.odpi.openmetadata.integrationservices.lineage.properties.OpenLineageJob;
import org.odpi.openmetadata.integrationservices.lineage.properties.OpenLineageNominalTimeRunFacet;
import org.odpi.openmetadata.integrationservices.lineage.properties.OpenLineageOutputDataSet;
import org.odpi.openmetadata.integrationservices.lineage.properties.OpenLineageParentRunFacet;
import org.odpi.openmetadata.integrationservices.lineage.properties.OpenLineageParentRunFacetJob;
import org.odpi.openmetadata.integrationservices.lineage.properties.OpenLineageParentRunFacetRun;
import org.odpi.openmetadata.integrationservices.lineage.properties.OpenLineageRun;
import org.odpi.openmetadata.integrationservices.lineage.properties.OpenLineageRunEvent;
import org.odpi.openmetadata.integrationservices.lineage.properties.OpenLineageRunFacets;
import org.odpi.openmetadata.integrationservices.lineage.properties.*;

import java.net.URI;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import java.util.*;


/**
Expand All @@ -40,14 +29,8 @@
*/
public class GovernanceActionOpenLineageIntegrationConnector extends LineageIntegratorConnector implements AssetManagerEventListener
{
private static final String inProgressGovernanceActionStatus = "InProgress";
private static final String actionedGovernanceActionStatus = "Actioned";
private static final String invalidGovernanceActionStatus = "Invalid";
private static final String failedGovernanceActionStatus = "Failed";

private static final URI producer = URI.create("https://egeria-project.org/");
private final ZoneId zoneId = ZoneId.systemDefault();
private LineageIntegratorContext myContext = null;


/**
Expand All @@ -70,25 +53,20 @@ public synchronized void start() throws ConnectorCheckedException

final String methodName = "start";

myContext = super.getContext();

if (myContext != null)
try
{
try
super.getContext().registerListener(this);
}
catch (Exception error)
{
if (auditLog != null)
{
myContext.registerListener(this);
}
catch (Exception error)
{
if (auditLog != null)
{
auditLog.logException(methodName,
OpenLineageIntegrationConnectorAuditCode.UNEXPECTED_EXCEPTION.getMessageDefinition(connectorName,
error.getClass().getName(),
methodName,
error.getMessage()),
error);
}
auditLog.logException(methodName,
OpenLineageIntegrationConnectorAuditCode.UNEXPECTED_EXCEPTION.getMessageDefinition(connectorName,
error.getClass().getName(),
methodName,
error.getMessage()),
error);
}
}
}
Expand Down Expand Up @@ -118,35 +96,28 @@ public void processEvent(AssetManagerOutTopicEvent event)
ElementHeader elementHeader = event.getElementHeader();

if (((event.getEventType() == AssetManagerEventType.NEW_ELEMENT_CREATED) ||
(event.getEventType() == AssetManagerEventType.REFRESH_ELEMENT_EVENT) ||
(event.getEventType() == AssetManagerEventType.ELEMENT_UPDATED)) &&
("GovernanceAction".equals(elementHeader.getType().getTypeName())))
(propertyHelper.isTypeOf(elementHeader, OpenMetadataType.ENGINE_ACTION.typeName)))
{
try
{
String previousActionStatus = "";

if (event.getPreviousElementProperties() != null)
{
previousActionStatus = event.getPreviousElementProperties().get("actionStatus").toString();
}

String currentActionStatus = event.getElementProperties().get("actionStatus").toString();
String previousActionStatus = getActionStatus(event.getPreviousElementProperties());
String currentActionStatus = getActionStatus(event.getPreviousElementProperties());

/*
* Only output an event if the status has changed.
*/
if (! previousActionStatus.equals(currentActionStatus))
{
if ((inProgressGovernanceActionStatus.equals(currentActionStatus)) ||
(actionedGovernanceActionStatus.equals(currentActionStatus)) ||
(failedGovernanceActionStatus.equals(currentActionStatus)) ||
(invalidGovernanceActionStatus.equals(currentActionStatus)))
if ((EngineActionStatus.IN_PROGRESS.getName().equals(currentActionStatus)) ||
(EngineActionStatus.ACTIONED.getName().equals(currentActionStatus)) ||
(EngineActionStatus.FAILED.getName().equals(currentActionStatus)) ||
(EngineActionStatus.INVALID.getName().equals(currentActionStatus)))
{
EngineActionElement engineAction = myContext.getEngineAction(elementHeader.getGUID());
EngineActionElement engineAction = super.getContext().getEngineAction(elementHeader.getGUID());

publishOpenLineageEvent(currentActionStatus,
event.getEventTime(),
engineAction);
publishOpenLineageEvent(currentActionStatus, event.getEventTime(), engineAction);
}
}

Expand All @@ -169,38 +140,53 @@ public void processEvent(AssetManagerOutTopicEvent event)
}
}

/**
* Return the action status from the event.
*
* @param elementProperties properties for the engine action
* @return action status as a string
*/
private String getActionStatus(Map<String, Object> elementProperties)
{
if ((elementProperties != null) && (elementProperties.get(OpenMetadataProperty.ACTION_STATUS.name) != null))
{
return elementProperties.get(OpenMetadataProperty.ACTION_STATUS.name).toString();
}

return "<null>";
}


/**
* Add information about the governance action into an OpenLineage event and publish it.
*
* @param governanceActionStatus the status from the entity at the time of the event
* @param engineActionStatus the status from the entity at the time of the event
* @param eventTime the time of the change to the entity
* @param engineAction source information
* @exception ConnectorCheckedException connector has been asked to stop
*/
private void publishOpenLineageEvent(String governanceActionStatus,
private void publishOpenLineageEvent(String engineActionStatus,
Date eventTime,
EngineActionElement engineAction)
EngineActionElement engineAction) throws ConnectorCheckedException
{
OpenLineageRunEvent event = new OpenLineageRunEvent();

event.setProducer(producer);
event.setEventTime(getTimeStamp(eventTime));



if (inProgressGovernanceActionStatus.equals(governanceActionStatus))
if (EngineActionStatus.IN_PROGRESS.getName().equals(engineActionStatus))
{
event.setEventType("START");
}
else if (actionedGovernanceActionStatus.equals(governanceActionStatus))
else if (EngineActionStatus.ACTIONED.getName().equals(engineActionStatus))
{
event.setEventType("COMPLETE");
}
else if (failedGovernanceActionStatus.equals(governanceActionStatus))
else if (EngineActionStatus.FAILED.getName().equals(engineActionStatus))
{
event.setEventType("FAIL");
}
else if (invalidGovernanceActionStatus.equals(governanceActionStatus))
else if (EngineActionStatus.INVALID.getName().equals(engineActionStatus))
{
event.setEventType("ABORT");
}
Expand Down Expand Up @@ -236,29 +222,12 @@ else if (invalidGovernanceActionStatus.equals(governanceActionStatus))

run.setRunId(UUID.fromString(engineAction.getElementHeader().getGUID()));

String anchorGUID = null;

List<ElementClassification> classifications = engineAction.getElementHeader().getClassifications();

if (classifications != null)
{
for (ElementClassification classification : classifications)
{
if ((classification != null) && ("Anchors".equals(classification.getClassificationName())))
{
if (classification.getClassificationProperties() != null)
{
anchorGUID = classification.getClassificationProperties().get("anchorGUID").toString();
}
}
}
}
String anchorGUID = propertyHelper.getAnchorGUID(engineAction.getElementHeader());

OpenLineageRunFacets runFacets = new OpenLineageRunFacets();

if (anchorGUID != null)
{

OpenLineageParentRunFacet parentRunFacet = new OpenLineageParentRunFacet();
OpenLineageParentRunFacetJob parentRunFacetJob = new OpenLineageParentRunFacetJob();
OpenLineageParentRunFacetRun parentRunFacetRun = new OpenLineageParentRunFacetRun();
Expand Down Expand Up @@ -376,7 +345,7 @@ else if (invalidGovernanceActionStatus.equals(governanceActionStatus))
event.setOutputs(outputDataSets);
}

myContext.publishOpenLineageRunEvent(event);
super.getContext().publishOpenLineageRunEvent(event);
}


Expand Down
Loading
Loading