Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛Computational backend: if a pipeline raises, it should not prevent handling of other pipelines #6295

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from pydantic import PositiveInt
from servicelib.common_headers import UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE
from servicelib.rabbitmq import RabbitMQClient, RabbitMQRPCClient
from servicelib.utils import logged_gather
from servicelib.utils import limited_gather

from ...constants import UNDEFINED_STR_METADATA
from ...core.errors import (
Expand Down Expand Up @@ -220,7 +220,7 @@ async def stop_pipeline(
async def schedule_all_pipelines(self) -> None:
self.wake_up_event.clear()
# if one of the task throws, the other are NOT cancelled which is what we want
await logged_gather(
await limited_gather(
*(
self._schedule_pipeline(
user_id=user_id,
Expand All @@ -234,8 +234,10 @@ async def schedule_all_pipelines(self) -> None:
iteration,
), pipeline_params in self.scheduled_pipelines.items()
),
reraise=False,
log=_logger,
max_concurrency=40,
limit=40,
tasks_group_prefix="computational-scheduled-pipeline",
)

async def _get_pipeline_dag(self, project_id: ProjectID) -> nx.DiGraph:
Expand Down
Loading