Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature suggestion: strict parameter for zip #118

Closed
smheidrich opened this issue Jul 8, 2024 · 3 comments · Fixed by #119
Closed

Feature suggestion: strict parameter for zip #118

smheidrich opened this issue Jul 8, 2024 · 3 comments · Fixed by #119

Comments

@smheidrich
Copy link
Contributor

smheidrich commented Jul 8, 2024

Python 3.10 introduced the strict parameter for the built-in zip function, which, when set to True, can help spot bugs by raising an exception when the supplied iterables turn out to have different lengths (i.e. when one raises StopIteration after fewer iterations than the others).

I think it would be useful if aiostream's zip function supported this parameter as well.

Extra context: The strict parameter of Python's built-in zip turned out to be useful enough that linters like Ruff or flake8 with the bugbear plugin even have a rule to complain when you neither use it nor explicitly disable it by setting strict=False.

@vxgmichel
Copy link
Owner

vxgmichel commented Jul 9, 2024

Hi @smheidrich, thanks for the suggestion 👍

I think it would be useful too, feel free to create a PR if you feel like contributing to the project. The implementation is over there:

@sources_operator
async def zip(*sources: AsyncIterable[T]) -> AsyncIterator[tuple[T, ...]]:
"""Combine and forward the elements of several asynchronous sequences.
Each generated value is a tuple of elements, using the same order as
their respective sources. The generation continues until the shortest
sequence is exhausted.
Note: the different sequences are awaited in parrallel, so that their
waiting times don't add up.
"""
# No sources
if not sources:
return
# One sources
if len(sources) == 1:
(source,) = sources
async with streamcontext(source) as streamer:
async for item in streamer:
yield (item,)
return
# N sources
async with AsyncExitStack() as stack:
# Handle resources
streamers = [
await stack.enter_async_context(streamcontext(source)) for source in sources
]
# Loop over items
while True:
try:
coros = builtins.map(anext, streamers)
items = await asyncio.gather(*coros)
except StopAsyncIteration:
break
else:
yield tuple(items)

It relies on asyncio.gather which exposes a return_exceptions argument. However, I don't think we can use that since we still want to propagate the exception as soon as possible if it is not a StopAsyncIteration. Instead, we probably want to turn StopAsyncIteration into a sentinel with a dedicated function:

async_stop_iteration_sentinel = object()
if strict:
    async def next_item(streamer):
        try:
            return await anext(streamer)
        except StopAsyncIteration:
            return async_stop_iteration_sentinel 
else:
    next_item = anext

[...]
try:
    coros = builtins.map(next_item, streamers)
    items = await asyncio.gather(*coros)
except StopAsyncIteration:
    break
if strict:
    if all(item == async_stop_iteration_sentinel for item in items):
        break
    if any(item == async_stop_iteration_sentinel for item in items):
        raise ValueError
[...]    

Otherwise, I'll do it myself when I find the time :)

smheidrich added a commit to smheidrich/aiostream that referenced this issue Jul 9, 2024
vxgmichel added a commit that referenced this issue Jul 21, 2024
* Add strict parameter to stream.zip (issue #118)

* Use shortcut for anext called without default

* Add tests for exception passthrough in zip

* Add (failing) test case for early exit from zip

* Exit from non-strict zip as early as possible

Fixes failing test from previous commit.

* Make UNSET an enum

Co-authored-by: Vincent Michel <[email protected]>

* Make STOP_SENTINEL an enum

Co-authored-by: Vincent Michel <[email protected]>

* Fix imports for enums

* Move strict condition further up & fix typing

* Update aiostream/stream/combine.py

Fix Pyton 3.8 compat

Co-authored-by: Vincent Michel <[email protected]>

* Move STOP_SENTINEL construction out of function

* Type inner anext wrapper function

* Improve un-overloaded anext() type signature

* Use ellipsis instead of pass

---------

Co-authored-by: Vincent Michel <[email protected]>
@vxgmichel
Copy link
Owner

vxgmichel commented Jul 21, 2024

Done and released in v0.6.2 🎉

Thanks a lot @smheidrich for your valuable contribution !

@smheidrich
Copy link
Contributor Author

Great!! Thanks for the reviews & merge! 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants