Skip to content

Commit

Permalink
on import from cloud, save resource to disk directly (#8930)
Browse files Browse the repository at this point in the history
when importing backups or datasets from a cloud, they first are loaded
to memory and only then are saved to the disk.
If a dataset or backup is larger then available RAM, the worker
terminates
  • Loading branch information
Eldies authored Jan 14, 2025
1 parent 13fd5a7 commit b63f68a
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 30 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
### Fixed

- Optimized importing from cloud storage
(<https://github.com/cvat-ai/cvat/pull/8930>)
55 changes: 25 additions & 30 deletions cvat/apps/engine/cloud_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
from concurrent.futures import FIRST_EXCEPTION, ThreadPoolExecutor, wait
from enum import Enum
from io import BytesIO
from typing import Any, Callable, Optional, TypeVar
from pathlib import Path
from typing import Any, BinaryIO, Callable, Optional, TypeVar

import boto3
from azure.core.exceptions import HttpResponseError, ResourceExistsError
Expand Down Expand Up @@ -166,17 +167,24 @@ def get_file_last_modified(self, key):
pass

@abstractmethod
def download_fileobj(self, key: str) -> NamedBytesIO:
def _download_fileobj_to_stream(self, key: str, stream: BinaryIO) -> None:
pass

def download_file(self, key, path):
file_obj = self.download_fileobj(key)
if isinstance(file_obj, BytesIO):
os.makedirs(os.path.dirname(path), exist_ok=True)
def download_fileobj(self, key: str) -> NamedBytesIO:
buf = NamedBytesIO()
self._download_fileobj_to_stream(key=key, stream=buf)
buf.seek(0)
buf.filename = key
return buf

def download_file(self, key: str, path: str) -> None:
os.makedirs(os.path.dirname(path), exist_ok=True)
try:
with open(path, 'wb') as f:
f.write(file_obj.getvalue())
else:
raise NotImplementedError("Unsupported type {} was found".format(type(file_obj)))
self._download_fileobj_to_stream(key, stream=f)
except Exception:
Path(path).unlink()
raise

def download_range_of_bytes(self, key: str, stop_byte: int, start_byte: int = 0) -> bytes:
"""Method downloads the required bytes range of the file.
Expand Down Expand Up @@ -556,16 +564,12 @@ def _list_raw_content_on_one_page(

@validate_file_status
@validate_bucket_status
def download_fileobj(self, key: str) -> NamedBytesIO:
buf = NamedBytesIO()
def _download_fileobj_to_stream(self, key: str, stream: BinaryIO) -> None:
self.bucket.download_fileobj(
Key=key,
Fileobj=buf,
Fileobj=stream,
Config=TransferConfig(max_io_queue=self.transfer_config['max_io_queue'])
)
buf.seek(0)
buf.filename = key
return buf

@validate_file_status
@validate_bucket_status
Expand Down Expand Up @@ -761,17 +765,14 @@ def _list_raw_content_on_one_page(

@validate_file_status
@validate_bucket_status
def download_fileobj(self, key: str) -> NamedBytesIO:
buf = NamedBytesIO()
def _download_fileobj_to_stream(self, key: str, stream: BinaryIO) -> None:
storage_stream_downloader = self._client.download_blob(
blob=key,
offset=None,
length=None,
max_concurrency=self.MAX_CONCURRENCY,
)
storage_stream_downloader.download_to_stream(buf, max_concurrency=self.MAX_CONCURRENCY)
buf.seek(0)
buf.filename = key
return buf
storage_stream_downloader.readinto(stream)

@validate_file_status
@validate_bucket_status
Expand Down Expand Up @@ -875,13 +876,9 @@ def _list_raw_content_on_one_page(

@validate_file_status
@validate_bucket_status
def download_fileobj(self, key: str) -> NamedBytesIO:
buf = NamedBytesIO()
def _download_fileobj_to_stream(self, key: str, stream: BinaryIO) -> None:
blob = self.bucket.blob(key)
self._client.download_blob_to_file(blob, buf)
buf.seek(0)
buf.filename = key
return buf
self._client.download_blob_to_file(blob, stream)

@validate_file_status
@validate_bucket_status
Expand Down Expand Up @@ -1024,9 +1021,7 @@ def import_resource_from_cloud_storage(
**kwargs,
) -> Any:
storage = db_storage_to_storage_instance(db_storage)

with storage.download_fileobj(key) as data, open(filename, 'wb+') as f:
f.write(data.getbuffer())
storage.download_file(key, filename)

return cleanup_func(import_func, filename, *args, **kwargs)

Expand Down

0 comments on commit b63f68a

Please sign in to comment.