Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

♻️✨ Comp backend task state reporting fixed #4775

Merged
merged 17 commits into from
Sep 21, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,19 @@ def upgrade():
"payments_transactions", sa.Column("state_message", sa.Text(), nullable=True)
)
connection.execute(
"UPDATE payments_transactions SET state = 'SUCCESS' WHERE success = true"
sa.DDL(
"UPDATE payments_transactions SET state = 'SUCCESS' WHERE success = true"
)
)
connection.execute(
"UPDATE payments_transactions SET state = 'FAILED' WHERE success = false"
sa.DDL(
"UPDATE payments_transactions SET state = 'FAILED' WHERE success = false"
)
)
connection.execute(
"UPDATE payments_transactions SET state = 'PENDING' WHERE success IS NULL"
sa.DDL(
"UPDATE payments_transactions SET state = 'PENDING' WHERE success IS NULL"
)
)
connection.execute("UPDATE payments_transactions SET state_message = errors")

Expand All @@ -72,13 +78,19 @@ def downgrade():

connection = op.get_bind()
connection.execute(
"UPDATE payments_transactions SET success = true WHERE state = 'SUCCESS'"
sa.DDL(
"UPDATE payments_transactions SET success = true WHERE state = 'SUCCESS'"
)
)
connection.execute(
"UPDATE payments_transactions SET success = false WHERE completed_at IS NOT NULL AND state != 'SUCCESS'"
sa.DDL(
"UPDATE payments_transactions SET success = false WHERE completed_at IS NOT NULL AND state != 'SUCCESS'"
)
)
connection.execute(
"UPDATE payments_transactions SET success = NULL WHERE completed_at IS NULL AND state != 'SUCCESS'"
sa.DDL(
"UPDATE payments_transactions SET success = NULL WHERE completed_at IS NULL AND state != 'SUCCESS'"
)
)

op.drop_column("payments_transactions", "state_message")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,18 @@ async def wrapper(*args, **kwargs):
msg=f"calling {func.__name__} with {args}, {kwargs}",
):
try:
result = await func(*args, **kwargs)
return result
return await func(*args, **kwargs)
except asyncio.CancelledError:
_logger.debug("call was cancelled")
raise
except Exception as exc: # pylint: disable=broad-except
_logger.exception("Unhandled exception:")
# NOTE: we do not return internal exceptions over RPC
raise RPCServerError(
method_name=func.__name__, exc_type=type(exc), msg=f"{exc}"
) from exc
method_name=func.__name__,
exc_type=f"{type(exc)}",
msg=f"{exc}",
) from None

self.routes[RPCMethodName(func.__name__)] = wrapper
return func
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,13 +195,14 @@ async def create_computation( # noqa: C901, PLR0912
publish=computation.start_pipeline or False,
)
assert computation.product_name # nosec
min_computation_nodes: list[NodeID] = [
NodeID(n) for n in minimal_computational_dag.nodes()
]
inserted_comp_tasks = await comp_tasks_repo.upsert_tasks_from_project(
project,
catalog_client,
director_client,
published_nodes=list(minimal_computational_dag.nodes())
if computation.start_pipeline
else [],
published_nodes=min_computation_nodes if computation.start_pipeline else [],
user_id=computation.user_id,
product_name=computation.product_name,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ class ComputationalRunNotFoundError(PydanticErrorMixin, DirectorException):
msg_template = "Computational run not found"


class ComputationalTaskNotFoundError(PydanticErrorMixin, DirectorException):
msg_template = "Computational task {node_id} not found"


class NodeRightsAcquireError(PydanticErrorMixin, DirectorException):
msg_template = "Could not acquire a lock for {docker_node_id} since all {slots} slots are used."

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from enum import Enum


# NOTE: mypy fails with src/simcore_service_director_v2/modules/dask_client.py:101:5: error: Dict entry 0 has incompatible type "str": "auto"; expected "Any": "DaskClientTaskState" [dict-item]
# when using StrAutoEnum
class DaskClientTaskState(str, Enum):
PENDING = "PENDING"
NO_WORKER = "NO_WORKER"
PENDING_OR_STARTED = "PENDING_OR_STARTED"
LOST = "LOST"
ERRED = "ERRED"
ABORTED = "ABORTED"
SUCCESS = "SUCCESS"
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
RemoteMethodNotRegisteredError,
RPCMethodName,
RPCNamespace,
RPCServerError,
)

from ..core.errors import (
Expand Down Expand Up @@ -59,3 +60,5 @@ async def get_or_create_on_demand_cluster(
except RemoteMethodNotRegisteredError as exc:
# no clusters-keeper, that is not going to work!
raise ComputationalBackendOnDemandClustersKeeperNotReadyError from exc
except RPCServerError as exc:
raise ComputationalBackendOnDemandClustersKeeperNotReadyError from exc
Loading