Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement price API #542

Merged
merged 2 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ package_dir =
=src
# DON'T CHANGE THE FOLLOWING LINE! IT WILL BE UPDATED BY PYSCAFFOLD!
setup_requires =
pyscaffold>=3.1a0,<3.2a0
setuptools_scm>=8.0.4
pytest-runner>=2.0,<3dev

# Note: eth/web3 dependencies updates are sensitive and can trigger a lot of dependency conflicts.
Expand All @@ -40,7 +40,7 @@ install_requires =
aiohttp==3.8.4
aioipfs@git+https://github.com/aleph-im/aioipfs.git@d671c79b2871bb4d6c8877ba1e7f3ffbe7d20b71
alembic==1.12.1
aleph-message==0.4.0
aleph-message==0.4.2rc1
aleph-p2p-client@git+https://github.com/aleph-im/p2p-service-client-python@2c04af39c566217f629fd89505ffc3270fba8676
aleph-pytezos@git+https://github.com/aleph-im/aleph-pytezos.git@32dd1749a4773da494275709060632cbeba9a51b
asyncpg==0.28.0
Expand All @@ -51,6 +51,7 @@ install_requires =
cosmospy==6.0.0
dataclasses_json==0.5.6
eth_account==0.10.0
eth-typing~=4.0
gunicorn==21.2.0
hexbytes==0.2.2
msgpack==1.0.3 # required by aiocache
Expand Down
6 changes: 3 additions & 3 deletions src/aleph/db/accessors/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from ..models.pending_messages import PendingMessageDb


def get_message_by_item_hash(session: DbSession, item_hash: str) -> Optional[MessageDb]:
def get_message_by_item_hash(session: DbSession, item_hash: ItemHash) -> Optional[MessageDb]:
select_stmt = (
select(MessageDb)
.where(MessageDb.item_hash == item_hash)
Expand Down Expand Up @@ -289,10 +289,10 @@ def make_confirmation_upsert_query(item_hash: str, tx_hash: str) -> Insert:
)


def get_message_status(session: DbSession, item_hash: str) -> Optional[MessageStatusDb]:
def get_message_status(session: DbSession, item_hash: ItemHash) -> Optional[MessageStatusDb]:
return (
session.execute(
select(MessageStatusDb).where(MessageStatusDb.item_hash == item_hash)
select(MessageStatusDb).where(MessageStatusDb.item_hash == str(item_hash))
)
).scalar()

Expand Down
3 changes: 3 additions & 0 deletions src/aleph/handlers/content/vm.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,9 @@ async def check_balance(self, session: DbSession, message: MessageDb) -> None:
if not content.on.persistent:
return

if content.payment and content.payment.is_stream:
return

required_tokens = compute_cost(session=session, content=content)

current_balance = (
Expand Down
92 changes: 83 additions & 9 deletions src/aleph/services/cost.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,21 @@
from aleph.types.files import FileTag


MINUTE = 60
HOUR = 60 * MINUTE

COMPUTE_UNIT_TOKEN_TO_HOLD_ON_DEMAND = Decimal("200")
COMPUTE_UNIT_TOKEN_TO_HOLD_PERSISTENT = Decimal("2000")
COMPUTE_UNIT_PRICE_PER_HOUR_ON_DEMAND = Decimal("0.011")
COMPUTE_UNIT_PRICE_PER_HOUR_PERSISTENT = Decimal("0.11")
STORAGE_INCLUDED_PER_COMPUTE_UNIT_ON_DEMAND = Decimal("2") * GiB
STORAGE_INCLUDED_PER_COMPUTE_UNIT_PERSISTENT = Decimal("20") * GiB

EXTRA_STORAGE_TOKEN_TO_HOLD = 1 / (Decimal('20') * MiB) # Hold 1 token for 20 MiB
EXTRA_STORAGE_PRICE_PER_HOUR = Decimal("0.000000977")
EXTRA_STORAGE_PRICE_PER_SECOND = EXTRA_STORAGE_PRICE_PER_HOUR / Decimal(HOUR)


def _get_file_from_ref(
session: DbSession, ref: str, use_latest: bool
) -> Optional[StoredFileDb]:
Expand Down Expand Up @@ -64,16 +79,20 @@ def get_volume_size(session: DbSession, content: ExecutableContent) -> int:
def get_additional_storage_price(
content: ExecutableContent, session: DbSession
) -> Decimal:
is_microvm = isinstance(content, ProgramContent) and not content.on.persistent
nb_compute_units = content.resources.vcpus
free_storage_per_compute_unit = 2 * GiB if is_microvm else 20 * GiB

is_on_demand = isinstance(content, ProgramContent) and not content.on.persistent
included_storage_per_compute_unit = (
STORAGE_INCLUDED_PER_COMPUTE_UNIT_ON_DEMAND
if is_on_demand
else STORAGE_INCLUDED_PER_COMPUTE_UNIT_PERSISTENT
)

total_volume_size = get_volume_size(session, content)
additional_storage = max(
total_volume_size - (free_storage_per_compute_unit * nb_compute_units), 0
total_volume_size - (included_storage_per_compute_unit * nb_compute_units), 0
)
price = Decimal(additional_storage) / 20 / MiB
return price
return Decimal(additional_storage) * EXTRA_STORAGE_TOKEN_TO_HOLD


def _get_nb_compute_units(content: ExecutableContent) -> int:
Expand All @@ -85,15 +104,22 @@ def _get_nb_compute_units(content: ExecutableContent) -> int:

def _get_compute_unit_multiplier(content: ExecutableContent) -> int:
compute_unit_multiplier = 1
if isinstance(content, ProgramContent) and not content.on.persistent and content.environment.internet:
if (
isinstance(content, ProgramContent)
and not content.on.persistent
and content.environment.internet
):
compute_unit_multiplier += 1
return compute_unit_multiplier


def compute_cost(session: DbSession, content: ExecutableContent) -> Decimal:
is_microvm = isinstance(content, ProgramContent) and not content.on.persistent

compute_unit_cost = 200 if is_microvm else 2000
is_on_demand = isinstance(content, ProgramContent) and not content.on.persistent
compute_unit_cost = (
COMPUTE_UNIT_TOKEN_TO_HOLD_ON_DEMAND
if is_on_demand
else COMPUTE_UNIT_TOKEN_TO_HOLD_PERSISTENT
)

compute_units_required = _get_nb_compute_units(content)
compute_unit_multiplier = _get_compute_unit_multiplier(content)
Expand All @@ -103,3 +129,51 @@ def compute_cost(session: DbSession, content: ExecutableContent) -> Decimal:
)
price = compute_unit_price + get_additional_storage_price(content, session)
return Decimal(price)


def compute_flow_cost(session: DbSession, content: ExecutableContent) -> Decimal:
# TODO: Use PAYMENT_PRICING_AGGREGATE when possible
is_on_demand = isinstance(content, ProgramContent) and not content.on.persistent
compute_unit_cost_hour = (
COMPUTE_UNIT_PRICE_PER_HOUR_PERSISTENT
if is_on_demand
else COMPUTE_UNIT_PRICE_PER_HOUR_ON_DEMAND
)

compute_unit_cost_second = compute_unit_cost_hour / HOUR

compute_units_required = _get_nb_compute_units(content)
compute_unit_multiplier = _get_compute_unit_multiplier(content)

compute_unit_price = (
Decimal(compute_units_required)
* Decimal(compute_unit_multiplier)
* Decimal(compute_unit_cost_second)
)

additional_storage_flow_price = get_additional_storage_flow_price(content, session)
price = compute_unit_price + additional_storage_flow_price
return Decimal(price)


def get_additional_storage_flow_price(
content: ExecutableContent, session: DbSession
) -> Decimal:
# TODO: Use PAYMENT_PRICING_AGGREGATE when possible
nb_compute_units = _get_nb_compute_units(content)

is_on_demand = isinstance(content, ProgramContent) and not content.on.persistent
included_storage_per_compute_unit = (
STORAGE_INCLUDED_PER_COMPUTE_UNIT_ON_DEMAND
if is_on_demand
else STORAGE_INCLUDED_PER_COMPUTE_UNIT_PERSISTENT
)

total_volume_size = get_volume_size(session, content)
additional_storage = max(
Decimal(total_volume_size)
- (Decimal(included_storage_per_compute_unit) * Decimal(nb_compute_units)),
Decimal(0),
)
price = additional_storage / EXTRA_STORAGE_PRICE_PER_SECOND / Decimal(MiB)
return price
91 changes: 91 additions & 0 deletions src/aleph/web/controllers/prices.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import logging
from dataclasses import dataclass
from decimal import Decimal
from typing import Optional

from aiohttp import web
from aiohttp.web_exceptions import HTTPException
from aleph_message.models import ExecutableContent, ItemHash, MessageType
from dataclasses_json import DataClassJsonMixin

from aleph.db.accessors.messages import get_message_by_item_hash, get_message_status
from aleph.db.models import MessageDb, MessageStatusDb
from aleph.services.cost import compute_cost, compute_flow_cost
from aleph.types.db_session import DbSession, DbSessionFactory
from aleph.types.message_status import MessageStatus

LOGGER = logging.getLogger(__name__)


# This is not defined in aiohttp.web_exceptions
class HTTPProcessing(HTTPException):
status_code = 102


# Mapping between message statuses to their corresponding exceptions and messages
MESSAGE_STATUS_EXCEPTIONS = {
MessageStatus.PENDING: (HTTPProcessing, "Message still pending"),
MessageStatus.REJECTED: (web.HTTPNotFound, "This message was rejected"),
MessageStatus.FORGOTTEN: (
web.HTTPGone,
"This message has been forgotten",
),
}


@dataclass
class MessagePrice(DataClassJsonMixin):
"""Dataclass used to expose message required tokens."""

required_tokens: Optional[Decimal] = None


async def get_executable_message(session: DbSession, item_hash_str: str) -> MessageDb:
"""Attempt to get an executable message from the database.
Raises an HTTP exception if the message is not found, not processed or is not an executable message.
"""

# Parse the item_hash_str into an ItemHash object
try:
item_hash = ItemHash(item_hash_str)
except ValueError:
raise web.HTTPBadRequest(body=f"Invalid message hash: {item_hash_str}")

# Get the message status from the database
message_status_db = get_message_status(session=session, item_hash=item_hash)
if not message_status_db:
raise web.HTTPNotFound(body=f"Message not found with hash: {item_hash}")
# Loop through the status_exceptions to find a match and raise the corresponding exception
if message_status_db.status in MESSAGE_STATUS_EXCEPTIONS:
exception, error_message = MESSAGE_STATUS_EXCEPTIONS[message_status_db.status]
raise exception(body=f"{error_message}: {item_hash_str}")
assert message_status_db.status == MessageStatus.PROCESSED

# Get the message from the database
message: Optional[MessageDb] = get_message_by_item_hash(session, item_hash)
if not message:
raise web.HTTPNotFound(body="Message not found, despite appearing as processed")
if message.type not in (MessageType.instance, MessageType.program):
raise web.HTTPBadRequest(
body=f"Message is not an executable message: {item_hash_str}"
)

return message


async def message_price(request: web.Request):
"""Returns the price of an executable message."""

session_factory: DbSessionFactory = request.app["session_factory"]
with session_factory() as session:
message = await get_executable_message(session, request.match_info["item_hash"])

content: ExecutableContent = message.parsed_content

if content.payment and content.payment.is_stream:
required_tokens = compute_flow_cost(session=session, content=content)
else:
required_tokens = compute_cost(session=session, content=content)

return web.json_response({"required_tokens": float(required_tokens),
"payment_type": content.payment.type if content.payment else None})
3 changes: 3 additions & 0 deletions src/aleph/web/controllers/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
messages,
p2p,
posts,
prices,
storage,
version,
)
Expand Down Expand Up @@ -57,6 +58,8 @@ def register_routes(app: web.Application):
app.router.add_get("/api/v1/posts.json", posts.view_posts_list_v1)
app.router.add_get("/api/v1/posts/page/{page}.json", posts.view_posts_list_v1)

app.router.add_get("/api/v0/price/{item_hash}", prices.message_price)

app.router.add_get("/api/v0/addresses/stats.json", accounts.addresses_stats_view)
app.router.add_get(
"/api/v0/addresses/{address}/balance", accounts.get_account_balance
Expand Down
2 changes: 1 addition & 1 deletion tests/schemas/test_pending_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def test_parse_program_message():
content = json.loads(message_dict["item_content"])
assert message.content.address == content["address"]
assert message.content.time == content["time"]
assert message.content.code == content["code"]
assert message.content.code.dict(exclude_none=True) == content["code"]
assert message.content.type == content["type"]


Expand Down
Loading