Skip to content

Commit

Permalink
Copy pythonGH-98137, make some changes, add benchmarking script, tryi…
Browse files Browse the repository at this point in the history
…ng to get it working
  • Loading branch information
itamaro committed Feb 1, 2023
1 parent 95fb0e0 commit 0ff6c93
Show file tree
Hide file tree
Showing 5 changed files with 273 additions and 15 deletions.
29 changes: 28 additions & 1 deletion Lib/asyncio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,34 @@
tasks.__all__ +
threads.__all__ +
timeouts.__all__ +
transports.__all__)
transports.__all__ + (
'create_eager_task_factory',
'eager_task_factory',
))

def create_eager_task_factory(custom_task_constructor):

def factory(loop, coro, *, name=None, context=None):
loop._check_closed()
try:
result = coro.send(None)
except StopIteration as si:
print("XXX")
fut = loop.create_future()
fut.set_result(si.value)
return fut
except Exception as ex:
print("YYY")
fut = loop.create_future()
fut.set_exception(ex)
return fut
else:
return custom_task_constructor(
coro, loop=loop, name=name, context=context, yield_result=result)

return factory

eager_task_factory = create_eager_task_factory(Task)

if sys.platform == 'win32': # pragma: no cover
from .windows_events import *
Expand Down
32 changes: 32 additions & 0 deletions Lib/asyncio/base_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,38 @@ def create_task(self, coro, *, name=None, context=None):

return task

def eager_task_factory(self, coro, *, name=None, context=None):
"""Start a coroutine.
This runs the coroutine until it first suspends itself.
If it runs till completion or fails without suspending,
return a future with the result or exception.
Otherwise schedule the resumption and return a task.
"""
self._check_closed()
# Do not go through the task factory.
# This _is_ the task factory.
if tasks.Task is not tasks._PyTask:
task = tasks.Task(coro, loop=self, name=name, context=context)
else:
try:
yield_result = coro.send(None)
except BaseException as exc:
fut = self.create_future()
# XXX What about AsyncStopIteration?
if isinstance(exc, StopIteration):
fut.set_result(exc.value)
else:
fut.set_exception(exc)
return fut
task = tasks.Task(coro, loop=self, name=name, context=context,
yield_result=yield_result)
if task._source_traceback:
del task._source_traceback[-1]
return task

def set_task_factory(self, factory):
"""Set a task factory that will be used by loop.create_task().
Expand Down
7 changes: 5 additions & 2 deletions Lib/asyncio/taskgroups.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,14 @@ def create_task(self, coro, *, name=None, context=None):
raise RuntimeError(f"TaskGroup {self!r} is finished")
if self._aborting:
raise RuntimeError(f"TaskGroup {self!r} is shutting down")
if context is None:
if hasattr(self._loop, "eager_task_factory"):
task = self._loop.eager_task_factory(coro, name=name, context=context)
elif context is None:
task = self._loop.create_task(coro)
else:
task = self._loop.create_task(coro, context=context)
tasks._set_task_name(task, name)
if not task.done(): # If it's done already, it's a future
tasks._set_task_name(task, name)
task.add_done_callback(self._on_task_done)
self._tasks.add(task)
return task
Expand Down
33 changes: 21 additions & 12 deletions Lib/asyncio/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ def _set_task_name(task, name):
set_name(name)


_NOT_SET = object()

class Task(futures._PyFuture): # Inherit Python Task implementation
# from a Python Future implementation.

Expand All @@ -93,7 +95,8 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
# status is still pending
_log_destroy_pending = True

def __init__(self, coro, *, loop=None, name=None, context=None):
def __init__(self, coro, *, loop=None, name=None, context=None,
coro_result=None, yield_result=_NOT_SET):
super().__init__(loop=loop)
if self._source_traceback:
del self._source_traceback[-1]
Expand All @@ -117,7 +120,10 @@ def __init__(self, coro, *, loop=None, name=None, context=None):
else:
self._context = context

self._loop.call_soon(self.__step, context=self._context)
if yield_result is _NOT_SET:
self._loop.call_soon(self.__step, context=self._context)
else:
self.__step2(yield_result)
_register_task(self)

def __del__(self):
Expand Down Expand Up @@ -287,6 +293,12 @@ def __step(self, exc=None):
except BaseException as exc:
super().set_exception(exc)
else:
self.__step2(result)
finally:
_leave_task(self._loop, self)
self = None # Needed to break cycles when an exception occurs.

def __step2(self, result):
blocking = getattr(result, '_asyncio_future_blocking', None)
if blocking is not None:
# Yielded Future must come from Future.__iter__().
Expand Down Expand Up @@ -333,9 +345,6 @@ def __step(self, exc=None):
new_exc = RuntimeError(f'Task got bad yield: {result!r}')
self._loop.call_soon(
self.__step, new_exc, context=self._context)
finally:
_leave_task(self._loop, self)
self = None # Needed to break cycles when an exception occurs.

def __wakeup(self, future):
try:
Expand All @@ -357,13 +366,13 @@ def __wakeup(self, future):
_PyTask = Task


try:
import _asyncio
except ImportError:
pass
else:
# _CTask is needed for tests.
Task = _CTask = _asyncio.Task
# try:
# import _asyncio
# except ImportError:
# pass
# else:
# # _CTask is needed for tests.
# Task = _CTask = _asyncio.Task


def create_task(coro, *, name=None, context=None):
Expand Down
187 changes: 187 additions & 0 deletions async_tree.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
# Copyright (c) Facebook, Inc. and its affiliates. (http://www.facebook.com)
"""
Benchmark script for recursive async tree workloads. This script includes the
following microbenchmark scenarios:
1) "no_suspension": No suspension in the async tree.
2) "suspense_all": Suspension (simulating IO) at all leaf nodes in the async tree.
3) "memoization": Simulated IO calls at all leaf nodes, but with memoization. Only
un-memoized IO calls will result in suspensions.
4) "cpu_io_mixed": A mix of CPU-bound workload and IO-bound workload (with
memoization) at the leaf nodes.
Use the commandline flag or choose the corresponding <Scenario>AsyncTree class
to run the desired microbenchmark scenario.
"""


import asyncio
import math
import random
import time
from argparse import ArgumentParser


NUM_RECURSE_LEVELS = 6
NUM_RECURSE_BRANCHES = 6
IO_SLEEP_TIME = 0.05
DEFAULT_MEMOIZABLE_PERCENTAGE = 90
DEFAULT_CPU_PROBABILITY = 0.5
FACTORIAL_N = 500


def parse_args():
parser = ArgumentParser(
description="""\
Benchmark script for recursive async tree workloads. It can be run as a standalone
script, in which case you can specify the microbenchmark scenario to run and whether
to print the results.
"""
)
parser.add_argument(
"-s",
"--scenario",
choices=["no_suspension", "suspense_all", "memoization", "cpu_io_mixed"],
default="no_suspension",
help="""\
Determines which microbenchmark scenario to run. Defaults to no_suspension. Options:
1) "no_suspension": No suspension in the async tree.
2) "suspense_all": Suspension (simulating IO) at all leaf nodes in the async tree.
3) "memoization": Simulated IO calls at all leaf nodes, but with memoization. Only
un-memoized IO calls will result in suspensions.
4) "cpu_io_mixed": A mix of CPU-bound workload and IO-bound workload (with
memoization) at the leaf nodes.
""",
)
parser.add_argument(
"-m",
"--memoizable-percentage",
type=int,
default=DEFAULT_MEMOIZABLE_PERCENTAGE,
help="""\
Sets the percentage (0-100) of the data that should be memoized, defaults to 90. For
example, at the default 90 percent, data 1-90 will be memoized and data 91-100 will not.
""",
)
parser.add_argument(
"-c",
"--cpu-probability",
type=float,
default=DEFAULT_CPU_PROBABILITY,
help="""\
Sets the probability (0-1) that a leaf node will execute a cpu-bound workload instead
of an io-bound workload. Defaults to 0.5. Only applies to the "cpu_io_mixed"
microbenchmark scenario.
""",
)
parser.add_argument(
"-p",
"--print",
action="store_true",
default=False,
help="Print the results (runtime and number of Tasks created).",
)
return parser.parse_args()


class AsyncTree:
def __init__(
self,
memoizable_percentage=DEFAULT_MEMOIZABLE_PERCENTAGE,
cpu_probability=DEFAULT_CPU_PROBABILITY,
):
self.suspense_count = 0
self.task_count = 0
self.memoizable_percentage = memoizable_percentage
self.cpu_probability = cpu_probability
self.cache = {}
# set to deterministic random, so that the results are reproducible
random.seed(0)

async def mock_io_call(self):
self.suspense_count += 1
await asyncio.sleep(IO_SLEEP_TIME)

def create_task(self, loop, coro):
self.task_count += 1
return asyncio.Task(coro, loop=loop)

async def suspense_func(self):
raise NotImplementedError(
"To be implemented by each microbenchmark's derived class."
)

async def recurse(self, recurse_level):
if recurse_level == 0:
await self.suspense_func()
return

await asyncio.gather(
*[self.recurse(recurse_level - 1) for _ in range(NUM_RECURSE_BRANCHES)]
)

def run(self):
loop = asyncio.new_event_loop()
# eager_factory = asyncio.create_eager_task_factory(self.create_task)
# loop.set_task_factory(eager_factory)
loop.set_task_factory(asyncio.eager_task_factory)
loop.run_until_complete(self.recurse(NUM_RECURSE_LEVELS))
loop.close()


class NoSuspensionAsyncTree(AsyncTree):
async def suspense_func(self):
return


class SuspenseAllAsyncTree(AsyncTree):
async def suspense_func(self):
await self.mock_io_call()


class MemoizationAsyncTree(AsyncTree):
async def suspense_func(self):
# deterministic random (seed preset)
data = random.randint(1, 100)

if data <= self.memoizable_percentage:
if self.cache.get(data):
return data

self.cache[data] = True

await self.mock_io_call()
return data


class CpuIoMixedAsyncTree(MemoizationAsyncTree):
async def suspense_func(self):
if random.random() < self.cpu_probability:
# mock cpu-bound call
return math.factorial(FACTORIAL_N)
else:
return await MemoizationAsyncTree.suspense_func(self)


if __name__ == "__main__":
args = parse_args()
scenario = args.scenario

trees = {
"no_suspension": NoSuspensionAsyncTree,
"suspense_all": SuspenseAllAsyncTree,
"memoization": MemoizationAsyncTree,
"cpu_io_mixed": CpuIoMixedAsyncTree,
}
async_tree_class = trees[scenario]
async_tree = async_tree_class(args.memoizable_percentage, args.cpu_probability)

start_time = time.perf_counter()
async_tree.run()
end_time = time.perf_counter()

if args.print:
print(f"Scenario: {scenario}")
print(f"Time: {end_time - start_time} s")
print(f"Tasks created: {async_tree.task_count}")
print(f"Suspense called: {async_tree.suspense_count}")

0 comments on commit 0ff6c93

Please sign in to comment.