diff --git a/piker/brokers/binance/broker.py b/piker/brokers/binance/broker.py index 78b864139..270088b25 100644 --- a/piker/brokers/binance/broker.py +++ b/piker/brokers/binance/broker.py @@ -61,7 +61,11 @@ Status, Order, ) -from .venues import Pair +from .venues import ( + Pair, + _futes_ws, + _testnet_futes_ws, +) from .api import Client log = get_logger('piker.brokers.binance') @@ -213,33 +217,41 @@ async def open_trade_dialog( ) -> AsyncIterator[dict[str, Any]]: + # TODO: how do we set this from the EMS such that + # positions are loaded from the correct venue on the user + # stream at startup? (that is in an attempt to support both + # spot and futes markets?) + # - I guess we just want to instead start 2 separate user + # stream tasks right? unless we want another actor pool? + # XXX: see issue: + venue_name: str = 'futes' + venue_mode: str = 'usdtm_futes' + account_name: str = 'usdtm' + use_testnet: bool = False + async with open_cached_client('binance') as client: - for key, subconf in client.conf.items(): - if subconf.get('api_key'): - break + subconf: dict = client.conf[venue_name] + use_testnet = subconf.get('use_testnet', False) # XXX: if no futes.api_key or spot.api_key has been set we # always fall back to the paper engine! - else: + if not subconf.get('api_key'): await ctx.started('paper') return async with ( open_cached_client('binance') as client, ): - client.mkt_mode: str = 'usdtm_futes' + client.mkt_mode: str = venue_mode - # if client. - venue: str = client.mkt_mode + # TODO: map these wss urls depending on spot or futes + # setting passed when this task is spawned? + wss_url: str = _futes_ws if not use_testnet else _testnet_futes_ws wss: NoBsWs async with ( client.manage_listen_key() as listen_key, - open_autorecon_ws( - f'wss://stream.binancefuture.com/ws/{listen_key}', - # f'wss://stream.binance.com:9443/ws/{listen_key}', - ) as wss, - + open_autorecon_ws(f'{wss_url}/ws/{listen_key}') as wss, ): nsid: int = time_ns() await wss.send_msg({ @@ -270,7 +282,7 @@ async def open_trade_dialog( positions: list[BrokerdPosition] = [] for resp_dict in msg['result']: - resp = resp_dict['res'] + resp: dict = resp_dict['res'] req: str = resp_dict['req'] # @account response should be something like: @@ -329,7 +341,9 @@ async def open_trade_dialog( bs_mktid: str = entry['symbol'] entry_size: float = float(entry['positionAmt']) - pair: Pair | None = client._venue2pairs[venue].get(bs_mktid) + pair: Pair | None = client._venue2pairs[ + venue_mode + ].get(bs_mktid) if ( pair and entry_size > 0 @@ -338,7 +352,7 @@ async def open_trade_dialog( ppmsg = BrokerdPosition( broker='binance', - account='binance.usdtm', + account=f'binance.{account_name}', # TODO: maybe we should be passing back # a `MktPair` here? @@ -357,6 +371,13 @@ async def open_trade_dialog( await ctx.started((positions, list(accounts))) + # TODO: package more state tracking into the dialogs API? + # - hmm maybe we could include `OrderDialogs.dids: + # bidict` as part of the interface and then ask for + # a reqid field to be passed at init? + # |-> `OrderDialog(reqid_field='orderId')` kinda thing? + # - also maybe bundle in some kind of dialog to account + # table? dialogs = OrderDialogs() dids: dict[str, int] = bidict() @@ -404,7 +425,8 @@ async def open_trade_dialog( ) tn.start_soon( handle_order_updates, - venue, + venue_mode, + account_name, client, ems_stream, wss, @@ -417,6 +439,7 @@ async def open_trade_dialog( async def handle_order_updates( venue: str, + account_name: str, client: Client, ems_stream: tractor.MsgStream, wss: NoBsWs, @@ -574,6 +597,9 @@ async def handle_order_updates( time_ns=time_ns(), # reqid=reqid, reqid=oid, + + # TODO: i feel like we don't need to make the + # ems and upstream clients aware of this? # account='binance.usdtm', status=status, @@ -622,7 +648,7 @@ async def handle_order_updates( pair: Pair | None = client._venue2pairs[venue].get(bs_mktid) ppmsg = BrokerdPosition( broker='binance', - account='binance.usdtm', + account=f'binance.{account_name}', # TODO: maybe we should be passing back # a `MktPair` here?