diff --git a/eth_portfolio/__init__.py b/eth_portfolio/__init__.py index c25be218..b2f17e4c 100644 --- a/eth_portfolio/__init__.py +++ b/eth_portfolio/__init__.py @@ -1,3 +1,9 @@ + +import a_sync._smart + +a_sync._smart.set_smart_task_factory() + + from eth_portfolio.portfolio import Portfolio, portfolio # make sure we init the extended db before we init ypm somewhere from eth_portfolio._db import utils diff --git a/eth_portfolio/_ledgers/address.py b/eth_portfolio/_ledgers/address.py index c5424b01..6d58ffd2 100644 --- a/eth_portfolio/_ledgers/address.py +++ b/eth_portfolio/_ledgers/address.py @@ -10,10 +10,9 @@ import dank_mids import eth_retry from pandas import DataFrame # type: ignore -from tqdm.asyncio import tqdm_asyncio from y import ERC20 +from y._decorators import stuck_coro_debugger from y.datatypes import Block -from y.decorators import stuck_coro_debugger from y.utils.events import BATCH_SIZE from eth_portfolio import _loaders @@ -22,8 +21,7 @@ from eth_portfolio._loaders.transaction import get_nonce_at_block from eth_portfolio._ydb.token_transfers import TokenTransfers from eth_portfolio.structs import InternalTransfer, TokenTransfer, Transaction -from eth_portfolio.utils import (_AiterMixin, PandableList, _unpack_indicies, - get_buffered_chain_height) +from eth_portfolio.utils import _AiterMixin, PandableList, get_buffered_chain_height if TYPE_CHECKING: from eth_portfolio.address import PortfolioAddress @@ -235,19 +233,18 @@ async def _load_new_objects(self, _: Block, end_block: Block) -> AsyncIterator[T class InternalTransfersList(PandableList[InternalTransfer]): pass -trace_semaphore = a_sync.Semaphore(32, __name__ + ".trace_semaphore") @cache_to_disk # we double stack these because high-volume wallets will likely need it @eth_retry.auto_retry -@eth_retry.auto_retry @stuck_coro_debugger +@a_sync.Semaphore(32, __name__ + ".trace_semaphore") +@eth_retry.auto_retry async def get_traces(params: list) -> List[dict]: - async with trace_semaphore: - traces = await dank_mids.web3.provider.make_request("trace_filter", params) # type: ignore [arg-type, misc] - if 'result' not in traces: - raise BadResponse(traces) - return [trace for trace in traces['result'] if "error" not in trace] + traces = await dank_mids.web3.provider.make_request("trace_filter", params) # type: ignore [arg-type, misc] + if 'result' not in traces: + raise BadResponse(traces) + return [trace for trace in traces['result'] if "error" not in trace] class AddressInternalTransfersLedger(AddressLedgerBase[InternalTransfersList, InternalTransfer]): _list_type = InternalTransfersList diff --git a/eth_portfolio/_loaders/balances.py b/eth_portfolio/_loaders/balances.py index edc86da8..923704b5 100644 --- a/eth_portfolio/_loaders/balances.py +++ b/eth_portfolio/_loaders/balances.py @@ -6,9 +6,9 @@ import dank_mids import eth_retry from y import ERC20, NonStandardERC20, get_price +from y._decorators import stuck_coro_debugger from y.constants import WRAPPED_GAS_COIN from y.datatypes import Address, Block -from y.decorators import stuck_coro_debugger from eth_portfolio.typing import Balance from eth_portfolio.utils import _get_price diff --git a/eth_portfolio/_loaders/internal_transfer.py b/eth_portfolio/_loaders/internal_transfer.py index cbd5694d..83395d2b 100644 --- a/eth_portfolio/_loaders/internal_transfer.py +++ b/eth_portfolio/_loaders/internal_transfer.py @@ -3,8 +3,9 @@ from typing import Optional from brownie import chain +from dank_mids.structs import Trace +from y._decorators import stuck_coro_debugger from y.constants import EEE_ADDRESS -from y.decorators import stuck_coro_debugger from eth_portfolio._loaders.utils import checksum, get_transaction_receipt, underscore from eth_portfolio.structs import InternalTransfer @@ -12,73 +13,60 @@ @stuck_coro_debugger -async def load_internal_transfer(transfer: dict, load_prices: bool) -> Optional[InternalTransfer]: - if transfer.get("to") == "0xd9db270c1b5e3bd161e8c8503c55ceabee709552": - # "0xd9Db270c1B5E3Bd161E8c8503c55cEABeE709552": # Gnosis Safe Singleton 1.3.0 +async def load_internal_transfer(trace: Trace, load_prices: bool) -> Optional[InternalTransfer]: + if trace.to == "0xd9Db270c1B5E3Bd161E8c8503c55cEABeE709552": # Gnosis Safe Singleton 1.3.0 # NOTE: Not sure why these appear, but I've yet to come across an internal transfer # that actually transmitted value to the singleton even though they appear to. return None - - if is_block_reward(transfer): - transfer['transactionHash'] = 'block reward' - elif is_uncle_reward(transfer): - transfer['transactionHash'] = 'uncle reward' + + params = {} + + if trace.type == "reward: + if trace.action.rewardType == 'block': + params = {'hash': 'block reward'} + elif trace.action.rewardType == 'uncle': + params = {'hash': 'uncle reward'} + else: + raise NotImplementedError(trace.action.rewardType) else: # NOTE: We don't need to confirm block rewards came from a successful transaction, because they don't come from a transaction # In all other cases, we need to confirm the transaction didn't revert - receipt = await get_transaction_receipt(transfer['transactionHash']) + receipt = await get_transaction_receipt(trace.transactionHash) if receipt.status == 0: return None + params = {'hash': trace.transactionHash} + + params['transaction_index'] = trace.transactionPosition + params['chainid'] = chain.id # Un-nest the action dict - if action := transfer.pop('action', None): + if action := trace.action: for key, value in action.items(): if key == 'author': # for block reward transfers, the recipient is 'author' - transfer['to'] = value + params['to'] = value else: - transfer[key] = value + params[key] = value # Un-nest the result dict - if result := transfer.pop('result', None): + if result := trace.result: for key, value in result.items(): - transfer[key] = value + params[key] = value - # Checksum the addresses - if "from" in transfer: - transfer['from_address'] = checksum(transfer.pop('from')) - if "to" in transfer: - transfer['to_address'] = checksum(transfer.pop('to')) - if "address" in transfer: - transfer['address'] = checksum(transfer.pop('address')) - - transfer['traceAddress'] = str(transfer['traceAddress']) - transfer['value'] = Decimal(int(transfer['value'], 16)) / Decimal(1e18) - transfer['gas'] = 0 if is_block_reward(transfer) or is_uncle_reward(transfer) else int(transfer['gas'], 16) - transfer['gasUsed'] = int(transfer['gasUsed'], 16) if transfer.get('gasUsed') else None + # Remap the addresses + if trace.sender: + params['from_address'] = trace.sender + if trace.to: + params['to_address'] = trace.to - if load_prices: - price = round(Decimal(await _get_price(EEE_ADDRESS, transfer['blockNumber'])), 18) - transfer['price'] = price - transfer['value_usd'] = round(transfer['value'] * price, 18) - - transfer['hash'] = transfer.pop('transactionHash') - transfer['transaction_index'] = transfer.pop('transactionPosition', None) - transfer['chainid'] = chain.id + value = Decimal(trace.value) / 10**18 + params['value'] = value + params['gas'] = 0 if trace.type == "reward" and trace.action.rewardType in ["block", "uncle"] else trace.gas - # We include this data in the hash field, we don't need it anymore - transfer.pop('rewardType', None) + if load_prices: + price = round(Decimal(await _get_price(EEE_ADDRESS, trace.blockNumber)), 18) + params['price'] = price + params['value_usd'] = round(value * price, 18) + + return InternalTransfer(**params) - return InternalTransfer(**{underscore(k): v for k, v in transfer.items()}) - -def is_block_reward(transfer: dict) -> bool: - return transfer['type'] == 'reward' and get_reward_type(transfer) == 'block' - -def is_uncle_reward(transfer: dict) -> bool: - return transfer['type'] == 'reward' and get_reward_type(transfer) == 'uncle' - -def get_reward_type(transfer: dict) -> str: - try: - return transfer.get('rewardType') or transfer['action']['rewardType'] - except KeyError: - raise ValueError('transfer is not reward type') from None diff --git a/eth_portfolio/_loaders/token_transfer.py b/eth_portfolio/_loaders/token_transfer.py index 3a54563f..6d0753f9 100644 --- a/eth_portfolio/_loaders/token_transfer.py +++ b/eth_portfolio/_loaders/token_transfer.py @@ -10,7 +10,7 @@ from brownie.network.event import _EventItem as brownie_EventItem from pony.orm import TransactionIntegrityError from y import ERC20, Contract -from y.decorators import stuck_coro_debugger +from y._decorators import stuck_coro_debugger from y.exceptions import ContractNotVerified, NonStandardERC20 from y.utils.events import decode_logs diff --git a/eth_portfolio/_loaders/transaction.py b/eth_portfolio/_loaders/transaction.py index c403a03a..f5b429da 100644 --- a/eth_portfolio/_loaders/transaction.py +++ b/eth_portfolio/_loaders/transaction.py @@ -11,9 +11,9 @@ from pony.orm import TransactionIntegrityError from web3.types import TxData from y import get_price +from y._decorators import stuck_coro_debugger from y.constants import EEE_ADDRESS from y.datatypes import Address, Block -from y.decorators import stuck_coro_debugger from eth_portfolio._db import utils as db from eth_portfolio._loaders.utils import get_transaction_receipt, underscore @@ -105,11 +105,15 @@ async def get_nonce_at_block(address: Address, block: Block) -> int: return -1 raise ValueError(f"For {address} at {block}: {e}") +@alru_cache(ttl=60*60) @eth_retry.auto_retry @stuck_coro_debugger -async def get_block_transactions(block: Block) -> List[TxData]: - async with _full_block_semaphore: - block = await dank_mids.eth.get_block(block, full_transactions=True) - return block.transactions +async def _get_block_transactions(block: Block) -> List[TxData]: + block = await dank_mids.eth.get_block(block, full_transactions=True) + return block.transactions -_full_block_semaphore = a_sync.Semaphore(1_000, name = __name__ + "._full_block_semaphore") +get_block_transactions = a_sync.SmartProcessingQueue( + _get_block_transactions, + num_workers=1_000, + name=__name__ + ".get_block_transactions", +) diff --git a/eth_portfolio/_loaders/utils.py b/eth_portfolio/_loaders/utils.py index d882c556..9fd94d50 100644 --- a/eth_portfolio/_loaders/utils.py +++ b/eth_portfolio/_loaders/utils.py @@ -8,16 +8,20 @@ from async_lru import alru_cache from eth_utils import to_checksum_address from web3.types import TxReceipt -from y.decorators import stuck_coro_debugger +from y._decorators import stuck_coro_debugger -receipt_semaphore = a_sync.Semaphore(1_000) @eth_retry.auto_retry -@alru_cache +@alru_cache(ttl=60*60) @stuck_coro_debugger -async def get_transaction_receipt(txhash: str) -> TxReceipt: - async with receipt_semaphore: - return await dank_mids.eth.get_transaction_receipt(txhash) +async def _get_transaction_receipt(txhash: str) -> TxReceipt: + return await dank_mids.eth.get_transaction_receipt(txhash) + +get_transaction_receipt = a_sync.SmartProcessingQueue( + _get_transaction_receipt, + num_workers=1000, + name=__name__ + ".get_transaction_receipt", +) def checksum(addr: str) -> str: """We keep a mapping here to save cpu cycles, checksumming is arduous.""" diff --git a/eth_portfolio/_ydb/token_transfers.py b/eth_portfolio/_ydb/token_transfers.py index ec5f9b14..fac5f45e 100644 --- a/eth_portfolio/_ydb/token_transfers.py +++ b/eth_portfolio/_ydb/token_transfers.py @@ -25,6 +25,8 @@ logger = logging.getLogger(__name__) +logged = set() + class _TokenTransfers(ProcessedEvents["asyncio.Task[TokenTransfer]"]): """A helper mixin that contains all logic for fetching token transfers for a particular wallet address""" __slots__ = "address", "_load_prices" @@ -41,10 +43,16 @@ def _topics(self) -> List: async def yield_thru_block(self, block) -> AsyncIterator["asyncio.Task[TokenTransfer]"]: logger.debug("%s yielding all objects thru block %s", self, block) async for task in self._objects_thru(block=block): - logger.debug("yielding %s at block %s [thru: %s, lock: %s]", task, task.block, block, self._lock.value) + + if task.block == 20205291 and task not in logged: + logger.info("yielding %s at block %s [thru: %s, lock: %s]", task, task.block, block, self._lock.value) + logged.add(task) + else: + logger.debug("yielding %s at block %s [thru: %s, lock: %s]", task, task.block, block, self._lock.value) + yield task logger.debug("%s yield thru %s complete", self, block) - def _extend(self, objs: List[LogReceipt]) -> None: + async def _extend(self, objs: List[LogReceipt]) -> None: for log in objs: task = asyncio.create_task( coro=_loaders.load_token_transfer(log, self._load_prices), @@ -66,13 +74,13 @@ class InboundTokenTransfers(_TokenTransfers): """A container that fetches and iterates over all inbound token transfers for a particular wallet address""" @property def _topics(self) -> List: - return [TRANSFER_SIGS, None, [encode_address(self.address)]] + return [TRANSFER_SIGS, None, encode_address(self.address)] class OutboundTokenTransfers(_TokenTransfers): """A container that fetches and iterates over all outbound token transfers for a particular wallet address""" @property def _topics(self) -> List: - return [TRANSFER_SIGS, [encode_address(self.address)]] + return [TRANSFER_SIGS, encode_address(self.address)] class TokenTransfers(a_sync.ASyncIterable[TokenTransfer]): """ diff --git a/eth_portfolio/address.py b/eth_portfolio/address.py index afac1b86..bdb17b14 100644 --- a/eth_portfolio/address.py +++ b/eth_portfolio/address.py @@ -6,9 +6,9 @@ import a_sync from a_sync.exceptions import MappingIsEmptyError from y import convert +from y._decorators import stuck_coro_debugger from y.constants import EEE_ADDRESS from y.datatypes import Address, Block -from y.decorators import stuck_coro_debugger from eth_portfolio._ledgers.address import (AddressInternalTransfersLedger, AddressLedgerBase, diff --git a/eth_portfolio/constants.py b/eth_portfolio/constants.py index 8ff7a965..8d796abc 100644 --- a/eth_portfolio/constants.py +++ b/eth_portfolio/constants.py @@ -2,7 +2,7 @@ import os from brownie import chain -from a_sync.primitives.executor import PruningThreadPoolExecutor +from a_sync.executor import PruningThreadPoolExecutor from y import Network, convert, weth ERC20_TRANSFER_EVENT_HASH = '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef' diff --git a/eth_portfolio/portfolio.py b/eth_portfolio/portfolio.py index 9fd0bd0b..28315aa5 100644 --- a/eth_portfolio/portfolio.py +++ b/eth_portfolio/portfolio.py @@ -2,10 +2,10 @@ import asyncio import logging from functools import cached_property, wraps -from typing import Any, Dict, Iterable, Iterator, List, Tuple +from typing import Any, Dict, Iterable, Iterator, List, Tuple, Union import a_sync -import a_sync.modified +from a_sync.a_sync import ASyncFunction from brownie import web3 from checksum_dict import ChecksumAddressDict from pandas import DataFrame, concat # type: ignore @@ -41,6 +41,8 @@ def __init__(self, portfolio: "Portfolio", addresses: Iterable[Address]) -> None }) def __repr__(self) -> str: return f"<{type(self).__name__} wallets={list(self._wallets.values())}>" + def __contains__(self, address: Union[Address, PortfolioAddress]) -> bool: + return address in self._wallets def __getitem__(self, address: Address) -> PortfolioAddress: return self._wallets[address] def __iter__(self) -> Iterator[PortfolioAddress]: @@ -133,7 +135,7 @@ async def describe(self, block: int) -> PortfolioBalances: return PortfolioBalances(await a_sync.gather({address: address.describe(block, sync=False) for address in self})) -async_functions = {name: obj for name, obj in PortfolioAddress.__dict__.items() if isinstance(obj, a_sync.modified.ASyncFunction)} +async_functions = {name: obj for name, obj in PortfolioAddress.__dict__.items() if isinstance(obj, ASyncFunction)} for func_name, func in async_functions.items(): if not callable(getattr(PortfolioAddress, func_name)): raise RuntimeError(f"A PortfolioAddress object should not have a non-callable attribute suffixed with '_async'") diff --git a/eth_portfolio/protocols/__init__.py b/eth_portfolio/protocols/__init__.py index 7effa75a..c9cbd5c7 100644 --- a/eth_portfolio/protocols/__init__.py +++ b/eth_portfolio/protocols/__init__.py @@ -19,6 +19,8 @@ def __init__(self) -> None: @a_sync.future async def balances(self, address: Address, block: Optional[Block] = None) -> RemoteTokenBalances: + if not self.protocols: + return RemoteTokenBalances() return RemoteTokenBalances({ type(protocol).__name__: protocol_balances async for protocol, protocol_balances diff --git a/eth_portfolio/protocols/_base.py b/eth_portfolio/protocols/_base.py index 86e278fd..cbb217fb 100644 --- a/eth_portfolio/protocols/_base.py +++ b/eth_portfolio/protocols/_base.py @@ -4,15 +4,14 @@ from typing import List, Optional import a_sync -from a_sync.modified import ASyncFunctionSyncDefault -from a_sync.property import HiddenMethod +from a_sync.a_sync import ASyncFunctionSyncDefault, HiddenMethod from brownie.network.contract import ContractCall from eth_portfolio.typing import Balance, TokenBalances from eth_portfolio.utils import Decimal from y import ERC20, Contract +from y._decorators import stuck_coro_debugger from y.contracts import contract_creation_block from y.datatypes import Address, Block -from y.decorators import stuck_coro_debugger class ProtocolABC(metaclass=abc.ABCMeta): diff --git a/eth_portfolio/protocols/lending/__init__.py b/eth_portfolio/protocols/lending/__init__.py index fd0ad758..b7689c7d 100644 --- a/eth_portfolio/protocols/lending/__init__.py +++ b/eth_portfolio/protocols/lending/__init__.py @@ -8,8 +8,8 @@ from eth_portfolio.typing import RemoteTokenBalances from eth_portfolio.utils import (_get_protocols_for_submodule, _import_submodules) +from y._decorators import stuck_coro_debugger from y.datatypes import Address, Block -from y.decorators import stuck_coro_debugger _import_submodules() @@ -31,6 +31,8 @@ async def collateral(self, address: Address, block: Optional[Block] = None) -> R @a_sync.future @stuck_coro_debugger async def debt(self, address: Address, block: Optional[Block] = None) -> RemoteTokenBalances: + if not self.protocols: + return RemoteTokenBalances() return RemoteTokenBalances({ type(protocol).__name__: token_balances async for protocol, token_balances in a_sync.map(lambda p: p.debt(address, block), self.protocols) diff --git a/eth_portfolio/protocols/lending/compound.py b/eth_portfolio/protocols/lending/compound.py index 3cfc0831..55ae26c8 100644 --- a/eth_portfolio/protocols/lending/compound.py +++ b/eth_portfolio/protocols/lending/compound.py @@ -5,9 +5,9 @@ import a_sync from async_lru import alru_cache from brownie import ZERO_ADDRESS, Contract -from y import ERC20, Contract, get_prices, weth +from y import ERC20, Contract, map_prices, weth +from y._decorators import stuck_coro_debugger from y.datatypes import Block -from y.decorators import stuck_coro_debugger from y.exceptions import ContractNotVerified from y.prices.lending.compound import CToken, compound @@ -65,11 +65,11 @@ async def _debt(self, address: Address, block: Optional[Block] = None) -> TokenB asyncio.gather(*[underlying.__scale__ for underlying in underlyings]), ) - debts = {underlying: Decimal(debt) / scale for underlying, scale, debt in zip(underlyings, underlying_scale, debt_data) if debt} - prices = await get_prices(debts.keys(), block=block, sync=False) balances: TokenBalances = TokenBalances() - for (underlying, debt), price in zip(debts.items(), prices): - balances[underlying] += Balance(debt, debt * Decimal(price)) + if debts := {underlying: Decimal(debt) / scale for underlying, scale, debt in zip(underlyings, underlying_scale, debt_data) if debt}: + async for underlying, price in map_prices(debts, block=block): + debt = debts.pop(underlying) + balances[underlying] += Balance(debt, debt * Decimal(price)) return balances @stuck_coro_debugger diff --git a/eth_portfolio/protocols/lending/liquity.py b/eth_portfolio/protocols/lending/liquity.py index 012ce7bd..9e5aa937 100644 --- a/eth_portfolio/protocols/lending/liquity.py +++ b/eth_portfolio/protocols/lending/liquity.py @@ -3,9 +3,9 @@ from async_lru import alru_cache from y import Contract, Network, get_price +from y._decorators import stuck_coro_debugger from y.constants import EEE_ADDRESS from y.datatypes import Address, Block -from y.decorators import stuck_coro_debugger from eth_portfolio.protocols.lending._base import \ LendingProtocolWithLockedCollateral diff --git a/eth_portfolio/protocols/lending/maker.py b/eth_portfolio/protocols/lending/maker.py index fbefeb0e..9626d27a 100644 --- a/eth_portfolio/protocols/lending/maker.py +++ b/eth_portfolio/protocols/lending/maker.py @@ -4,10 +4,10 @@ from async_lru import alru_cache from y import Network, get_price +from y._decorators import stuck_coro_debugger from y.constants import dai from y.contracts import Contract from y.datatypes import Address, Block -from y.decorators import stuck_coro_debugger from eth_portfolio.protocols.lending._base import \ LendingProtocolWithLockedCollateral diff --git a/eth_portfolio/protocols/lending/unit.py b/eth_portfolio/protocols/lending/unit.py index fd685cda..b06ae63d 100644 --- a/eth_portfolio/protocols/lending/unit.py +++ b/eth_portfolio/protocols/lending/unit.py @@ -3,8 +3,8 @@ from brownie import chain from y import Contract, Network, get_price +from y._decorators import stuck_coro_debugger from y.datatypes import Address, Block -from y.decorators import stuck_coro_debugger from eth_portfolio.protocols.lending._base import \ LendingProtocolWithLockedCollateral diff --git a/requirements.txt b/requirements.txt index 348341bc..dc938647 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,7 @@ checksum_dict>=1.1.2 dank_mids>=4.20.85 eth-brownie>=1.19.3,<1.21 eth_retry>=0.1.15,<1 -ez-a-sync>=0.19.5,<0.21 +ez-a-sync>=0.22.1 +numpy<2 pandas>=1.4.3,<1.6 ypricemagic>=3.3.0,<4