Skip to content

Commit

Permalink
Merge pull request #473 from pikers/binance_ws_ep_update
Browse files Browse the repository at this point in the history
`binance`: use built-in `anext()` add note about new ws ep URL, fix agen streaming within `NoBsWs` usage
  • Loading branch information
goodboy authored Apr 12, 2023
2 parents f3b04f2 + 609b91e commit 70db20b
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 56 deletions.
30 changes: 19 additions & 11 deletions piker/brokers/binance.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
)
import time

from trio_util import trio_async_generator
import trio
from trio_typing import TaskStatus
import pendulum
Expand Down Expand Up @@ -317,7 +318,10 @@ class AggTrade(Struct):
M: bool # Ignore


async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]:
@trio_async_generator
async def stream_messages(
ws: NoBsWs,
) -> AsyncGenerator[NoBsWs, dict]:

timeouts = 0
while True:
Expand Down Expand Up @@ -529,19 +533,23 @@ async def subscribe(ws: wsproto.WSConnection):
# XXX: do we need to ack the unsub?
# await ws.recv_msg()

async with open_autorecon_ws(
'wss://stream.binance.com/ws',
fixture=subscribe,
) as ws:
async with (
open_autorecon_ws(
# XXX: see api docs which show diff addr?
# https://developers.binance.com/docs/binance-trading-api/websocket_api#general-api-information
# 'wss://ws-api.binance.com:443/ws-api/v3',
'wss://stream.binance.com/ws',
fixture=subscribe,
) as ws,

# pull a first quote and deliver
msg_gen = stream_messages(ws)

typ, quote = await msg_gen.__anext__()
# avoid stream-gen closure from breaking trio..
stream_messages(ws) as msg_gen,
):
typ, quote = await anext(msg_gen)

# pull a first quote and deliver
while typ != 'trade':
# TODO: use ``anext()`` when it lands in 3.10!
typ, quote = await msg_gen.__anext__()
typ, quote = await anext(msg_gen)

task_status.started((init_msgs, quote))

Expand Down
90 changes: 47 additions & 43 deletions piker/brokers/ib/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@

import tractor

from piker.log import get_logger

log = get_logger(__name__)


_reset_tech: Literal[
'vnc',
Expand Down Expand Up @@ -134,54 +138,54 @@ def i3ipc_xdotool_manual_click_hack() -> None:
# 'IB', # gw running in i3 (newer version?)
]

for name in win_names:
results = t.find_titled(name)
print(f'results for {name}: {results}')
if results:
con = results[0]
print(f'Resetting data feed for {name}')
win_id = str(con.window)
w, h = con.rect.width, con.rect.height

# TODO: seems to be a few libs for python but not sure
# if they support all the sub commands we need, order of
# most recent commit history:
# https://github.com/rr-/pyxdotool
# https://github.com/ShaneHutter/pyxdotool
# https://github.com/cphyc/pyxdotool

# TODO: only run the reconnect (2nd) kc on a detected
# disconnect?
for key_combo, timeout in [
# only required if we need a connection reset.
# ('ctrl+alt+r', 12),
# data feed reset.
('ctrl+alt+f', 6)
]:
subprocess.call([
'xdotool',
'windowactivate', '--sync', win_id,

# move mouse to bottom left of window (where there should
# be nothing to click).
'mousemove_relative', '--sync', str(w-4), str(h-4),

# NOTE: we may need to stick a `--retry 3` in here..
'click', '--window', win_id,
'--repeat', '3', '1',

# hackzorzes
'key', key_combo,
],
timeout=timeout,
)
try:
for name in win_names:
results = t.find_titled(name)
print(f'results for {name}: {results}')
if results:
con = results[0]
print(f'Resetting data feed for {name}')
win_id = str(con.window)
w, h = con.rect.width, con.rect.height

# TODO: seems to be a few libs for python but not sure
# if they support all the sub commands we need, order of
# most recent commit history:
# https://github.com/rr-/pyxdotool
# https://github.com/ShaneHutter/pyxdotool
# https://github.com/cphyc/pyxdotool

# TODO: only run the reconnect (2nd) kc on a detected
# disconnect?
for key_combo, timeout in [
# only required if we need a connection reset.
# ('ctrl+alt+r', 12),
# data feed reset.
('ctrl+alt+f', 6)
]:
subprocess.call([
'xdotool',
'windowactivate', '--sync', win_id,

# move mouse to bottom left of window (where
# there should be nothing to click).
'mousemove_relative', '--sync', str(w-4), str(h-4),

# NOTE: we may need to stick a `--retry 3` in here..
'click', '--window', win_id,
'--repeat', '3', '1',

# hackzorzes
'key', key_combo,
],
timeout=timeout,
)

# re-activate and focus original window
try:
subprocess.call([
'xdotool',
'windowactivate', '--sync', str(orig_win_id),
'click', '--window', str(orig_win_id), '1',
])
except subprocess.TimeoutExpired:
log.exception(f'xdotool timed out?')
log.exception('xdotool timed out?')
10 changes: 8 additions & 2 deletions piker/brokers/kraken/feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@
)
import time

from async_generator import aclosing
from fuzzywuzzy import process as fuzzy
import numpy as np
import pendulum
from trio_typing import TaskStatus
from trio_util import trio_async_generator
import tractor
import trio

Expand Down Expand Up @@ -122,6 +122,7 @@ async def stream_messages(
yield msg


@trio_async_generator
async def process_data_feed_msgs(
ws: NoBsWs,
):
Expand Down Expand Up @@ -378,7 +379,12 @@ async def subscribe(ws: NoBsWs):
'wss://ws.kraken.com/',
fixture=subscribe,
) as ws,
aclosing(process_data_feed_msgs(ws)) as msg_gen,

# avoid stream-gen closure from breaking trio..
# NOTE: not sure this actually works XD particularly
# if we call `ws._connect()` manally in the streaming
# async gen..
process_data_feed_msgs(ws) as msg_gen,
):
# pull a first quote and deliver
typ, ohlc_last = await anext(msg_gen)
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
# async
'trio',
'trio-websocket',
'trio-util',
'async_generator',

# from github currently (see requirements.txt)
Expand Down

0 comments on commit 70db20b

Please sign in to comment.