diff --git a/src/aleph/db/accessors/vms.py b/src/aleph/db/accessors/vms.py index 1e4cb252..480d9ce7 100644 --- a/src/aleph/db/accessors/vms.py +++ b/src/aleph/db/accessors/vms.py @@ -1,10 +1,20 @@ import datetime as dt from typing import Iterable, Optional -from sqlalchemy import delete, func, select +from sqlalchemy import delete, func, or_, select from sqlalchemy.dialects.postgresql import insert -from aleph.db.models.vms import ProgramDb, VmBaseDb, VmInstanceDb, VmVersionDb, MachineVolumeBaseDb +from aleph.db.models.vms import ( + CodeVolumeDb, + DataVolumeDb, + ImmutableVolumeDb, + ProgramDb, + RootfsVolumeDb, + RuntimeDb, + VmBaseDb, + VmInstanceDb, + VmVersionDb, +) from aleph.types.db_session import DbSession from aleph.types.vms import VmVersion @@ -51,10 +61,39 @@ def get_vm_version(session: DbSession, vm_hash: str) -> Optional[VmVersionDb]: ).scalar_one_or_none() -def get_machine_volumes(session: DbSession, volume_hash: str) -> Optional[VmBaseDb]: - return session.execute( - select(MachineVolumeBaseDb).where(MachineVolumeBaseDb.ref == volume_hash) - ).scalar_one_or_none() +def get_vms_dependent_volumes( + session: DbSession, volume_hash: str +) -> Optional[VmBaseDb]: + statement = ( + select(VmBaseDb) + .join( + ImmutableVolumeDb, + ImmutableVolumeDb.vm_hash == VmBaseDb.item_hash, + isouter=True, + ) + .join( + CodeVolumeDb, CodeVolumeDb.program_hash == VmBaseDb.item_hash, isouter=True + ) + .join( + DataVolumeDb, DataVolumeDb.program_hash == VmBaseDb.item_hash, isouter=True + ) + .join(RuntimeDb, RuntimeDb.program_hash == VmBaseDb.item_hash, isouter=True) + .join( + RootfsVolumeDb, + RootfsVolumeDb.instance_hash == VmBaseDb.item_hash, + isouter=True, + ) + .where( + or_( + ImmutableVolumeDb.ref == volume_hash, + CodeVolumeDb.ref == volume_hash, + DataVolumeDb.ref == volume_hash, + RuntimeDb.ref == volume_hash, + RootfsVolumeDb.parent_ref == volume_hash, + ) + ) + ) + return session.execute(statement).scalar_one_or_none() def upsert_vm_version( diff --git a/src/aleph/handlers/content/forget.py b/src/aleph/handlers/content/forget.py index 40edbc7c..99dfefa8 100644 --- a/src/aleph/handlers/content/forget.py +++ b/src/aleph/handlers/content/forget.py @@ -14,11 +14,13 @@ get_message_status, message_exists, ) +from aleph.db.accessors.vms import get_vms_dependent_volumes from aleph.db.models import AggregateElementDb, MessageDb from aleph.handlers.content.content_handler import ContentHandler from aleph.types.db_session import DbSession from aleph.types.message_status import ( CannotForgetForgetMessage, + ForgetNotAllowed, ForgetTargetNotFound, InternalError, MessageStatus, @@ -58,6 +60,17 @@ async def check_dependencies(self, session: DbSession, message: MessageDb) -> No if not message_exists(session=session, item_hash=item_hash): raise ForgetTargetNotFound(item_hash) + # Check file references, on VM volumes, as data volume and as code volume + # to block the deletion if we found ones + dependent_volumes = get_vms_dependent_volumes( + session=session, volume_hash=item_hash + ) + print(dependent_volumes, item_hash) + if dependent_volumes is not None: + raise ForgetNotAllowed( + file_hash=item_hash, vm_hash=dependent_volumes.item_hash + ) + for aggregate_key in content.aggregates: if not aggregate_exists( session=session, key=aggregate_key, owner=content.address diff --git a/src/aleph/handlers/content/store.py b/src/aleph/handlers/content/store.py index b3a00f6e..e22b71a8 100644 --- a/src/aleph/handlers/content/store.py +++ b/src/aleph/handlers/content/store.py @@ -25,7 +25,6 @@ upsert_file, upsert_file_tag, ) -from aleph.db.accessors.vms import get_machine_volumes from aleph.db.models import MessageDb from aleph.exceptions import AlephStorageException, UnknownHashError from aleph.handlers.content.content_handler import ContentHandler @@ -38,7 +37,6 @@ InvalidMessageFormat, PermissionDenied, StoreCannotUpdateStoreWithRef, - StoreForgetNotAllowed, StoreRefNotFound, ) from aleph.utils import item_type_from_hash @@ -229,11 +227,6 @@ async def check_dependencies(self, session: DbSession, message: MessageDb) -> No if ref_file_pin_db.ref is not None: raise StoreCannotUpdateStoreWithRef() - # Check file references, like on VMs to block the deletion if we found ones - dependent_vms = get_machine_volumes(session=session, volume_hash=message.item_hash) - if dependent_vms is not None: - raise StoreForgetNotAllowed() - async def check_permissions(self, session: DbSession, message: MessageDb): await super().check_permissions(session=session, message=message) content = _get_store_content(message) @@ -325,8 +318,6 @@ async def _check_remaining_pins( async def forget_message(self, session: DbSession, message: MessageDb) -> Set[str]: content = _get_store_content(message) - - delete_file_pin(session=session, item_hash=message.item_hash) refresh_file_tag( session=session, diff --git a/src/aleph/types/message_status.py b/src/aleph/types/message_status.py index 0932e2e7..6c1efa32 100644 --- a/src/aleph/types/message_status.py +++ b/src/aleph/types/message_status.py @@ -203,11 +203,14 @@ class StoreCannotUpdateStoreWithRef(InvalidMessageException): error_code = ErrorCode.STORE_UPDATE_UPDATE -class StoreForgetNotAllowed(InvalidMessageException): +class ForgetNotAllowed(InvalidMessageException): """ - The original store message hash specified in the `ref` field could not be found. + The store message targeted by the `ref` field has a value in the `ref` field of a dependent volume. """ + def __init__(self, file_hash: str, vm_hash: str): + super().__init__(f"File {file_hash} used on vm {vm_hash}") + error_code = ErrorCode.FORGET_NOT_ALLOWED diff --git a/tests/message_processing/test_process_forgets.py b/tests/message_processing/test_process_forgets.py index 1275c95a..151700aa 100644 --- a/tests/message_processing/test_process_forgets.py +++ b/tests/message_processing/test_process_forgets.py @@ -123,6 +123,7 @@ async def test_forget_post_message( session=session, ) ) + assert isinstance(forget_message_result, ProcessedMessage) forget_message = forget_message_result.message @@ -468,3 +469,138 @@ async def test_forget_store_multi_users( # Check that the file is still there content = await storage_engine.read(filename=file_hash) assert content == file_content + + +@pytest.mark.asyncio +async def test_forget_store_message_dependent( + session_factory: DbSessionFactory, + message_processor: PendingMessageProcessor, + mock_config: Config, +): + file_hash = "5ccdd7bccfbc5955e2e40166dd0cdea0b093154fd87bc2bea57e7c768cde2f21" + + code_message = PendingMessageDb( + item_hash="f6fc4884e3ec3624bd3f60a3c37abf83a130777086061b1a373e659f2bab4d06", + chain=Chain.ETH, + sender="0x696879aE4F6d8DaDD5b8F1cbb1e663B89b08f106", + signature="0x7b87c29388a7a452353f9cae8718b66158fb5bdc93f032964226745ee04919092550791b93f79e5ee1981f2d9d6e5ac0cae0d28b68bb63fe0fcbd79015a6f3ea1b", + type=MessageType.store, + time=timestamp_to_datetime(1652794362.573859), + item_type=ItemType.inline, + item_content='{"address":"0x696879aE4F6d8DaDD5b8F1cbb1e663B89b08f106","time":1652794362.5736332,"item_type":"storage","item_hash":"5ccdd7bccfbc5955e2e40166dd0cdea0b093154fd87bc2bea57e7c768cde2f21","mime_type":"text/plain"}', + channel=Channel("TEST"), + retries=0, + next_attempt=dt.datetime(2023, 1, 1), + check_message=True, + fetched=True, + reception_time=dt.datetime(2022, 1, 1), + ) + + runtime_hash = "QmXb4khKJJazpEuGVzchSy6yeJubGf8gy9Qjd4ZGSY6hXZ" + runtime_message = PendingMessageDb( + item_hash="63f07193e6ee9d207b7d1fcf8286f9aee34e6f12f101d2ec77c1229f92964696", + chain=Chain.ETH, + sender="0x101d8D16372dBf5f1614adaE95Ee5CCE61998Fc9", + signature="0xb23a41e693ed1c8df444f22b711c5ff2f15875b3f67656910bfa8275ed34fd014855305602309ad4b060be255b7f4f926c543273e5e37f54d7d2567175d2b1921c", + type=MessageType.store, + time=timestamp_to_datetime(1713798108.26326), + item_type=ItemType.inline, + item_content='{"address":"0x101d8D16372dBf5f1614adaE95Ee5CCE61998Fc9","time":1713798108.26326,"item_type":"storage","item_hash":"QmXb4khKJJazpEuGVzchSy6yeJubGf8gy9Qjd4ZGSY6hXZ"}', + channel=None, + retries=0, + next_attempt=dt.datetime(2023, 1, 1), + check_message=True, + fetched=True, + reception_time=dt.datetime(2022, 1, 1), + ) + + program_message = PendingMessageDb( + item_hash="81fe71c052eb17031d3273336e5bfc10b63eb5a3688e886d23d7a71940c8af1c", + type=MessageType.program, + chain=Chain.ETH, + sender="0x49572dA92e54824ba64D8EAa9b3f9Ef8d0A8183c", + signature="0x172f665a4a8fb31e1cdba49302278051a82185096bc941446db4fd21f83c0fa00bfeafaeb4d2e949c3f48cfc4a2d22f33b4312187aac5ea056a0bb42595d1a0b1b", + item_type=ItemType.inline, + time=timestamp_to_datetime(1736329132.262), + item_content='{"address":"0x49572dA92e54824ba64D8EAa9b3f9Ef8d0A8183c","time":1736329132.262,"type":"vm-function","allow_amend":false,"code":{"encoding":"zip","entrypoint":"main:app","ref":"f6fc4884e3ec3624bd3f60a3c37abf83a130777086061b1a373e659f2bab4d06","use_latest":true},"metadata":{"name":"My program","description":"My program description"},"on":{"http":true,"persistent":false},"environment":{"reproducible":false,"internet":true,"aleph_api":true,"shared_cache":false},"resources":{"vcpus":1,"memory":128,"seconds":30},"runtime":{"ref":"63f07193e6ee9d207b7d1fcf8286f9aee34e6f12f101d2ec77c1229f92964696","use_latest":true,"comment":"Aleph Alpine Linux with Python 3.12"},"volumes":[],"variables":{},"payment":{"chain":"ETH","type":"hold"}}', + channel=None, + reception_time=timestamp_to_datetime(1736329150), + fetched=True, + check_message=True, + retries=0, + next_attempt=dt.datetime(2023, 1, 1), + ) + + pending_forget_message = PendingMessageDb( + item_hash="5e40c8e2197e0678b5fba9cb1679e3a80fa6aeaa1a440d94f059525295fa32d3", + chain=Chain.ETH, + sender="0x696879aE4F6d8DaDD5b8F1cbb1e663B89b08f106", + signature="0xc342e671be10894bf707b86c3f7538cdb7e4bb5760e234f8d07f8b3dfde015492337bd8756f169e37ac691b74c765415e96b6e1813238912e10ea54cc003887d1b", + type=MessageType.forget, + time=timestamp_to_datetime(1652794384.3102906), + item_type=ItemType.inline, + item_content='{"address":"0x696879aE4F6d8DaDD5b8F1cbb1e663B89b08f106","time":1652794384.3101473,"hashes":["f6fc4884e3ec3624bd3f60a3c37abf83a130777086061b1a373e659f2bab4d06"]}', + channel=Channel("TEST"), + retries=0, + next_attempt=dt.datetime(2023, 1, 2), + check_message=True, + fetched=True, + reception_time=dt.datetime(2022, 1, 2), + ) + + storage_engine = message_processor.message_handler.storage_service.storage_engine + await storage_engine.write( + filename=file_hash, + content=b"Test", + ) + await storage_engine.write( + filename=runtime_hash, + content=b"Runtime", + ) + + with session_factory() as session: + target_message_result = one( + await process_pending_messages( + message_processor=message_processor, + pending_messages=[code_message], + session=session, + ) + ) + assert isinstance(target_message_result, ProcessedMessage) + + target_message_result1 = one( + await process_pending_messages( + message_processor=message_processor, + pending_messages=[runtime_message], + session=session, + ) + ) + assert isinstance(target_message_result1, ProcessedMessage) + + target_message_result2 = one( + await process_pending_messages( + message_processor=message_processor, + pending_messages=[program_message], + session=session, + ) + ) + + assert isinstance(target_message_result2, ProcessedMessage) + + # Sanity check + nb_references = count_file_pins(session=session, file_hash=file_hash) + assert nb_references == 1 + + forget_message_result = one( + await process_pending_messages( + message_processor=message_processor, + pending_messages=[pending_forget_message], + session=session, + ) + ) + assert isinstance(forget_message_result, RejectedMessage) + assert forget_message_result.error_code == 503 + + # Check that the file continues pinned with a grace period + nb_references = count_file_pins(session=session, file_hash=file_hash) + assert nb_references == 1