Skip to content

Commit

Permalink
Only emit pps msg for trade triggering instrument
Browse files Browse the repository at this point in the history
We can probably make this better (and with less file sys accesses) later
such that we keep a consistent pps state in mem and only write async
maybe from another side-task?
  • Loading branch information
goodboy committed Jun 16, 2022
1 parent b38c17d commit a95eafb
Showing 1 changed file with 15 additions and 19 deletions.
34 changes: 15 additions & 19 deletions piker/brokers/ib/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
Client,
MethodProxy,
)
# from .feed import open_data_client


def pack_position(
Expand Down Expand Up @@ -285,7 +284,6 @@ def push_tradesies(

async def update_ledger_from_api_trades(
trade_entries: list[dict[str, Any]],
ib_pp_msgs: dict[int, BrokerdPosition], # conid -> msg
client: Union[Client, MethodProxy],

) -> dict[str, Any]:
Expand Down Expand Up @@ -478,7 +476,6 @@ async def open_stream(
trades = await proxy.trades()
await update_ledger_from_api_trades(
trades,
cids2pps, # pass these in to map to correct fqsns..
proxy,
)

Expand Down Expand Up @@ -542,25 +539,26 @@ async def emit_pp_update(
proxy = proxies[acctid]
await update_ledger_from_api_trades(
[trade_entry],
cids2pps, # pass these in to map to correct fqsns..
proxy,
)
# load all positions from `pps.toml`, cross check with
# ib's positions data, and relay re-formatted pps as
# msgs to the ems.
for acctid, by_fqsn in pp.get_pps(
'ib',
acctids={acctid},
).items():

# should only be one right?
msgs = await update_and_audit(
by_fqsn,
cids2pps,
validate=False,
)
for msg in msgs:
await ems_stream.send(msg.dict())
by_acct = pp.get_pps('ib', acctids={acctid})
by_fqsn = by_acct[acctid.strip('ib.')]

for fqsn, p in by_fqsn.items():
if p.bsuid == trade_entry['contract']['conId']:
# should only be one right?
msgs = await update_and_audit(
{fqsn: p},
cids2pps,
validate=False,
)
msg = msgs[0]
break

await ems_stream.send(msg.dict())


async def deliver_trade_events(
Expand Down Expand Up @@ -714,7 +712,6 @@ async def deliver_trade_events(

if comms or cost_already_rx:
# only send a pp update once we have a cost report
print("EMITTING PP")
await emit_pp_update(
ems_stream,
trade_entry,
Expand All @@ -739,7 +736,6 @@ async def deliver_trade_events(
)

if fill_already_rx:
print("EMITTING PP")
await emit_pp_update(
ems_stream,
trade_entry,
Expand Down

0 comments on commit a95eafb

Please sign in to comment.