From d3ea3eef8a5cf317335f1a7fc6423502cd714b61 Mon Sep 17 00:00:00 2001 From: uhbrar Date: Mon, 25 Nov 2024 13:00:53 -0500 Subject: [PATCH] update documentations --- README.md | 6 ++++++ app/server.py | 5 +++++ 2 files changed, 11 insertions(+) diff --git a/README.md b/README.md index 6ecb418..10478c1 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,11 @@ # Workflow runner +Workflow Runner is a tool that utlizes the Operations and Workflows language to perform queries. A request can be sent to the service containing a TRAPI message and workflow. Workflow Runner will then perform all of the operations included in the workflow on the message. Workflows are defined sequentially, so each operation is performed on the result message of the operation directly preceeding it. + +Workflow Runner is dependent on operation providers, which are the services that are able to perform each operation. These operation providers are discoverd through SmartAPI registry, and require services to self report which operations they support. The list of operation providers can be found through the /services endpoint. + +A complete list of operations can be found here: https://github.com/NCATSTranslator/OperationsAndWorkflows?tab=readme-ov-file + ## deployment ### Live service diff --git a/app/server.py b/app/server.py index a628943..77399e0 100644 --- a/app/server.py +++ b/app/server.py @@ -102,11 +102,13 @@ async def run_workflow( logger.setLevel(logging._nameToLevel[log_level]) completed_workflow = [] + # Run operations in workflow sequentially, using output from one operation as input for next for operation in workflow: operation_services = [] runner_parameters = operation.pop("runner_parameters", {}) operation_timeout = runner_parameters.get("timeout", 60.0) async with httpx.AsyncClient(verify=False, timeout=operation_timeout) as client: + # If allowlist included, only use those services if "allowlist" in runner_parameters.keys(): for service in SERVICES.get(operation["id"], []): if service["infores"] in runner_parameters["allowlist"]: @@ -114,6 +116,7 @@ async def run_workflow( else: for service in SERVICES.get(operation["id"], []): operation_services.append(service) + # Don't use anything in the denylist if "denylist" in runner_parameters.keys(): for service in operation_services: if service["infores"] in runner_parameters["denylist"]: @@ -131,6 +134,7 @@ async def run_workflow( service_operation_responses = [] + # Run operation on each operation provider for service in operation_services: url = service["url"] service_name = service["title"] @@ -152,6 +156,7 @@ async def run_workflow( ) logger.debug(f"Received operation '{operation}' from {service_name}...") + # Normalize responses so we can merge try: response = await post_safely( NORMALIZER_URL + "/query",