Skip to content

Commit

Permalink
fix: Prevent race condition over the asyncio event loop in ThreadPool…
Browse files Browse the repository at this point in the history
…Executor
  • Loading branch information
asgeirrr committed Dec 11, 2024
1 parent ee66d37 commit 6a4d922
Showing 1 changed file with 18 additions and 3 deletions.
21 changes: 18 additions & 3 deletions rossum_api/elis_api_client_sync.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import asyncio
import threading
import typing
from concurrent.futures import ThreadPoolExecutor
from queue import Queue as ThreadSafeQueue
Expand Down Expand Up @@ -46,6 +47,20 @@

T = TypeVar("T")

thread_local = threading.local()


def get_or_create_event_loop():
if not hasattr(thread_local, "loop"):
thread_local.loop = asyncio.new_event_loop()
asyncio.set_event_loop(thread_local.loop)
return thread_local.loop


def run_coroutine_in_thread(coroutine):
loop = get_or_create_event_loop()
return loop.run_until_complete(coroutine)


class Sideload:
pass
Expand Down Expand Up @@ -93,7 +108,7 @@ async def async_iter_to_list(ait: AsyncIterator[T], queue: ThreadSafeQueue):
finally:
queue.put(None) # Signal iterator was consumed

future = self.executor.submit(asyncio.run, async_iter_to_list(ait, queue)) # type: ignore
future = self.executor.submit(run_coroutine_in_thread, async_iter_to_list(ait, queue)) # type: ignore

# Consume the queue until completion to retain the iterator nature even in sync context
while True:
Expand All @@ -105,8 +120,8 @@ async def async_iter_to_list(ait: AsyncIterator[T], queue: ThreadSafeQueue):
future.result()

def _run_coroutine(self, coroutine):
future = self.executor.submit(asyncio.run, coroutine)
return future.result() # Wait for the coroutine to complete
future = self.executor.submit(run_coroutine_in_thread, coroutine)
return future.result()

# ##### QUEUE #####
def retrieve_queue(self, queue_id: int) -> Queue:
Expand Down

0 comments on commit 6a4d922

Please sign in to comment.