diff --git a/b2sdk/_internal/raw_simulator.py b/b2sdk/_internal/raw_simulator.py index 41f4ce95a..6b7cb77b0 100644 --- a/b2sdk/_internal/raw_simulator.py +++ b/b2sdk/_internal/raw_simulator.py @@ -10,6 +10,7 @@ from __future__ import annotations import collections +import dataclasses import io import logging import random @@ -447,25 +448,52 @@ def _get_encryption_mode_and_secret(self, encryption: EncryptionSetting | None): return mode, secret -FakeRequest = collections.namedtuple('FakeRequest', 'url headers') +@dataclasses.dataclass +class FakeRequest: + url: str + headers: CaseInsensitiveDict + + +@dataclasses.dataclass +class FakeRaw: + data_bytes: bytes + _position: int = 0 + + def tell(self): + return self._position + + def read(self, size): + data = self.data_bytes[self._position:self._position + size] + self._position += len(data) + return data class FakeResponse: def __init__(self, account_auth_token_or_none, file_sim, url, range_=None): - self.data_bytes = file_sim.data_bytes + self.raw = FakeRaw(file_sim.data_bytes) self.headers = file_sim.as_download_headers(account_auth_token_or_none, range_) self.url = url self.range_ = range_ if range_ is not None: self.data_bytes = self.data_bytes[range_[0]:range_[1] + 1] + @property + def data_bytes(self): + return self.raw.data_bytes + + @data_bytes.setter + def data_bytes(self, value): + self.raw = FakeRaw(value) + def iter_content(self, chunk_size=1): - start = 0 rnd = random.Random(self.url) - while start <= len(self.data_bytes): - time.sleep(rnd.random() * 0.01) - yield self.data_bytes[start:start + chunk_size] - start += chunk_size + while True: + chunk = self.raw.read(chunk_size) + if chunk: + time.sleep(rnd.random() * 0.01) + yield chunk + else: + break @property def request(self): diff --git a/b2sdk/_internal/transfer/inbound/downloaded_file.py b/b2sdk/_internal/transfer/inbound/downloaded_file.py index ef7c05910..ec23df13c 100644 --- a/b2sdk/_internal/transfer/inbound/downloaded_file.py +++ b/b2sdk/_internal/transfer/inbound/downloaded_file.py @@ -204,6 +204,15 @@ def save(self, file: BinaryIO, allow_seeking: bool | None = None) -> None: logger.warning('File is not seekable, disabling strategies that require seeking') allow_seeking = False + if allow_seeking: # check if file allows reading from arbitrary position + try: + file.read(0) + except io.UnsupportedOperation: + logger.warning( + 'File is seekable, but does not allow reads, disabling strategies that require seeking' + ) + allow_seeking = False + if self.progress_listener: file = WritingStreamWithProgress(file, self.progress_listener) if self.range_ is not None: diff --git a/b2sdk/_internal/transfer/inbound/downloader/abstract.py b/b2sdk/_internal/transfer/inbound/downloader/abstract.py index 6feb7f6d2..35601b8b8 100644 --- a/b2sdk/_internal/transfer/inbound/downloader/abstract.py +++ b/b2sdk/_internal/transfer/inbound/downloader/abstract.py @@ -41,8 +41,18 @@ def copy(self): class AbstractDownloader(metaclass=B2TraceMetaAbstract): + """ + Abstract class for downloaders. + + :var REQUIRES_SEEKING: if True, the downloader requires the ability to seek in the file object. + :var SUPPORTS_DECODE_CONTENT: if True, the downloader supports decoded HTTP streams. + In practice, this means that the downloader can handle HTTP responses which already + have the content decoded per Content-Encoding and, more likely than not, of a different + length than requested. + """ REQUIRES_SEEKING = True + SUPPORTS_DECODE_CONTENT = True DEFAULT_THREAD_POOL_CLASS = staticmethod(ThreadPoolExecutor) DEFAULT_ALIGN_FACTOR = 4096 @@ -103,6 +113,8 @@ def is_suitable(self, download_version: DownloadVersion, allow_seeking: bool): """ if self.REQUIRES_SEEKING and not allow_seeking: return False + if not self.SUPPORTS_DECODE_CONTENT and download_version.content_encoding and download_version.api.api_config.decode_content: + return False return True @abstractmethod @@ -113,8 +125,16 @@ def download( download_version: DownloadVersion, session: B2Session, encryption: EncryptionSetting | None = None, - ): + ) -> tuple[int, str]: """ - @returns (bytes_read, actual_sha1) + Download target to a file-like object. + + :param file: file-like object to write to + :param response: requests.Response of b2_download_url_by_* endpoint with the target object + :param download_version: DownloadVersion of an object being downloaded + :param session: B2Session to be used for downloading + :param encryption: optional Encryption setting + :return: (bytes_read, actual_sha1) + please note bytes_read may be different from bytes written to a file object if decode_content=True """ pass diff --git a/b2sdk/_internal/transfer/inbound/downloader/parallel.py b/b2sdk/_internal/transfer/inbound/downloader/parallel.py index d94686a7c..bed173c66 100644 --- a/b2sdk/_internal/transfer/inbound/downloader/parallel.py +++ b/b2sdk/_internal/transfer/inbound/downloader/parallel.py @@ -30,6 +30,12 @@ class ParallelDownloader(AbstractDownloader): + """ + Downloader using threads to download&write multiple parts of an object in parallel. + + Each part is downloaded by its own thread, while all writes are done by additional dedicated thread. + This can increase performance even for a small file, as fetching & writing can be done in parallel. + """ # situations to consider: # # local file start local file end @@ -50,6 +56,7 @@ class ParallelDownloader(AbstractDownloader): # cloud file start cloud file end # FINISH_HASHING_BUFFER_SIZE = 1024**2 + SUPPORTS_DECODE_CONTENT = False def __init__(self, min_part_size: int, max_streams: int | None = None, **kwargs): """ @@ -60,14 +67,7 @@ def __init__(self, min_part_size: int, max_streams: int | None = None, **kwargs) self.max_streams = max_streams self.min_part_size = min_part_size - def is_suitable(self, download_version: DownloadVersion, allow_seeking: bool): - if not super().is_suitable(download_version, allow_seeking): - return False - return self._get_number_of_streams( - download_version.content_length - ) >= 2 and download_version.content_length >= 2 * self.min_part_size - - def _get_number_of_streams(self, content_length): + def _get_number_of_streams(self, content_length: int) -> int: num_streams = content_length // self.min_part_size if self.max_streams is not None: num_streams = min(num_streams, self.max_streams) @@ -75,7 +75,7 @@ def _get_number_of_streams(self, content_length): max_threadpool_workers = getattr(self._thread_pool, '_max_workers', None) if max_threadpool_workers is not None: num_streams = min(num_streams, max_threadpool_workers) - return num_streams + return max(num_streams, 1) def download( self, diff --git a/b2sdk/_internal/transfer/inbound/downloader/simple.py b/b2sdk/_internal/transfer/inbound/downloader/simple.py index 8bab6f493..d82ed0fda 100644 --- a/b2sdk/_internal/transfer/inbound/downloader/simple.py +++ b/b2sdk/_internal/transfer/inbound/downloader/simple.py @@ -26,6 +26,7 @@ class SimpleDownloader(AbstractDownloader): REQUIRES_SEEKING = False + SUPPORTS_DECODE_CONTENT = True def _download( self, @@ -39,12 +40,12 @@ def _download( chunk_size = self._get_chunk_size(actual_size) digest = self._get_hasher() - - bytes_read = 0 + decoded_bytes_read = 0 for data in response.iter_content(chunk_size=chunk_size): file.write(data) digest.update(data) - bytes_read += len(data) + decoded_bytes_read += len(data) + bytes_read = response.raw.tell() assert actual_size >= 1 # code below does `actual_size - 1`, but it should never reach that part with an empty file @@ -62,8 +63,8 @@ def _download( # original response is not closed at this point yet, as another layer is responsible for closing it, so a new socket might be allocated, # but this is a very rare case and so it is not worth the optimization logger.debug( - 're-download attempts remaining: %i, bytes read already: %i. Getting range %s now.', - retries_left, bytes_read, new_range + 're-download attempts remaining: %i, bytes read: %i (decoded: %i). Getting range %s now.', + retries_left, bytes_read, decoded_bytes_read, new_range ) with session.download_file_from_url( response.request.url, @@ -75,7 +76,8 @@ def _download( ): file.write(data) digest.update(data) - bytes_read += len(data) + decoded_bytes_read += len(data) + bytes_read += followup_response.raw.tell() retries_left -= 1 return bytes_read, digest.hexdigest() diff --git a/changelog.d/+fix_decode_content.fixed.md b/changelog.d/+fix_decode_content.fixed.md new file mode 100644 index 000000000..827198f79 --- /dev/null +++ b/changelog.d/+fix_decode_content.fixed.md @@ -0,0 +1 @@ +Fix `decode_content=True` causing an error when downloading tiny and large files. diff --git a/changelog.d/+parallel_download_seekable_no_read.fixed.md b/changelog.d/+parallel_download_seekable_no_read.fixed.md new file mode 100644 index 000000000..f355756e3 --- /dev/null +++ b/changelog.d/+parallel_download_seekable_no_read.fixed.md @@ -0,0 +1 @@ +Prevent errors due to the use of "seekable" download strategies for seekable, but not readable files. diff --git a/changelog.d/+use_paraller_download_for_small_files.changed.md b/changelog.d/+use_paraller_download_for_small_files.changed.md new file mode 100644 index 000000000..da0804587 --- /dev/null +++ b/changelog.d/+use_paraller_download_for_small_files.changed.md @@ -0,0 +1 @@ +Use ParallelDownloader for small files instead of SimpleDownloader to avoid blocking on I/O. diff --git a/test/integration/test_download.py b/test/integration/test_download.py index ff1b5779f..ef02ac802 100644 --- a/test/integration/test_download.py +++ b/test/integration/test_download.py @@ -48,15 +48,14 @@ def test_large_file(self): ] ): - # let's check that small file downloads fail with these settings - zero = bucket.upload_bytes(b'0', 'a_single_zero') - with pytest.raises(ValueError) as exc_info: - with io.BytesIO() as io_: - bucket.download_file_by_name('a_single_zero').save(io_) - assert exc_info.value.args == ('no strategy suitable for download was found!',) + # let's check that small file downloads do not fail with these settings + small_file_version = bucket.upload_bytes(b'0', 'a_single_char') + with io.BytesIO() as io_: + bucket.download_file_by_name('a_single_char').save(io_) + assert io_.getvalue() == b'0' f, sha1 = self._file_helper(bucket) - if zero._type() != 'large': + if small_file_version._type() != 'large': # if we are here, that's not the production server! assert f.download_version.content_sha1_verified # large files don't have sha1, lets not check @@ -98,37 +97,27 @@ def test_small_unverified(self): pprint(f.download_version._get_args_for_clone()) assert not f.download_version.content_sha1_verified - def test_gzip(self): - bucket = self.create_bucket() - with tempfile.TemporaryDirectory() as temp_dir: - temp_dir = pathlib.Path(temp_dir) - source_file = temp_dir / 'compressed_file.gz' - downloaded_uncompressed_file = temp_dir / 'downloaded_uncompressed_file' - downloaded_compressed_file = temp_dir / 'downloaded_compressed_file' - - data_to_write = b"I'm about to be compressed and sent to the cloud, yay!\n" * 100 # too short files failed somehow - with gzip.open(source_file, 'wb') as gzip_file: - gzip_file.write(data_to_write) - file_version = bucket.upload_local_file( - str(source_file), 'gzipped_file', file_info={'b2-content-encoding': 'gzip'} - ) - self.b2_api.download_file_by_id(file_id=file_version.id_).save_to( - str(downloaded_compressed_file) - ) - with open(downloaded_compressed_file, 'rb') as dcf: - downloaded_data = dcf.read() - with open(source_file, 'rb') as sf: - source_data = sf.read() - assert downloaded_data == source_data - - decompressing_api, _ = authorize( - self.b2_auth_data, B2HttpApiConfig(decode_content=True) - ) - decompressing_api.download_file_by_id(file_id=file_version.id_).save_to( - str(downloaded_uncompressed_file) - ) - with open(downloaded_uncompressed_file, 'rb') as duf: - assert duf.read() == data_to_write + +@pytest.mark.parametrize("size_multiplier", [1, 100]) +def test_gzip(b2_auth_data, bucket, tmp_path, b2_api, size_multiplier): + """Test downloading gzipped files of varius sizes with and without content-encoding.""" + source_file = tmp_path / 'compressed_file.gz' + downloaded_uncompressed_file = tmp_path / 'downloaded_uncompressed_file' + downloaded_compressed_file = tmp_path / 'downloaded_compressed_file' + + data_to_write = b"I'm about to be compressed and sent to the cloud, yay!\n" * size_multiplier + source_file.write_bytes(gzip.compress(data_to_write)) + file_version = bucket.upload_local_file( + str(source_file), 'gzipped_file', file_info={'b2-content-encoding': 'gzip'} + ) + b2_api.download_file_by_id(file_id=file_version.id_).save_to(str(downloaded_compressed_file)) + assert downloaded_compressed_file.read_bytes() == source_file.read_bytes() + + decompressing_api, _ = authorize(b2_auth_data, B2HttpApiConfig(decode_content=True)) + decompressing_api.download_file_by_id(file_id=file_version.id_).save_to( + str(downloaded_uncompressed_file) + ) + assert downloaded_uncompressed_file.read_bytes() == data_to_write @pytest.fixture diff --git a/test/unit/bucket/test_bucket.py b/test/unit/bucket/test_bucket.py index 6df52191e..ba2ea4bc5 100644 --- a/test/unit/bucket/test_bucket.py +++ b/test/unit/bucket/test_bucket.py @@ -2556,6 +2556,34 @@ def test_download_to_non_seekable_file(self): ) assert output_file.getvalue() == self.DATA.encode() + @pytest.mark.apiver(from_ver=2) + def test_download_to_seekable_but_no_read_file(self): + file_version = self.bucket.upload_bytes(self.DATA.encode(), 'file1') + + non_seekable_strategies = [ + strat for strat in self.bucket.api.services.download_manager.strategies + if not isinstance(strat, ParallelDownloader) + ] + context = contextlib.nullcontext() if non_seekable_strategies else pytest.raises( + ValueError, + match='no strategy suitable for download was found!', + ) + output_file = io.BytesIO() + seekable_but_not_readable = io.BufferedWriter(output_file) + + # test sanity check + assert seekable_but_not_readable.seekable() + with pytest.raises(io.UnsupportedOperation): + seekable_but_not_readable.read(0) + + with context: + self.download_file_by_id( + file_version.id_, + v2_file=seekable_but_not_readable, + ) + seekable_but_not_readable.flush() + assert output_file.getvalue() == self.DATA.encode() + # download empty file