Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove sync notifiers for a major speedup #714

Merged
merged 4 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ Changes

- Allow ``janus.Queue()`` instantiation without running asyncio event loop #710

- Remove sync notifiers for a major speedup #714

1.1.0 (2024-10-30)
------------------

Expand Down
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ time-tested, but has some limitations.
threads. If you do not properly close the queue,
`asyncio may generate error messages
<https://github.com/aio-libs/janus/issues/574>`_.
* The library has acceptable performance only when used as intended, that is,
* The library has quite good performance only when used as intended, that is,
for communication between synchronous code and asynchronous one.
For sync-only and async-only cases, use queues from
`queue <https://docs.python.org/3/library/queue.html>`_ and
Expand Down
30 changes: 6 additions & 24 deletions janus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,24 +203,6 @@ def _put_internal(self, item: T) -> None:
self._unfinished_tasks += 1
self._finished.clear()

def _sync_not_empty_notifier(self) -> None:
with self._sync_mutex:
self._sync_not_empty.notify()

def _notify_sync_not_empty(self, loop: asyncio.AbstractEventLoop) -> None:
fut = loop.run_in_executor(None, self._sync_not_empty_notifier)
fut.add_done_callback(self._pending.remove)
self._pending.append(fut)

def _sync_not_full_notifier(self) -> None:
with self._sync_mutex:
self._sync_not_full.notify()

def _notify_sync_not_full(self, loop: asyncio.AbstractEventLoop) -> None:
fut = loop.run_in_executor(None, self._sync_not_full_notifier)
fut.add_done_callback(self._pending.remove)
self._pending.append(fut)

async def _async_not_empty_notifier(self) -> None:
async with self._async_mutex:
self._async_not_empty.notify()
Expand Down Expand Up @@ -506,7 +488,7 @@ async def put(self, item: T) -> None:
async with parent._async_not_full:
parent._sync_mutex.acquire()
locked = True
loop = parent._get_loop()
parent._get_loop() # check the event loop
try:
if parent._maxsize > 0:
do_wait = True
Expand All @@ -527,7 +509,7 @@ async def put(self, item: T) -> None:
if parent._async_not_empty_waiting:
parent._async_not_empty.notify()
if parent._sync_not_empty_waiting:
parent._notify_sync_not_empty(loop)
parent._sync_not_empty.notify()
finally:
if locked:
parent._sync_mutex.release()
Expand All @@ -549,7 +531,7 @@ def put_nowait(self, item: T) -> None:
if parent._async_not_empty_waiting:
parent._make_async_not_empty_notifier(loop)
if parent._sync_not_empty_waiting:
parent._notify_sync_not_empty(loop)
parent._sync_not_empty.notify()

async def get(self) -> T:
"""Remove and return an item from the queue.
Expand All @@ -563,7 +545,7 @@ async def get(self) -> T:
async with parent._async_not_empty:
parent._sync_mutex.acquire()
locked = True
loop = parent._get_loop()
parent._get_loop() # check the event loop
try:
do_wait = True
while do_wait:
Expand All @@ -584,7 +566,7 @@ async def get(self) -> T:
if parent._async_not_full_waiting:
parent._async_not_full.notify()
if parent._sync_not_full_waiting:
parent._notify_sync_not_full(loop)
parent._sync_not_full.notify()
return item
finally:
if locked:
Expand All @@ -607,7 +589,7 @@ def get_nowait(self) -> T:
if parent._async_not_full_waiting:
parent._make_async_not_full_notifier(loop)
if parent._sync_not_full_waiting:
parent._notify_sync_not_full(loop)
parent._sync_not_full.notify()
return item

def task_done(self) -> None:
Expand Down
Loading