Skip to content

Commit

Permalink
Factor out msg unwrapping into a func
Browse files Browse the repository at this point in the history
  • Loading branch information
goodboy committed Nov 29, 2021
1 parent 0e7234a commit f6de7e0
Showing 1 changed file with 31 additions and 28 deletions.
59 changes: 31 additions & 28 deletions tractor/_portal.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from ._exceptions import (
unpack_error,
NoResult,
RemoteActorError,
# RemoteActorError,
ContextCancelled,
)
from ._streaming import Context, ReceiveMsgStream
Expand Down Expand Up @@ -54,8 +54,23 @@ def func_deats(func: Callable) -> Tuple[str, str]:
)


def _unwrap_msg(

msg: dict[str, Any],
channel: Channel

) -> Any:
try:
return msg['return']
except KeyError:
# internal error should never get here
assert msg.get('cid'), "Received internal error at portal?"
raise unpack_error(msg, channel)


class Portal:
"""A 'portal' to a(n) (remote) ``Actor``.
'''
A 'portal' to a(n) (remote) ``Actor``.
A portal is "opened" (and eventually closed) by one side of an
inter-actor communication context. The side which opens the portal
Expand All @@ -71,7 +86,7 @@ class Portal:
function calling semantics are supported transparently; hence it is
like having a "portal" between the seperate actor memory spaces.
"""
'''
def __init__(self, channel: Channel) -> None:
self.channel = channel
# when this is set to a tuple returned from ``_submit()`` then
Expand Down Expand Up @@ -130,16 +145,11 @@ async def _return_once(
resptype: str,
first_msg: dict

) -> tuple[Any, dict[str, Any]]:
assert resptype == 'asyncfunc' # single response
) -> dict[str, Any]:

assert resptype == 'asyncfunc' # single response
msg = await recv_chan.receive()
try:
return msg['return'], msg
except KeyError:
# internal error should never get here
assert msg.get('cid'), "Received internal error at portal?"
raise unpack_error(msg, self.channel)
return msg

async def result(self) -> Any:
"""Return the result(s) from the remote actor's "main" task.
Expand All @@ -162,19 +172,9 @@ async def result(self) -> Any:
assert self._expect_result

if self._result_msg is None:
try:
result, self._result_msg = await self._return_once(
*self._expect_result)
except RemoteActorError as err:
result = err
else:
result = self._result_msg['return']
self._result_msg = await self._return_once(*self._expect_result)

# re-raise error on every call
if isinstance(result, RemoteActorError):
raise result

return result
return _unwrap_msg(self._result_msg, self.channel)

async def _cancel_streams(self):
# terminate all locally running async generator
Expand Down Expand Up @@ -249,10 +249,10 @@ async def run_from_ns(
instance methods in the remote runtime. Currently this should only
be used for `tractor` internals.
"""
value, _ = await self._return_once(
msg = await self._return_once(
*(await self._submit(namespace_path, function_name, kwargs))
)
return value
return _unwrap_msg(msg, self.channel)

async def run(
self,
Expand Down Expand Up @@ -293,9 +293,12 @@ async def run(

fn_mod_path, fn_name = func_deats(func)

return (await self._return_once(
*(await self._submit(fn_mod_path, fn_name, kwargs))
))[0]
return _unwrap_msg(
await self._return_once(
*(await self._submit(fn_mod_path, fn_name, kwargs)),
),
self.channel,
)

@asynccontextmanager
async def open_stream_from(
Expand Down

0 comments on commit f6de7e0

Please sign in to comment.