From aadd468cc5e4c0f66bfc181c53d070d52c57d84b Mon Sep 17 00:00:00 2001 From: Alex Walker Date: Mon, 28 Aug 2023 12:02:21 -0700 Subject: [PATCH 1/2] feat: add timeout to controller.py Signed-off-by: Alex Walker --- controller/controller.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/controller/controller.py b/controller/controller.py index 6201b91..0189134 100644 --- a/controller/controller.py +++ b/controller/controller.py @@ -556,15 +556,22 @@ async def record_with_values( topic: str, *, record_type: Optional[Type[T]] = None, + timeout: float = 5.0, # seconds **values, ) -> Union[T, Mapping[str, Any]]: """Get a record from an event with values matching those passed in.""" try: - event = await self.event_queue.get( - lambda event: event.topic == topic - and all( - [event.payload.get(key) == value for key, value in values.items()] - ) + event = await asyncio.wait_for( + self.event_queue.get( + lambda event: event.topic == topic + and all( + [ + event.payload.get(key) == value + for key, value in values.items() + ] + ) + ), + timeout=timeout, ) except asyncio.TimeoutError: raise ControllerError( From 85413aab01d64bc638241068ff3b7aad41c79577 Mon Sep 17 00:00:00 2001 From: Alex Walker Date: Tue, 29 Aug 2023 13:21:41 -0700 Subject: [PATCH 2/2] fix: event_queue has get method with own int timeout Signed-off-by: Alex Walker --- controller/controller.py | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/controller/controller.py b/controller/controller.py index 0189134..bb4b0e5 100644 --- a/controller/controller.py +++ b/controller/controller.py @@ -556,20 +556,15 @@ async def record_with_values( topic: str, *, record_type: Optional[Type[T]] = None, - timeout: float = 5.0, # seconds + timeout: Optional[int] = None, # seconds **values, ) -> Union[T, Mapping[str, Any]]: """Get a record from an event with values matching those passed in.""" try: - event = await asyncio.wait_for( - self.event_queue.get( - lambda event: event.topic == topic - and all( - [ - event.payload.get(key) == value - for key, value in values.items() - ] - ) + event = await self.event_queue.get( + lambda event: event.topic == topic + and all( + [event.payload.get(key) == value for key, value in values.items()] ), timeout=timeout, )