Skip to content

Commit

Permalink
Prevent delete volumes used by VMs (#685)
Browse files Browse the repository at this point in the history
* Fix: Prevent a store message forget to be executed if the file is already used on a VM.

* Fix: Added use case test for removing an already used volume.

* Fix: Added new error code to database with a migration

---------

Co-authored-by: Andres D. Molins <[email protected]>
  • Loading branch information
nesitor and Andres D. Molins authored Jan 16, 2025
1 parent ac9e82d commit f3be9c8
Show file tree
Hide file tree
Showing 5 changed files with 236 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""add_new_error_code_on_db
Revision ID: 9e600f404aa1
Revises: 46f7e55ff55c
Create Date: 2025-01-16 13:51:36.699939
"""
from alembic import op


# revision identifiers, used by Alembic.
revision = '9e600f404aa1'
down_revision = '46f7e55ff55c'
branch_labels = None
depends_on = None


def upgrade() -> None:
op.execute(
"""
INSERT INTO error_codes(code, description) VALUES
(503, 'Cannot forget a used message')
"""
)


def downgrade() -> None:
op.execute("DELETE FROM error_codes WHERE code = 503")
49 changes: 47 additions & 2 deletions src/aleph/db/accessors/vms.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
import datetime as dt
from typing import Iterable, Optional

from sqlalchemy import delete, func, select
from sqlalchemy import delete, func, or_, select
from sqlalchemy.dialects.postgresql import insert

from aleph.db.models.vms import ProgramDb, VmBaseDb, VmInstanceDb, VmVersionDb
from aleph.db.models.vms import (
CodeVolumeDb,
DataVolumeDb,
ImmutableVolumeDb,
ProgramDb,
RootfsVolumeDb,
RuntimeDb,
VmBaseDb,
VmInstanceDb,
VmVersionDb,
)
from aleph.types.db_session import DbSession
from aleph.types.vms import VmVersion

Expand Down Expand Up @@ -51,6 +61,41 @@ def get_vm_version(session: DbSession, vm_hash: str) -> Optional[VmVersionDb]:
).scalar_one_or_none()


def get_vms_dependent_volumes(
session: DbSession, volume_hash: str
) -> Optional[VmBaseDb]:
statement = (
select(VmBaseDb)
.join(
ImmutableVolumeDb,
ImmutableVolumeDb.vm_hash == VmBaseDb.item_hash,
isouter=True,
)
.join(
CodeVolumeDb, CodeVolumeDb.program_hash == VmBaseDb.item_hash, isouter=True
)
.join(
DataVolumeDb, DataVolumeDb.program_hash == VmBaseDb.item_hash, isouter=True
)
.join(RuntimeDb, RuntimeDb.program_hash == VmBaseDb.item_hash, isouter=True)
.join(
RootfsVolumeDb,
RootfsVolumeDb.instance_hash == VmBaseDb.item_hash,
isouter=True,
)
.where(
or_(
ImmutableVolumeDb.ref == volume_hash,
CodeVolumeDb.ref == volume_hash,
DataVolumeDb.ref == volume_hash,
RuntimeDb.ref == volume_hash,
RootfsVolumeDb.parent_ref == volume_hash,
)
)
)
return session.execute(statement).scalar_one_or_none()


def upsert_vm_version(
session: DbSession,
vm_hash: str,
Expand Down
13 changes: 13 additions & 0 deletions src/aleph/handlers/content/forget.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
get_message_status,
message_exists,
)
from aleph.db.accessors.vms import get_vms_dependent_volumes
from aleph.db.models import AggregateElementDb, MessageDb
from aleph.handlers.content.content_handler import ContentHandler
from aleph.types.db_session import DbSession
from aleph.types.message_status import (
CannotForgetForgetMessage,
ForgetNotAllowed,
ForgetTargetNotFound,
InternalError,
MessageStatus,
Expand Down Expand Up @@ -58,6 +60,17 @@ async def check_dependencies(self, session: DbSession, message: MessageDb) -> No
if not message_exists(session=session, item_hash=item_hash):
raise ForgetTargetNotFound(item_hash)

# Check file references, on VM volumes, as data volume and as code volume
# to block the deletion if we found ones
dependent_volumes = get_vms_dependent_volumes(
session=session, volume_hash=item_hash
)
print(dependent_volumes, item_hash)
if dependent_volumes is not None:
raise ForgetNotAllowed(
file_hash=item_hash, vm_hash=dependent_volumes.item_hash
)

for aggregate_key in content.aggregates:
if not aggregate_exists(
session=session, key=aggregate_key, owner=content.address
Expand Down
12 changes: 12 additions & 0 deletions src/aleph/types/message_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class ErrorCode(IntEnum):
FORGET_NO_TARGET = 500
FORGET_TARGET_NOT_FOUND = 501
FORGET_FORGET = 502
FORGET_NOT_ALLOWED = 503


class MessageProcessingException(Exception):
Expand Down Expand Up @@ -202,6 +203,17 @@ class StoreCannotUpdateStoreWithRef(InvalidMessageException):
error_code = ErrorCode.STORE_UPDATE_UPDATE


class ForgetNotAllowed(InvalidMessageException):
"""
The store message targeted by the `ref` field has a value in the `ref` field of a dependent volume.
"""

def __init__(self, file_hash: str, vm_hash: str):
super().__init__(f"File {file_hash} used on vm {vm_hash}")

error_code = ErrorCode.FORGET_NOT_ALLOWED


class VmRefNotFound(RetryMessageException):
"""
The original program specified in the `ref` field could not be found.
Expand Down
136 changes: 136 additions & 0 deletions tests/message_processing/test_process_forgets.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ async def test_forget_post_message(
session=session,
)
)

assert isinstance(forget_message_result, ProcessedMessage)
forget_message = forget_message_result.message

Expand Down Expand Up @@ -468,3 +469,138 @@ async def test_forget_store_multi_users(
# Check that the file is still there
content = await storage_engine.read(filename=file_hash)
assert content == file_content


@pytest.mark.asyncio
async def test_forget_store_message_dependent(
session_factory: DbSessionFactory,
message_processor: PendingMessageProcessor,
mock_config: Config,
):
file_hash = "5ccdd7bccfbc5955e2e40166dd0cdea0b093154fd87bc2bea57e7c768cde2f21"

code_message = PendingMessageDb(
item_hash="f6fc4884e3ec3624bd3f60a3c37abf83a130777086061b1a373e659f2bab4d06",
chain=Chain.ETH,
sender="0x696879aE4F6d8DaDD5b8F1cbb1e663B89b08f106",
signature="0x7b87c29388a7a452353f9cae8718b66158fb5bdc93f032964226745ee04919092550791b93f79e5ee1981f2d9d6e5ac0cae0d28b68bb63fe0fcbd79015a6f3ea1b",
type=MessageType.store,
time=timestamp_to_datetime(1652794362.573859),
item_type=ItemType.inline,
item_content='{"address":"0x696879aE4F6d8DaDD5b8F1cbb1e663B89b08f106","time":1652794362.5736332,"item_type":"storage","item_hash":"5ccdd7bccfbc5955e2e40166dd0cdea0b093154fd87bc2bea57e7c768cde2f21","mime_type":"text/plain"}',
channel=Channel("TEST"),
retries=0,
next_attempt=dt.datetime(2023, 1, 1),
check_message=True,
fetched=True,
reception_time=dt.datetime(2022, 1, 1),
)

runtime_hash = "QmXb4khKJJazpEuGVzchSy6yeJubGf8gy9Qjd4ZGSY6hXZ"
runtime_message = PendingMessageDb(
item_hash="63f07193e6ee9d207b7d1fcf8286f9aee34e6f12f101d2ec77c1229f92964696",
chain=Chain.ETH,
sender="0x101d8D16372dBf5f1614adaE95Ee5CCE61998Fc9",
signature="0xb23a41e693ed1c8df444f22b711c5ff2f15875b3f67656910bfa8275ed34fd014855305602309ad4b060be255b7f4f926c543273e5e37f54d7d2567175d2b1921c",
type=MessageType.store,
time=timestamp_to_datetime(1713798108.26326),
item_type=ItemType.inline,
item_content='{"address":"0x101d8D16372dBf5f1614adaE95Ee5CCE61998Fc9","time":1713798108.26326,"item_type":"storage","item_hash":"QmXb4khKJJazpEuGVzchSy6yeJubGf8gy9Qjd4ZGSY6hXZ"}',
channel=None,
retries=0,
next_attempt=dt.datetime(2023, 1, 1),
check_message=True,
fetched=True,
reception_time=dt.datetime(2022, 1, 1),
)

program_message = PendingMessageDb(
item_hash="81fe71c052eb17031d3273336e5bfc10b63eb5a3688e886d23d7a71940c8af1c",
type=MessageType.program,
chain=Chain.ETH,
sender="0x49572dA92e54824ba64D8EAa9b3f9Ef8d0A8183c",
signature="0x172f665a4a8fb31e1cdba49302278051a82185096bc941446db4fd21f83c0fa00bfeafaeb4d2e949c3f48cfc4a2d22f33b4312187aac5ea056a0bb42595d1a0b1b",
item_type=ItemType.inline,
time=timestamp_to_datetime(1736329132.262),
item_content='{"address":"0x49572dA92e54824ba64D8EAa9b3f9Ef8d0A8183c","time":1736329132.262,"type":"vm-function","allow_amend":false,"code":{"encoding":"zip","entrypoint":"main:app","ref":"f6fc4884e3ec3624bd3f60a3c37abf83a130777086061b1a373e659f2bab4d06","use_latest":true},"metadata":{"name":"My program","description":"My program description"},"on":{"http":true,"persistent":false},"environment":{"reproducible":false,"internet":true,"aleph_api":true,"shared_cache":false},"resources":{"vcpus":1,"memory":128,"seconds":30},"runtime":{"ref":"63f07193e6ee9d207b7d1fcf8286f9aee34e6f12f101d2ec77c1229f92964696","use_latest":true,"comment":"Aleph Alpine Linux with Python 3.12"},"volumes":[],"variables":{},"payment":{"chain":"ETH","type":"hold"}}',
channel=None,
reception_time=timestamp_to_datetime(1736329150),
fetched=True,
check_message=True,
retries=0,
next_attempt=dt.datetime(2023, 1, 1),
)

pending_forget_message = PendingMessageDb(
item_hash="5e40c8e2197e0678b5fba9cb1679e3a80fa6aeaa1a440d94f059525295fa32d3",
chain=Chain.ETH,
sender="0x696879aE4F6d8DaDD5b8F1cbb1e663B89b08f106",
signature="0xc342e671be10894bf707b86c3f7538cdb7e4bb5760e234f8d07f8b3dfde015492337bd8756f169e37ac691b74c765415e96b6e1813238912e10ea54cc003887d1b",
type=MessageType.forget,
time=timestamp_to_datetime(1652794384.3102906),
item_type=ItemType.inline,
item_content='{"address":"0x696879aE4F6d8DaDD5b8F1cbb1e663B89b08f106","time":1652794384.3101473,"hashes":["f6fc4884e3ec3624bd3f60a3c37abf83a130777086061b1a373e659f2bab4d06"]}',
channel=Channel("TEST"),
retries=0,
next_attempt=dt.datetime(2023, 1, 2),
check_message=True,
fetched=True,
reception_time=dt.datetime(2022, 1, 2),
)

storage_engine = message_processor.message_handler.storage_service.storage_engine
await storage_engine.write(
filename=file_hash,
content=b"Test",
)
await storage_engine.write(
filename=runtime_hash,
content=b"Runtime",
)

with session_factory() as session:
target_message_result = one(
await process_pending_messages(
message_processor=message_processor,
pending_messages=[code_message],
session=session,
)
)
assert isinstance(target_message_result, ProcessedMessage)

target_message_result1 = one(
await process_pending_messages(
message_processor=message_processor,
pending_messages=[runtime_message],
session=session,
)
)
assert isinstance(target_message_result1, ProcessedMessage)

target_message_result2 = one(
await process_pending_messages(
message_processor=message_processor,
pending_messages=[program_message],
session=session,
)
)

assert isinstance(target_message_result2, ProcessedMessage)

# Sanity check
nb_references = count_file_pins(session=session, file_hash=file_hash)
assert nb_references == 1

forget_message_result = one(
await process_pending_messages(
message_processor=message_processor,
pending_messages=[pending_forget_message],
session=session,
)
)
assert isinstance(forget_message_result, RejectedMessage)
assert forget_message_result.error_code == 503

# Check that the file continues pinned with a grace period
nb_references = count_file_pins(session=session, file_hash=file_hash)
assert nb_references == 1

0 comments on commit f3be9c8

Please sign in to comment.