Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client now retries .map() failures #2734

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
64 changes: 58 additions & 6 deletions modal/_utils/async_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from typing import (
Any,
Callable,
Generic,
Optional,
TypeVar,
Union,
Expand All @@ -26,6 +27,10 @@
from ..exception import InvalidError
from .logger import logger

T = TypeVar("T")
P = ParamSpec("P")
V = TypeVar("V")

synchronizer = synchronicity.Synchronizer()


Expand Down Expand Up @@ -260,7 +265,59 @@ def run_coro_blocking(coro):
return fut.result()


async def queue_batch_iterator(q: asyncio.Queue, max_batch_size=100, debounce_time=0.015):
class TimestampPriorityQueue(Generic[T]):
"""
A priority queue that schedules items to be processed at specific timestamps.
"""

_MAX_PRIORITY = float("inf")

def __init__(self, maxsize: int = 0):
self.condition = asyncio.Condition()
self._queue: asyncio.PriorityQueue[tuple[float, Union[T, None]]] = asyncio.PriorityQueue(maxsize=maxsize)

async def close(self):
await self.put(self._MAX_PRIORITY, None)

async def put(self, timestamp: float, item: Union[T, None]):
"""
Add an item to the queue to be processed at a specific timestamp.
"""
await self._queue.put((timestamp, item))
async with self.condition:
self.condition.notify_all() # notify any waiting coroutines

async def get(self) -> Union[T, None]:
"""
Get the next item from the queue that is ready to be processed.
"""
while True:
async with self.condition:
while self.empty():
await self.condition.wait()
# peek at the next item
timestamp, item = await self._queue.get()
now = time.time()
if timestamp < now:
return item
if timestamp == self._MAX_PRIORITY:
return None
# not ready yet, calculate sleep time
sleep_time = timestamp - now
self._queue.put_nowait((timestamp, item)) # put it back
# wait until either the timeout or a new item is added
try:
await asyncio.wait_for(self.condition.wait(), timeout=sleep_time)
except asyncio.TimeoutError:
continue

def empty(self) -> bool:
return self._queue.empty()


async def queue_batch_iterator(
q: Union[asyncio.Queue, TimestampPriorityQueue], max_batch_size=100, debounce_time=0.015
):
"""
Read from a queue but return lists of items when queue is large

Expand Down Expand Up @@ -401,11 +458,6 @@ async def wrapper():
_shutdown_tasks.append(asyncio.create_task(wrapper()))


T = TypeVar("T")
P = ParamSpec("P")
V = TypeVar("V")


def asyncify(f: Callable[P, T]) -> Callable[P, typing.Coroutine[None, None, T]]:
"""Convert a blocking function into one that runs in the current loop's executor."""

Expand Down
12 changes: 11 additions & 1 deletion modal/functions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Copyright Modal Labs 2023
import asyncio
import dataclasses
import inspect
import textwrap
Expand Down Expand Up @@ -256,7 +257,10 @@ async def run_function(self) -> Any:
try:
return await self._get_single_output(ctx.input_jwt)
except (UserCodeException, FunctionTimeoutError) as exc:
await user_retry_manager.raise_or_sleep(exc)
delay_ms = user_retry_manager.get_delay_ms()
if delay_ms is None:
raise exc
await asyncio.sleep(delay_ms / 1000)
except InternalFailure:
# For system failures on the server, we retry immediately.
pass
Expand Down Expand Up @@ -1244,6 +1248,11 @@ async def _map(
else:
count_update_callback = None

if config.get("client_retries"):
function_call_invocation_type = api_pb2.FUNCTION_CALL_INVOCATION_TYPE_SYNC
else:
function_call_invocation_type = api_pb2.FUNCTION_CALL_INVOCATION_TYPE_SYNC_LEGACY

async with aclosing(
_map_invocation(
self, # type: ignore
Expand All @@ -1252,6 +1261,7 @@ async def _map(
order_outputs,
return_exceptions,
count_update_callback,
function_call_invocation_type,
)
) as stream:
async for item in stream:
Expand Down
Loading
Loading