Skip to content

Commit

Permalink
Error log brokerd msgs that have .reqid == None
Browse files Browse the repository at this point in the history
Relates to the bug discovered in #310, this should avoid out-of-order
msgs which do not have a `.reqid` set to be error logged to console.
Further, add `pformat()` to kraken logging of ems msging.
  • Loading branch information
goodboy committed May 21, 2022
1 parent db88ac2 commit 7b9ffaa
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 8 deletions.
10 changes: 9 additions & 1 deletion piker/brokers/kraken.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from contextlib import asynccontextmanager as acm
from dataclasses import asdict, field
from datetime import datetime
from pprint import pformat
from typing import Any, Optional, AsyncIterator, Callable, Union
import time

Expand Down Expand Up @@ -569,7 +570,10 @@ async def handle_order_requests(
order: BrokerdOrder

async for request_msg in ems_order_stream:
log.info(f'Received order request {request_msg}')
log.info(
'Received order request:\n'
f'{pformat(request_msg)}'
)

action = request_msg['action']

Expand Down Expand Up @@ -628,6 +632,7 @@ async def handle_order_requests(
# update the internal pairing of oid to krakens
# txid with the new txid that is returned on edit
reqid = resp['result']['txid']

# deliver ack that order has been submitted to broker routing
await ems_order_stream.send(
BrokerdOrderAck(
Expand Down Expand Up @@ -788,7 +793,10 @@ async def subscribe(ws: wsproto.WSConnection, token: str):
# Get websocket token for authenticated data stream
# Assert that a token was actually received.
resp = await client.endpoint('GetWebSocketsToken', {})

# lol wtf is this..
assert resp['error'] == []

token = resp['result']['token']

async with (
Expand Down
26 changes: 20 additions & 6 deletions piker/clearing/_ems.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,10 @@ async def translate_and_relay_brokerd_events(

name = brokerd_msg['name']

log.info(f'Received broker trade event:\n{pformat(brokerd_msg)}')
log.info(
f'Received broker trade event:\n'
f'{pformat(brokerd_msg)}'
)

if name == 'position':

Expand Down Expand Up @@ -613,19 +616,28 @@ async def translate_and_relay_brokerd_events(
# packed at submission since we already know it ahead of
# time
paper = brokerd_msg['broker_details'].get('paper_info')
ext = brokerd_msg['broker_details'].get('external')
if paper:
# paperboi keeps the ems id up front
oid = paper['oid']

else:
elif ext:
# may be an order msg specified as "external" to the
# piker ems flow (i.e. generated by some other
# external broker backend client (like tws for ib)
ext = brokerd_msg['broker_details'].get('external')
if ext:
log.error(f"External trade event {ext}")
log.error(f"External trade event {ext}")

continue

else:
# something is out of order, we don't have an oid for
# this broker-side message.
log.error(
'Unknown oid:{oid} for msg:\n'
f'{pformat(brokerd_msg)}'
'Unable to relay message to client side!?'
)

else:
# check for existing live flow entry
entry = book._ems_entries.get(oid)
Expand Down Expand Up @@ -823,7 +835,9 @@ async def process_client_order_cmds(
if reqid:

# send cancel to brokerd immediately!
log.info("Submitting cancel for live order {reqid}")
log.info(
f'Submitting cancel for live order {reqid}'
)

await brokerd_order_stream.send(msg.dict())

Expand Down
4 changes: 3 additions & 1 deletion piker/ui/order_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -907,7 +907,9 @@ async def process_trades_and_update_ui(
mode.lines.remove_line(uuid=oid)

# each clearing tick is responded individually
elif resp in ('broker_filled',):
elif resp in (
'broker_filled',
):

known_order = book._sent_orders.get(oid)
if not known_order:
Expand Down

0 comments on commit 7b9ffaa

Please sign in to comment.