diff --git a/sdk/python/packages/flet/src/flet/fastapi/flet_app.py b/sdk/python/packages/flet/src/flet/fastapi/flet_app.py index 1990f4728..d0362ea66 100644 --- a/sdk/python/packages/flet/src/flet/fastapi/flet_app.py +++ b/sdk/python/packages/flet/src/flet/fastapi/flet_app.py @@ -33,9 +33,6 @@ DEFAULT_FLET_SESSION_TIMEOUT = 3600 DEFAULT_FLET_OAUTH_STATE_TIMEOUT = 600 -_pubsubhubs_lock = asyncio.Lock() -_pubsubhubs = {} - class FletApp(LocalConnection): def __init__( @@ -98,13 +95,9 @@ async def handle(self, websocket: WebSocket): else "" ) - async with _pubsubhubs_lock: - psh = _pubsubhubs.get(self.__session_handler, None) - if psh is None: - psh = PubSubHub(loop=self.__loop, executor=app_manager.executor) - _pubsubhubs[self.__session_handler] = psh - self.pubsubhub = psh - + self.pubsubhub = app_manager.get_pubsubhub( + self.__session_handler, loop=self.__loop + ) self.page_url = str(websocket.url).rsplit("/", 1)[0] self.page_name = websocket.url.path.rsplit("/", 1)[0].lstrip("/") diff --git a/sdk/python/packages/flet/src/flet/fastapi/flet_app_manager.py b/sdk/python/packages/flet/src/flet/fastapi/flet_app_manager.py index a663401b9..66f0281a4 100644 --- a/sdk/python/packages/flet/src/flet/fastapi/flet_app_manager.py +++ b/sdk/python/packages/flet/src/flet/fastapi/flet_app_manager.py @@ -12,6 +12,7 @@ from flet_core.connection import Connection from flet_core.locks import NopeLock from flet_core.page import Page +from flet_core.pubsub.pubsub_hub import PubSubHub from flet_core.utils.concurrency_utils import is_pyodide logger = logging.getLogger(flet_fastapi.__name__) @@ -31,11 +32,26 @@ def __init__(self): self.__evict_oauth_states_task = None self.__temp_dirs = {} self.__executor = ThreadPoolExecutor(thread_name_prefix="flet_fastapi") + self.__pubsubhubs_lock = threading.Lock() if not is_pyodide() else NopeLock() + self.__pubsubhubs = {} @property def executor(self): return self.__executor + def get_pubsubhub( + self, session_handler, loop: Optional[asyncio.AbstractEventLoop] = None + ): + with self.__pubsubhubs_lock: + psh = self.__pubsubhubs.get(session_handler, None) + if psh is None: + psh = PubSubHub( + loop=loop or asyncio.get_running_loop(), + executor=self.__executor, + ) + self.__pubsubhubs[session_handler] = psh + return psh + async def start(self): """ Background task evicting expired app data. Must be called at FastAPI application startup.