Skip to content

Commit

Permalink
Merge pull request #1634 from dandi/stuck-assets
Browse files Browse the repository at this point in the history
  • Loading branch information
danlamanna authored Jul 18, 2023
2 parents 6d94648 + 1e681c3 commit 0961f55
Show file tree
Hide file tree
Showing 12 changed files with 70 additions and 101 deletions.
3 changes: 2 additions & 1 deletion dandiapi/api/management/commands/revalidate.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ def revalidate(assets: bool, versions: bool, revalidate_all: bool, dry_run: bool
click.echo(f'Revalidating {asset_qs.count()} assets')
if not dry_run:
for asset in asset_qs.iterator():
validate_asset_metadata(asset=asset)
if not validate_asset_metadata(asset=asset):
click.echo(f'Unable to validate asset {asset.id}', err=True, fg='yellow')

if versions:
# Only revalidate draft versions
Expand Down
8 changes: 0 additions & 8 deletions dandiapi/api/services/asset/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
DraftDandisetNotModifiable,
ZarrArchiveBelongsToDifferentDandiset,
)
from dandiapi.api.services.asset.metadata import _maybe_validate_asset_metadata
from dandiapi.zarr.models import ZarrArchive


Expand Down Expand Up @@ -148,13 +147,6 @@ def add_asset_to_version(
# Save the version so that the modified field is updated
version.save()

# The sha256 may have been newly calculated in the time since the asset was created. If so, we
# need to fetch the latest record from the DB so that _maybe_validate_asset_metadata sees the
# new sha256 value and includes it in the asset metadata properly.
asset.refresh_from_db()

_maybe_validate_asset_metadata(asset)

return asset


Expand Down
24 changes: 0 additions & 24 deletions dandiapi/api/services/asset/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,6 @@
from django.db.models.query import QuerySet

from dandiapi.api.models.asset import Asset
from dandiapi.api.services.metadata import validate_asset_metadata


def _maybe_validate_asset_metadata(asset: Asset):
"""
Validate asset metadata if a checksum for its blob has already been computed.
If the checksum isn't there yet, it's the responsibility of the checksum code
to trigger validation for all assets pointing to its blob.
"""
# Checksums are necessary for the asset metadata to be validated. For asset blobs, the sha256
# checksum is required. For zarrs, the zarr checksum is required. In both of these cases, if
# the checksum is not present, asset metadata is not yet validated, and that validation should
# be kicked off at the end of their respective checksum calculation tasks.
if asset.is_zarr:
if asset.zarr.checksum is None:
return
else:
blob = asset.blob or asset.embargoed_blob
if blob.sha256 is None:
return

# We do not bother to delay this because it should run very quickly.
validate_asset_metadata(asset=asset)


def bulk_recalculate_asset_metadata(*, assets: QuerySet[Asset]):
Expand Down
30 changes: 19 additions & 11 deletions dandiapi/api/services/metadata/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,17 @@ def _collect_validation_errors(
return [encoder(error) for error in error.errors]


def validate_asset_metadata(*, asset: Asset) -> None:
def validate_asset_metadata(*, asset: Asset) -> bool:
logger.info('Validating asset metadata for asset %s', asset.id)

# Published assets are immutable
if asset.published:
raise AssetHasBeenPublished()

with transaction.atomic():
asset.status = Asset.Status.VALIDATING
asset.save()
# track the state of the asset before to use optimistic locking
asset_state = asset.status

with transaction.atomic():
try:
metadata = asset.published_metadata()
validate(metadata, schema_key='PublishedAsset', json_validation=True)
Expand All @@ -62,14 +62,24 @@ def validate_asset_metadata(*, asset: Asset) -> None:
asset.status = Asset.Status.INVALID
asset.validation_errors = [{'field': '', 'message': str(e)}]

# Save asset
asset.save()
updated_asset = Asset.objects.filter(
id=asset.id, status=asset_state, metadata=asset.metadata, published=False
).update(
status=asset.status,
validation_errors=asset.validation_errors,
# include metadata in update since we're bypassing .save()
metadata=asset._populate_metadata(),
)
if updated_asset:
# Update modified timestamps on all draft versions this asset belongs to
asset.versions.filter(version='draft').update(modified=timezone.now())
else:
logger.info('Asset %s was modified while validating', asset.id)

# Update modified timestamps on all draft versions this asset belongs to
asset.versions.filter(version='draft').update(modified=timezone.now())
return updated_asset


def version_aggregate_assets_summary(version: Version):
def version_aggregate_assets_summary(version: Version) -> None:
if version.version != 'draft':
raise VersionHasBeenPublished()

Expand All @@ -82,8 +92,6 @@ def version_aggregate_assets_summary(version: Version):
Version.objects.filter(id=version.id, version='draft').update(
modified=timezone.now(), metadata=version.metadata
)
version.refresh_from_db()
return version


def validate_version_metadata(*, version: Version) -> None:
Expand Down
3 changes: 0 additions & 3 deletions dandiapi/api/services/publish/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ def publish_asset(*, asset: Asset) -> None:
locked_asset.published = True
locked_asset.save()

# Original asset is now stale, so we need to refresh from DB
asset.refresh_from_db()


def _lock_dandiset_for_publishing(*, user: User, dandiset: Dandiset) -> None:
"""
Expand Down
22 changes: 4 additions & 18 deletions dandiapi/api/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from celery import shared_task
from celery.utils.log import get_task_logger
from django.db import transaction
from django.db.transaction import atomic

from dandiapi.api.doi import delete_doi
Expand All @@ -17,8 +16,7 @@


@shared_task(queue='calculate_sha256', soft_time_limit=86_400)
@atomic
def calculate_sha256(blob_id: int) -> None:
def calculate_sha256(blob_id: str) -> None:
try:
asset_blob = AssetBlob.objects.get(blob_id=blob_id)
logger.info(f'Found AssetBlob {blob_id}')
Expand All @@ -33,19 +31,6 @@ def calculate_sha256(blob_id: int) -> None:
asset_blob.sha256 = sha256
asset_blob.save()

# The newly calculated sha256 digest will be included in the metadata, so we need to revalidate
# Note, we use `.iterator` here and delay each validation as a new task in order to keep memory
# usage down.
def dispatch_validation():
for asset_id in asset_blob.assets.values_list('id', flat=True).iterator():
# Note: while asset metadata is fairly lightweight compute-wise, memory-wise it can
# become an issue during serialization/deserialization of the JSON blob by pydantic.
# Therefore, we delay each validation to its own task.
validate_asset_metadata_task.delay(asset_id)

# Run on transaction commit
transaction.on_commit(dispatch_validation)


@shared_task(soft_time_limit=60)
@atomic
Expand All @@ -65,8 +50,9 @@ def write_manifest_files(version_id: int) -> None:
def validate_asset_metadata_task(asset_id: int) -> None:
from dandiapi.api.services.metadata import validate_asset_metadata

asset: Asset = Asset.objects.get(id=asset_id)
validate_asset_metadata(asset=asset)
asset: Asset = Asset.objects.filter(id=asset_id, status=Asset.Status.PENDING).first()
if asset:
validate_asset_metadata(asset=asset)


@shared_task(soft_time_limit=30)
Expand Down
37 changes: 36 additions & 1 deletion dandiapi/api/tasks/scheduled.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,19 @@
from django.conf import settings
from django.contrib.auth.models import User
from django.db import connection
from django.db.models.query_utils import Q

from dandiapi.analytics.tasks import collect_s3_log_records_task
from dandiapi.api.mail import send_pending_users_message
from dandiapi.api.models import UserMetadata, Version
from dandiapi.api.models.asset import Asset
from dandiapi.api.services.metadata import version_aggregate_assets_summary
from dandiapi.api.tasks import validate_version_metadata_task, write_manifest_files
from dandiapi.api.tasks import (
validate_asset_metadata_task,
validate_version_metadata_task,
write_manifest_files,
)
from dandiapi.zarr.models import ZarrArchiveStatus

logger = get_task_logger(__name__)

Expand All @@ -28,6 +35,29 @@ def aggregate_assets_summary_task(version_id: int):
version_aggregate_assets_summary(version)


@shared_task(soft_time_limit=30)
def validate_pending_asset_metadata():
validatable_assets = (
Asset.objects.filter(status=Asset.Status.PENDING)
.filter(
(Q(blob__isnull=False) & Q(blob__sha256__isnull=False))
| (Q(embargoed_blob__isnull=False) & Q(embargoed_blob__sha256__isnull=False))
| (
Q(zarr__isnull=False)
& Q(zarr__checksum__isnull=False)
& Q(zarr__status=ZarrArchiveStatus.COMPLETE)
)
)
.values_list('id', flat=True)
)
validatable_assets_count = validatable_assets.count()
if validatable_assets_count > 0:
logger.info('Found %s assets to validate', validatable_assets_count)
for asset_id in validatable_assets.iterator():
# TODO: throttle?
validate_asset_metadata_task.delay(asset_id)


@shared_task(soft_time_limit=20)
def validate_draft_version_metadata():
# Select only the id of draft versions that have status PENDING
Expand Down Expand Up @@ -75,6 +105,11 @@ def register_scheduled_tasks(sender: Celery, **kwargs):
timedelta(seconds=settings.DANDI_VALIDATION_JOB_INTERVAL),
validate_draft_version_metadata.s(),
)
# Check for any assets that need validation every minute
sender.add_periodic_task(
timedelta(seconds=settings.DANDI_VALIDATION_JOB_INTERVAL),
validate_pending_asset_metadata.s(),
)

# Send daily email to admins containing a list of users awaiting approval
sender.add_periodic_task(crontab(hour=0, minute=0), send_pending_users_email.s())
Expand Down
1 change: 1 addition & 0 deletions dandiapi/api/tests/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ def _create(cls, *args, **kwargs):
asset.status = Asset.Status.VALID # published assets are always valid
asset.save()
publish_asset(asset=asset)
asset.refresh_from_db()
return asset


Expand Down
5 changes: 4 additions & 1 deletion dandiapi/api/tests/test_asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from dandiapi.api.services.asset import add_asset_to_version
from dandiapi.api.services.asset.exceptions import AssetPathConflict
from dandiapi.api.services.publish import publish_asset
from dandiapi.api.tasks.scheduled import validate_pending_asset_metadata
from dandiapi.zarr.models import ZarrArchive
from dandiapi.zarr.tasks import ingest_zarr_archive

Expand Down Expand Up @@ -101,6 +102,7 @@ def test_publish_asset(draft_asset: Asset):

# draft_asset has been published, so it is now published_asset
published_asset = draft_asset
published_asset.refresh_from_db()

assert published_asset.blob == draft_blob
assert published_asset.metadata == {
Expand Down Expand Up @@ -707,7 +709,8 @@ def test_asset_create_zarr_validated(
zarr_file_factory(zarr_archive=zarr_archive)
api_client.post(f'/api/zarr/{zarr_archive.zarr_id}/finalize/')

# Since tasks run synchronously in tests, the assets should be validated now
validate_pending_asset_metadata()

asset1.refresh_from_db()
asset2.refresh_from_db()
assert asset1.status == Asset.Status.VALID
Expand Down
25 changes: 3 additions & 22 deletions dandiapi/api/tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,28 +50,6 @@ def test_calculate_checksum_task_embargo(storage: Storage, embargoed_asset_blob_
assert asset_blob.sha256 == sha256


@pytest.mark.django_db
def test_checksum_task_invokes_asset_validation(
storage: Storage, asset_blob_factory, draft_asset_factory, django_capture_on_commit_callbacks
):
# Pretend like AssetBlob was defined with the given storage
AssetBlob.blob.field.storage = storage
asset_blob = asset_blob_factory(sha256=None)
assets: list[Asset] = [draft_asset_factory(blob=asset_blob) for _ in range(3)]

# Assert status before checksum
assert all([asset.status == Asset.Status.PENDING for asset in assets])

# Dispatch task, capture any transaction callbacks
with django_capture_on_commit_callbacks(execute=True):
tasks.calculate_sha256(asset_blob.blob_id)

# Assert metadata validation ran
for asset in assets:
asset.refresh_from_db()
assert asset.status != Asset.Status.PENDING


@pytest.mark.django_db
def test_write_manifest_files(storage: Storage, version: Version, asset_factory):
# Pretend like AssetBlob was defined with the given storage
Expand Down Expand Up @@ -159,6 +137,9 @@ def test_validate_asset_metadata_no_digest(draft_asset: Asset):
draft_asset.blob.sha256 = None
draft_asset.blob.save()

del draft_asset.metadata['digest']
draft_asset.save()

tasks.validate_asset_metadata_task(draft_asset.id)

draft_asset.refresh_from_db()
Expand Down
1 change: 1 addition & 0 deletions dandiapi/api/tests/test_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,7 @@ def test_version_rest_publish_zarr(
zarr_archive = zarr_archive_factory(dandiset=draft_version.dandiset)
zarr_file_factory(zarr_archive=zarr_archive)
ingest_zarr_archive(zarr_archive.zarr_id)
zarr_archive.refresh_from_db()

zarr_asset: Asset = draft_asset_factory(zarr=zarr_archive, blob=None)
normal_asset: Asset = draft_asset_factory()
Expand Down
12 changes: 0 additions & 12 deletions dandiapi/zarr/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

from dandiapi.api.asset_paths import add_zarr_paths, delete_zarr_paths
from dandiapi.api.storage import get_storage_params
from dandiapi.api.tasks import validate_asset_metadata_task
from dandiapi.zarr.models import ZarrArchive, ZarrArchiveStatus

logger = get_task_logger(__name__)
Expand Down Expand Up @@ -67,17 +66,6 @@ def ingest_zarr_archive(zarr_id: str, force: bool = False):
# Add asset paths after ingest is finished
add_zarr_paths(zarr)

# Use function instead of lambda for correct closure
def dispatch_validation():
for asset_id in zarr.assets.values_list('id', flat=True).iterator():
# Note: while asset metadata is fairly lightweight compute-wise, memory-wise it can
# become an issue during serialization/deserialization of the JSON blob by pydantic.
# Therefore, we delay each validation to its own task.
validate_asset_metadata_task.delay(asset_id)

# Kickoff metadata validation for all zarr assets after the transaction completes
transaction.on_commit(dispatch_validation)


def ingest_dandiset_zarrs(dandiset_id: int, **kwargs):
for zarr in ZarrArchive.objects.filter(dandiset__id=dandiset_id):
Expand Down

0 comments on commit 0961f55

Please sign in to comment.