Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

binance: use built-in anext() add note about new ws ep URL, fix agen streaming within NoBsWs usage #473

Merged
merged 4 commits into from
Apr 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 19 additions & 11 deletions piker/brokers/binance.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
)
import time

from trio_util import trio_async_generator
import trio
from trio_typing import TaskStatus
import pendulum
Expand Down Expand Up @@ -317,7 +318,10 @@ class AggTrade(Struct):
M: bool # Ignore


async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]:
@trio_async_generator
async def stream_messages(
ws: NoBsWs,
) -> AsyncGenerator[NoBsWs, dict]:

timeouts = 0
while True:
Expand Down Expand Up @@ -529,19 +533,23 @@ async def subscribe(ws: wsproto.WSConnection):
# XXX: do we need to ack the unsub?
# await ws.recv_msg()

async with open_autorecon_ws(
'wss://stream.binance.com/ws',
fixture=subscribe,
) as ws:
async with (
open_autorecon_ws(
# XXX: see api docs which show diff addr?
# https://developers.binance.com/docs/binance-trading-api/websocket_api#general-api-information
# 'wss://ws-api.binance.com:443/ws-api/v3',
'wss://stream.binance.com/ws',
fixture=subscribe,
) as ws,

# pull a first quote and deliver
msg_gen = stream_messages(ws)

typ, quote = await msg_gen.__anext__()
# avoid stream-gen closure from breaking trio..
stream_messages(ws) as msg_gen,
):
typ, quote = await anext(msg_gen)

# pull a first quote and deliver
while typ != 'trade':
# TODO: use ``anext()`` when it lands in 3.10!
typ, quote = await msg_gen.__anext__()
typ, quote = await anext(msg_gen)

task_status.started((init_msgs, quote))

Expand Down
90 changes: 47 additions & 43 deletions piker/brokers/ib/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@

import tractor

from piker.log import get_logger

log = get_logger(__name__)


_reset_tech: Literal[
'vnc',
Expand Down Expand Up @@ -134,54 +138,54 @@ def i3ipc_xdotool_manual_click_hack() -> None:
# 'IB', # gw running in i3 (newer version?)
]

for name in win_names:
results = t.find_titled(name)
print(f'results for {name}: {results}')
if results:
con = results[0]
print(f'Resetting data feed for {name}')
win_id = str(con.window)
w, h = con.rect.width, con.rect.height

# TODO: seems to be a few libs for python but not sure
# if they support all the sub commands we need, order of
# most recent commit history:
# https://github.com/rr-/pyxdotool
# https://github.com/ShaneHutter/pyxdotool
# https://github.com/cphyc/pyxdotool

# TODO: only run the reconnect (2nd) kc on a detected
# disconnect?
for key_combo, timeout in [
# only required if we need a connection reset.
# ('ctrl+alt+r', 12),
# data feed reset.
('ctrl+alt+f', 6)
]:
subprocess.call([
'xdotool',
'windowactivate', '--sync', win_id,

# move mouse to bottom left of window (where there should
# be nothing to click).
'mousemove_relative', '--sync', str(w-4), str(h-4),

# NOTE: we may need to stick a `--retry 3` in here..
'click', '--window', win_id,
'--repeat', '3', '1',

# hackzorzes
'key', key_combo,
],
timeout=timeout,
)
try:
for name in win_names:
results = t.find_titled(name)
print(f'results for {name}: {results}')
if results:
con = results[0]
print(f'Resetting data feed for {name}')
win_id = str(con.window)
w, h = con.rect.width, con.rect.height

# TODO: seems to be a few libs for python but not sure
# if they support all the sub commands we need, order of
# most recent commit history:
# https://github.com/rr-/pyxdotool
# https://github.com/ShaneHutter/pyxdotool
# https://github.com/cphyc/pyxdotool

# TODO: only run the reconnect (2nd) kc on a detected
# disconnect?
for key_combo, timeout in [
# only required if we need a connection reset.
# ('ctrl+alt+r', 12),
# data feed reset.
('ctrl+alt+f', 6)
]:
subprocess.call([
'xdotool',
'windowactivate', '--sync', win_id,

# move mouse to bottom left of window (where
# there should be nothing to click).
'mousemove_relative', '--sync', str(w-4), str(h-4),

# NOTE: we may need to stick a `--retry 3` in here..
'click', '--window', win_id,
'--repeat', '3', '1',

# hackzorzes
'key', key_combo,
],
timeout=timeout,
)

# re-activate and focus original window
try:
subprocess.call([
'xdotool',
'windowactivate', '--sync', str(orig_win_id),
'click', '--window', str(orig_win_id), '1',
])
except subprocess.TimeoutExpired:
log.exception(f'xdotool timed out?')
log.exception('xdotool timed out?')
10 changes: 8 additions & 2 deletions piker/brokers/kraken/feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@
)
import time

from async_generator import aclosing
from fuzzywuzzy import process as fuzzy
import numpy as np
import pendulum
from trio_typing import TaskStatus
from trio_util import trio_async_generator
import tractor
import trio

Expand Down Expand Up @@ -122,6 +122,7 @@ async def stream_messages(
yield msg


@trio_async_generator
async def process_data_feed_msgs(
ws: NoBsWs,
):
Expand Down Expand Up @@ -378,7 +379,12 @@ async def subscribe(ws: NoBsWs):
'wss://ws.kraken.com/',
fixture=subscribe,
) as ws,
aclosing(process_data_feed_msgs(ws)) as msg_gen,

# avoid stream-gen closure from breaking trio..
# NOTE: not sure this actually works XD particularly
# if we call `ws._connect()` manally in the streaming
# async gen..
process_data_feed_msgs(ws) as msg_gen,
):
# pull a first quote and deliver
typ, ohlc_last = await anext(msg_gen)
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
# async
'trio',
'trio-websocket',
'trio-util',
'async_generator',

# from github currently (see requirements.txt)
Expand Down