Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): add last workflow execution to context #1029

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions keep/api/core/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,21 @@ def get_last_workflow_workflow_to_alert_executions(
return latest_workflow_to_alert_executions


def get_last_workflow_execution_by_workflow_id(
workflow_id: str, tenant_id: str
) -> Optional[WorkflowExecution]:
with Session(engine) as session:
workflow_execution = (
session.query(WorkflowExecution)
.filter(WorkflowExecution.workflow_id == workflow_id)
.filter(WorkflowExecution.tenant_id == tenant_id)
.filter(WorkflowExecution.status == "success")
.order_by(WorkflowExecution.started.desc())
.first()
)
return workflow_execution


def get_workflows_with_last_execution(tenant_id: str) -> List[dict]:
with Session(engine) as session:
latest_execution_cte = (
Expand Down
18 changes: 17 additions & 1 deletion keep/contextmanager/contextmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import click
from pympler.asizeof import asizeof

from keep.api.core.db import get_session
from keep.api.core.db import get_last_workflow_execution_by_workflow_id, get_session
from keep.api.logging import WorkflowLoggerAdapter


Expand All @@ -24,10 +24,25 @@ def __init__(self, tenant_id, workflow_id=None, workflow_execution_id=None):
self.foreach_context = {
"value": None,
}
# cli context
try:
self.click_context = click.get_current_context()
except RuntimeError:
self.click_context = {}
# last workflow context
self.last_workflow_execution_results = {}
if self.workflow_id:
try:
last_workflow_execution = get_last_workflow_execution_by_workflow_id(
workflow_id, tenant_id
)
if last_workflow_execution is not None:
self.last_workflow_execution_results = (
last_workflow_execution.results
)
except Exception:
self.logger.exception("Failed to get last workflow execution")
pass
self.aliases = {}
# dependencies are used so iohandler will be able to use the output class of the providers
# e.g. let's say bigquery_provider results are google.cloud.bigquery.Row
Expand Down Expand Up @@ -86,6 +101,7 @@ def get_full_context(self, exclude_providers=False, exclude_env=False):
"steps": self.steps_context,
"foreach": self.foreach_context,
"event": self.event_context,
"last_workflow_results": self.last_workflow_execution_results,
"alert": self.event_context, # this is an alias so workflows will be able to use alert.source
}

Expand Down
Loading