Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Debug temp #51

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
6 changes: 6 additions & 0 deletions eth_portfolio/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@

import a_sync._smart

a_sync._smart.set_smart_task_factory()


from eth_portfolio.portfolio import Portfolio, portfolio
# make sure we init the extended db before we init ypm somewhere
from eth_portfolio._db import utils
Expand Down
19 changes: 8 additions & 11 deletions eth_portfolio/_ledgers/address.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@
import dank_mids
import eth_retry
from pandas import DataFrame # type: ignore
from tqdm.asyncio import tqdm_asyncio
from y import ERC20
from y._decorators import stuck_coro_debugger
from y.datatypes import Block
from y.decorators import stuck_coro_debugger
from y.utils.events import BATCH_SIZE

from eth_portfolio import _loaders
Expand All @@ -22,8 +21,7 @@
from eth_portfolio._loaders.transaction import get_nonce_at_block
from eth_portfolio._ydb.token_transfers import TokenTransfers
from eth_portfolio.structs import InternalTransfer, TokenTransfer, Transaction
from eth_portfolio.utils import (_AiterMixin, PandableList, _unpack_indicies,
get_buffered_chain_height)
from eth_portfolio.utils import _AiterMixin, PandableList, get_buffered_chain_height

if TYPE_CHECKING:
from eth_portfolio.address import PortfolioAddress
Expand Down Expand Up @@ -235,19 +233,18 @@ async def _load_new_objects(self, _: Block, end_block: Block) -> AsyncIterator[T
class InternalTransfersList(PandableList[InternalTransfer]):
pass

trace_semaphore = a_sync.Semaphore(32, __name__ + ".trace_semaphore")

@cache_to_disk
# we double stack these because high-volume wallets will likely need it
@eth_retry.auto_retry
@eth_retry.auto_retry
@stuck_coro_debugger
@a_sync.Semaphore(32, __name__ + ".trace_semaphore")
@eth_retry.auto_retry
async def get_traces(params: list) -> List[dict]:
async with trace_semaphore:
traces = await dank_mids.web3.provider.make_request("trace_filter", params) # type: ignore [arg-type, misc]
if 'result' not in traces:
raise BadResponse(traces)
return [trace for trace in traces['result'] if "error" not in trace]
traces = await dank_mids.web3.provider.make_request("trace_filter", params) # type: ignore [arg-type, misc]
if 'result' not in traces:
raise BadResponse(traces)
return [trace for trace in traces['result'] if "error" not in trace]

class AddressInternalTransfersLedger(AddressLedgerBase[InternalTransfersList, InternalTransfer]):
_list_type = InternalTransfersList
Expand Down
2 changes: 1 addition & 1 deletion eth_portfolio/_loaders/balances.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
import dank_mids
import eth_retry
from y import ERC20, NonStandardERC20, get_price
from y._decorators import stuck_coro_debugger
from y.constants import WRAPPED_GAS_COIN
from y.datatypes import Address, Block
from y.decorators import stuck_coro_debugger

from eth_portfolio.typing import Balance
from eth_portfolio.utils import _get_price
Expand Down
88 changes: 38 additions & 50 deletions eth_portfolio/_loaders/internal_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,82 +3,70 @@
from typing import Optional

from brownie import chain
from dank_mids.structs import Trace
from y._decorators import stuck_coro_debugger
from y.constants import EEE_ADDRESS
from y.decorators import stuck_coro_debugger

from eth_portfolio._loaders.utils import checksum, get_transaction_receipt, underscore
from eth_portfolio.structs import InternalTransfer
from eth_portfolio.utils import _get_price


@stuck_coro_debugger
async def load_internal_transfer(transfer: dict, load_prices: bool) -> Optional[InternalTransfer]:
if transfer.get("to") == "0xd9db270c1b5e3bd161e8c8503c55ceabee709552":
# "0xd9Db270c1B5E3Bd161E8c8503c55cEABeE709552": # Gnosis Safe Singleton 1.3.0
async def load_internal_transfer(trace: Trace, load_prices: bool) -> Optional[InternalTransfer]:
if trace.to == "0xd9Db270c1B5E3Bd161E8c8503c55cEABeE709552": # Gnosis Safe Singleton 1.3.0
# NOTE: Not sure why these appear, but I've yet to come across an internal transfer
# that actually transmitted value to the singleton even though they appear to.
return None

if is_block_reward(transfer):
transfer['transactionHash'] = 'block reward'
elif is_uncle_reward(transfer):
transfer['transactionHash'] = 'uncle reward'

params = {}

if trace.type == "reward:
if trace.action.rewardType == 'block':
params = {'hash': 'block reward'}
elif trace.action.rewardType == 'uncle':
params = {'hash': 'uncle reward'}
else:
raise NotImplementedError(trace.action.rewardType)
else:
# NOTE: We don't need to confirm block rewards came from a successful transaction, because they don't come from a transaction
# In all other cases, we need to confirm the transaction didn't revert
receipt = await get_transaction_receipt(transfer['transactionHash'])
receipt = await get_transaction_receipt(trace.transactionHash)
if receipt.status == 0:
return None
params = {'hash': trace.transactionHash}

params['transaction_index'] = trace.transactionPosition
params['chainid'] = chain.id

# Un-nest the action dict
if action := transfer.pop('action', None):
if action := trace.action:
for key, value in action.items():
if key == 'author':
# for block reward transfers, the recipient is 'author'
transfer['to'] = value
params['to'] = value
else:
transfer[key] = value
params[key] = value

# Un-nest the result dict
if result := transfer.pop('result', None):
if result := trace.result:
for key, value in result.items():
transfer[key] = value
params[key] = value

# Checksum the addresses
if "from" in transfer:
transfer['from_address'] = checksum(transfer.pop('from'))
if "to" in transfer:
transfer['to_address'] = checksum(transfer.pop('to'))
if "address" in transfer:
transfer['address'] = checksum(transfer.pop('address'))

transfer['traceAddress'] = str(transfer['traceAddress'])
transfer['value'] = Decimal(int(transfer['value'], 16)) / Decimal(1e18)
transfer['gas'] = 0 if is_block_reward(transfer) or is_uncle_reward(transfer) else int(transfer['gas'], 16)
transfer['gasUsed'] = int(transfer['gasUsed'], 16) if transfer.get('gasUsed') else None
# Remap the addresses
if trace.sender:
params['from_address'] = trace.sender
if trace.to:
params['to_address'] = trace.to

if load_prices:
price = round(Decimal(await _get_price(EEE_ADDRESS, transfer['blockNumber'])), 18)
transfer['price'] = price
transfer['value_usd'] = round(transfer['value'] * price, 18)

transfer['hash'] = transfer.pop('transactionHash')
transfer['transaction_index'] = transfer.pop('transactionPosition', None)
transfer['chainid'] = chain.id
value = Decimal(trace.value) / 10**18
params['value'] = value
params['gas'] = 0 if trace.type == "reward" and trace.action.rewardType in ["block", "uncle"] else trace.gas

# We include this data in the hash field, we don't need it anymore
transfer.pop('rewardType', None)
if load_prices:
price = round(Decimal(await _get_price(EEE_ADDRESS, trace.blockNumber)), 18)
params['price'] = price
params['value_usd'] = round(value * price, 18)

return InternalTransfer(**params)

return InternalTransfer(**{underscore(k): v for k, v in transfer.items()})

def is_block_reward(transfer: dict) -> bool:
return transfer['type'] == 'reward' and get_reward_type(transfer) == 'block'

def is_uncle_reward(transfer: dict) -> bool:
return transfer['type'] == 'reward' and get_reward_type(transfer) == 'uncle'

def get_reward_type(transfer: dict) -> str:
try:
return transfer.get('rewardType') or transfer['action']['rewardType']
except KeyError:
raise ValueError('transfer is not reward type') from None
2 changes: 1 addition & 1 deletion eth_portfolio/_loaders/token_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from brownie.network.event import _EventItem as brownie_EventItem
from pony.orm import TransactionIntegrityError
from y import ERC20, Contract
from y.decorators import stuck_coro_debugger
from y._decorators import stuck_coro_debugger
from y.exceptions import ContractNotVerified, NonStandardERC20
from y.utils.events import decode_logs

Expand Down
16 changes: 10 additions & 6 deletions eth_portfolio/_loaders/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
from pony.orm import TransactionIntegrityError
from web3.types import TxData
from y import get_price
from y._decorators import stuck_coro_debugger
from y.constants import EEE_ADDRESS
from y.datatypes import Address, Block
from y.decorators import stuck_coro_debugger

from eth_portfolio._db import utils as db
from eth_portfolio._loaders.utils import get_transaction_receipt, underscore
Expand Down Expand Up @@ -105,11 +105,15 @@ async def get_nonce_at_block(address: Address, block: Block) -> int:
return -1
raise ValueError(f"For {address} at {block}: {e}")

@alru_cache(ttl=60*60)
@eth_retry.auto_retry
@stuck_coro_debugger
async def get_block_transactions(block: Block) -> List[TxData]:
async with _full_block_semaphore:
block = await dank_mids.eth.get_block(block, full_transactions=True)
return block.transactions
async def _get_block_transactions(block: Block) -> List[TxData]:
block = await dank_mids.eth.get_block(block, full_transactions=True)
return block.transactions

_full_block_semaphore = a_sync.Semaphore(1_000, name = __name__ + "._full_block_semaphore")
get_block_transactions = a_sync.SmartProcessingQueue(
_get_block_transactions,
num_workers=1_000,
name=__name__ + ".get_block_transactions",
)
16 changes: 10 additions & 6 deletions eth_portfolio/_loaders/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,20 @@
from async_lru import alru_cache
from eth_utils import to_checksum_address
from web3.types import TxReceipt
from y.decorators import stuck_coro_debugger
from y._decorators import stuck_coro_debugger

receipt_semaphore = a_sync.Semaphore(1_000)

@eth_retry.auto_retry
@alru_cache
@alru_cache(ttl=60*60)
@stuck_coro_debugger
async def get_transaction_receipt(txhash: str) -> TxReceipt:
async with receipt_semaphore:
return await dank_mids.eth.get_transaction_receipt(txhash)
async def _get_transaction_receipt(txhash: str) -> TxReceipt:
return await dank_mids.eth.get_transaction_receipt(txhash)

get_transaction_receipt = a_sync.SmartProcessingQueue(
_get_transaction_receipt,
num_workers=1000,
name=__name__ + ".get_transaction_receipt",
)

def checksum(addr: str) -> str:
"""We keep a mapping here to save cpu cycles, checksumming is arduous."""
Expand Down
16 changes: 12 additions & 4 deletions eth_portfolio/_ydb/token_transfers.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

logger = logging.getLogger(__name__)

logged = set()

class _TokenTransfers(ProcessedEvents["asyncio.Task[TokenTransfer]"]):
"""A helper mixin that contains all logic for fetching token transfers for a particular wallet address"""
__slots__ = "address", "_load_prices"
Expand All @@ -41,10 +43,16 @@ def _topics(self) -> List:
async def yield_thru_block(self, block) -> AsyncIterator["asyncio.Task[TokenTransfer]"]:
logger.debug("%s yielding all objects thru block %s", self, block)
async for task in self._objects_thru(block=block):
logger.debug("yielding %s at block %s [thru: %s, lock: %s]", task, task.block, block, self._lock.value)

if task.block == 20205291 and task not in logged:
logger.info("yielding %s at block %s [thru: %s, lock: %s]", task, task.block, block, self._lock.value)
logged.add(task)
else:
logger.debug("yielding %s at block %s [thru: %s, lock: %s]", task, task.block, block, self._lock.value)

yield task
logger.debug("%s yield thru %s complete", self, block)
def _extend(self, objs: List[LogReceipt]) -> None:
async def _extend(self, objs: List[LogReceipt]) -> None:
for log in objs:
task = asyncio.create_task(
coro=_loaders.load_token_transfer(log, self._load_prices),
Expand All @@ -66,13 +74,13 @@ class InboundTokenTransfers(_TokenTransfers):
"""A container that fetches and iterates over all inbound token transfers for a particular wallet address"""
@property
def _topics(self) -> List:
return [TRANSFER_SIGS, None, [encode_address(self.address)]]
return [TRANSFER_SIGS, None, encode_address(self.address)]

class OutboundTokenTransfers(_TokenTransfers):
"""A container that fetches and iterates over all outbound token transfers for a particular wallet address"""
@property
def _topics(self) -> List:
return [TRANSFER_SIGS, [encode_address(self.address)]]
return [TRANSFER_SIGS, encode_address(self.address)]

class TokenTransfers(a_sync.ASyncIterable[TokenTransfer]):
"""
Expand Down
2 changes: 1 addition & 1 deletion eth_portfolio/address.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
import a_sync
from a_sync.exceptions import MappingIsEmptyError
from y import convert
from y._decorators import stuck_coro_debugger
from y.constants import EEE_ADDRESS
from y.datatypes import Address, Block
from y.decorators import stuck_coro_debugger

from eth_portfolio._ledgers.address import (AddressInternalTransfersLedger,
AddressLedgerBase,
Expand Down
2 changes: 1 addition & 1 deletion eth_portfolio/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os

from brownie import chain
from a_sync.primitives.executor import PruningThreadPoolExecutor
from a_sync.executor import PruningThreadPoolExecutor
from y import Network, convert, weth

ERC20_TRANSFER_EVENT_HASH = '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'
Expand Down
8 changes: 5 additions & 3 deletions eth_portfolio/portfolio.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
import asyncio
import logging
from functools import cached_property, wraps
from typing import Any, Dict, Iterable, Iterator, List, Tuple
from typing import Any, Dict, Iterable, Iterator, List, Tuple, Union

import a_sync
import a_sync.modified
from a_sync.a_sync import ASyncFunction
from brownie import web3
from checksum_dict import ChecksumAddressDict
from pandas import DataFrame, concat # type: ignore
Expand Down Expand Up @@ -41,6 +41,8 @@ def __init__(self, portfolio: "Portfolio", addresses: Iterable[Address]) -> None
})
def __repr__(self) -> str:
return f"<{type(self).__name__} wallets={list(self._wallets.values())}>"
def __contains__(self, address: Union[Address, PortfolioAddress]) -> bool:
return address in self._wallets
def __getitem__(self, address: Address) -> PortfolioAddress:
return self._wallets[address]
def __iter__(self) -> Iterator[PortfolioAddress]:
Expand Down Expand Up @@ -133,7 +135,7 @@ async def describe(self, block: int) -> PortfolioBalances:
return PortfolioBalances(await a_sync.gather({address: address.describe(block, sync=False) for address in self}))


async_functions = {name: obj for name, obj in PortfolioAddress.__dict__.items() if isinstance(obj, a_sync.modified.ASyncFunction)}
async_functions = {name: obj for name, obj in PortfolioAddress.__dict__.items() if isinstance(obj, ASyncFunction)}
for func_name, func in async_functions.items():
if not callable(getattr(PortfolioAddress, func_name)):
raise RuntimeError(f"A PortfolioAddress object should not have a non-callable attribute suffixed with '_async'")
Expand Down
2 changes: 2 additions & 0 deletions eth_portfolio/protocols/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ def __init__(self) -> None:

@a_sync.future
async def balances(self, address: Address, block: Optional[Block] = None) -> RemoteTokenBalances:
if not self.protocols:
return RemoteTokenBalances()
return RemoteTokenBalances({
type(protocol).__name__: protocol_balances
async for protocol, protocol_balances
Expand Down
5 changes: 2 additions & 3 deletions eth_portfolio/protocols/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@
from typing import List, Optional

import a_sync
from a_sync.modified import ASyncFunctionSyncDefault
from a_sync.property import HiddenMethod
from a_sync.a_sync import ASyncFunctionSyncDefault, HiddenMethod
from brownie.network.contract import ContractCall
from eth_portfolio.typing import Balance, TokenBalances
from eth_portfolio.utils import Decimal
from y import ERC20, Contract
from y._decorators import stuck_coro_debugger
from y.contracts import contract_creation_block
from y.datatypes import Address, Block
from y.decorators import stuck_coro_debugger


class ProtocolABC(metaclass=abc.ABCMeta):
Expand Down
4 changes: 3 additions & 1 deletion eth_portfolio/protocols/lending/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from eth_portfolio.typing import RemoteTokenBalances
from eth_portfolio.utils import (_get_protocols_for_submodule,
_import_submodules)
from y._decorators import stuck_coro_debugger
from y.datatypes import Address, Block
from y.decorators import stuck_coro_debugger

_import_submodules()

Expand All @@ -31,6 +31,8 @@ async def collateral(self, address: Address, block: Optional[Block] = None) -> R
@a_sync.future
@stuck_coro_debugger
async def debt(self, address: Address, block: Optional[Block] = None) -> RemoteTokenBalances:
if not self.protocols:
return RemoteTokenBalances()
return RemoteTokenBalances({
type(protocol).__name__: token_balances
async for protocol, token_balances in a_sync.map(lambda p: p.debt(address, block), self.protocols)
Expand Down
Loading