Skip to content

Commit

Permalink
Merge pull request #325 from EyalCMX/pr-branch
Browse files Browse the repository at this point in the history
add timeout to ActivateJobs request
  • Loading branch information
dimastbk authored Sep 26, 2024
2 parents ef89bf1 + ab33a86 commit bed8bf6
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 3 deletions.
4 changes: 4 additions & 0 deletions pyzeebe/errors/zeebe_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ class ZeebeInternalError(PyZeebeError):
pass


class ZeebeDeadlineExceeded(PyZeebeError):
pass


class UnknownGrpcStatusCodeError(PyZeebeError):
def __init__(self, grpc_error: grpc.aio.AioRpcError):
super().__init__()
Expand Down
5 changes: 4 additions & 1 deletion pyzeebe/grpc_internals/zeebe_adapter_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from pyzeebe.errors import (
UnknownGrpcStatusCodeError,
ZeebeBackPressureError,
ZeebeDeadlineExceeded,
ZeebeGatewayUnavailableError,
ZeebeInternalError,
)
Expand All @@ -32,7 +33,7 @@ async def _handle_grpc_error(self, grpc_error: grpc.aio.AioRpcError) -> NoReturn
try:
pyzeebe_error = _create_pyzeebe_error_from_grpc_error(grpc_error)
raise pyzeebe_error
except (ZeebeGatewayUnavailableError, ZeebeInternalError):
except (ZeebeGatewayUnavailableError, ZeebeInternalError, ZeebeDeadlineExceeded):
self._current_connection_retries += 1
if not self._should_retry():
await self._close()
Expand All @@ -52,4 +53,6 @@ def _create_pyzeebe_error_from_grpc_error(grpc_error: grpc.aio.AioRpcError) -> P
return ZeebeGatewayUnavailableError()
if is_error_status(grpc_error, grpc.StatusCode.INTERNAL):
return ZeebeInternalError()
elif is_error_status(grpc_error, grpc.StatusCode.DEADLINE_EXCEEDED):
return ZeebeDeadlineExceeded()
return UnknownGrpcStatusCodeError(grpc_error)
6 changes: 5 additions & 1 deletion pyzeebe/grpc_internals/zeebe_job_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

logger = logging.getLogger(__name__)

DEFAULT_GRPC_REQUEST_TIMEOUT = 20 # This constant represents the fallback timeout value


class ZeebeJobAdapter(ZeebeAdapterBase):
async def activate_jobs(
Expand All @@ -39,6 +41,7 @@ async def activate_jobs(
tenant_ids: Optional[Iterable[str]] = None,
) -> AsyncGenerator[Job, None]:
try:
grpc_request_timeout = request_timeout / 1000 * 2 if request_timeout > 0 else DEFAULT_GRPC_REQUEST_TIMEOUT
async for response in self._gateway_stub.ActivateJobs(
ActivateJobsRequest(
type=task_type,
Expand All @@ -48,7 +51,8 @@ async def activate_jobs(
fetchVariable=variables_to_fetch,
requestTimeout=request_timeout,
tenantIds=tenant_ids,
)
),
timeout=grpc_request_timeout,
):
for raw_job in response.jobs:
job = self._create_job_from_raw_job(raw_job)
Expand Down
8 changes: 7 additions & 1 deletion pyzeebe/worker/job_poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pyzeebe.errors import (
ActivateJobsRequestInvalidError,
ZeebeBackPressureError,
ZeebeDeadlineExceeded,
ZeebeGatewayUnavailableError,
ZeebeInternalError,
)
Expand Down Expand Up @@ -70,7 +71,12 @@ async def poll_once(self) -> None:
except ActivateJobsRequestInvalidError:
logger.warning("Activate job requests was invalid for task %s", self.task.type)
raise
except (ZeebeBackPressureError, ZeebeGatewayUnavailableError, ZeebeInternalError) as error:
except (
ZeebeBackPressureError,
ZeebeGatewayUnavailableError,
ZeebeInternalError,
ZeebeDeadlineExceeded,
) as error:
logger.warning(
"Failed to activate jobs from the gateway. Exception: %s. Retrying in 5 seconds...",
repr(error),
Expand Down

0 comments on commit bed8bf6

Please sign in to comment.