From 5a07542e7ead90dd8cada63eaf8091f44720d9f0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 13 Mar 2023 12:29:17 -0400 Subject: [PATCH] `binance`: wrap streamer async-gen in `aclosing()` --- piker/brokers/binance.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index 0c177db85e..5c47282d39 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -26,6 +26,7 @@ ) import time +from async_generator import aclosing import trio from trio_typing import TaskStatus import pendulum @@ -529,14 +530,14 @@ async def subscribe(ws: wsproto.WSConnection): # XXX: do we need to ack the unsub? # await ws.recv_msg() - async with open_autorecon_ws( - 'wss://stream.binance.com/ws', - fixture=subscribe, - ) as ws: - + async with ( + open_autorecon_ws( + 'wss://stream.binance.com/ws', + fixture=subscribe, + ) as ws, + aclosing(stream_messages(ws)) as msg_gen, + ): # pull a first quote and deliver - msg_gen = stream_messages(ws) - typ, quote = await msg_gen.__anext__() while typ != 'trade':