Skip to content

Commit

Permalink
Update ledger from api immediately, cruft cleaning
Browse files Browse the repository at this point in the history
  • Loading branch information
goodboy committed Jul 21, 2022
1 parent a6aabb9 commit 2d0f926
Showing 1 changed file with 23 additions and 39 deletions.
62 changes: 23 additions & 39 deletions piker/brokers/kraken/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
from typing import (
Any,
AsyncIterator,
# Optional,
Union,
)

Expand All @@ -46,7 +45,6 @@
Position,
PpTable,
Transaction,
# update_pps_conf,
open_trade_ledger,
open_pps,
)
Expand Down Expand Up @@ -391,6 +389,7 @@ async def trades_dialogue(
# most recent 50 trades and assume that by ordering we
# already have those records in the ledger.
tids2trades = await client.get_trades()
ledger_dict.update(tids2trades)
api_trans = norm_trade_records(tids2trades)

# retrieve kraken reported balances
Expand Down Expand Up @@ -448,12 +447,13 @@ def has_pp(dst: str) -> Position | bool:
ppmsgs = trades2pps(
table,
acctid,
# new_trans,
)
await ctx.started((ppmsgs, [acc_name]))

# XXX: not fucking clue but putting this finally block
# will suppress errors inside the direct await below!?!
# likely something to do with the exist stack inside
# the nobsws stuff...
# try:

# Get websocket token for authenticated data stream
Expand Down Expand Up @@ -494,26 +494,19 @@ def has_pp(dst: str) -> Position | bool:
)

# enter relay loop
# try:
try:
await handle_order_updates(
ws,
stream,
ems_stream,
apiflows,
ids,
reqids2txids,
table,
api_trans,
acctid,
acc_name,
token,
)
# except:
# await tractor.breakpoint()
finally:
# always update ledger on exit
ledger_dict.update(tids2trades)
await handle_order_updates(
ws,
stream,
ems_stream,
apiflows,
ids,
reqids2txids,
table,
api_trans,
acctid,
acc_name,
token,
)


async def handle_order_updates(
Expand Down Expand Up @@ -561,9 +554,13 @@ async def handle_order_updates(
f'ownTrades update_{seq}:\n'
f'{pformat(trades_msgs)}'
)
# XXX: a fix / todo
# see the comment in the caller about weird error
# suppression around a commented `try:`
# assert 0

# format as tid -> trade event map
# eg. msg
# eg. received msg format,
# [{'TOKWHY-SMTUB-G5DOI6': {'cost': '95.29047',
# 'fee': '0.24776',
# 'margin': '0.00000',
Expand All @@ -579,15 +576,10 @@ async def handle_order_updates(
tid: trade
for entry in trades_msgs
for (tid, trade) in entry.items()

# don't re-process datums we've already seen
if tid not in ledger_trans
}

# if tid in ledger_trans:
# # skip already seen transactions
# log.info(f'Skipping already seen trade {trade}')
# continue

# await tractor.breakpoint()
for tid, trade in trades.items():
txid = trade['ordertxid']

Expand Down Expand Up @@ -642,11 +634,6 @@ async def handle_order_updates(
)
await ems_stream.send(filled_msg)

# if not trades:
# # skip pp emissions if we have already
# # processed all trades in this msg.
# continue

new_trans = norm_trade_records(trades)
ppmsgs = trades2pps(
table,
Expand Down Expand Up @@ -897,9 +884,6 @@ async def handle_order_updates(
chain = apiflows[reqid]
chain.maps.append(event)

# pretxid = chain['txid']
# print(f'pretxid: {pretxid}')

resps, errored = process_status(
event,
oid,
Expand Down

0 comments on commit 2d0f926

Please sign in to comment.