diff --git a/changelog.d/20241224_150942_maria_clear_cache_cron_job.md b/changelog.d/20241224_150942_maria_clear_cache_cron_job.md new file mode 100644 index 000000000000..e56e1debfe9e --- /dev/null +++ b/changelog.d/20241224_150942_maria_clear_cache_cron_job.md @@ -0,0 +1,12 @@ +### Added + +- Setting `TMP_FILE_OR_DIR_RETENTION_DAYS`, which defines maximum retention period + of a file or dir in temporary directory + () +- Cron job to remove outdated files and directories from CVAT tmp directory + () + +### Changed + +- Export cache cleaning moved to a separate cron job + () diff --git a/cvat/apps/dataset_manager/cron.py b/cvat/apps/dataset_manager/cron.py new file mode 100644 index 000000000000..278bbe945d51 --- /dev/null +++ b/cvat/apps/dataset_manager/cron.py @@ -0,0 +1,143 @@ +# Copyright (C) 2025 CVAT.ai Corporation +# +# SPDX-License-Identifier: MIT + +from __future__ import annotations + +import os +import shutil +from abc import ABCMeta, abstractmethod +from datetime import timedelta +from pathlib import Path +from typing import ClassVar, Type + +from django.conf import settings +from django.utils import timezone + +from cvat.apps.dataset_manager.util import ( + CacheFileOrDirPathParseError, + ExportCacheManager, + TmpDirManager, + get_export_cache_lock, +) +from cvat.apps.dataset_manager.views import ( + EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT, + EXPORT_CACHE_LOCK_TTL, + get_export_cache_ttl, + log_exception, +) +from cvat.apps.engine.log import ServerLogManager + +logger = ServerLogManager(__name__).glob + + +def clear_export_cache(file_path: Path) -> bool: + with get_export_cache_lock( + file_path, + block=True, + acquire_timeout=EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT, + ttl=EXPORT_CACHE_LOCK_TTL, + ): + parsed_filename = ExportCacheManager.parse_filename(file_path.name) + cache_ttl = get_export_cache_ttl(parsed_filename.instance_type) + + if timezone.now().timestamp() <= file_path.stat().st_mtime + cache_ttl.total_seconds(): + logger.debug(f"Export cache file {file_path.name!r} was recently accessed") + return False + + os.remove(file_path) + logger.debug(f"Export cache file {file_path.name!r} was successfully removed") + return True + + +class BaseCleaner(metaclass=ABCMeta): + task_description: ClassVar[str] + + def __init__(self) -> None: + self._number_of_removed_objects = 0 + + @property + def number_of_removed_objects(self) -> int: + return self._number_of_removed_objects + + @abstractmethod + def do_cleanup(self): + pass + + +class TmpDirectoryCleaner(BaseCleaner): + task_description: ClassVar[str] = "common temporary directory cleanup" + + def do_cleanup(self) -> None: + # we do not use locks here when handling objects from tmp directory + # because undesired race conditions are not possible here: + # 1. A temporary file/directory can be removed while checking access time. + # In that case an exception is expected and is handled by the cron process. + # 2. A temporary file/directory can be removed by the cron job only when it is outdated. + # 3. Each temporary file/directory has a unique name, so the race condition when one process is creating an object + # and another is removing it - impossible. + for child in os.scandir(TmpDirManager.TMP_ROOT): + try: + if ( + child.stat().st_atime + + timedelta(days=TmpDirManager.TMP_FILE_OR_DIR_RETENTION_DAYS).total_seconds() + < timezone.now().timestamp() + ): + if child.is_dir(): + shutil.rmtree(child.path) + else: + os.remove(child.path) + logger.debug(f"The {child.name} was successfully removed") + self._number_of_removed_objects += 1 + except FileNotFoundError: + # file or directory has been removed by another process + continue + except Exception: + log_exception(logger) + + +class ExportCacheDirectoryCleaner(BaseCleaner): + task_description: ClassVar[str] = "export cache directory cleanup" + + def do_cleanup(self) -> None: + export_cache_dir_path = settings.EXPORT_CACHE_ROOT + assert os.path.exists(export_cache_dir_path) + + for child in os.scandir(export_cache_dir_path): + # export cache directory is expected to contain only files + if not child.is_file(): + logger.warning(f"The {child.name} is not a file, skipping...") + continue + + try: + if clear_export_cache(child): + self._number_of_removed_objects += 1 + except CacheFileOrDirPathParseError: + logger.warning(f"Cannot parse {child.name}, skipping...") + continue + + except Exception: + log_exception(logger) + + +def cleanup(CleanerClass: Type[ExportCacheDirectoryCleaner | TmpDirectoryCleaner]) -> None: + assert issubclass(CleanerClass, BaseCleaner) + started_at = timezone.now() + + cleaner = CleanerClass() + cleaner.do_cleanup() + + finished_at = timezone.now() + logger.info( + f"The {cleaner.task_description!r} process has been successfully " + f"completed after {int((finished_at - started_at).total_seconds())} seconds. " + f"{cleaner.number_of_removed_objects} elements have been removed" + ) + + +def cleanup_export_cache_directory() -> None: + cleanup(ExportCacheDirectoryCleaner) + + +def cleanup_tmp_directory() -> None: + cleanup(TmpDirectoryCleaner) diff --git a/cvat/apps/dataset_manager/management/__init__.py b/cvat/apps/dataset_manager/management/__init__.py new file mode 100644 index 000000000000..71cfbec65515 --- /dev/null +++ b/cvat/apps/dataset_manager/management/__init__.py @@ -0,0 +1,4 @@ +# Copyright (C) 2025 CVAT.ai Corporation +# +# SPDX-License-Identifier: MIT + diff --git a/cvat/apps/dataset_manager/management/commands/__init__.py b/cvat/apps/dataset_manager/management/commands/__init__.py new file mode 100644 index 000000000000..71cfbec65515 --- /dev/null +++ b/cvat/apps/dataset_manager/management/commands/__init__.py @@ -0,0 +1,4 @@ +# Copyright (C) 2025 CVAT.ai Corporation +# +# SPDX-License-Identifier: MIT + diff --git a/cvat/apps/dataset_manager/management/commands/cleanuplegacyexportcache.py b/cvat/apps/dataset_manager/management/commands/cleanuplegacyexportcache.py new file mode 100644 index 000000000000..6775b8db7c91 --- /dev/null +++ b/cvat/apps/dataset_manager/management/commands/cleanuplegacyexportcache.py @@ -0,0 +1,46 @@ +# Copyright (C) 2025 CVAT.ai Corporation +# +# SPDX-License-Identifier: MIT + +import shutil +from contextlib import suppress +from pathlib import Path + +from django.core.management.base import BaseCommand +from django.utils import timezone + +from cvat.apps.engine.models import Job, Project, Task + + +class Command(BaseCommand): + help = "Cleanup outdated export cache" + + def handle(self, *args, **options): + def update_progress(): + progress = (i + 1) / objects_count + done = int(progress_bar_len * progress) + progress_bar = "#" * done + "-" * (progress_bar_len - done) + self.stdout.write(f"\rProgress: |{progress_bar}| {progress:.0%}", ending="") + + now = timezone.now() + progress_bar_len = shutil.get_terminal_size().columns // 2 + + for Model in (Project, Task, Job): + self.stdout.write(f"\nDeleting the export cache for {Model.__name__.lower()}s...") + queryset = Model.objects.filter(created_date__lt=now) + objects_count = queryset.count() + if objects_count < 1: + continue + + msg = ( + f"{objects_count} folders are going to be checked" + if objects_count > 1 + else "1 folder is going to be checked" + ) + self.stdout.write(msg) + + for i, obj in enumerate(queryset.iterator()): + update_progress() + export_cache_dir = Path(obj.get_dirname()) / "export_cache" + with suppress(FileNotFoundError): + shutil.rmtree(export_cache_dir) diff --git a/cvat/apps/dataset_manager/project.py b/cvat/apps/dataset_manager/project.py index ad51370b04e1..8f91e4f12651 100644 --- a/cvat/apps/dataset_manager/project.py +++ b/cvat/apps/dataset_manager/project.py @@ -3,9 +3,9 @@ # # SPDX-License-Identifier: MIT -import os +import io from collections.abc import Mapping -from tempfile import TemporaryDirectory +from contextlib import nullcontext from typing import Any, Callable import rq @@ -14,6 +14,7 @@ from django.db import transaction from cvat.apps.dataset_manager.task import TaskAnnotation +from cvat.apps.dataset_manager.util import TmpDirManager from cvat.apps.engine import models from cvat.apps.engine.log import DatasetLogManager from cvat.apps.engine.rq_job_handler import RQJobMetaField @@ -26,8 +27,15 @@ dlogger = DatasetLogManager() -def export_project(project_id, dst_file, format_name, - server_url=None, save_images=False): +def export_project( + project_id: int, + dst_file: str, + *, + format_name: str, + server_url: str | None = None, + save_images: bool = False, + temp_dir: str | None = None, +): # For big tasks dump function may run for a long time and # we dont need to acquire lock after the task has been initialized from DB. # But there is the bug with corrupted dump file in case 2 or @@ -39,7 +47,7 @@ def export_project(project_id, dst_file, format_name, exporter = make_exporter(format_name) with open(dst_file, 'wb') as f: - project.export(f, exporter, host=server_url, save_images=save_images) + project.export(f, exporter, host=server_url, save_images=save_images, temp_dir=temp_dir) class ProjectAnnotationAndData: def __init__(self, pk: int): @@ -131,16 +139,26 @@ def init_from_db(self): self.task_annotations[task.id] = annotation self.annotation_irs[task.id] = annotation.ir_data - def export(self, dst_file: str, exporter: Callable, host: str='', **options): + def export( + self, + dst_file: io.BufferedWriter, + exporter: Callable[..., None], + *, + host: str = '', + temp_dir: str | None = None, + **options + ): project_data = ProjectData( annotation_irs=self.annotation_irs, db_project=self.db_project, host=host ) - temp_dir_base = self.db_project.get_tmp_dirname() - os.makedirs(temp_dir_base, exist_ok=True) - with TemporaryDirectory(dir=temp_dir_base) as temp_dir: + with ( + TmpDirManager.get_tmp_directory_for_export( + instance_type=self.db_project.__class__.__name__, + ) if not temp_dir else nullcontext(temp_dir) + ) as temp_dir: exporter(dst_file, temp_dir, project_data, **options) def load_dataset_data(self, *args, **kwargs): @@ -155,9 +173,7 @@ def import_dataset(self, dataset_file, importer, **options): ) project_data.soft_attribute_import = True - temp_dir_base = self.db_project.get_tmp_dirname() - os.makedirs(temp_dir_base, exist_ok=True) - with TemporaryDirectory(dir=temp_dir_base) as temp_dir: + with TmpDirManager.get_tmp_directory() as temp_dir: try: importer(dataset_file, temp_dir, project_data, load_data_callback=self.load_dataset_data, **options) except (DatasetNotFoundError, CvatDatasetNotFoundError) as not_found: diff --git a/cvat/apps/dataset_manager/task.py b/cvat/apps/dataset_manager/task.py index 74f035d40787..ebc007dbb898 100644 --- a/cvat/apps/dataset_manager/task.py +++ b/cvat/apps/dataset_manager/task.py @@ -1,15 +1,15 @@ # Copyright (C) 2019-2022 Intel Corporation -# Copyright (C) 2022-2024 CVAT.ai Corporation +# Copyright (C) 2022-2025 CVAT.ai Corporation # # SPDX-License-Identifier: MIT +import io import itertools -import os from collections import OrderedDict +from contextlib import nullcontext from copy import deepcopy from enum import Enum -from tempfile import TemporaryDirectory -from typing import Optional, Union +from typing import Callable, Optional, Union from datumaro.components.errors import DatasetError, DatasetImportError, DatasetNotFoundError from django.conf import settings @@ -26,6 +26,7 @@ ) from cvat.apps.dataset_manager.formats.registry import make_exporter, make_importer from cvat.apps.dataset_manager.util import ( + TmpDirManager, add_prefetch_fields, bulk_create, faster_deepcopy, @@ -768,16 +769,26 @@ def init_from_db(self): def data(self): return self.ir_data.data - def export(self, dst_file, exporter, host='', **options): + def export( + self, + dst_file: io.BufferedWriter, + exporter: Callable[..., None], + *, + host: str = '', + temp_dir: str | None = None, + **options + ): job_data = JobData( annotation_ir=self.ir_data, db_job=self.db_job, host=host, ) - temp_dir_base = self.db_job.get_tmp_dirname() - os.makedirs(temp_dir_base, exist_ok=True) - with TemporaryDirectory(dir=temp_dir_base) as temp_dir: + with ( + TmpDirManager.get_tmp_directory_for_export( + instance_type=self.db_job.__class__.__name__, + ) if not temp_dir else nullcontext(temp_dir) + ) as temp_dir: exporter(dst_file, temp_dir, job_data, **options) def import_annotations(self, src_file, importer, **options): @@ -788,9 +799,7 @@ def import_annotations(self, src_file, importer, **options): ) self.delete() - temp_dir_base = self.db_job.get_tmp_dirname() - os.makedirs(temp_dir_base, exist_ok=True) - with TemporaryDirectory(dir=temp_dir_base) as temp_dir: + with TmpDirManager.get_tmp_directory() as temp_dir: try: importer(src_file, temp_dir, job_data, **options) except (DatasetNotFoundError, CvatDatasetNotFoundError) as not_found: @@ -975,16 +984,26 @@ def init_from_db(self): self._merge_data(gt_annotation.ir_data, start_frame=db_job.segment.start_frame) - def export(self, dst_file, exporter, host='', **options): + def export( + self, + dst_file: io.BufferedWriter, + exporter: Callable[..., None], + *, + host: str = '', + temp_dir: str | None = None, + **options + ): task_data = TaskData( annotation_ir=self.ir_data, db_task=self.db_task, host=host, ) - temp_dir_base = self.db_task.get_tmp_dirname() - os.makedirs(temp_dir_base, exist_ok=True) - with TemporaryDirectory(dir=temp_dir_base) as temp_dir: + with ( + TmpDirManager.get_tmp_directory_for_export( + instance_type=self.db_task.__class__.__name__, + ) if not temp_dir else nullcontext(temp_dir) + ) as temp_dir: exporter(dst_file, temp_dir, task_data, **options) def import_annotations(self, src_file, importer, **options): @@ -995,9 +1014,7 @@ def import_annotations(self, src_file, importer, **options): ) self.delete() - temp_dir_base = self.db_task.get_tmp_dirname() - os.makedirs(temp_dir_base, exist_ok=True) - with TemporaryDirectory(dir=temp_dir_base) as temp_dir: + with TmpDirManager.get_tmp_directory() as temp_dir: try: importer(src_file, temp_dir, task_data, **options) except (DatasetNotFoundError, CvatDatasetNotFoundError) as not_found: @@ -1059,7 +1076,15 @@ def delete_job_data(pk, *, db_job: models.Job | None = None): annotation.delete() -def export_job(job_id, dst_file, format_name, server_url=None, save_images=False): +def export_job( + job_id: int, + dst_file: str, + *, + format_name: str, + server_url: str | None = None, + save_images=False, + temp_dir: str | None = None, +): # For big tasks dump function may run for a long time and # we dont need to acquire lock after the task has been initialized from DB. # But there is the bug with corrupted dump file in case 2 or @@ -1071,7 +1096,7 @@ def export_job(job_id, dst_file, format_name, server_url=None, save_images=False exporter = make_exporter(format_name) with open(dst_file, 'wb') as f: - job.export(f, exporter, host=server_url, save_images=save_images) + job.export(f, exporter, host=server_url, save_images=save_images, temp_dir=temp_dir) @silk_profile(name="GET task data") @@ -1113,7 +1138,15 @@ def delete_task_data(pk): annotation.delete() -def export_task(task_id, dst_file, format_name, server_url=None, save_images=False): +def export_task( + task_id: int, + dst_file: str, + *, + format_name: str, + server_url: str | None = None, + save_images: bool = False, + temp_dir: str | None = None, + ): # For big tasks dump function may run for a long time and # we dont need to acquire lock after the task has been initialized from DB. # But there is the bug with corrupted dump file in case 2 or @@ -1125,7 +1158,7 @@ def export_task(task_id, dst_file, format_name, server_url=None, save_images=Fal exporter = make_exporter(format_name) with open(dst_file, 'wb') as f: - task.export(f, exporter, host=server_url, save_images=save_images) + task.export(f, exporter, host=server_url, save_images=save_images, temp_dir=temp_dir) @transaction.atomic diff --git a/cvat/apps/dataset_manager/tests/test_formats.py b/cvat/apps/dataset_manager/tests/test_formats.py index bdf20d2f8e5c..bf9e868743cf 100644 --- a/cvat/apps/dataset_manager/tests/test_formats.py +++ b/cvat/apps/dataset_manager/tests/test_formats.py @@ -262,7 +262,7 @@ def _test_export(check, task, format_name, **export_args): with tempfile.TemporaryDirectory() as temp_dir: file_path = osp.join(temp_dir, format_name) dm.task.export_task(task["id"], file_path, - format_name, **export_args) + format_name=format_name, **export_args) check(file_path) @@ -986,7 +986,7 @@ def _test_can_import_annotations(self, task, import_format): if import_format == "CVAT 1.1": export_format = "CVAT for images 1.1" - dm.task.export_task(task["id"], file_path, export_format) + dm.task.export_task(task["id"], file_path, format_name=export_format) expected_ann = TaskAnnotation(task["id"]) expected_ann.init_from_db() diff --git a/cvat/apps/dataset_manager/tests/test_rest_api_formats.py b/cvat/apps/dataset_manager/tests/test_rest_api_formats.py index fe1addd2cbc5..0e0205bf69df 100644 --- a/cvat/apps/dataset_manager/tests/test_rest_api_formats.py +++ b/cvat/apps/dataset_manager/tests/test_rest_api_formats.py @@ -10,13 +10,13 @@ import os import os.path as osp import random -import shutil import xml.etree.ElementTree as ET import zipfile from contextlib import ExitStack, contextmanager from datetime import timedelta from functools import partial from io import BytesIO +from pathlib import Path from tempfile import TemporaryDirectory from time import sleep from typing import Any, Callable, ClassVar, Optional, overload @@ -34,10 +34,11 @@ import cvat.apps.dataset_manager as dm from cvat.apps.dataset_manager.bindings import CvatTaskOrJobDataExtractor, TaskData +from cvat.apps.dataset_manager.cron import clear_export_cache from cvat.apps.dataset_manager.task import TaskAnnotation from cvat.apps.dataset_manager.tests.utils import TestDir from cvat.apps.dataset_manager.util import get_export_cache_lock -from cvat.apps.dataset_manager.views import clear_export_cache, export, parse_export_file_path +from cvat.apps.dataset_manager.views import export from cvat.apps.engine.models import Task from cvat.apps.engine.tests.utils import ApiTestBase, ForceLogin, get_paginated_collection @@ -1448,7 +1449,7 @@ def test_concurrent_export_and_cleanup(self): export_checked_the_file = self.SharedBool() clear_has_been_finished = self.SharedBool() clear_removed_the_file = self.SharedBool() - export_outdated_after = timedelta(seconds=1) + export_outdated_after = timedelta(seconds=4) EXPORT_CACHE_LOCK_TTL = 4 EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT = EXPORT_CACHE_LOCK_TTL * 2 @@ -1485,7 +1486,7 @@ def patched_log_exception(logger=None, exc_info=True): # only after checking whether a file exists inside an acquired lock patch("cvat.apps.dataset_manager.views.osp_exists") as mock_osp_exists, patch( - "cvat.apps.dataset_manager.views.os.replace", side_effect=original_replace + "cvat.apps.dataset_manager.views.shutil.move", side_effect=original_replace ) as mock_os_replace, patch("cvat.apps.dataset_manager.views.log_exception", new=patched_log_exception), patch("cvat.apps.dataset_manager.views.task.export_task") as mock_export_fn, @@ -1502,42 +1503,31 @@ def patched_log_exception(logger=None, exc_info=True): set_condition(export_file_path, result_file) mock_os_replace.assert_not_called() - def _clear(*_, file_path: str, file_ctime: str): + def _clear(*_, file_path: str): from os import remove as original_remove - from cvat.apps.dataset_manager.views import FileIsBeingUsedError - with ( - patch("cvat.apps.dataset_manager.views.EXPORT_CACHE_LOCK_TTL", new=EXPORT_CACHE_LOCK_TTL), - patch("cvat.apps.dataset_manager.views.EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT", new=EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT), + patch("cvat.apps.dataset_manager.cron.EXPORT_CACHE_LOCK_TTL", new=EXPORT_CACHE_LOCK_TTL), + patch("cvat.apps.dataset_manager.cron.EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT", new=EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT), patch( - "cvat.apps.dataset_manager.views.get_export_cache_lock", + "cvat.apps.dataset_manager.cron.get_export_cache_lock", new=self.patched_get_export_cache_lock, ), patch( "cvat.apps.dataset_manager.views.os.remove" ) as mock_os_remove, - patch( - "cvat.apps.dataset_manager.views.rq.get_current_job" - ) as mock_rq_get_current_job, - patch("cvat.apps.dataset_manager.views.django_rq.get_scheduler"), patch( "cvat.apps.dataset_manager.views.TTL_CONSTS", new={"task": export_outdated_after}, ), ): - mock_rq_get_current_job.return_value = MagicMock(timeout=5) mock_os_remove.side_effect = chain_side_effects( original_remove, side_effect(set_condition, clear_removed_the_file), ) - try: - clear_export_cache( - file_path=file_path, file_ctime=file_ctime, logger=MagicMock() - ) - except FileIsBeingUsedError: - set_condition(clear_has_been_finished) + clear_export_cache(file_path=Path(file_path)) + set_condition(clear_has_been_finished) mock_os_remove.assert_not_called() @@ -1567,22 +1557,13 @@ def _clear(*_, file_path: str, file_ctime: str): task = self._setup_task_with_annotations(format_name=format_name) task_id = task["id"] - with ( - patch("cvat.apps.dataset_manager.views.rq.get_current_job") as mock_rq_get_current_job, - patch("cvat.apps.dataset_manager.views.django_rq.get_scheduler"), - ): - mock_rq_job = MagicMock(timeout=5) - mock_rq_get_current_job.return_value = mock_rq_job - - # create a file in the export cache - first_export_path = export(dst_format=format_name, task_id=task_id) - - initial_file_modfication_time = os.path.getmtime(first_export_path) - # make sure that a file in the export cache is outdated by timeout - # and a file would have to be deleted if the export was not running in parallel - sleep(export_outdated_after.seconds + 1) + # create a file in the export cache + first_export_path = export(dst_format=format_name, task_id=task_id) - export_instance_timestamp = parse_export_file_path(first_export_path).instance_timestamp + initial_file_modfication_time = os.path.getmtime(first_export_path) + # make sure that a file in the export cache is outdated by timeout + # and a file would have to be deleted if the export was not running in parallel + sleep(export_outdated_after.seconds + 1) processes_finished_correctly = False with ExitStack() as es: @@ -1609,7 +1590,7 @@ def _clear(*_, file_path: str, file_ctime: str): export_checked_the_file, ), kwargs=dict( - file_path=first_export_path, file_ctime=export_instance_timestamp + file_path=first_export_path ), ) ) @@ -1701,22 +1682,18 @@ def patched_osp_exists(path: str): mock_osp_exists.assert_called() - def _clear(*_, file_path: str, file_ctime: str): + def _clear(*_, file_path: str): from os import remove as original_remove from cvat.apps.dataset_manager.util import LockNotAvailableError with ( - patch("cvat.apps.dataset_manager.views.EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT", new=3), + patch("cvat.apps.dataset_manager.cron.EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT", new=3), patch( - "cvat.apps.dataset_manager.views.get_export_cache_lock", + "cvat.apps.dataset_manager.cron.get_export_cache_lock", new=self.patched_get_export_cache_lock, ), - patch("cvat.apps.dataset_manager.views.os.remove") as mock_os_remove, - patch( - "cvat.apps.dataset_manager.views.rq.get_current_job" - ) as mock_rq_get_current_job, - patch("cvat.apps.dataset_manager.views.django_rq.get_scheduler"), + patch("cvat.apps.dataset_manager.cron.os.remove") as mock_os_remove, patch( "cvat.apps.dataset_manager.views.TTL_CONSTS", new={"task": timedelta(seconds=0)} ), @@ -1726,13 +1703,9 @@ def _clear(*_, file_path: str, file_ctime: str): side_effect(set_condition, clear_removed_the_file), ) - mock_rq_get_current_job.return_value = MagicMock(timeout=5) - exited_by_timeout = False try: - clear_export_cache( - file_path=file_path, file_ctime=file_ctime, logger=MagicMock() - ) + clear_export_cache(file_path=Path(file_path)) except LockNotAvailableError: # should come from waiting for get_export_cache_lock exited_by_timeout = True @@ -1771,8 +1744,6 @@ def patched_export(*args, **kwargs): response = self._get_request_with_data(download_url, download_params, self.admin) self.assertEqual(response.status_code, status.HTTP_201_CREATED) - export_instance_time = parse_export_file_path(export_path).instance_timestamp - download_params["action"] = "download" processes_finished_correctly = False @@ -1793,7 +1764,7 @@ def patched_export(*args, **kwargs): multiprocessing.Process( target=_clear, args=(download_checked_the_file, clear_removed_the_file), - kwargs=dict(file_path=export_path, file_ctime=export_instance_time), + kwargs=dict(file_path=export_path), ) ) ) @@ -1833,28 +1804,17 @@ def patched_export(*args, **kwargs): self.assertFalse(clear_removed_the_file.get()) - def test_export_can_create_file_and_cleanup_job(self): + def test_export_can_create_file(self): format_name = "CVAT for images 1.1" task = self._setup_task_with_annotations(format_name=format_name) task_id = task["id"] with ( - patch("cvat.apps.dataset_manager.views.rq.get_current_job") as mock_rq_get_current_job, - patch( - "cvat.apps.dataset_manager.views.django_rq.get_scheduler" - ) as mock_rq_get_scheduler, patch("cvat.apps.dataset_manager.views.TTL_CONSTS", new={"task": timedelta(seconds=0)}), ): - mock_rq_job = MagicMock(timeout=5) - mock_rq_get_current_job.return_value = mock_rq_job - - mock_rq_scheduler = MagicMock() - mock_rq_get_scheduler.return_value = mock_rq_scheduler - export_path = export(dst_format=format_name, task_id=task_id) self.assertTrue(osp.isfile(export_path)) - mock_rq_scheduler.enqueue_in.assert_called_once() def test_export_cache_lock_can_raise_on_releasing_expired_lock(self): from pottery import ReleaseUnlockedLock @@ -1893,26 +1853,16 @@ def test_export_can_reuse_older_file_if_still_relevant(self): task = self._setup_task_with_annotations(format_name=format_name) task_id = task["id"] - with ( - patch("cvat.apps.dataset_manager.views.rq.get_current_job") as mock_rq_get_current_job, - patch("cvat.apps.dataset_manager.views.django_rq.get_scheduler"), - ): - mock_rq_get_current_job.return_value = MagicMock(timeout=5) - - first_export_path = export(dst_format=format_name, task_id=task_id) + first_export_path = export(dst_format=format_name, task_id=task_id) from os.path import exists as original_exists with ( - patch("cvat.apps.dataset_manager.views.rq.get_current_job") as mock_rq_get_current_job, - patch("cvat.apps.dataset_manager.views.django_rq.get_scheduler"), patch( "cvat.apps.dataset_manager.views.osp_exists", side_effect=original_exists ) as mock_osp_exists, - patch("cvat.apps.dataset_manager.views.os.replace") as mock_os_replace, + patch("cvat.apps.dataset_manager.views.shutil.move") as mock_os_replace, ): - mock_rq_get_current_job.return_value = MagicMock(timeout=5) - second_export_path = export(dst_format=format_name, task_id=task_id) self.assertEqual(first_export_path, second_export_path) @@ -1958,7 +1908,7 @@ def _export_1( "cvat.apps.dataset_manager.views.get_export_cache_lock", new=self.patched_get_export_cache_lock, ), - patch("cvat.apps.dataset_manager.views.os.replace") as mock_os_replace, + patch("cvat.apps.dataset_manager.views.shutil.move") as mock_os_replace, patch("cvat.apps.dataset_manager.views.task.export_task") as mock_export_fn, patch("cvat.apps.dataset_manager.views.django_rq.get_scheduler"), ): @@ -1998,7 +1948,7 @@ def _export_2( "cvat.apps.dataset_manager.views.get_export_cache_lock", new=self.patched_get_export_cache_lock, ), - patch("cvat.apps.dataset_manager.views.os.replace") as mock_os_replace, + patch("cvat.apps.dataset_manager.views.shutil.move") as mock_os_replace, patch("cvat.apps.dataset_manager.views.task.export_task") as mock_export_fn, patch("cvat.apps.dataset_manager.views.django_rq.get_scheduler"), ): @@ -2077,141 +2027,84 @@ def test_cleanup_can_remove_file(self): task = self._setup_task_with_annotations(format_name=format_name) task_id = task["id"] - with ( - patch("cvat.apps.dataset_manager.views.rq.get_current_job") as mock_rq_get_current_job, - patch("cvat.apps.dataset_manager.views.django_rq.get_scheduler"), - ): - mock_rq_get_current_job.return_value = MagicMock(timeout=5) - - export_path = export(dst_format=format_name, task_id=task_id) + export_path = export(dst_format=format_name, task_id=task_id) with ( - patch("cvat.apps.dataset_manager.views.rq.get_current_job") as mock_rq_get_current_job, - patch("cvat.apps.dataset_manager.views.django_rq.get_scheduler"), patch("cvat.apps.dataset_manager.views.TTL_CONSTS", new={"task": timedelta(seconds=0)}), ): - mock_rq_get_current_job.return_value = MagicMock(timeout=5) - export_path = export(dst_format=format_name, task_id=task_id) - file_ctime = parse_export_file_path(export_path).instance_timestamp - clear_export_cache(file_path=export_path, file_ctime=file_ctime, logger=MagicMock()) + clear_export_cache(file_path=Path(export_path)) self.assertFalse(osp.isfile(export_path)) - def test_cleanup_can_request_retry_on_locking_failure(self): - format_name = "CVAT for images 1.1" - task = self._setup_task_with_annotations(format_name=format_name) - task_id = task["id"] - - from cvat.apps.dataset_manager.util import LockNotAvailableError - - with ( - patch("cvat.apps.dataset_manager.views.rq.get_current_job") as mock_rq_get_current_job, - patch("cvat.apps.dataset_manager.views.django_rq.get_scheduler"), - ): - mock_rq_get_current_job.return_value = MagicMock(timeout=5) - - export_path = export(dst_format=format_name, task_id=task_id) - - with ( - patch( - "cvat.apps.dataset_manager.views.get_export_cache_lock", - side_effect=LockNotAvailableError, - ) as mock_get_export_cache_lock, - patch("cvat.apps.dataset_manager.views.rq.get_current_job") as mock_rq_get_current_job, - patch("cvat.apps.dataset_manager.views.django_rq.get_scheduler"), - self.assertRaises(LockNotAvailableError), - ): - mock_rq_job = MagicMock(timeout=5) - mock_rq_get_current_job.return_value = mock_rq_job - - file_ctime = parse_export_file_path(export_path).instance_timestamp - clear_export_cache(file_path=export_path, file_ctime=file_ctime, logger=MagicMock()) - - mock_get_export_cache_lock.assert_called() - self.assertEqual(mock_rq_job.retries_left, 1) - self.assertTrue(osp.isfile(export_path)) def test_cleanup_can_fail_if_no_file(self): - with ( - patch("cvat.apps.dataset_manager.views.rq.get_current_job") as mock_rq_get_current_job, - patch("cvat.apps.dataset_manager.views.django_rq.get_scheduler"), - self.assertRaises(FileNotFoundError), - ): - mock_rq_job = MagicMock(timeout=5) - mock_rq_get_current_job.return_value = mock_rq_job - - clear_export_cache(file_path="non existent file path", file_ctime=0, logger=MagicMock()) + from cvat.apps.dataset_manager.util import CacheFileOrDirPathParseError + with self.assertRaises(CacheFileOrDirPathParseError): + clear_export_cache(file_path=Path("non existent file path")) def test_cleanup_can_defer_removal_if_file_is_used_recently(self): + from os import remove as original_remove format_name = "CVAT for images 1.1" task = self._setup_task_with_annotations(format_name=format_name) task_id = task["id"] - with ( - patch("cvat.apps.dataset_manager.views.rq.get_current_job") as mock_rq_get_current_job, - patch("cvat.apps.dataset_manager.views.django_rq.get_scheduler"), - ): - mock_rq_get_current_job.return_value = MagicMock(timeout=5) - - export_path = export(dst_format=format_name, task_id=task_id) - - from cvat.apps.dataset_manager.views import FileIsBeingUsedError + export_path = export(dst_format=format_name, task_id=task_id) with ( - patch("cvat.apps.dataset_manager.views.rq.get_current_job") as mock_rq_get_current_job, patch("cvat.apps.dataset_manager.views.TTL_CONSTS", new={"task": timedelta(hours=1)}), - self.assertRaises(FileIsBeingUsedError), + patch("cvat.apps.dataset_manager.cron.os.remove", side_effect=original_remove) as mock_os_remove, ): - mock_rq_job = MagicMock(timeout=5) - mock_rq_get_current_job.return_value = mock_rq_job - export_path = export(dst_format=format_name, task_id=task_id) - file_ctime = parse_export_file_path(export_path).instance_timestamp - clear_export_cache(file_path=export_path, file_ctime=file_ctime, logger=MagicMock()) + clear_export_cache(file_path=Path(export_path)) + mock_os_remove.assert_not_called() - self.assertEqual(mock_rq_job.retries_left, 1) self.assertTrue(osp.isfile(export_path)) - def test_cleanup_can_be_called_with_old_signature_and_values(self): - # Test RQ jobs for backward compatibility of API prior to the PR - # https://github.com/cvat-ai/cvat/pull/7864 - # Jobs referring to the old API can exist in the redis queues after the server is updated - - format_name = "CVAT for images 1.1" - task = self._setup_task_with_annotations(format_name=format_name) - task_id = task["id"] - - with ( - patch("cvat.apps.dataset_manager.views.rq.get_current_job") as mock_rq_get_current_job, - patch("cvat.apps.dataset_manager.views.django_rq.get_scheduler"), - ): - mock_rq_get_current_job.return_value = MagicMock(timeout=5) - - new_export_path = export(dst_format=format_name, task_id=task_id) - - file_ctime = parse_export_file_path(new_export_path).instance_timestamp - - old_export_path = osp.join( - osp.dirname(new_export_path), "annotations_cvat-for-images-11.ZIP" - ) - shutil.move(new_export_path, old_export_path) + def test_cleanup_cron_job_can_delete_cached_files(self): + from cvat.apps.dataset_manager.cron import cleanup_export_cache_directory - old_kwargs = { - "file_path": old_export_path, - "file_ctime": file_ctime, - "logger": MagicMock(), - } - - with ( - patch("cvat.apps.dataset_manager.views.rq.get_current_job") as mock_rq_get_current_job, - patch("cvat.apps.dataset_manager.views.TTL_CONSTS", new={"task": timedelta(seconds=0)}), - ): - mock_rq_get_current_job.return_value = MagicMock(timeout=5) + def _get_project_task_job_ids(): + project = self._create_project(projects["main"]) + project_id = project["id"] - clear_export_cache(**old_kwargs) + images = self._generate_task_images(3) + task = self._create_task( + data=tasks["task in project #1"], + image_data=images, + ) + task_id = task["id"] + job_id = self._get_jobs(task_id)[0]["id"] + return project_id, task_id, job_id + + # remove chunks from the cache + self._clear_temp_data() + project_id, task_id, job_id = _get_project_task_job_ids() + + for resource, rid in zip(("project", "task", "job"), (project_id, task_id, job_id)): + for save_images in (True, False): + export_path = export( + dst_format="CVAT for images 1.1", + save_images=save_images, + **{resource + "_id": rid}, + ) + self.assertTrue(osp.isfile(export_path)) + self.assertTrue(resource in export_path) + + with ( + patch( + "cvat.apps.dataset_manager.views.TTL_CONSTS", + new={resource: timedelta(seconds=0)}, + ), + patch( + "cvat.apps.dataset_manager.cron.clear_export_cache", + side_effect=clear_export_cache, + ) as mock_clear_export_cache, + ): + cleanup_export_cache_directory() + mock_clear_export_cache.assert_called_once() - self.assertFalse(osp.isfile(old_export_path)) + self.assertFalse(osp.exists(export_path)) class ProjectDumpUpload(_DbTestBase): diff --git a/cvat/apps/dataset_manager/util.py b/cvat/apps/dataset_manager/util.py index 6d814ec2c679..f2d2e05001d2 100644 --- a/cvat/apps/dataset_manager/util.py +++ b/cvat/apps/dataset_manager/util.py @@ -1,5 +1,5 @@ # Copyright (C) 2019-2022 Intel Corporation -# Copyright (C) 2023-2024 CVAT.ai Corporation +# Copyright (C) 2023-2025 CVAT.ai Corporation # # SPDX-License-Identifier: MIT @@ -7,13 +7,15 @@ import os import os.path as osp import re +import tempfile import zipfile from collections.abc import Generator, Sequence from contextlib import contextmanager from copy import deepcopy from datetime import timedelta +from enum import Enum from threading import Lock -from typing import Any, Optional +from typing import Any import attrs import django_rq @@ -23,8 +25,6 @@ from django.db import models from pottery import Redlock -from cvat.apps.engine.models import Job, Project, Task - def current_function_name(depth=1): return inspect.getouterframes(inspect.currentframe())[depth].function @@ -41,7 +41,7 @@ def make_zip_archive(src_path, dst_path): def bulk_create(db_model, objects, flt_param): if objects: if flt_param: - if 'postgresql' in settings.DATABASES["default"]["ENGINE"]: + if "postgresql" in settings.DATABASES["default"]["ENGINE"]: return db_model.objects.bulk_create(objects) else: ids = list(db_model.objects.filter(**flt_param).values_list('id', flat=True)) @@ -53,9 +53,11 @@ def bulk_create(db_model, objects, flt_param): return [] + def is_prefetched(queryset: models.QuerySet, field: str) -> bool: return field in queryset._prefetch_related_lookups + def add_prefetch_fields(queryset: models.QuerySet, fields: Sequence[str]) -> models.QuerySet: for field in fields: if not is_prefetched(queryset, field): @@ -63,6 +65,7 @@ def add_prefetch_fields(queryset: models.QuerySet, fields: Sequence[str]) -> mod return queryset + def get_cached(queryset: models.QuerySet, pk: int) -> models.Model: """ Like regular queryset.get(), but checks for the cached values first @@ -82,6 +85,7 @@ def get_cached(queryset: models.QuerySet, pk: int) -> models.Model: return result + def faster_deepcopy(v): "A slightly optimized version of the default deepcopy, can be used as a drop-in replacement." # Default deepcopy is very slow, here we do shallow copy for primitive types and containers @@ -100,6 +104,9 @@ def faster_deepcopy(v): class LockNotAvailableError(Exception): pass +class CacheFileOrDirPathParseError(Exception): + pass + def make_export_cache_lock_key(filename: os.PathLike[str]) -> str: return f"export_lock:{os.fspath(filename)}" @@ -147,94 +154,203 @@ def get_export_cache_lock( if acquired: lock.release() +class OperationType(str, Enum): + EXPORT = "export" -EXPORT_CACHE_DIR_NAME = 'export_cache' +class ExportFileType(str, Enum): + ANNOTATIONS = "annotations" + BACKUP = "backup" + DATASET = "dataset" -def get_export_cache_dir(db_instance: Project | Task | Job) -> str: - base_dir = osp.abspath(db_instance.get_dirname()) + @classmethod + def values(cls) -> list[str]: + return list(map(lambda x: x.value, cls)) - if osp.isdir(base_dir): - return osp.join(base_dir, EXPORT_CACHE_DIR_NAME) - else: - raise FileNotFoundError( - '{} dir {} does not exist'.format(db_instance.__class__.__name__, base_dir) - ) +class InstanceType(str, Enum): + PROJECT = "project" + TASK = "task" + JOB = "job" + + @classmethod + def values(cls) -> list[str]: + return list(map(lambda x: x.value, cls)) + +@attrs.frozen +class _ParsedExportFilename: + file_type: ExportFileType + file_ext: str + instance_type: InstanceType = attrs.field(converter=InstanceType) + instance_id: int + instance_timestamp: float = attrs.field(converter=float) -def make_export_filename( - dst_dir: str, - save_images: bool, - instance_timestamp: float, - format_name: str, -) -> str: - from .formats.registry import EXPORT_FORMATS - file_ext = EXPORT_FORMATS[format_name].EXT +@attrs.frozen +class ParsedDatasetFilename(_ParsedExportFilename): + format_repr: str + + +@attrs.frozen +class ParsedBackupFilename(_ParsedExportFilename): + pass - filename = '%s-instance%f-%s.%s' % ( - 'dataset' if save_images else 'annotations', + +class TmpDirManager: + SPLITTER = "-" + TMP_ROOT = settings.TMP_FILES_ROOT + TMP_FILE_OR_DIR_RETENTION_DAYS = settings.TMP_FILE_OR_DIR_RETENTION_DAYS + + @classmethod + @contextmanager + def get_tmp_directory( + cls, + *, + prefix: str | None = None, + suffix: str | None = None, + ignore_cleanup_errors: bool | None = None, + ) -> Generator[str, Any, Any]: + """ + The method allows to create a temporary directory and + ensures that the parent directory uses the CVAT tmp directory + """ + params = {} + for k, v in { + "prefix": prefix, + "suffix": suffix, + "ignore_cleanup_errors": ignore_cleanup_errors, + }.items(): + if v is not None: + params[k] = v + + with tempfile.TemporaryDirectory(**params, dir=cls.TMP_ROOT) as tmp_dir: + yield tmp_dir + + @classmethod + @contextmanager + def get_tmp_directory_for_export( + cls, + *, + instance_type: str, + ) -> Generator[str, Any, Any]: + instance_type = InstanceType(instance_type.lower()) + with cls.get_tmp_directory( + prefix=cls.SPLITTER.join([OperationType.EXPORT, instance_type]) + cls.SPLITTER + ) as tmp_dir: + yield tmp_dir + + +class ExportCacheManager: + SPLITTER = "-" + INSTANCE_PREFIX = "instance" + FILE_NAME_TEMPLATE = SPLITTER.join([ + "{instance_type}", "{instance_id}", "{file_type}", INSTANCE_PREFIX + # store the instance timestamp in the file name to reliably get this information # ctime / mtime do not return file creation time on linux # mtime is used for file usage checks - instance_timestamp, - make_file_name(to_snake_case(format_name)), - file_ext, - ) - return osp.join(dst_dir, filename) + "{instance_timestamp}{optional_suffix}.{file_ext}" + ]) + + @classmethod + def make_dataset_file_path( + cls, + *, + instance_type: str, + instance_id: int, + instance_timestamp: float, + save_images: bool, + format_name: str, + ) -> str: + from .formats.registry import EXPORT_FORMATS + + file_ext = (EXPORT_FORMATS[format_name].EXT).lower() + + instance_type = InstanceType(instance_type.lower()) + file_type = ExportFileType.DATASET if save_images else ExportFileType.ANNOTATIONS + + normalized_format_name = make_file_name(to_snake_case(format_name)) + filename = cls.FILE_NAME_TEMPLATE.format_map( + { + "instance_type": instance_type, + "instance_id": instance_id, + "file_type": file_type, + "instance_timestamp": instance_timestamp, + "optional_suffix": cls.SPLITTER + normalized_format_name, + "file_ext": file_ext, + } + ) + return osp.join(settings.EXPORT_CACHE_ROOT, filename) + + @classmethod + def make_backup_file_path( + cls, + *, + instance_type: str, + instance_id: int, + instance_timestamp: float, + ) -> str: + instance_type = InstanceType(instance_type.lower()) + filename = cls.FILE_NAME_TEMPLATE.format_map( + { + "instance_type": instance_type, + "instance_id": instance_id, + "file_type": ExportFileType.BACKUP, + "instance_timestamp": instance_timestamp, + "optional_suffix": "", + "file_ext": "zip", + } + ) + return osp.join(settings.EXPORT_CACHE_ROOT, filename) + + @classmethod + def parse_filename( + cls, filename: str, + ) -> ParsedDatasetFilename | ParsedBackupFilename: + basename, file_ext = osp.splitext(filename) + file_ext = file_ext.strip(".").lower() + basename_match = re.fullmatch( + ( + rf"^(?P{'|'.join(InstanceType.values())})" + rf"{cls.SPLITTER}(?P\d+)" + rf"{cls.SPLITTER}(?P{'|'.join(ExportFileType.values())})" + rf"{cls.SPLITTER}(?P.+)$" + ), + basename, + ) -@attrs.define -class ParsedExportFilename: - instance_type: str - has_images: bool - instance_timestamp: Optional[float] - format_repr: str - file_ext: str + if not basename_match: + raise CacheFileOrDirPathParseError(f"Couldn't parse file name: {basename!r}") + fragments = basename_match.groupdict() + fragments["instance_id"] = int(fragments["instance_id"]) -def parse_export_file_path(file_path: os.PathLike[str]) -> ParsedExportFilename: - file_path = osp.normpath(file_path) - dirname, basename = osp.split(file_path) + unparsed = fragments.pop("unparsed")[len(cls.INSTANCE_PREFIX):] + specific_params = {} + + if fragments["file_type"] in (ExportFileType.DATASET, ExportFileType.ANNOTATIONS): + try: + instance_timestamp, format_repr = unparsed.split(cls.SPLITTER, maxsplit=1) + except ValueError: + raise CacheFileOrDirPathParseError(f"Couldn't parse file name: {basename!r}") + + specific_params["format_repr"] = format_repr + ParsedFileNameClass = ParsedDatasetFilename + else: + instance_timestamp = unparsed + ParsedFileNameClass = ParsedBackupFilename + + try: + parsed_file_name = ParsedFileNameClass( + file_ext=file_ext, + instance_timestamp=instance_timestamp, + **fragments, + **specific_params, + ) + except ValueError as ex: + raise CacheFileOrDirPathParseError(f"Couldn't parse file name: {basename!r}") from ex + + return parsed_file_name - basename_match = re.fullmatch( - ( - r'(?Pdataset|annotations)' - # optional for backward compatibility - r'(?:-instance(?P\d+\.\d+)-|_)' - r'(?P.+)' - r'\.(?P.+)' - ), - basename - ) - if not basename_match: - raise ValueError(f"Couldn't parse filename components in '{basename}'") - - dirname_match = re.search(rf'/(jobs|tasks|projects)/\d+/{EXPORT_CACHE_DIR_NAME}$', dirname) - if not dirname_match: - raise ValueError(f"Couldn't parse instance type in '{dirname}'") - - match dirname_match.group(1): - case 'jobs': - instance_type_name = 'job' - case 'tasks': - instance_type_name = 'task' - case 'projects': - instance_type_name = 'project' - case _: - assert False - - if instance_timestamp_str := basename_match.groupdict().get('instance_timestamp'): - instance_timestamp = float(instance_timestamp_str) - else: - instance_timestamp = None - - return ParsedExportFilename( - instance_type=instance_type_name, - has_images=basename_match.group('export_mode') == 'dataset', - instance_timestamp=instance_timestamp, - format_repr=basename_match.group('format_tag'), - file_ext=basename_match.group('file_ext'), - ) def extend_export_file_lifetime(file_path: str): # Update the last modification time to extend the export's lifetime, diff --git a/cvat/apps/dataset_manager/views.py b/cvat/apps/dataset_manager/views.py index 4dcd8304e43d..351bd4c76491 100644 --- a/cvat/apps/dataset_manager/views.py +++ b/cvat/apps/dataset_manager/views.py @@ -6,7 +6,7 @@ import logging import os import os.path as osp -import tempfile +import shutil from datetime import timedelta from os.path import exists as osp_exists @@ -24,15 +24,13 @@ from cvat.apps.engine.utils import get_rq_lock_by_user from .formats.registry import EXPORT_FORMATS, IMPORT_FORMATS -from .util import EXPORT_CACHE_DIR_NAME # pylint: disable=unused-import from .util import ( + ExportCacheManager, LockNotAvailableError, + TmpDirManager, current_function_name, extend_export_file_lifetime, - get_export_cache_dir, get_export_cache_lock, - make_export_filename, - parse_export_file_path, ) slogger = ServerLogManager(__name__) @@ -75,7 +73,7 @@ def _patch_scheduled_job_status(job: rq.job.Job): if job.get_status(refresh=False) != rq.job.JobStatus.SCHEDULED: job.set_status(rq.job.JobStatus.SCHEDULED) -def _retry_current_rq_job(time_delta: timedelta) -> rq.job.Job: +def retry_current_rq_job(time_delta: timedelta) -> rq.job.Job: # TODO: implement using retries once we move from rq_scheduler to builtin RQ scheduler # for better reliability and error reporting @@ -136,8 +134,7 @@ def export( db_instance = Job.objects.get(pk=job_id) cache_ttl = get_export_cache_ttl(db_instance) - - cache_dir = get_export_cache_dir(db_instance) + instance_type = db_instance.__class__.__name__ # As we're not locking the db object here, it can be updated by the time of actual export. # The file will be saved with the older timestamp. @@ -151,12 +148,14 @@ def export( )) instance_update_time = max(tasks_update + [instance_update_time]) - output_path = make_export_filename( - cache_dir, save_images, instance_update_time.timestamp(), dst_format + output_path = ExportCacheManager.make_dataset_file_path( + instance_id=db_instance.id, + instance_type=instance_type, + instance_timestamp=instance_update_time.timestamp(), + save_images=save_images, + format_name=dst_format ) - os.makedirs(cache_dir, exist_ok=True) - # acquire a lock 2 times instead of using one long lock: # 1. to check whether the file exists or not # 2. to create a file when it doesn't exist @@ -169,43 +168,33 @@ def export( extend_export_file_lifetime(output_path) return output_path - with tempfile.TemporaryDirectory(dir=cache_dir) as temp_dir: + with TmpDirManager.get_tmp_directory_for_export(instance_type=instance_type) as temp_dir: temp_file = osp.join(temp_dir, 'result') - export_fn(db_instance.id, temp_file, dst_format, - server_url=server_url, save_images=save_images) + # create a subdirectory to store export-related files, + # which will be fully included in the resulting archive + temp_subdir = osp.join(temp_dir, 'subdir') + os.makedirs(temp_subdir, exist_ok=True) + + export_fn(db_instance.id, temp_file, format_name=dst_format, + server_url=server_url, save_images=save_images, temp_dir=temp_subdir) + with get_export_cache_lock( output_path, ttl=EXPORT_CACHE_LOCK_TTL, acquire_timeout=EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT, ): - os.replace(temp_file, output_path) - - scheduler: Scheduler = django_rq.get_scheduler(settings.CVAT_QUEUES.EXPORT_DATA.value) - cleaning_job = scheduler.enqueue_in( - time_delta=cache_ttl, - func=clear_export_cache, - file_path=output_path, - file_ctime=instance_update_time.timestamp(), - logger=logger, - ) - _patch_scheduled_job_status(cleaning_job) + shutil.move(temp_file, output_path) + logger.info( - "The {} '{}' is exported as '{}' at '{}' " - "and available for downloading for the next {}. " - "Export cache cleaning job is enqueued, id '{}'".format( - db_instance.__class__.__name__.lower(), - db_instance.id, - dst_format, - output_path, - cache_ttl, - cleaning_job.id, - ) + f"The {db_instance.__class__.__name__.lower()} '{db_instance.id}' is exported " + f"as {dst_format!r} at {output_path!r} and available for downloading for the next " + f"{cache_ttl.total_seconds()} seconds. " ) return output_path except LockNotAvailableError: # Need to retry later if the lock was not available - _retry_current_rq_job(EXPORT_LOCKED_RETRY_INTERVAL) + retry_current_rq_job(EXPORT_LOCKED_RETRY_INTERVAL) logger.info( "Failed to acquire export cache lock. Retrying in {}".format( EXPORT_LOCKED_RETRY_INTERVAL @@ -234,52 +223,6 @@ def export_project_as_dataset(project_id: int, dst_format: str, *, server_url: s def export_project_annotations(project_id: int, dst_format: str, *, server_url: str | None = None): return export(dst_format=dst_format, project_id=project_id, server_url=server_url, save_images=False) - -class FileIsBeingUsedError(Exception): - pass - -def clear_export_cache(file_path: str, file_ctime: float, logger: logging.Logger) -> None: - # file_ctime is for backward compatibility with older RQ jobs, not needed now - - try: - with get_export_cache_lock( - file_path, - block=True, - acquire_timeout=EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT, - ttl=EXPORT_CACHE_LOCK_TTL, - ): - if not osp.exists(file_path): - raise FileNotFoundError("Export cache file '{}' doesn't exist".format(file_path)) - - parsed_filename = parse_export_file_path(file_path) - cache_ttl = get_export_cache_ttl(parsed_filename.instance_type) - - if timezone.now().timestamp() <= osp.getmtime(file_path) + cache_ttl.total_seconds(): - # Need to retry later, the export is in use - _retry_current_rq_job(cache_ttl) - logger.info( - "Export cache file '{}' is recently accessed, will retry in {}".format( - file_path, cache_ttl - ) - ) - raise FileIsBeingUsedError # should be handled by the worker - - # TODO: maybe remove all outdated exports - os.remove(file_path) - logger.info("Export cache file '{}' successfully removed".format(file_path)) - except LockNotAvailableError: - # Need to retry later if the lock was not available - _retry_current_rq_job(EXPORT_LOCKED_RETRY_INTERVAL) - logger.info( - "Failed to acquire export cache lock. Retrying in {}".format( - EXPORT_LOCKED_RETRY_INTERVAL - ) - ) - raise - except Exception: - log_exception(logger) - raise - def get_export_formats(): return list(EXPORT_FORMATS.values()) diff --git a/cvat/apps/engine/background.py b/cvat/apps/engine/background.py index a3a2d34326b9..f2b5d0e89b6d 100644 --- a/cvat/apps/engine/background.py +++ b/cvat/apps/engine/background.py @@ -23,6 +23,7 @@ from rq.job import JobStatus as RQJobStatus import cvat.apps.dataset_manager as dm +from cvat.apps.dataset_manager.util import extend_export_file_lifetime from cvat.apps.engine import models from cvat.apps.engine.backup import ProjectExporter, TaskExporter, create_backup from cvat.apps.engine.cloud_provider import export_resource_to_cloud_storage @@ -330,7 +331,7 @@ def handle_local_download() -> Response: acquire_timeout=LOCK_ACQUIRE_TIMEOUT, ): if osp.exists(file_path) and not is_result_outdated(): - dm.util.extend_export_file_lifetime(file_path) + extend_export_file_lifetime(file_path) return Response(status=status.HTTP_201_CREATED) @@ -545,6 +546,10 @@ def _handle_rq_job_v1( rq_job: Optional[RQJob], queue: DjangoRQ, ) -> Optional[Response]: + + def is_result_outdated() -> bool: + return rq_job.meta[RQJobMetaField.REQUEST]["timestamp"] < last_instance_update_time + last_instance_update_time = timezone.localtime(self.db_instance.updated_date) timestamp = self.get_timestamp(last_instance_update_time) @@ -604,25 +609,36 @@ def _handle_rq_job_v1( status=status.HTTP_500_INTERNAL_SERVER_ERROR, ) - elif not os.path.exists(file_path): - return Response( - "The export result is not found", - status=status.HTTP_500_INTERNAL_SERVER_ERROR, - ) if action == "download": - filename = self.export_args.filename or build_backup_file_name( - class_name=self.resource, - identifier=self.db_instance.name, - timestamp=timestamp, - extension=os.path.splitext(file_path)[1], - ) - - rq_job.delete() - return sendfile( - self.request, file_path, attachment=True, attachment_filename=filename - ) - - return Response(status=status.HTTP_201_CREATED) + with dm.util.get_export_cache_lock( + file_path, ttl=LOCK_TTL, acquire_timeout=LOCK_ACQUIRE_TIMEOUT + ): + if not os.path.exists(file_path): + return Response( + "The backup file has been expired, please retry backing up", + status=status.HTTP_404_NOT_FOUND, + ) + + filename = self.export_args.filename or build_backup_file_name( + class_name=self.resource, + identifier=self.db_instance.name, + timestamp=timestamp, + extension=os.path.splitext(file_path)[1], + ) + + rq_job.delete() + return sendfile( + self.request, file_path, attachment=True, attachment_filename=filename + ) + with dm.util.get_export_cache_lock( + file_path, ttl=LOCK_TTL, acquire_timeout=LOCK_ACQUIRE_TIMEOUT + ): + if osp.exists(file_path) and not is_result_outdated(): + extend_export_file_lifetime(file_path) + return Response(status=status.HTTP_201_CREATED) + + cancel_and_delete(rq_job) + return None else: raise NotImplementedError( f"Export to {self.export_args.location} location is not implemented yet" @@ -697,9 +713,8 @@ def setup_background_job( func = self.export_callback func_args = ( - self.db_instance, + self.db_instance.id, Exporter, - "{}_backup.zip".format(self.resource), logger, cache_ttl, ) diff --git a/cvat/apps/engine/backup.py b/cvat/apps/engine/backup.py index f3790427f5ba..3d9e420c6a03 100644 --- a/cvat/apps/engine/backup.py +++ b/cvat/apps/engine/backup.py @@ -8,17 +8,19 @@ import os import re import shutil -import tempfile import uuid +from abc import ABCMeta, abstractmethod from collections.abc import Collection, Iterable +from datetime import timedelta from enum import Enum from logging import Logger from tempfile import NamedTemporaryFile -from typing import Any, Optional, Union +from typing import Any, ClassVar, Optional, Type, Union from zipfile import ZipFile import django_rq from django.conf import settings +from django.core.exceptions import ObjectDoesNotExist from django.db import transaction from django.utils import timezone from rest_framework import serializers, status @@ -29,7 +31,20 @@ import cvat.apps.dataset_manager as dm from cvat.apps.dataset_manager.bindings import CvatImportError -from cvat.apps.dataset_manager.views import get_export_cache_dir, log_exception +from cvat.apps.dataset_manager.util import ( + ExportCacheManager, + TmpDirManager, + extend_export_file_lifetime, + get_export_cache_lock, +) +from cvat.apps.dataset_manager.views import ( + EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT, + EXPORT_CACHE_LOCK_TTL, + EXPORT_LOCKED_RETRY_INTERVAL, + LockNotAvailableError, + log_exception, + retry_current_rq_job, +) from cvat.apps.engine import models from cvat.apps.engine.cloud_provider import import_resource_from_cloud_storage from cvat.apps.engine.location import StorageType, get_location_configuration @@ -37,7 +52,6 @@ from cvat.apps.engine.models import ( DataChoice, Location, - Project, RequestAction, RequestSubresource, RequestTarget, @@ -328,7 +342,9 @@ def _get_db_jobs(self): return db_jobs return () -class _ExporterBase(): +class _ExporterBase(metaclass=ABCMeta): + ModelClass: ClassVar[models.Project | models.Task] + def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -356,7 +372,22 @@ def _write_directory(self, source_dir, zip_object, target_dir, recursive=True, e target_dir=target_dir, ) + @abstractmethod + def export_to(self, file: str | ZipFile, target_dir: str | None = None): + ... + + @classmethod + def get_object(cls, pk: int) -> models.Project | models.Task: + # FUTURE-FIXME: need to check permissions one more time when background task is called + try: + return cls.ModelClass.objects.get(pk=pk) + except ObjectDoesNotExist: + raise ValidationError(f'Such a {cls.ModelClass.__name__.lower()} does not exist') + + class TaskExporter(_ExporterBase, _TaskBackupBase): + ModelClass: ClassVar[models.Task] = models.Task + def __init__(self, pk, version=Version.V1): super().__init__(logger=slogger.task[pk]) @@ -542,7 +573,7 @@ def _export_task(self, zip_obj, target_dir=None): self._write_annotations(zip_obj, target_dir) self._write_annotation_guide(zip_obj, target_dir) - def export_to(self, file, target_dir=None): + def export_to(self, file: str | ZipFile, target_dir: str | None = None): if self._db_task.data.storage_method == StorageMethodChoice.FILE_SYSTEM and \ self._db_task.data.storage == StorageChoice.SHARE: raise Exception('The task cannot be exported because it does not contain any raw data') @@ -917,9 +948,11 @@ def _prepare_project_meta(self, project): return self._prepare_meta(allowed_fields, project) class ProjectExporter(_ExporterBase, _ProjectBackupBase): + ModelClass: ClassVar[models.Project] = models.Project + def __init__(self, pk, version=Version.V1): super().__init__(logger=slogger.project[pk]) - self._db_project = models.Project.objects.prefetch_related('tasks', 'annotation_guide__assets').select_related('annotation_guide').get(pk=pk) + self._db_project = self.ModelClass.objects.prefetch_related('tasks', 'annotation_guide__assets').select_related('annotation_guide').get(pk=pk) self._version = version db_labels = self._db_project.label_set.all().prefetch_related('attributespec_set') @@ -954,8 +987,8 @@ def serialize_project(): zip_object.writestr(self.MANIFEST_FILENAME, data=JSONRenderer().render(project)) - def export_to(self, filename): - with ZipFile(filename, 'w') as output_file: + def export_to(self, file: str, target_dir: str | None = None): + with ZipFile(file, 'w') as output_file: self._write_annotation_guide(output_file) self._write_manifest(output_file) self._write_tasks(output_file) @@ -1036,37 +1069,64 @@ def _import_project(filename, user, org_id): db_project = project_importer.import_project() return db_project.id -def create_backup(db_instance, Exporter, output_path, logger, cache_ttl): + +def create_backup( + instance_id: int, + Exporter: Type[ProjectExporter | TaskExporter], + logger: Logger, + cache_ttl: timedelta, +): + db_instance = Exporter.get_object(instance_id) + instance_type = db_instance.__class__.__name__ + instance_timestamp = timezone.localtime(db_instance.updated_date).timestamp() + + output_path = ExportCacheManager.make_backup_file_path( + instance_id=db_instance.id, + instance_type=instance_type, + instance_timestamp=instance_timestamp + ) + try: - cache_dir = get_export_cache_dir(db_instance) - output_path = os.path.join(cache_dir, output_path) - - instance_time = timezone.localtime(db_instance.updated_date).timestamp() - if not (os.path.exists(output_path) and \ - instance_time <= os.path.getmtime(output_path)): - os.makedirs(cache_dir, exist_ok=True) - with tempfile.TemporaryDirectory(dir=cache_dir) as temp_dir: - temp_file = os.path.join(temp_dir, 'dump') - exporter = Exporter(db_instance.id) - exporter.export_to(temp_file) - os.replace(temp_file, output_path) - - archive_ctime = os.path.getctime(output_path) - scheduler = django_rq.get_scheduler(settings.CVAT_QUEUES.IMPORT_DATA.value) - cleaning_job = scheduler.enqueue_in(time_delta=cache_ttl, - func=_clear_export_cache, - file_path=output_path, - file_ctime=archive_ctime, - logger=logger) + with get_export_cache_lock( + output_path, + block=True, + acquire_timeout=EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT, + ttl=EXPORT_CACHE_LOCK_TTL, + ): + # output_path includes timestamp of the last update + if os.path.exists(output_path): + extend_export_file_lifetime(output_path) + return output_path + + with TmpDirManager.get_tmp_directory_for_export(instance_type=instance_type) as tmp_dir: + temp_file = os.path.join(tmp_dir, 'dump') + exporter = Exporter(db_instance.id) + exporter.export_to(temp_file) + + with get_export_cache_lock( + output_path, + block=True, + acquire_timeout=EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT, + ttl=EXPORT_CACHE_LOCK_TTL, + ): + shutil.move(temp_file, output_path) + logger.info( - "The {} '{}' is backuped at '{}' " - "and available for downloading for the next {}. " - "Export cache cleaning job is enqueued, id '{}'".format( - "project" if isinstance(db_instance, Project) else 'task', - db_instance.name, output_path, cache_ttl, - cleaning_job.id)) + f"The {db_instance.__class__.__name__.lower()} '{db_instance.id}' is backed up at {output_path!r} " + f"and available for downloading for the next {cache_ttl}." + ) return output_path + except LockNotAvailableError: + # Need to retry later if the lock was not available + retry_current_rq_job(EXPORT_LOCKED_RETRY_INTERVAL) + logger.info( + "Failed to acquire export cache lock. Retrying in {}".format( + EXPORT_LOCKED_RETRY_INTERVAL + ) + ) + raise + except Exception: log_exception(logger) raise @@ -1157,7 +1217,7 @@ def _import(importer, request, queue, rq_id, Serializer, file_field_name, locati return Response(serializer.data, status=status.HTTP_202_ACCEPTED) def get_backup_dirname(): - return settings.TMP_FILES_ROOT + return TmpDirManager.TMP_ROOT def import_project(request, queue_name, filename=None): if 'rq_id' in request.data: @@ -1213,15 +1273,3 @@ def import_task(request, queue_name, filename=None): location_conf=location_conf, filename=filename ) - -def _clear_export_cache(file_path: str, file_ctime: float, logger: Logger) -> None: - try: - if os.path.exists(file_path) and os.path.getctime(file_path) == file_ctime: - os.remove(file_path) - - logger.info( - "Export cache file '{}' successfully removed" \ - .format(file_path)) - except Exception: - log_exception(logger) - raise diff --git a/cvat/apps/engine/management/commands/runperiodicjob.py b/cvat/apps/engine/management/commands/runperiodicjob.py index 765f16541cfd..29745a63e8c3 100644 --- a/cvat/apps/engine/management/commands/runperiodicjob.py +++ b/cvat/apps/engine/management/commands/runperiodicjob.py @@ -17,7 +17,10 @@ def handle(self, *args, **options): for job_definition in settings.PERIODIC_RQ_JOBS: if job_definition["id"] == job_id: job_func = import_string(job_definition["func"]) - job_func() + job_func( + *(job_definition.get("args", [])), + **(job_definition.get("kwargs", {})), + ) return raise CommandError(f"Job with ID {job_id} not found") diff --git a/cvat/apps/engine/management/commands/syncperiodicjobs.py b/cvat/apps/engine/management/commands/syncperiodicjobs.py index d78d3f247179..3463b71c5359 100644 --- a/cvat/apps/engine/management/commands/syncperiodicjobs.py +++ b/cvat/apps/engine/management/commands/syncperiodicjobs.py @@ -8,22 +8,25 @@ import django_rq from django.conf import settings from django.core.management.base import BaseCommand +from rq.job import Job as RQJob class Command(BaseCommand): help = "Synchronize periodic jobs in Redis with the project configuration" - _PERIODIC_JOBS_KEY_PREFIX = 'cvat:utils:periodic-jobs:' + _PERIODIC_JOBS_KEY_PREFIX = "cvat:utils:periodic-jobs:" def add_arguments(self, parser: ArgumentParser) -> None: - parser.add_argument('--clear', action='store_true', help='Remove jobs from Redis instead of updating them') + parser.add_argument( + "--clear", action="store_true", help="Remove jobs from Redis instead of updating them" + ) def handle(self, *args, **options): configured_jobs = defaultdict(dict) if not options["clear"]: for job in settings.PERIODIC_RQ_JOBS: - configured_jobs[job['queue']][job['id']] = job + configured_jobs[job["queue"]][job["id"]] = job for queue_name in settings.RQ_QUEUES: self.stdout.write(f"Processing queue {queue_name}...") @@ -34,7 +37,7 @@ def handle(self, *args, **options): scheduler = django_rq.get_scheduler(queue_name, queue=queue) stored_jobs_for_queue = { - member.decode('UTF-8') for member in queue.connection.smembers(periodic_jobs_key) + member.decode("UTF-8") for member in queue.connection.smembers(periodic_jobs_key) } configured_jobs_for_queue = configured_jobs[queue_name] @@ -49,15 +52,26 @@ def handle(self, *args, **options): queue.connection.srem(periodic_jobs_key, job_id) + def is_job_actual(job: RQJob, job_definition: dict): + return ( + job.func_name == job_definition["func"] + and job.meta.get("cron_string") == job_definition["cron_string"] + and ( + not (job.args or job_definition.get("args")) + or job.args == job_definition.get("args") + ) + and ( + not (job.kwargs or job_definition.get("kwargs")) + or job.kwargs == job_definition.get("kwargs") + ) + ) + # Add/update jobs from the configuration for job_definition in configured_jobs_for_queue.values(): - job_id = job_definition['id'] + job_id = job_definition["id"] if job := queue.fetch_job(job_id): - if ( - job.func_name == job_definition['func'] - and job.meta.get('cron_string') == job_definition['cron_string'] - ): + if is_job_actual(job, job_definition): self.stdout.write(f"Job {job_id} is unchanged") queue.connection.sadd(periodic_jobs_key, job_id) continue @@ -68,9 +82,11 @@ def handle(self, *args, **options): self.stdout.write(f"Creating job {job_id}...") scheduler.cron( - cron_string=job_definition['cron_string'], - func=job_definition['func'], + cron_string=job_definition["cron_string"], + func=job_definition["func"], id=job_id, + args=job_definition.get("args"), + kwargs=job_definition.get("kwargs"), ) queue.connection.sadd(periodic_jobs_key, job_id) diff --git a/cvat/apps/engine/models.py b/cvat/apps/engine/models.py index c25c75404eaf..e012c75bef1c 100644 --- a/cvat/apps/engine/models.py +++ b/cvat/apps/engine/models.py @@ -10,6 +10,7 @@ import re import shutil import uuid +from abc import ABCMeta, abstractmethod from collections.abc import Collection, Sequence from enum import Enum from functools import cached_property @@ -21,6 +22,7 @@ from django.core.files.storage import FileSystemStorage from django.db import IntegrityError, models, transaction from django.db.models import Q, TextChoices +from django.db.models.base import ModelBase from django.db.models.fields import FloatField from django.utils.translation import gettext_lazy as _ from drf_spectacular.types import OpenApiTypes @@ -430,6 +432,25 @@ class Meta: def touch(self) -> None: self.save(update_fields=["updated_date"]) +class ABCModelMeta(ABCMeta, ModelBase): + pass + +class FileSystemRelatedModel(metaclass=ABCModelMeta): + @abstractmethod + def get_dirname(self) -> str: + ... + + def get_tmp_dirname(self) -> str: + """ + The method returns a directory that is only used + to store temporary files or folders related to the object + """ + dir_path = os.path.join(self.get_dirname(), "tmp") + os.makedirs(dir_path, exist_ok=True) + + return dir_path + + @transaction.atomic(savepoint=False) def clear_annotations_in_jobs(job_ids): for job_ids_chunk in take_by(job_ids, chunk_size=1000): @@ -442,6 +463,7 @@ def clear_annotations_in_jobs(job_ids): LabeledImageAttributeVal.objects.filter(image__job_id__in=job_ids_chunk).delete() LabeledImage.objects.filter(job_id__in=job_ids_chunk).delete() + @transaction.atomic(savepoint=False) def clear_annotations_on_frames_in_honeypot_task(db_task: Task, frames: Sequence[int]): if db_task.data.validation_mode != ValidationMode.GT_POOL: @@ -466,7 +488,7 @@ def clear_annotations_on_frames_in_honeypot_task(db_task: Task, frames: Sequence frame__in=frames_batch, ).delete() -class Project(TimestampedModel): +class Project(TimestampedModel, FileSystemRelatedModel): name = SafeCharField(max_length=256) owner = models.ForeignKey(User, null=True, blank=True, on_delete=models.SET_NULL, related_name="+") @@ -490,12 +512,9 @@ def get_labels(self, prefetch=False): 'attributespec_set', 'sublabels__attributespec_set', ) if prefetch else queryset - def get_dirname(self): + def get_dirname(self) -> str: return os.path.join(settings.PROJECTS_ROOT, str(self.id)) - def get_tmp_dirname(self): - return os.path.join(self.get_dirname(), "tmp") - def is_job_staff(self, user_id): if self.owner == user_id: return True @@ -544,7 +563,7 @@ def with_job_summary(self): ) ) -class Task(TimestampedModel): +class Task(TimestampedModel, FileSystemRelatedModel): objects = TaskQuerySet.as_manager() project = models.ForeignKey(Project, on_delete=models.CASCADE, @@ -590,12 +609,9 @@ def get_labels(self, prefetch=False): 'attributespec_set', 'sublabels__attributespec_set', ) if prefetch else queryset - def get_dirname(self): + def get_dirname(self) -> str: return os.path.join(settings.TASKS_ROOT, str(self.id)) - def get_tmp_dirname(self): - return os.path.join(self.get_dirname(), "tmp") - def is_job_staff(self, user_id): if self.owner == user_id: return True @@ -838,7 +854,7 @@ def _validate_constraints(self, obj: dict[str, Any]): -class Job(TimestampedModel): +class Job(TimestampedModel, FileSystemRelatedModel): objects = JobQuerySet.as_manager() segment = models.ForeignKey(Segment, on_delete=models.CASCADE) @@ -856,7 +872,6 @@ class Job(TimestampedModel): default=StageChoice.ANNOTATION) state = models.CharField(max_length=32, choices=StateChoice.choices(), default=StateChoice.NEW) - type = models.CharField(max_length=32, choices=JobType.choices(), default=JobType.ANNOTATION) @@ -866,12 +881,9 @@ def get_target_storage(self) -> Optional[Storage]: def get_source_storage(self) -> Optional[Storage]: return self.segment.task.source_storage - def get_dirname(self): + def get_dirname(self) -> str: return os.path.join(settings.JOBS_ROOT, str(self.id)) - def get_tmp_dirname(self): - return os.path.join(self.get_dirname(), 'tmp') - @extend_schema_field(OpenApiTypes.INT) def get_project_id(self): project = self.segment.task.project diff --git a/cvat/apps/engine/tests/test_rest_api.py b/cvat/apps/engine/tests/test_rest_api.py index d59c310e5a3c..4cd7ad254854 100644 --- a/cvat/apps/engine/tests/test_rest_api.py +++ b/cvat/apps/engine/tests/test_rest_api.py @@ -3,6 +3,7 @@ # # SPDX-License-Identifier: MIT + import copy import io import json @@ -36,6 +37,8 @@ from pyunpack import Archive from rest_framework import status from rest_framework.test import APIClient +from rq.job import Job as RQJob +from rq.queue import Queue as RQQueue from cvat.apps.dataset_manager.tests.utils import TestDir from cvat.apps.dataset_manager.util import current_function_name @@ -3105,30 +3108,47 @@ def test_api_v2_tasks_id_export_no_auth(self): self._run_api_v2_tasks_id_export_import(None) def test_can_remove_export_cache_automatically_after_successful_export(self): + from cvat.apps.dataset_manager.cron import ( + cleanup_export_cache_directory, + clear_export_cache, + ) self._create_tasks() task_id = self.tasks[0]["id"] user = self.admin - with mock.patch('cvat.apps.dataset_manager.views.TASK_CACHE_TTL', new=timedelta(hours=10)): + TASK_CACHE_TTL = timedelta(hours=1) + with ( + mock.patch('cvat.apps.dataset_manager.views.TASK_CACHE_TTL', new=TASK_CACHE_TTL), + mock.patch('cvat.apps.dataset_manager.views.TTL_CONSTS', new={'task': TASK_CACHE_TTL}), + mock.patch( + "cvat.apps.dataset_manager.cron.clear_export_cache", + side_effect=clear_export_cache, + ) as mock_clear_export_cache, + ): + cleanup_export_cache_directory() + mock_clear_export_cache.assert_not_called() + response = self._run_api_v2_tasks_id_export(task_id, user) self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED) response = self._run_api_v2_tasks_id_export(task_id, user) self.assertEqual(response.status_code, status.HTTP_201_CREATED) - scheduler = django_rq.get_scheduler(settings.CVAT_QUEUES.IMPORT_DATA.value) - scheduled_jobs = list(scheduler.get_jobs()) - cleanup_job = next( - j for j in scheduled_jobs if j.func_name.endswith('.engine.backup._clear_export_cache') - ) - - export_path = cleanup_job.kwargs['file_path'] - self.assertTrue(os.path.isfile(export_path)) - - from cvat.apps.engine.backup import _clear_export_cache - _clear_export_cache(**cleanup_job.kwargs) - - self.assertFalse(os.path.isfile(export_path)) + queue: RQQueue = django_rq.get_queue(settings.CVAT_QUEUES.EXPORT_DATA.value) + rq_job_ids = queue.finished_job_registry.get_job_ids() + self.assertEqual(len(rq_job_ids), 1) + job: RQJob | None = queue.fetch_job(rq_job_ids[0]) + self.assertFalse(job is None) + file_path = job.return_value() + self.assertTrue(os.path.isfile(file_path)) + + with ( + mock.patch('cvat.apps.dataset_manager.views.TASK_CACHE_TTL', new=timedelta(seconds=0)), + mock.patch('cvat.apps.dataset_manager.views.TTL_CONSTS', new={'task': timedelta(seconds=0)}), + ): + cleanup_export_cache_directory() + mock_clear_export_cache.assert_called_once() + self.assertFalse(os.path.exists(file_path)) def generate_random_image_file(filename): diff --git a/cvat/apps/engine/tests/utils.py b/cvat/apps/engine/tests/utils.py index 09fd850b2c19..15d2adb65451 100644 --- a/cvat/apps/engine/tests/utils.py +++ b/cvat/apps/engine/tests/utils.py @@ -5,9 +5,11 @@ import itertools import logging import os +import shutil from collections.abc import Iterator, Sequence from contextlib import contextmanager from io import BytesIO +from pathlib import Path from typing import Any, Callable, TypeVar import av @@ -106,6 +108,14 @@ def _clear_temp_data(self): # Clear any remaining RQ jobs produced by the tests executed self._clear_rq_jobs() + # clear cache files created after previous exports + export_cache_dir = Path(settings.EXPORT_CACHE_ROOT) + for child in export_cache_dir.iterdir(): + if child.is_dir(): + shutil.rmtree(child) + else: + os.remove(child) + def _clear_rq_jobs(self): clear_rq_jobs() diff --git a/cvat/apps/engine/views.py b/cvat/apps/engine/views.py index 6b70836d53a1..6a5691dc1182 100644 --- a/cvat/apps/engine/views.py +++ b/cvat/apps/engine/views.py @@ -531,7 +531,7 @@ def get_upload_dir(self): return self._object.get_tmp_dirname() elif 'backup' in self.action: return backup.get_backup_dirname() - return "" + assert False def upload_finished(self, request): if self.action == 'dataset': @@ -539,9 +539,10 @@ def upload_finished(self, request): filename = request.query_params.get("filename", "") conv_mask_to_poly = to_bool(request.query_params.get('conv_mask_to_poly', True)) tmp_dir = self._object.get_tmp_dirname() - uploaded_file = None - if os.path.isfile(os.path.join(tmp_dir, filename)): - uploaded_file = os.path.join(tmp_dir, filename) + uploaded_file = os.path.join(tmp_dir, filename) + if not os.path.isfile(uploaded_file): + uploaded_file = None + return _import_project_dataset( request=request, filename=uploaded_file, @@ -1132,7 +1133,8 @@ def get_upload_dir(self): return self._object.data.get_upload_dirname() elif 'backup' in self.action: return backup.get_backup_dirname() - return "" + + assert False def _prepare_upload_info_entry(self, filename: str) -> str: filename = osp.normpath(filename) @@ -1210,8 +1212,8 @@ def _handle_upload_annotations(request): filename = request.query_params.get("filename", "") conv_mask_to_poly = to_bool(request.query_params.get('conv_mask_to_poly', True)) tmp_dir = self._object.get_tmp_dirname() - if os.path.isfile(os.path.join(tmp_dir, filename)): - annotation_file = os.path.join(tmp_dir, filename) + annotation_file = os.path.join(tmp_dir, filename) + if os.path.isfile(annotation_file): return _import_annotations( request=request, filename=annotation_file, @@ -2019,8 +2021,8 @@ def upload_finished(self, request): filename = request.query_params.get("filename", "") conv_mask_to_poly = to_bool(request.query_params.get('conv_mask_to_poly', True)) tmp_dir = self.get_upload_dir() - if os.path.isfile(os.path.join(tmp_dir, filename)): - annotation_file = os.path.join(tmp_dir, filename) + annotation_file = os.path.join(tmp_dir, filename) + if os.path.isfile(annotation_file): return _import_annotations( request=request, filename=annotation_file, @@ -2159,7 +2161,7 @@ def upload_finished(self, request): serializer_class=LabeledDataSerializer, parser_classes=_UPLOAD_PARSER_CLASSES, csrf_workaround_is_needed=csrf_workaround_is_needed_for_export) def annotations(self, request, pk): - self._object = self.get_object() # force call of check_object_permissions() + self._object: models.Job = self.get_object() # force call of check_object_permissions() if request.method == 'GET': # FUTURE-TODO: mark as deprecated using this endpoint to export annotations when new API for result file downloading will be implemented return self.export_dataset_v1( diff --git a/cvat/settings/base.py b/cvat/settings/base.py index c73cb31eafa2..ae2952c6849f 100644 --- a/cvat/settings/base.py +++ b/cvat/settings/base.py @@ -320,7 +320,7 @@ class CVAT_QUEUES(Enum): }, CVAT_QUEUES.CLEANING.value: { **shared_queue_settings, - 'DEFAULT_TIMEOUT': '1h', + 'DEFAULT_TIMEOUT': '2h', }, CVAT_QUEUES.CHUNKS.value: { **shared_queue_settings, @@ -353,6 +353,20 @@ class CVAT_QUEUES(Enum): 'func': 'cvat.apps.iam.utils.clean_up_sessions', 'cron_string': '0 0 * * *', }, + { + 'queue': CVAT_QUEUES.CLEANING.value, + 'id': 'cron_export_cache_directory_cleanup', + 'func': 'cvat.apps.dataset_manager.cron.cleanup_export_cache_directory', + # Run twice a day (at midnight and at noon) + 'cron_string': '0 0,12 * * *', + }, + { + 'queue': CVAT_QUEUES.CLEANING.value, + 'id': 'cron_tmp_directory_cleanup', + 'func': 'cvat.apps.dataset_manager.cron.cleanup_tmp_directory', + # Run once a day + 'cron_string': '0 18 * * *', + } ] # JavaScript and CSS compression @@ -413,6 +427,9 @@ class CVAT_QUEUES(Enum): CACHE_ROOT = os.path.join(DATA_ROOT, 'cache') os.makedirs(CACHE_ROOT, exist_ok=True) +EXPORT_CACHE_ROOT = os.path.join(CACHE_ROOT, 'export') +os.makedirs(EXPORT_CACHE_ROOT, exist_ok=True) + EVENTS_LOCAL_DB_ROOT = os.path.join(CACHE_ROOT, 'events') os.makedirs(EVENTS_LOCAL_DB_ROOT, exist_ok=True) EVENTS_LOCAL_DB_FILE = os.path.join( @@ -745,3 +762,6 @@ class CVAT_QUEUES(Enum): CLOUD_DATA_DOWNLOADING_MAX_THREADS_NUMBER = 4 CLOUD_DATA_DOWNLOADING_NUMBER_OF_FILES_PER_THREAD = 1000 + +# Indicates the maximum number of days a file or directory is retained in the temporary directory +TMP_FILE_OR_DIR_RETENTION_DAYS = 3 diff --git a/cvat/settings/testing.py b/cvat/settings/testing.py index e0391e4c3b40..ee75f055dbc0 100644 --- a/cvat/settings/testing.py +++ b/cvat/settings/testing.py @@ -25,6 +25,9 @@ CACHE_ROOT = os.path.join(DATA_ROOT, 'cache') os.makedirs(CACHE_ROOT, exist_ok=True) +EXPORT_CACHE_ROOT = os.path.join(CACHE_ROOT, 'export') +os.makedirs(EXPORT_CACHE_ROOT, exist_ok=True) + JOBS_ROOT = os.path.join(DATA_ROOT, 'jobs') os.makedirs(JOBS_ROOT, exist_ok=True) diff --git a/dev/format_python_code.sh b/dev/format_python_code.sh index db18ce328dc4..e18bb2b3e1eb 100755 --- a/dev/format_python_code.sh +++ b/dev/format_python_code.sh @@ -31,9 +31,12 @@ for paths in \ "cvat/apps/engine/field_validation.py" \ "cvat/apps/engine/model_utils.py" \ "cvat/apps/engine/task_validation.py" \ + "cvat/apps/dataset_manager/cron.py" \ "cvat/apps/dataset_manager/tests/test_annotation.py" \ "cvat/apps/dataset_manager/tests/utils.py" \ "cvat/apps/events/signals.py" \ + "cvat/apps/engine/management/commands/syncperiodicjobs.py" \ + "cvat/apps/dataset_manager/management/commands/cleanuplegacyexportcache.py" \ ; do ${BLACK} -- ${paths} ${ISORT} -- ${paths}