diff --git a/dandiapi/api/management/commands/revalidate.py b/dandiapi/api/management/commands/revalidate.py index 14a4b03d7..13b139f42 100644 --- a/dandiapi/api/management/commands/revalidate.py +++ b/dandiapi/api/management/commands/revalidate.py @@ -22,7 +22,8 @@ def revalidate(assets: bool, versions: bool, revalidate_all: bool, dry_run: bool click.echo(f'Revalidating {asset_qs.count()} assets') if not dry_run: for asset in asset_qs.iterator(): - validate_asset_metadata(asset=asset) + if not validate_asset_metadata(asset=asset): + click.echo(f'Unable to validate asset {asset.id}', err=True, fg='yellow') if versions: # Only revalidate draft versions diff --git a/dandiapi/api/services/asset/__init__.py b/dandiapi/api/services/asset/__init__.py index 7570993cb..219579e00 100644 --- a/dandiapi/api/services/asset/__init__.py +++ b/dandiapi/api/services/asset/__init__.py @@ -10,7 +10,6 @@ DraftDandisetNotModifiable, ZarrArchiveBelongsToDifferentDandiset, ) -from dandiapi.api.services.asset.metadata import _maybe_validate_asset_metadata from dandiapi.zarr.models import ZarrArchive @@ -148,13 +147,6 @@ def add_asset_to_version( # Save the version so that the modified field is updated version.save() - # The sha256 may have been newly calculated in the time since the asset was created. If so, we - # need to fetch the latest record from the DB so that _maybe_validate_asset_metadata sees the - # new sha256 value and includes it in the asset metadata properly. - asset.refresh_from_db() - - _maybe_validate_asset_metadata(asset) - return asset diff --git a/dandiapi/api/services/asset/metadata.py b/dandiapi/api/services/asset/metadata.py index 41fa55592..315b30cb8 100644 --- a/dandiapi/api/services/asset/metadata.py +++ b/dandiapi/api/services/asset/metadata.py @@ -2,30 +2,6 @@ from django.db.models.query import QuerySet from dandiapi.api.models.asset import Asset -from dandiapi.api.services.metadata import validate_asset_metadata - - -def _maybe_validate_asset_metadata(asset: Asset): - """ - Validate asset metadata if a checksum for its blob has already been computed. - - If the checksum isn't there yet, it's the responsibility of the checksum code - to trigger validation for all assets pointing to its blob. - """ - # Checksums are necessary for the asset metadata to be validated. For asset blobs, the sha256 - # checksum is required. For zarrs, the zarr checksum is required. In both of these cases, if - # the checksum is not present, asset metadata is not yet validated, and that validation should - # be kicked off at the end of their respective checksum calculation tasks. - if asset.is_zarr: - if asset.zarr.checksum is None: - return - else: - blob = asset.blob or asset.embargoed_blob - if blob.sha256 is None: - return - - # We do not bother to delay this because it should run very quickly. - validate_asset_metadata(asset=asset) def bulk_recalculate_asset_metadata(*, assets: QuerySet[Asset]): diff --git a/dandiapi/api/services/metadata/__init__.py b/dandiapi/api/services/metadata/__init__.py index ed5efa3a0..96708e6cc 100644 --- a/dandiapi/api/services/metadata/__init__.py +++ b/dandiapi/api/services/metadata/__init__.py @@ -33,17 +33,17 @@ def _collect_validation_errors( return [encoder(error) for error in error.errors] -def validate_asset_metadata(*, asset: Asset) -> None: +def validate_asset_metadata(*, asset: Asset) -> bool: logger.info('Validating asset metadata for asset %s', asset.id) # Published assets are immutable if asset.published: raise AssetHasBeenPublished() - with transaction.atomic(): - asset.status = Asset.Status.VALIDATING - asset.save() + # track the state of the asset before to use optimistic locking + asset_state = asset.status + with transaction.atomic(): try: metadata = asset.published_metadata() validate(metadata, schema_key='PublishedAsset', json_validation=True) @@ -62,14 +62,24 @@ def validate_asset_metadata(*, asset: Asset) -> None: asset.status = Asset.Status.INVALID asset.validation_errors = [{'field': '', 'message': str(e)}] - # Save asset - asset.save() + updated_asset = Asset.objects.filter( + id=asset.id, status=asset_state, metadata=asset.metadata, published=False + ).update( + status=asset.status, + validation_errors=asset.validation_errors, + # include metadata in update since we're bypassing .save() + metadata=asset._populate_metadata(), + ) + if updated_asset: + # Update modified timestamps on all draft versions this asset belongs to + asset.versions.filter(version='draft').update(modified=timezone.now()) + else: + logger.info('Asset %s was modified while validating', asset.id) - # Update modified timestamps on all draft versions this asset belongs to - asset.versions.filter(version='draft').update(modified=timezone.now()) + return updated_asset -def version_aggregate_assets_summary(version: Version): +def version_aggregate_assets_summary(version: Version) -> None: if version.version != 'draft': raise VersionHasBeenPublished() @@ -82,8 +92,6 @@ def version_aggregate_assets_summary(version: Version): Version.objects.filter(id=version.id, version='draft').update( modified=timezone.now(), metadata=version.metadata ) - version.refresh_from_db() - return version def validate_version_metadata(*, version: Version) -> None: diff --git a/dandiapi/api/services/publish/__init__.py b/dandiapi/api/services/publish/__init__.py index 33b766da2..8ffeaba32 100644 --- a/dandiapi/api/services/publish/__init__.py +++ b/dandiapi/api/services/publish/__init__.py @@ -34,9 +34,6 @@ def publish_asset(*, asset: Asset) -> None: locked_asset.published = True locked_asset.save() - # Original asset is now stale, so we need to refresh from DB - asset.refresh_from_db() - def _lock_dandiset_for_publishing(*, user: User, dandiset: Dandiset) -> None: """ diff --git a/dandiapi/api/tasks/__init__.py b/dandiapi/api/tasks/__init__.py index ab8e84238..6c2449024 100644 --- a/dandiapi/api/tasks/__init__.py +++ b/dandiapi/api/tasks/__init__.py @@ -1,6 +1,5 @@ from celery import shared_task from celery.utils.log import get_task_logger -from django.db import transaction from django.db.transaction import atomic from dandiapi.api.doi import delete_doi @@ -17,8 +16,7 @@ @shared_task(queue='calculate_sha256', soft_time_limit=86_400) -@atomic -def calculate_sha256(blob_id: int) -> None: +def calculate_sha256(blob_id: str) -> None: try: asset_blob = AssetBlob.objects.get(blob_id=blob_id) logger.info(f'Found AssetBlob {blob_id}') @@ -33,19 +31,6 @@ def calculate_sha256(blob_id: int) -> None: asset_blob.sha256 = sha256 asset_blob.save() - # The newly calculated sha256 digest will be included in the metadata, so we need to revalidate - # Note, we use `.iterator` here and delay each validation as a new task in order to keep memory - # usage down. - def dispatch_validation(): - for asset_id in asset_blob.assets.values_list('id', flat=True).iterator(): - # Note: while asset metadata is fairly lightweight compute-wise, memory-wise it can - # become an issue during serialization/deserialization of the JSON blob by pydantic. - # Therefore, we delay each validation to its own task. - validate_asset_metadata_task.delay(asset_id) - - # Run on transaction commit - transaction.on_commit(dispatch_validation) - @shared_task(soft_time_limit=60) @atomic @@ -65,8 +50,9 @@ def write_manifest_files(version_id: int) -> None: def validate_asset_metadata_task(asset_id: int) -> None: from dandiapi.api.services.metadata import validate_asset_metadata - asset: Asset = Asset.objects.get(id=asset_id) - validate_asset_metadata(asset=asset) + asset: Asset = Asset.objects.filter(id=asset_id, status=Asset.Status.PENDING).first() + if asset: + validate_asset_metadata(asset=asset) @shared_task(soft_time_limit=30) diff --git a/dandiapi/api/tasks/scheduled.py b/dandiapi/api/tasks/scheduled.py index 5608b7d4e..445a6e2e9 100644 --- a/dandiapi/api/tasks/scheduled.py +++ b/dandiapi/api/tasks/scheduled.py @@ -12,12 +12,19 @@ from django.conf import settings from django.contrib.auth.models import User from django.db import connection +from django.db.models.query_utils import Q from dandiapi.analytics.tasks import collect_s3_log_records_task from dandiapi.api.mail import send_pending_users_message from dandiapi.api.models import UserMetadata, Version +from dandiapi.api.models.asset import Asset from dandiapi.api.services.metadata import version_aggregate_assets_summary -from dandiapi.api.tasks import validate_version_metadata_task, write_manifest_files +from dandiapi.api.tasks import ( + validate_asset_metadata_task, + validate_version_metadata_task, + write_manifest_files, +) +from dandiapi.zarr.models import ZarrArchiveStatus logger = get_task_logger(__name__) @@ -28,6 +35,29 @@ def aggregate_assets_summary_task(version_id: int): version_aggregate_assets_summary(version) +@shared_task(soft_time_limit=30) +def validate_pending_asset_metadata(): + validatable_assets = ( + Asset.objects.filter(status=Asset.Status.PENDING) + .filter( + (Q(blob__isnull=False) & Q(blob__sha256__isnull=False)) + | (Q(embargoed_blob__isnull=False) & Q(embargoed_blob__sha256__isnull=False)) + | ( + Q(zarr__isnull=False) + & Q(zarr__checksum__isnull=False) + & Q(zarr__status=ZarrArchiveStatus.COMPLETE) + ) + ) + .values_list('id', flat=True) + ) + validatable_assets_count = validatable_assets.count() + if validatable_assets_count > 0: + logger.info('Found %s assets to validate', validatable_assets_count) + for asset_id in validatable_assets.iterator(): + # TODO: throttle? + validate_asset_metadata_task.delay(asset_id) + + @shared_task(soft_time_limit=20) def validate_draft_version_metadata(): # Select only the id of draft versions that have status PENDING @@ -75,6 +105,11 @@ def register_scheduled_tasks(sender: Celery, **kwargs): timedelta(seconds=settings.DANDI_VALIDATION_JOB_INTERVAL), validate_draft_version_metadata.s(), ) + # Check for any assets that need validation every minute + sender.add_periodic_task( + timedelta(seconds=settings.DANDI_VALIDATION_JOB_INTERVAL), + validate_pending_asset_metadata.s(), + ) # Send daily email to admins containing a list of users awaiting approval sender.add_periodic_task(crontab(hour=0, minute=0), send_pending_users_email.s()) diff --git a/dandiapi/api/tests/factories.py b/dandiapi/api/tests/factories.py index d98bbeda2..6b4210687 100644 --- a/dandiapi/api/tests/factories.py +++ b/dandiapi/api/tests/factories.py @@ -208,6 +208,7 @@ def _create(cls, *args, **kwargs): asset.status = Asset.Status.VALID # published assets are always valid asset.save() publish_asset(asset=asset) + asset.refresh_from_db() return asset diff --git a/dandiapi/api/tests/test_asset.py b/dandiapi/api/tests/test_asset.py index 33b3d75eb..dc9154d12 100644 --- a/dandiapi/api/tests/test_asset.py +++ b/dandiapi/api/tests/test_asset.py @@ -16,6 +16,7 @@ from dandiapi.api.services.asset import add_asset_to_version from dandiapi.api.services.asset.exceptions import AssetPathConflict from dandiapi.api.services.publish import publish_asset +from dandiapi.api.tasks.scheduled import validate_pending_asset_metadata from dandiapi.zarr.models import ZarrArchive from dandiapi.zarr.tasks import ingest_zarr_archive @@ -101,6 +102,7 @@ def test_publish_asset(draft_asset: Asset): # draft_asset has been published, so it is now published_asset published_asset = draft_asset + published_asset.refresh_from_db() assert published_asset.blob == draft_blob assert published_asset.metadata == { @@ -707,7 +709,8 @@ def test_asset_create_zarr_validated( zarr_file_factory(zarr_archive=zarr_archive) api_client.post(f'/api/zarr/{zarr_archive.zarr_id}/finalize/') - # Since tasks run synchronously in tests, the assets should be validated now + validate_pending_asset_metadata() + asset1.refresh_from_db() asset2.refresh_from_db() assert asset1.status == Asset.Status.VALID diff --git a/dandiapi/api/tests/test_tasks.py b/dandiapi/api/tests/test_tasks.py index a25937945..f8c8e7c51 100644 --- a/dandiapi/api/tests/test_tasks.py +++ b/dandiapi/api/tests/test_tasks.py @@ -50,28 +50,6 @@ def test_calculate_checksum_task_embargo(storage: Storage, embargoed_asset_blob_ assert asset_blob.sha256 == sha256 -@pytest.mark.django_db -def test_checksum_task_invokes_asset_validation( - storage: Storage, asset_blob_factory, draft_asset_factory, django_capture_on_commit_callbacks -): - # Pretend like AssetBlob was defined with the given storage - AssetBlob.blob.field.storage = storage - asset_blob = asset_blob_factory(sha256=None) - assets: list[Asset] = [draft_asset_factory(blob=asset_blob) for _ in range(3)] - - # Assert status before checksum - assert all([asset.status == Asset.Status.PENDING for asset in assets]) - - # Dispatch task, capture any transaction callbacks - with django_capture_on_commit_callbacks(execute=True): - tasks.calculate_sha256(asset_blob.blob_id) - - # Assert metadata validation ran - for asset in assets: - asset.refresh_from_db() - assert asset.status != Asset.Status.PENDING - - @pytest.mark.django_db def test_write_manifest_files(storage: Storage, version: Version, asset_factory): # Pretend like AssetBlob was defined with the given storage @@ -159,6 +137,9 @@ def test_validate_asset_metadata_no_digest(draft_asset: Asset): draft_asset.blob.sha256 = None draft_asset.blob.save() + del draft_asset.metadata['digest'] + draft_asset.save() + tasks.validate_asset_metadata_task(draft_asset.id) draft_asset.refresh_from_db() diff --git a/dandiapi/api/tests/test_version.py b/dandiapi/api/tests/test_version.py index edd672c35..c7e8a5fb5 100644 --- a/dandiapi/api/tests/test_version.py +++ b/dandiapi/api/tests/test_version.py @@ -651,6 +651,7 @@ def test_version_rest_publish_zarr( zarr_archive = zarr_archive_factory(dandiset=draft_version.dandiset) zarr_file_factory(zarr_archive=zarr_archive) ingest_zarr_archive(zarr_archive.zarr_id) + zarr_archive.refresh_from_db() zarr_asset: Asset = draft_asset_factory(zarr=zarr_archive, blob=None) normal_asset: Asset = draft_asset_factory() diff --git a/dandiapi/zarr/tasks/__init__.py b/dandiapi/zarr/tasks/__init__.py index 6edf96964..2cffeab22 100644 --- a/dandiapi/zarr/tasks/__init__.py +++ b/dandiapi/zarr/tasks/__init__.py @@ -8,7 +8,6 @@ from dandiapi.api.asset_paths import add_zarr_paths, delete_zarr_paths from dandiapi.api.storage import get_storage_params -from dandiapi.api.tasks import validate_asset_metadata_task from dandiapi.zarr.models import ZarrArchive, ZarrArchiveStatus logger = get_task_logger(__name__) @@ -67,17 +66,6 @@ def ingest_zarr_archive(zarr_id: str, force: bool = False): # Add asset paths after ingest is finished add_zarr_paths(zarr) - # Use function instead of lambda for correct closure - def dispatch_validation(): - for asset_id in zarr.assets.values_list('id', flat=True).iterator(): - # Note: while asset metadata is fairly lightweight compute-wise, memory-wise it can - # become an issue during serialization/deserialization of the JSON blob by pydantic. - # Therefore, we delay each validation to its own task. - validate_asset_metadata_task.delay(asset_id) - - # Kickoff metadata validation for all zarr assets after the transaction completes - transaction.on_commit(dispatch_validation) - def ingest_dandiset_zarrs(dandiset_id: int, **kwargs): for zarr in ZarrArchive.objects.filter(dandiset__id=dandiset_id):