Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Renames and deprecations for #68 #324

Merged
merged 9 commits into from
Sep 16, 2017
Prev Previous commit
Next Next commit
Add BlockingTrioPortal; deprecate {run,await}_in_trio_thread
  • Loading branch information
njsmith committed Sep 15, 2017
commit 1bed1d0083050dcea03a90012ac80197abd48c44
72 changes: 41 additions & 31 deletions trio/_threads.py
Original file line number Diff line number Diff line change
@@ -6,61 +6,71 @@

from . import _core
from ._sync import CapacityLimiter
from ._deprecate import deprecated_alias
from ._deprecate import deprecated

__all__ = [
"current_await_in_trio_thread",
"current_run_in_trio_thread",
"run_sync_in_worker_thread",
"current_default_worker_thread_limiter",
"BlockingTrioPortal",
]


def _await_in_trio_thread_cb(q, afn, args):
@_core.disable_ki_protection
async def unprotected_afn():
return await afn(*args)
class BlockingTrioPortal:
def __init__(self, trio_token=None):
if trio_token is None:
trio_token = _core.current_trio_token()
self._trio_token = trio_token

async def await_in_trio_thread_task():
q.put_nowait(await _core.Result.acapture(unprotected_afn))
# This is the part that runs in the trio thread
def _run_cb(self, q, afn, args):
@_core.disable_ki_protection
async def unprotected_afn():
return await afn(*args)

_core.spawn_system_task(await_in_trio_thread_task, name=afn)
async def await_in_trio_thread_task():
q.put_nowait(await _core.Result.acapture(unprotected_afn))

_core.spawn_system_task(await_in_trio_thread_task, name=afn)

def _run_in_trio_thread_cb(q, fn, args):
@_core.disable_ki_protection
def unprotected_fn():
return fn(*args)
# This is the part that runs in the trio thread
def _run_sync_cb(self, q, fn, args):
@_core.disable_ki_protection
def unprotected_fn():
return fn(*args)

res = _core.Result.capture(unprotected_fn)
q.put_nowait(res)
res = _core.Result.capture(unprotected_fn)
q.put_nowait(res)


def _current_do_in_trio_thread(name, cb):
token = _core.current_trio_token()
trio_thread = threading.current_thread()

def do_in_trio_thread(fn, *args):
if threading.current_thread() == trio_thread:
raise RuntimeError("must be called from a thread")
def _do_it(self, cb, fn, *args):
try:
_core.current_task()
except RuntimeError:
pass
else:
raise RuntimeError(
"this is a blocking function; call it from a thread"
)
q = stdlib_queue.Queue()
token.run_sync_soon(cb, q, fn, args)
self._trio_token.run_sync_soon(cb, q, fn, args)
return q.get().unwrap()

do_in_trio_thread.__name__ = name
return do_in_trio_thread
def run(self, afn, *args):
return self._do_it(self._run_cb, afn, *args)

def run_sync(self, fn, *args):
return self._do_it(self._run_sync_cb, fn, *args)


@deprecated("0.2.0", issue=68, instead=BlockingTrioPortal.run_sync)
def current_run_in_trio_thread():
return _current_do_in_trio_thread(
"run_in_trio_thread", _run_in_trio_thread_cb
)
return BlockingTrioPortal().run_sync


@deprecated("0.2.0", issue=68, instead=BlockingTrioPortal.run)
def current_await_in_trio_thread():
return _current_do_in_trio_thread(
"await_in_trio_thread", _await_in_trio_thread_cb
)
return BlockingTrioPortal().run


################################################################