Skip to content

Commit

Permalink
Fix: Added use case test for removing an already used volume.
Browse files Browse the repository at this point in the history
  • Loading branch information
Andres D. Molins committed Jan 16, 2025
1 parent 962e951 commit 0ce7e0d
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 17 deletions.
51 changes: 45 additions & 6 deletions src/aleph/db/accessors/vms.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
import datetime as dt
from typing import Iterable, Optional

from sqlalchemy import delete, func, select
from sqlalchemy import delete, func, or_, select
from sqlalchemy.dialects.postgresql import insert

from aleph.db.models.vms import ProgramDb, VmBaseDb, VmInstanceDb, VmVersionDb, MachineVolumeBaseDb
from aleph.db.models.vms import (
CodeVolumeDb,
DataVolumeDb,
ImmutableVolumeDb,
ProgramDb,
RootfsVolumeDb,
RuntimeDb,
VmBaseDb,
VmInstanceDb,
VmVersionDb,
)
from aleph.types.db_session import DbSession
from aleph.types.vms import VmVersion

Expand Down Expand Up @@ -51,10 +61,39 @@ def get_vm_version(session: DbSession, vm_hash: str) -> Optional[VmVersionDb]:
).scalar_one_or_none()


def get_machine_volumes(session: DbSession, volume_hash: str) -> Optional[VmBaseDb]:
return session.execute(
select(MachineVolumeBaseDb).where(MachineVolumeBaseDb.ref == volume_hash)
).scalar_one_or_none()
def get_vms_dependent_volumes(
session: DbSession, volume_hash: str
) -> Optional[VmBaseDb]:
statement = (
select(VmBaseDb)
.join(
ImmutableVolumeDb,
ImmutableVolumeDb.vm_hash == VmBaseDb.item_hash,
isouter=True,
)
.join(
CodeVolumeDb, CodeVolumeDb.program_hash == VmBaseDb.item_hash, isouter=True
)
.join(
DataVolumeDb, DataVolumeDb.program_hash == VmBaseDb.item_hash, isouter=True
)
.join(RuntimeDb, RuntimeDb.program_hash == VmBaseDb.item_hash, isouter=True)
.join(
RootfsVolumeDb,
RootfsVolumeDb.instance_hash == VmBaseDb.item_hash,
isouter=True,
)
.where(
or_(
ImmutableVolumeDb.ref == volume_hash,
CodeVolumeDb.ref == volume_hash,
DataVolumeDb.ref == volume_hash,
RuntimeDb.ref == volume_hash,
RootfsVolumeDb.parent_ref == volume_hash,
)
)
)
return session.execute(statement).scalar_one_or_none()


def upsert_vm_version(
Expand Down
13 changes: 13 additions & 0 deletions src/aleph/handlers/content/forget.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
get_message_status,
message_exists,
)
from aleph.db.accessors.vms import get_vms_dependent_volumes
from aleph.db.models import AggregateElementDb, MessageDb
from aleph.handlers.content.content_handler import ContentHandler
from aleph.types.db_session import DbSession
from aleph.types.message_status import (
CannotForgetForgetMessage,
ForgetNotAllowed,
ForgetTargetNotFound,
InternalError,
MessageStatus,
Expand Down Expand Up @@ -58,6 +60,17 @@ async def check_dependencies(self, session: DbSession, message: MessageDb) -> No
if not message_exists(session=session, item_hash=item_hash):
raise ForgetTargetNotFound(item_hash)

# Check file references, on VM volumes, as data volume and as code volume
# to block the deletion if we found ones
dependent_volumes = get_vms_dependent_volumes(
session=session, volume_hash=item_hash
)
print(dependent_volumes, item_hash)
if dependent_volumes is not None:
raise ForgetNotAllowed(
file_hash=item_hash, vm_hash=dependent_volumes.item_hash
)

for aggregate_key in content.aggregates:
if not aggregate_exists(
session=session, key=aggregate_key, owner=content.address
Expand Down
9 changes: 0 additions & 9 deletions src/aleph/handlers/content/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
upsert_file,
upsert_file_tag,
)
from aleph.db.accessors.vms import get_machine_volumes
from aleph.db.models import MessageDb
from aleph.exceptions import AlephStorageException, UnknownHashError
from aleph.handlers.content.content_handler import ContentHandler
Expand All @@ -38,7 +37,6 @@
InvalidMessageFormat,
PermissionDenied,
StoreCannotUpdateStoreWithRef,
StoreForgetNotAllowed,
StoreRefNotFound,
)
from aleph.utils import item_type_from_hash
Expand Down Expand Up @@ -229,11 +227,6 @@ async def check_dependencies(self, session: DbSession, message: MessageDb) -> No
if ref_file_pin_db.ref is not None:
raise StoreCannotUpdateStoreWithRef()

# Check file references, like on VMs to block the deletion if we found ones
dependent_vms = get_machine_volumes(session=session, volume_hash=message.item_hash)
if dependent_vms is not None:
raise StoreForgetNotAllowed()

async def check_permissions(self, session: DbSession, message: MessageDb):
await super().check_permissions(session=session, message=message)
content = _get_store_content(message)
Expand Down Expand Up @@ -325,8 +318,6 @@ async def _check_remaining_pins(
async def forget_message(self, session: DbSession, message: MessageDb) -> Set[str]:
content = _get_store_content(message)



delete_file_pin(session=session, item_hash=message.item_hash)
refresh_file_tag(
session=session,
Expand Down
7 changes: 5 additions & 2 deletions src/aleph/types/message_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,11 +203,14 @@ class StoreCannotUpdateStoreWithRef(InvalidMessageException):
error_code = ErrorCode.STORE_UPDATE_UPDATE


class StoreForgetNotAllowed(InvalidMessageException):
class ForgetNotAllowed(InvalidMessageException):
"""
The original store message hash specified in the `ref` field could not be found.
The store message targeted by the `ref` field has a value in the `ref` field of a dependent volume.
"""

def __init__(self, file_hash: str, vm_hash: str):
super().__init__(f"File {file_hash} used on vm {vm_hash}")

error_code = ErrorCode.FORGET_NOT_ALLOWED


Expand Down
136 changes: 136 additions & 0 deletions tests/message_processing/test_process_forgets.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ async def test_forget_post_message(
session=session,
)
)

assert isinstance(forget_message_result, ProcessedMessage)
forget_message = forget_message_result.message

Expand Down Expand Up @@ -468,3 +469,138 @@ async def test_forget_store_multi_users(
# Check that the file is still there
content = await storage_engine.read(filename=file_hash)
assert content == file_content


@pytest.mark.asyncio
async def test_forget_store_message_dependent(
session_factory: DbSessionFactory,
message_processor: PendingMessageProcessor,
mock_config: Config,
):
file_hash = "5ccdd7bccfbc5955e2e40166dd0cdea0b093154fd87bc2bea57e7c768cde2f21"

code_message = PendingMessageDb(
item_hash="f6fc4884e3ec3624bd3f60a3c37abf83a130777086061b1a373e659f2bab4d06",
chain=Chain.ETH,
sender="0x696879aE4F6d8DaDD5b8F1cbb1e663B89b08f106",
signature="0x7b87c29388a7a452353f9cae8718b66158fb5bdc93f032964226745ee04919092550791b93f79e5ee1981f2d9d6e5ac0cae0d28b68bb63fe0fcbd79015a6f3ea1b",
type=MessageType.store,
time=timestamp_to_datetime(1652794362.573859),
item_type=ItemType.inline,
item_content='{"address":"0x696879aE4F6d8DaDD5b8F1cbb1e663B89b08f106","time":1652794362.5736332,"item_type":"storage","item_hash":"5ccdd7bccfbc5955e2e40166dd0cdea0b093154fd87bc2bea57e7c768cde2f21","mime_type":"text/plain"}',
channel=Channel("TEST"),
retries=0,
next_attempt=dt.datetime(2023, 1, 1),
check_message=True,
fetched=True,
reception_time=dt.datetime(2022, 1, 1),
)

runtime_hash = "QmXb4khKJJazpEuGVzchSy6yeJubGf8gy9Qjd4ZGSY6hXZ"
runtime_message = PendingMessageDb(
item_hash="63f07193e6ee9d207b7d1fcf8286f9aee34e6f12f101d2ec77c1229f92964696",
chain=Chain.ETH,
sender="0x101d8D16372dBf5f1614adaE95Ee5CCE61998Fc9",
signature="0xb23a41e693ed1c8df444f22b711c5ff2f15875b3f67656910bfa8275ed34fd014855305602309ad4b060be255b7f4f926c543273e5e37f54d7d2567175d2b1921c",
type=MessageType.store,
time=timestamp_to_datetime(1713798108.26326),
item_type=ItemType.inline,
item_content='{"address":"0x101d8D16372dBf5f1614adaE95Ee5CCE61998Fc9","time":1713798108.26326,"item_type":"storage","item_hash":"QmXb4khKJJazpEuGVzchSy6yeJubGf8gy9Qjd4ZGSY6hXZ"}',
channel=None,
retries=0,
next_attempt=dt.datetime(2023, 1, 1),
check_message=True,
fetched=True,
reception_time=dt.datetime(2022, 1, 1),
)

program_message = PendingMessageDb(
item_hash="81fe71c052eb17031d3273336e5bfc10b63eb5a3688e886d23d7a71940c8af1c",
type=MessageType.program,
chain=Chain.ETH,
sender="0x49572dA92e54824ba64D8EAa9b3f9Ef8d0A8183c",
signature="0x172f665a4a8fb31e1cdba49302278051a82185096bc941446db4fd21f83c0fa00bfeafaeb4d2e949c3f48cfc4a2d22f33b4312187aac5ea056a0bb42595d1a0b1b",
item_type=ItemType.inline,
time=timestamp_to_datetime(1736329132.262),
item_content='{"address":"0x49572dA92e54824ba64D8EAa9b3f9Ef8d0A8183c","time":1736329132.262,"type":"vm-function","allow_amend":false,"code":{"encoding":"zip","entrypoint":"main:app","ref":"f6fc4884e3ec3624bd3f60a3c37abf83a130777086061b1a373e659f2bab4d06","use_latest":true},"metadata":{"name":"My program","description":"My program description"},"on":{"http":true,"persistent":false},"environment":{"reproducible":false,"internet":true,"aleph_api":true,"shared_cache":false},"resources":{"vcpus":1,"memory":128,"seconds":30},"runtime":{"ref":"63f07193e6ee9d207b7d1fcf8286f9aee34e6f12f101d2ec77c1229f92964696","use_latest":true,"comment":"Aleph Alpine Linux with Python 3.12"},"volumes":[],"variables":{},"payment":{"chain":"ETH","type":"hold"}}',
channel=None,
reception_time=timestamp_to_datetime(1736329150),
fetched=True,
check_message=True,
retries=0,
next_attempt=dt.datetime(2023, 1, 1),
)

pending_forget_message = PendingMessageDb(
item_hash="5e40c8e2197e0678b5fba9cb1679e3a80fa6aeaa1a440d94f059525295fa32d3",
chain=Chain.ETH,
sender="0x696879aE4F6d8DaDD5b8F1cbb1e663B89b08f106",
signature="0xc342e671be10894bf707b86c3f7538cdb7e4bb5760e234f8d07f8b3dfde015492337bd8756f169e37ac691b74c765415e96b6e1813238912e10ea54cc003887d1b",
type=MessageType.forget,
time=timestamp_to_datetime(1652794384.3102906),
item_type=ItemType.inline,
item_content='{"address":"0x696879aE4F6d8DaDD5b8F1cbb1e663B89b08f106","time":1652794384.3101473,"hashes":["f6fc4884e3ec3624bd3f60a3c37abf83a130777086061b1a373e659f2bab4d06"]}',
channel=Channel("TEST"),
retries=0,
next_attempt=dt.datetime(2023, 1, 2),
check_message=True,
fetched=True,
reception_time=dt.datetime(2022, 1, 2),
)

storage_engine = message_processor.message_handler.storage_service.storage_engine
await storage_engine.write(
filename=file_hash,
content=b"Test",
)
await storage_engine.write(
filename=runtime_hash,
content=b"Runtime",
)

with session_factory() as session:
target_message_result = one(
await process_pending_messages(
message_processor=message_processor,
pending_messages=[code_message],
session=session,
)
)
assert isinstance(target_message_result, ProcessedMessage)

target_message_result1 = one(
await process_pending_messages(
message_processor=message_processor,
pending_messages=[runtime_message],
session=session,
)
)
assert isinstance(target_message_result1, ProcessedMessage)

target_message_result2 = one(
await process_pending_messages(
message_processor=message_processor,
pending_messages=[program_message],
session=session,
)
)

assert isinstance(target_message_result2, ProcessedMessage)

# Sanity check
nb_references = count_file_pins(session=session, file_hash=file_hash)
assert nb_references == 1

forget_message_result = one(
await process_pending_messages(
message_processor=message_processor,
pending_messages=[pending_forget_message],
session=session,
)
)
assert isinstance(forget_message_result, RejectedMessage)
assert forget_message_result.error_code == 503

# Check that the file continues pinned with a grace period
nb_references = count_file_pins(session=session, file_hash=file_hash)
assert nb_references == 1

0 comments on commit 0ce7e0d

Please sign in to comment.