diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 86d30f41f..3d08f92b8 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -18,7 +18,6 @@ Order api and machinery ''' -from collections import ChainMap, defaultdict from contextlib import ( asynccontextmanager as acm, aclosing, @@ -52,6 +51,9 @@ from piker.accounting._mktinfo import ( MktPair, ) +from piker.clearing import( + OrderDialogs, +) from piker.clearing._messages import ( Order, Status, @@ -124,7 +126,7 @@ async def handle_order_requests( client: Client, ems_order_stream: tractor.MsgStream, token: str, - apiflows: dict[int, ChainMap[dict[str, dict]]], + apiflows: OrderDialogs, ids: bidict[str, int], reqids2txids: dict[int, str], @@ -188,6 +190,7 @@ async def handle_order_requests( try: txid: str = reqids2txids[reqid] except KeyError: + # XXX: not sure if this block ever gets hit now? log.error('TOO FAST EDIT') reqids2txids[reqid] = TooFastEdit(reqid) @@ -221,7 +224,11 @@ async def handle_order_requests( 'type': order.action, } - psym: str = order.symbol.upper() + # XXX strip any . token which should + # ONLY ever be '.spot' rn, until we support + # futes. + bs_fqme: str = order.symbol.rstrip('.spot') + psym: str = bs_fqme.upper() pair: str = f'{psym[:3]}/{psym[3:]}' # XXX: ACK the request **immediately** before sending @@ -260,7 +267,7 @@ async def handle_order_requests( await ws.send_msg(req) # placehold for sanity checking in relay loop - apiflows[reqid].maps.append(msg) + apiflows.add_msg(reqid, msg) case _: account = msg.get('account') @@ -440,10 +447,7 @@ async def open_trade_dialog( acc_name = 'kraken.' + acctid # task local msg dialog tracking - apiflows: defaultdict[ - int, - ChainMap[dict[str, dict]], - ] = defaultdict(ChainMap) + apiflows = OrderDialogs() # 2way map for ems ids to kraken int reqids.. ids: bidict[str, int] = bidict() @@ -706,7 +710,7 @@ async def handle_order_updates( ws: NoBsWs, ws_stream: AsyncIterator, ems_stream: tractor.MsgStream, - apiflows: dict[int, ChainMap[dict[str, dict]]], + apiflows: OrderDialogs, ids: bidict[str, int], reqids2txids: bidict[int, str], table: PpTable, @@ -921,7 +925,7 @@ async def handle_order_updates( ), src='kraken', ) - apiflows[reqid].maps.append(status_msg.to_dict()) + apiflows.add_msg(reqid, status_msg.to_dict()) await ems_stream.send(status_msg) continue @@ -1057,7 +1061,7 @@ async def handle_order_updates( ), ) - apiflows[reqid].maps.append(update_msg) + apiflows.add_msg(reqid, update_msg) await ems_stream.send(resp) # fill msg. @@ -1136,9 +1140,8 @@ async def handle_order_updates( ) continue - # update the msg chain - chain = apiflows[reqid] - chain.maps.append(event) + # update the msg history + apiflows.add_msg(reqid, event) if status == 'error': # any of ``{'add', 'edit', 'cancel'}`` @@ -1148,11 +1151,16 @@ async def handle_order_updates( f'Failed to {action} order {reqid}:\n' f'{errmsg}' ) + + symbol: str = 'N/A' + if chain := apiflows.get(reqid): + symbol: str = chain.get('symbol', 'N/A') + await ems_stream.send(BrokerdError( oid=oid, # XXX: use old reqid in case it changed? reqid=reqid, - symbol=chain.get('symbol', 'N/A'), + symbol=symbol, reason=f'Failed {action}:\n{errmsg}', broker_details=event