Skip to content

Commit

Permalink
♻️✨ Comp backend task state reporting fixed (#4775)
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg authored Sep 21, 2023
1 parent 4af4b44 commit 0fb7103
Show file tree
Hide file tree
Showing 16 changed files with 512 additions and 276 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,19 @@ def upgrade():
"payments_transactions", sa.Column("state_message", sa.Text(), nullable=True)
)
connection.execute(
"UPDATE payments_transactions SET state = 'SUCCESS' WHERE success = true"
sa.DDL(
"UPDATE payments_transactions SET state = 'SUCCESS' WHERE success = true"
)
)
connection.execute(
"UPDATE payments_transactions SET state = 'FAILED' WHERE success = false"
sa.DDL(
"UPDATE payments_transactions SET state = 'FAILED' WHERE success = false"
)
)
connection.execute(
"UPDATE payments_transactions SET state = 'PENDING' WHERE success IS NULL"
sa.DDL(
"UPDATE payments_transactions SET state = 'PENDING' WHERE success IS NULL"
)
)
connection.execute("UPDATE payments_transactions SET state_message = errors")

Expand All @@ -72,13 +78,19 @@ def downgrade():

connection = op.get_bind()
connection.execute(
"UPDATE payments_transactions SET success = true WHERE state = 'SUCCESS'"
sa.DDL(
"UPDATE payments_transactions SET success = true WHERE state = 'SUCCESS'"
)
)
connection.execute(
"UPDATE payments_transactions SET success = false WHERE completed_at IS NOT NULL AND state != 'SUCCESS'"
sa.DDL(
"UPDATE payments_transactions SET success = false WHERE completed_at IS NOT NULL AND state != 'SUCCESS'"
)
)
connection.execute(
"UPDATE payments_transactions SET success = NULL WHERE completed_at IS NULL AND state != 'SUCCESS'"
sa.DDL(
"UPDATE payments_transactions SET success = NULL WHERE completed_at IS NULL AND state != 'SUCCESS'"
)
)

op.drop_column("payments_transactions", "state_message")
Expand Down
10 changes: 6 additions & 4 deletions packages/service-library/src/servicelib/rabbitmq/_rpc_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,18 @@ async def wrapper(*args, **kwargs):
msg=f"calling {func.__name__} with {args}, {kwargs}",
):
try:
result = await func(*args, **kwargs)
return result
return await func(*args, **kwargs)
except asyncio.CancelledError:
_logger.debug("call was cancelled")
raise
except Exception as exc: # pylint: disable=broad-except
_logger.exception("Unhandled exception:")
# NOTE: we do not return internal exceptions over RPC
raise RPCServerError(
method_name=func.__name__, exc_type=type(exc), msg=f"{exc}"
) from exc
method_name=func.__name__,
exc_type=f"{type(exc)}",
msg=f"{exc}",
) from None

self.routes[RPCMethodName(func.__name__)] = wrapper
return func
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,13 +195,14 @@ async def create_computation( # noqa: C901, PLR0912
publish=computation.start_pipeline or False,
)
assert computation.product_name # nosec
min_computation_nodes: list[NodeID] = [
NodeID(n) for n in minimal_computational_dag.nodes()
]
inserted_comp_tasks = await comp_tasks_repo.upsert_tasks_from_project(
project,
catalog_client,
director_client,
published_nodes=list(minimal_computational_dag.nodes())
if computation.start_pipeline
else [],
published_nodes=min_computation_nodes if computation.start_pipeline else [],
user_id=computation.user_id,
product_name=computation.product_name,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ class ComputationalRunNotFoundError(PydanticErrorMixin, DirectorException):
msg_template = "Computational run not found"


class ComputationalTaskNotFoundError(PydanticErrorMixin, DirectorException):
msg_template = "Computational task {node_id} not found"


class NodeRightsAcquireError(PydanticErrorMixin, DirectorException):
msg_template = "Could not acquire a lock for {docker_node_id} since all {slots} slots are used."

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from enum import Enum


# NOTE: mypy fails with src/simcore_service_director_v2/modules/dask_client.py:101:5: error: Dict entry 0 has incompatible type "str": "auto"; expected "Any": "DaskClientTaskState" [dict-item]
# when using StrAutoEnum
class DaskClientTaskState(str, Enum):
PENDING = "PENDING"
NO_WORKER = "NO_WORKER"
PENDING_OR_STARTED = "PENDING_OR_STARTED"
LOST = "LOST"
ERRED = "ERRED"
ABORTED = "ABORTED"
SUCCESS = "SUCCESS"
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
RemoteMethodNotRegisteredError,
RPCMethodName,
RPCNamespace,
RPCServerError,
)

from ..core.errors import (
Expand Down Expand Up @@ -59,3 +60,5 @@ async def get_or_create_on_demand_cluster(
except RemoteMethodNotRegisteredError as exc:
# no clusters-keeper, that is not going to work!
raise ComputationalBackendOnDemandClustersKeeperNotReadyError from exc
except RPCServerError as exc:
raise ComputationalBackendOnDemandClustersKeeperNotReadyError from exc
Loading

0 comments on commit 0fb7103

Please sign in to comment.