diff --git a/setup.cfg b/setup.cfg index 7a5fbcf2f..40390676f 100644 --- a/setup.cfg +++ b/setup.cfg @@ -27,7 +27,7 @@ package_dir = =src # DON'T CHANGE THE FOLLOWING LINE! IT WILL BE UPDATED BY PYSCAFFOLD! setup_requires = - pyscaffold>=3.1a0,<3.2a0 + setuptools_scm>=8.0.4 pytest-runner>=2.0,<3dev # Note: eth/web3 dependencies updates are sensitive and can trigger a lot of dependency conflicts. @@ -40,7 +40,7 @@ install_requires = aiohttp==3.8.4 aioipfs@git+https://github.com/aleph-im/aioipfs.git@d671c79b2871bb4d6c8877ba1e7f3ffbe7d20b71 alembic==1.12.1 - aleph-message==0.4.0 + aleph-message==0.4.2rc1 aleph-p2p-client@git+https://github.com/aleph-im/p2p-service-client-python@2c04af39c566217f629fd89505ffc3270fba8676 aleph-pytezos@git+https://github.com/aleph-im/aleph-pytezos.git@32dd1749a4773da494275709060632cbeba9a51b asyncpg==0.28.0 @@ -51,6 +51,7 @@ install_requires = cosmospy==6.0.0 dataclasses_json==0.5.6 eth_account==0.10.0 + eth-typing~=4.0 gunicorn==21.2.0 hexbytes==0.2.2 msgpack==1.0.3 # required by aiocache diff --git a/src/aleph/db/accessors/messages.py b/src/aleph/db/accessors/messages.py index 6f7dcdd4a..198ecb08a 100644 --- a/src/aleph/db/accessors/messages.py +++ b/src/aleph/db/accessors/messages.py @@ -30,7 +30,7 @@ from ..models.pending_messages import PendingMessageDb -def get_message_by_item_hash(session: DbSession, item_hash: str) -> Optional[MessageDb]: +def get_message_by_item_hash(session: DbSession, item_hash: ItemHash) -> Optional[MessageDb]: select_stmt = ( select(MessageDb) .where(MessageDb.item_hash == item_hash) @@ -289,10 +289,10 @@ def make_confirmation_upsert_query(item_hash: str, tx_hash: str) -> Insert: ) -def get_message_status(session: DbSession, item_hash: str) -> Optional[MessageStatusDb]: +def get_message_status(session: DbSession, item_hash: ItemHash) -> Optional[MessageStatusDb]: return ( session.execute( - select(MessageStatusDb).where(MessageStatusDb.item_hash == item_hash) + select(MessageStatusDb).where(MessageStatusDb.item_hash == str(item_hash)) ) ).scalar() diff --git a/src/aleph/handlers/content/vm.py b/src/aleph/handlers/content/vm.py index f1400d41b..6e8318450 100644 --- a/src/aleph/handlers/content/vm.py +++ b/src/aleph/handlers/content/vm.py @@ -323,6 +323,9 @@ async def check_balance(self, session: DbSession, message: MessageDb) -> None: if not content.on.persistent: return + if content.payment and content.payment.is_stream: + return + required_tokens = compute_cost(session=session, content=content) current_balance = ( diff --git a/src/aleph/services/cost.py b/src/aleph/services/cost.py index f38a71f03..66464b251 100644 --- a/src/aleph/services/cost.py +++ b/src/aleph/services/cost.py @@ -12,6 +12,21 @@ from aleph.types.files import FileTag +MINUTE = 60 +HOUR = 60 * MINUTE + +COMPUTE_UNIT_TOKEN_TO_HOLD_ON_DEMAND = Decimal("200") +COMPUTE_UNIT_TOKEN_TO_HOLD_PERSISTENT = Decimal("2000") +COMPUTE_UNIT_PRICE_PER_HOUR_ON_DEMAND = Decimal("0.011") +COMPUTE_UNIT_PRICE_PER_HOUR_PERSISTENT = Decimal("0.11") +STORAGE_INCLUDED_PER_COMPUTE_UNIT_ON_DEMAND = Decimal("2") * GiB +STORAGE_INCLUDED_PER_COMPUTE_UNIT_PERSISTENT = Decimal("20") * GiB + +EXTRA_STORAGE_TOKEN_TO_HOLD = 1 / (Decimal('20') * MiB) # Hold 1 token for 20 MiB +EXTRA_STORAGE_PRICE_PER_HOUR = Decimal("0.000000977") +EXTRA_STORAGE_PRICE_PER_SECOND = EXTRA_STORAGE_PRICE_PER_HOUR / Decimal(HOUR) + + def _get_file_from_ref( session: DbSession, ref: str, use_latest: bool ) -> Optional[StoredFileDb]: @@ -64,16 +79,20 @@ def get_volume_size(session: DbSession, content: ExecutableContent) -> int: def get_additional_storage_price( content: ExecutableContent, session: DbSession ) -> Decimal: - is_microvm = isinstance(content, ProgramContent) and not content.on.persistent nb_compute_units = content.resources.vcpus - free_storage_per_compute_unit = 2 * GiB if is_microvm else 20 * GiB + + is_on_demand = isinstance(content, ProgramContent) and not content.on.persistent + included_storage_per_compute_unit = ( + STORAGE_INCLUDED_PER_COMPUTE_UNIT_ON_DEMAND + if is_on_demand + else STORAGE_INCLUDED_PER_COMPUTE_UNIT_PERSISTENT + ) total_volume_size = get_volume_size(session, content) additional_storage = max( - total_volume_size - (free_storage_per_compute_unit * nb_compute_units), 0 + total_volume_size - (included_storage_per_compute_unit * nb_compute_units), 0 ) - price = Decimal(additional_storage) / 20 / MiB - return price + return Decimal(additional_storage) * EXTRA_STORAGE_TOKEN_TO_HOLD def _get_nb_compute_units(content: ExecutableContent) -> int: @@ -85,15 +104,22 @@ def _get_nb_compute_units(content: ExecutableContent) -> int: def _get_compute_unit_multiplier(content: ExecutableContent) -> int: compute_unit_multiplier = 1 - if isinstance(content, ProgramContent) and not content.on.persistent and content.environment.internet: + if ( + isinstance(content, ProgramContent) + and not content.on.persistent + and content.environment.internet + ): compute_unit_multiplier += 1 return compute_unit_multiplier def compute_cost(session: DbSession, content: ExecutableContent) -> Decimal: - is_microvm = isinstance(content, ProgramContent) and not content.on.persistent - - compute_unit_cost = 200 if is_microvm else 2000 + is_on_demand = isinstance(content, ProgramContent) and not content.on.persistent + compute_unit_cost = ( + COMPUTE_UNIT_TOKEN_TO_HOLD_ON_DEMAND + if is_on_demand + else COMPUTE_UNIT_TOKEN_TO_HOLD_PERSISTENT + ) compute_units_required = _get_nb_compute_units(content) compute_unit_multiplier = _get_compute_unit_multiplier(content) @@ -103,3 +129,51 @@ def compute_cost(session: DbSession, content: ExecutableContent) -> Decimal: ) price = compute_unit_price + get_additional_storage_price(content, session) return Decimal(price) + + +def compute_flow_cost(session: DbSession, content: ExecutableContent) -> Decimal: + # TODO: Use PAYMENT_PRICING_AGGREGATE when possible + is_on_demand = isinstance(content, ProgramContent) and not content.on.persistent + compute_unit_cost_hour = ( + COMPUTE_UNIT_PRICE_PER_HOUR_PERSISTENT + if is_on_demand + else COMPUTE_UNIT_PRICE_PER_HOUR_ON_DEMAND + ) + + compute_unit_cost_second = compute_unit_cost_hour / HOUR + + compute_units_required = _get_nb_compute_units(content) + compute_unit_multiplier = _get_compute_unit_multiplier(content) + + compute_unit_price = ( + Decimal(compute_units_required) + * Decimal(compute_unit_multiplier) + * Decimal(compute_unit_cost_second) + ) + + additional_storage_flow_price = get_additional_storage_flow_price(content, session) + price = compute_unit_price + additional_storage_flow_price + return Decimal(price) + + +def get_additional_storage_flow_price( + content: ExecutableContent, session: DbSession +) -> Decimal: + # TODO: Use PAYMENT_PRICING_AGGREGATE when possible + nb_compute_units = _get_nb_compute_units(content) + + is_on_demand = isinstance(content, ProgramContent) and not content.on.persistent + included_storage_per_compute_unit = ( + STORAGE_INCLUDED_PER_COMPUTE_UNIT_ON_DEMAND + if is_on_demand + else STORAGE_INCLUDED_PER_COMPUTE_UNIT_PERSISTENT + ) + + total_volume_size = get_volume_size(session, content) + additional_storage = max( + Decimal(total_volume_size) + - (Decimal(included_storage_per_compute_unit) * Decimal(nb_compute_units)), + Decimal(0), + ) + price = additional_storage / EXTRA_STORAGE_PRICE_PER_SECOND / Decimal(MiB) + return price diff --git a/src/aleph/web/controllers/prices.py b/src/aleph/web/controllers/prices.py new file mode 100644 index 000000000..913e11cba --- /dev/null +++ b/src/aleph/web/controllers/prices.py @@ -0,0 +1,91 @@ +import logging +from dataclasses import dataclass +from decimal import Decimal +from typing import Optional + +from aiohttp import web +from aiohttp.web_exceptions import HTTPException +from aleph_message.models import ExecutableContent, ItemHash, MessageType +from dataclasses_json import DataClassJsonMixin + +from aleph.db.accessors.messages import get_message_by_item_hash, get_message_status +from aleph.db.models import MessageDb, MessageStatusDb +from aleph.services.cost import compute_cost, compute_flow_cost +from aleph.types.db_session import DbSession, DbSessionFactory +from aleph.types.message_status import MessageStatus + +LOGGER = logging.getLogger(__name__) + + +# This is not defined in aiohttp.web_exceptions +class HTTPProcessing(HTTPException): + status_code = 102 + + +# Mapping between message statuses to their corresponding exceptions and messages +MESSAGE_STATUS_EXCEPTIONS = { + MessageStatus.PENDING: (HTTPProcessing, "Message still pending"), + MessageStatus.REJECTED: (web.HTTPNotFound, "This message was rejected"), + MessageStatus.FORGOTTEN: ( + web.HTTPGone, + "This message has been forgotten", + ), +} + + +@dataclass +class MessagePrice(DataClassJsonMixin): + """Dataclass used to expose message required tokens.""" + + required_tokens: Optional[Decimal] = None + + +async def get_executable_message(session: DbSession, item_hash_str: str) -> MessageDb: + """Attempt to get an executable message from the database. + Raises an HTTP exception if the message is not found, not processed or is not an executable message. + """ + + # Parse the item_hash_str into an ItemHash object + try: + item_hash = ItemHash(item_hash_str) + except ValueError: + raise web.HTTPBadRequest(body=f"Invalid message hash: {item_hash_str}") + + # Get the message status from the database + message_status_db = get_message_status(session=session, item_hash=item_hash) + if not message_status_db: + raise web.HTTPNotFound(body=f"Message not found with hash: {item_hash}") + # Loop through the status_exceptions to find a match and raise the corresponding exception + if message_status_db.status in MESSAGE_STATUS_EXCEPTIONS: + exception, error_message = MESSAGE_STATUS_EXCEPTIONS[message_status_db.status] + raise exception(body=f"{error_message}: {item_hash_str}") + assert message_status_db.status == MessageStatus.PROCESSED + + # Get the message from the database + message: Optional[MessageDb] = get_message_by_item_hash(session, item_hash) + if not message: + raise web.HTTPNotFound(body="Message not found, despite appearing as processed") + if message.type not in (MessageType.instance, MessageType.program): + raise web.HTTPBadRequest( + body=f"Message is not an executable message: {item_hash_str}" + ) + + return message + + +async def message_price(request: web.Request): + """Returns the price of an executable message.""" + + session_factory: DbSessionFactory = request.app["session_factory"] + with session_factory() as session: + message = await get_executable_message(session, request.match_info["item_hash"]) + + content: ExecutableContent = message.parsed_content + + if content.payment and content.payment.is_stream: + required_tokens = compute_flow_cost(session=session, content=content) + else: + required_tokens = compute_cost(session=session, content=content) + + return web.json_response({"required_tokens": float(required_tokens), + "payment_type": content.payment.type if content.payment else None}) diff --git a/src/aleph/web/controllers/routes.py b/src/aleph/web/controllers/routes.py index e09ebd0fe..7ab22728a 100644 --- a/src/aleph/web/controllers/routes.py +++ b/src/aleph/web/controllers/routes.py @@ -11,6 +11,7 @@ messages, p2p, posts, + prices, storage, version, ) @@ -57,6 +58,8 @@ def register_routes(app: web.Application): app.router.add_get("/api/v1/posts.json", posts.view_posts_list_v1) app.router.add_get("/api/v1/posts/page/{page}.json", posts.view_posts_list_v1) + app.router.add_get("/api/v0/price/{item_hash}", prices.message_price) + app.router.add_get("/api/v0/addresses/stats.json", accounts.addresses_stats_view) app.router.add_get( "/api/v0/addresses/{address}/balance", accounts.get_account_balance diff --git a/tests/schemas/test_pending_messages.py b/tests/schemas/test_pending_messages.py index ddce70f88..e48ee076e 100644 --- a/tests/schemas/test_pending_messages.py +++ b/tests/schemas/test_pending_messages.py @@ -170,7 +170,7 @@ def test_parse_program_message(): content = json.loads(message_dict["item_content"]) assert message.content.address == content["address"] assert message.content.time == content["time"] - assert message.content.code == content["code"] + assert message.content.code.dict(exclude_none=True) == content["code"] assert message.content.type == content["type"]