Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Update async utils to remove upcoming deprecated call #6297

Merged
merged 1 commit into from
Nov 18, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions samcli/lib/utils/async_utils.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
"""
Contains asyncio related methods and helpers
"""
import asyncio
import logging
from asyncio import AbstractEventLoop, gather, new_event_loop
from concurrent.futures.thread import ThreadPoolExecutor
from functools import partial
from typing import Callable, List, Optional

LOG = logging.getLogger(__name__)


async def _run_given_tasks_async(tasks, event_loop=asyncio.get_event_loop(), executor=None):
async def _run_given_tasks_async(
tasks: List[Callable], event_loop: AbstractEventLoop, executor: Optional[ThreadPoolExecutor] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we use the default value at all?

Copy link
Contributor Author

@lucashuy lucashuy Nov 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, since the creation of this util the event loop was always specified and the helper methods are only called within this file:

event_loop = asyncio.new_event_loop()
if not default_executor:
with ThreadPoolExecutor() as self.executor:
return run_given_tasks_async(self._async_tasks, event_loop, self.executor)
return run_given_tasks_async(self._async_tasks, event_loop)

) -> list:
"""
Given list of Task objects, this method executes all tasks in the given event loop (or default one)
and returns list of the results.
Expand Down Expand Up @@ -50,7 +53,7 @@ async def _run_given_tasks_async(tasks, event_loop=asyncio.get_event_loop(), exe

LOG.debug("Waiting for async results")

for result in await asyncio.gather(*async_tasks, return_exceptions=True):
for result in await gather(*async_tasks, return_exceptions=True):
# for each task, wait for them to complete
if isinstance(result, Exception):
LOG.debug("Exception raised during the execution")
Expand All @@ -68,7 +71,9 @@ async def _run_given_tasks_async(tasks, event_loop=asyncio.get_event_loop(), exe
return results


def run_given_tasks_async(tasks, event_loop=asyncio.get_event_loop(), executor=None):
def run_given_tasks_async(
tasks: List[Callable], event_loop: AbstractEventLoop, executor: Optional[ThreadPoolExecutor] = None
) -> list:
"""
Runs the given list of tasks in the given (or default) event loop.
This function will wait for execution to be completed
Expand Down Expand Up @@ -99,7 +104,7 @@ def __init__(self):
self._async_tasks = []
self.executor = None

def add_async_task(self, function, *args):
def add_async_task(self, function: Callable, *args) -> None:
"""
Add a function definition and its args to the the async context, which will be executed later

Expand All @@ -111,7 +116,7 @@ def add_async_task(self, function, *args):
"""
self._async_tasks.append(partial(function, *args))

def run_async(self, default_executor=True):
def run_async(self, default_executor: bool = True) -> list:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding type definitions!

"""
Will run all collected functions in async context, and return their results in order

Expand All @@ -124,7 +129,7 @@ def run_async(self, default_executor=True):
-------
List of result of the executions in order
"""
event_loop = asyncio.new_event_loop()
event_loop = new_event_loop()
if not default_executor:
with ThreadPoolExecutor() as self.executor:
return run_given_tasks_async(self._async_tasks, event_loop, self.executor)
Expand Down
Loading