forked from julep-ai/julep
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Feat(agents-api): Replace Retry Policies with Temporal Interceptors &…
… Add more non-retryable errors (julep-ai#612) <!-- ELLIPSIS_HIDDEN --> > [!IMPORTANT] > Replaces retry policies with Temporal interceptors to handle non-retryable errors and updates worker to use these interceptors. > > - **Interceptors**: > - Add `CustomActivityInterceptor` and `CustomWorkflowInterceptor` in `interceptors.py` to handle non-retryable errors by raising `ApplicationError`. > - `CustomInterceptor` combines both activity and workflow interceptors. > - **Non-Retryable Errors**: > - Define `NON_RETRYABLE_ERROR_TYPES` and `is_non_retryable_error()` in `exceptions/tasks.py` to identify non-retryable errors. > - **Retry Policy**: > - Set `DEFAULT_RETRY_POLICY` to `None` in `retry_policies.py` due to conflict with interceptors. > - **Worker**: > - Update `create_worker()` in `worker.py` to use `CustomInterceptor`. > > <sup>This description was created by </sup>[<img alt="Ellipsis" src="https://img.shields.io/badge/Ellipsis-blue?color=175173">](https://www.ellipsis.dev?ref=julep-ai%2Fjulep&utm_source=github&utm_medium=referral)<sup> for 7a50b72. It will automatically update as commits are pushed.</sup> <!-- ELLIPSIS_HIDDEN --> --------- Co-authored-by: HamadaSalhab <[email protected]>
- Loading branch information
1 parent
0424843
commit d2759b3
Showing
4 changed files
with
213 additions
and
61 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
""" | ||
This module defines non-retryable error types and provides a function to check | ||
if a given error is non-retryable. These are used in conjunction with custom | ||
Temporal interceptors to prevent unnecessary retries of certain error types. | ||
""" | ||
|
||
import asyncio | ||
|
||
import beartype | ||
import beartype.roar | ||
import box | ||
import box.exceptions | ||
import fastapi | ||
import httpx | ||
import jinja2 | ||
import jsonschema.exceptions | ||
import pydantic | ||
import requests | ||
import temporalio.exceptions | ||
|
||
# List of error types that should not be retried | ||
NON_RETRYABLE_ERROR_TYPES = [ | ||
# Temporal-specific errors | ||
temporalio.exceptions.WorkflowAlreadyStartedError, | ||
temporalio.exceptions.TerminatedError, | ||
temporalio.exceptions.CancelledError, | ||
# | ||
# Built-in Python exceptions | ||
TypeError, | ||
AssertionError, | ||
SyntaxError, | ||
ValueError, | ||
ZeroDivisionError, | ||
IndexError, | ||
AttributeError, | ||
LookupError, | ||
BufferError, | ||
ArithmeticError, | ||
KeyError, | ||
NameError, | ||
NotImplementedError, | ||
RecursionError, | ||
RuntimeError, | ||
StopIteration, | ||
StopAsyncIteration, | ||
IndentationError, | ||
TabError, | ||
# | ||
# Unicode-related errors | ||
UnicodeError, | ||
UnicodeEncodeError, | ||
UnicodeDecodeError, | ||
UnicodeTranslateError, | ||
# | ||
# HTTP and API-related errors | ||
fastapi.exceptions.HTTPException, | ||
fastapi.exceptions.RequestValidationError, | ||
httpx.RequestError, | ||
httpx.HTTPStatusError, | ||
# | ||
# Asynchronous programming errors | ||
asyncio.CancelledError, | ||
asyncio.InvalidStateError, | ||
GeneratorExit, | ||
# | ||
# Third-party library exceptions | ||
jinja2.exceptions.TemplateSyntaxError, | ||
jinja2.exceptions.TemplateNotFound, | ||
jsonschema.exceptions.ValidationError, | ||
pydantic.ValidationError, | ||
requests.exceptions.InvalidURL, | ||
requests.exceptions.MissingSchema, | ||
# Box exceptions | ||
box.exceptions.BoxKeyError, | ||
box.exceptions.BoxTypeError, | ||
box.exceptions.BoxValueError, | ||
# Beartype exceptions | ||
beartype.roar.BeartypeException, | ||
beartype.roar.BeartypeDecorException, | ||
beartype.roar.BeartypeDecorHintException, | ||
beartype.roar.BeartypeDecorHintNonpepException, | ||
beartype.roar.BeartypeDecorHintPepException, | ||
beartype.roar.BeartypeDecorHintPepUnsupportedException, | ||
beartype.roar.BeartypeDecorHintTypeException, | ||
beartype.roar.BeartypeDecorParamException, | ||
beartype.roar.BeartypeDecorParamNameException, | ||
beartype.roar.BeartypeCallHintParamViolation, | ||
beartype.roar.BeartypeCallHintReturnViolation, | ||
beartype.roar.BeartypeDecorHintParamDefaultViolation, | ||
beartype.roar.BeartypeDoorHintViolation, | ||
] | ||
|
||
|
||
def is_non_retryable_error(error: Exception) -> bool: | ||
""" | ||
Determines if the given error is non-retryable. | ||
This function checks if the error is an instance of any of the error types | ||
defined in NON_RETRYABLE_ERROR_TYPES. | ||
Args: | ||
error (Exception): The error to check. | ||
Returns: | ||
bool: True if the error is non-retryable, False otherwise. | ||
""" | ||
return isinstance(error, tuple(NON_RETRYABLE_ERROR_TYPES)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
""" | ||
This module defines custom interceptors for Temporal activities and workflows. | ||
The main purpose of these interceptors is to handle errors and prevent retrying | ||
certain types of errors that are known to be non-retryable. | ||
""" | ||
|
||
from typing import Optional, Type | ||
|
||
from temporalio.exceptions import ApplicationError | ||
from temporalio.worker import ( | ||
ActivityInboundInterceptor, | ||
ExecuteActivityInput, | ||
ExecuteWorkflowInput, | ||
Interceptor, | ||
WorkflowInboundInterceptor, | ||
WorkflowInterceptorClassInput, | ||
) | ||
|
||
from .exceptions.tasks import is_non_retryable_error | ||
|
||
|
||
class CustomActivityInterceptor(ActivityInboundInterceptor): | ||
""" | ||
Custom interceptor for Temporal activities. | ||
This interceptor catches exceptions during activity execution and | ||
raises them as non-retryable ApplicationErrors if they are identified | ||
as non-retryable errors. | ||
""" | ||
|
||
async def execute_activity(self, input: ExecuteActivityInput): | ||
try: | ||
return await super().execute_activity(input) | ||
except Exception as e: | ||
if is_non_retryable_error(e): | ||
raise ApplicationError( | ||
str(e), | ||
type=type(e).__name__, | ||
non_retryable=True, | ||
) | ||
raise | ||
|
||
|
||
class CustomWorkflowInterceptor(WorkflowInboundInterceptor): | ||
""" | ||
Custom interceptor for Temporal workflows. | ||
This interceptor catches exceptions during workflow execution and | ||
raises them as non-retryable ApplicationErrors if they are identified | ||
as non-retryable errors. | ||
""" | ||
|
||
async def execute_workflow(self, input: ExecuteWorkflowInput): | ||
try: | ||
return await super().execute_workflow(input) | ||
except Exception as e: | ||
if is_non_retryable_error(e): | ||
raise ApplicationError( | ||
str(e), | ||
type=type(e).__name__, | ||
non_retryable=True, | ||
) | ||
raise | ||
|
||
|
||
class CustomInterceptor(Interceptor): | ||
""" | ||
Custom Interceptor that combines both activity and workflow interceptors. | ||
This class is responsible for creating and returning the custom | ||
interceptors for both activities and workflows. | ||
""" | ||
|
||
def intercept_activity( | ||
self, next: ActivityInboundInterceptor | ||
) -> ActivityInboundInterceptor: | ||
""" | ||
Creates and returns a CustomActivityInterceptor. | ||
This method is called by Temporal to intercept activity executions. | ||
""" | ||
return CustomActivityInterceptor(super().intercept_activity(next)) | ||
|
||
def workflow_interceptor_class( | ||
self, input: WorkflowInterceptorClassInput | ||
) -> Optional[Type[WorkflowInboundInterceptor]]: | ||
""" | ||
Returns the CustomWorkflowInterceptor class. | ||
This method is called by Temporal to get the workflow interceptor class. | ||
""" | ||
return CustomWorkflowInterceptor |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,63 +1,14 @@ | ||
from datetime import timedelta | ||
# from datetime import timedelta | ||
|
||
from temporalio.common import RetryPolicy | ||
# from temporalio.common import RetryPolicy | ||
|
||
DEFAULT_RETRY_POLICY = RetryPolicy( | ||
initial_interval=timedelta(seconds=1), | ||
backoff_coefficient=2, | ||
maximum_attempts=25, | ||
maximum_interval=timedelta(seconds=300), | ||
non_retryable_error_types=[ | ||
# Temporal-specific errors | ||
"WorkflowExecutionAlreadyStarted", | ||
"temporalio.exceptions.TerminalFailure", | ||
"temporalio.exceptions.CanceledError", | ||
# | ||
# Built-in Python exceptions | ||
"TypeError", | ||
"AssertionError", | ||
"SyntaxError", | ||
"ValueError", | ||
"ZeroDivisionError", | ||
"IndexError", | ||
"AttributeError", | ||
"LookupError", | ||
"BufferError", | ||
"ArithmeticError", | ||
"KeyError", | ||
"NameError", | ||
"NotImplementedError", | ||
"RecursionError", | ||
"RuntimeError", | ||
"StopIteration", | ||
"StopAsyncIteration", | ||
"IndentationError", | ||
"TabError", | ||
# | ||
# Unicode-related errors | ||
"UnicodeError", | ||
"UnicodeEncodeError", | ||
"UnicodeDecodeError", | ||
"UnicodeTranslateError", | ||
# | ||
# HTTP and API-related errors | ||
"HTTPException", | ||
"fastapi.exceptions.HTTPException", | ||
"fastapi.exceptions.RequestValidationError", | ||
"httpx.RequestError", | ||
"httpx.HTTPStatusError", | ||
# | ||
# Asynchronous programming errors | ||
"asyncio.CancelledError", | ||
"asyncio.InvalidStateError", | ||
"GeneratorExit", | ||
# | ||
# Third-party library exceptions | ||
"jinja2.exceptions.TemplateSyntaxError", | ||
"jinja2.exceptions.TemplateNotFound", | ||
"jsonschema.exceptions.ValidationError", | ||
"pydantic.ValidationError", | ||
"requests.exceptions.InvalidURL", | ||
"requests.exceptions.MissingSchema", | ||
], | ||
) | ||
# DEFAULT_RETRY_POLICY = RetryPolicy( | ||
# initial_interval=timedelta(seconds=1), | ||
# backoff_coefficient=2, | ||
# maximum_attempts=25, | ||
# maximum_interval=timedelta(seconds=300), | ||
# ) | ||
|
||
# FIXME: Adding both interceptors and retry policy (even with `non_retryable_errors` not set) | ||
# is causing the errors to be retried. We need to find a workaround for this. | ||
DEFAULT_RETRY_POLICY = None |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters