Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use ParallelDownloader for small files instead of SimpleDownloader #482

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 35 additions & 7 deletions b2sdk/_internal/raw_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from __future__ import annotations

import collections
import dataclasses
import io
import logging
import random
Expand Down Expand Up @@ -447,25 +448,52 @@ def _get_encryption_mode_and_secret(self, encryption: EncryptionSetting | None):
return mode, secret


FakeRequest = collections.namedtuple('FakeRequest', 'url headers')
@dataclasses.dataclass
class FakeRequest:
url: str
headers: CaseInsensitiveDict


@dataclasses.dataclass
class FakeRaw:
data_bytes: bytes
_position: int = 0

def tell(self):
return self._position

def read(self, size):
data = self.data_bytes[self._position:self._position + size]
self._position += len(data)
return data


class FakeResponse:
def __init__(self, account_auth_token_or_none, file_sim, url, range_=None):
self.data_bytes = file_sim.data_bytes
self.raw = FakeRaw(file_sim.data_bytes)
self.headers = file_sim.as_download_headers(account_auth_token_or_none, range_)
self.url = url
self.range_ = range_
if range_ is not None:
self.data_bytes = self.data_bytes[range_[0]:range_[1] + 1]

@property
def data_bytes(self):
return self.raw.data_bytes

@data_bytes.setter
def data_bytes(self, value):
self.raw = FakeRaw(value)

def iter_content(self, chunk_size=1):
start = 0
rnd = random.Random(self.url)
while start <= len(self.data_bytes):
time.sleep(rnd.random() * 0.01)
yield self.data_bytes[start:start + chunk_size]
start += chunk_size
while True:
chunk = self.raw.read(chunk_size)
if chunk:
time.sleep(rnd.random() * 0.01)
yield chunk
else:
break

@property
def request(self):
Expand Down
9 changes: 9 additions & 0 deletions b2sdk/_internal/transfer/inbound/downloaded_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,15 @@ def save(self, file: BinaryIO, allow_seeking: bool | None = None) -> None:
logger.warning('File is not seekable, disabling strategies that require seeking')
allow_seeking = False

if allow_seeking: # check if file allows reading from arbitrary position
try:
file.read(0)
except io.UnsupportedOperation:
logger.warning(
'File is seekable, but does not allow reads, disabling strategies that require seeking'
)
allow_seeking = False

if self.progress_listener:
file = WritingStreamWithProgress(file, self.progress_listener)
if self.range_ is not None:
Expand Down
24 changes: 22 additions & 2 deletions b2sdk/_internal/transfer/inbound/downloader/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,18 @@ def copy(self):


class AbstractDownloader(metaclass=B2TraceMetaAbstract):
"""
Abstract class for downloaders.

:var REQUIRES_SEEKING: if True, the downloader requires the ability to seek in the file object.
:var SUPPORTS_DECODE_CONTENT: if True, the downloader supports decoded HTTP streams.
In practice, this means that the downloader can handle HTTP responses which already
have the content decoded per Content-Encoding and, more likely than not, of a different
length than requested.
"""

REQUIRES_SEEKING = True
SUPPORTS_DECODE_CONTENT = True
DEFAULT_THREAD_POOL_CLASS = staticmethod(ThreadPoolExecutor)
DEFAULT_ALIGN_FACTOR = 4096

Expand Down Expand Up @@ -103,6 +113,8 @@ def is_suitable(self, download_version: DownloadVersion, allow_seeking: bool):
"""
if self.REQUIRES_SEEKING and not allow_seeking:
return False
if not self.SUPPORTS_DECODE_CONTENT and download_version.content_encoding and download_version.api.api_config.decode_content:
return False
return True

@abstractmethod
Expand All @@ -113,8 +125,16 @@ def download(
download_version: DownloadVersion,
session: B2Session,
encryption: EncryptionSetting | None = None,
):
) -> tuple[int, str]:
"""
@returns (bytes_read, actual_sha1)
Download target to a file-like object.

:param file: file-like object to write to
:param response: requests.Response of b2_download_url_by_* endpoint with the target object
:param download_version: DownloadVersion of an object being downloaded
:param session: B2Session to be used for downloading
:param encryption: optional Encryption setting
:return: (bytes_read, actual_sha1)
please note bytes_read may be different from bytes written to a file object if decode_content=True
"""
pass
18 changes: 9 additions & 9 deletions b2sdk/_internal/transfer/inbound/downloader/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@


class ParallelDownloader(AbstractDownloader):
"""
Downloader using threads to download&write multiple parts of an object in parallel.

Each part is downloaded by its own thread, while all writes are done by additional dedicated thread.
This can increase performance even for a small file, as fetching & writing can be done in parallel.
"""
# situations to consider:
#
# local file start local file end
Expand All @@ -50,6 +56,7 @@ class ParallelDownloader(AbstractDownloader):
# cloud file start cloud file end
#
FINISH_HASHING_BUFFER_SIZE = 1024**2
SUPPORTS_DECODE_CONTENT = False

def __init__(self, min_part_size: int, max_streams: int | None = None, **kwargs):
"""
Expand All @@ -60,22 +67,15 @@ def __init__(self, min_part_size: int, max_streams: int | None = None, **kwargs)
self.max_streams = max_streams
self.min_part_size = min_part_size

def is_suitable(self, download_version: DownloadVersion, allow_seeking: bool):
if not super().is_suitable(download_version, allow_seeking):
return False
return self._get_number_of_streams(
download_version.content_length
) >= 2 and download_version.content_length >= 2 * self.min_part_size

def _get_number_of_streams(self, content_length):
def _get_number_of_streams(self, content_length: int) -> int:
num_streams = content_length // self.min_part_size
if self.max_streams is not None:
num_streams = min(num_streams, self.max_streams)
else:
max_threadpool_workers = getattr(self._thread_pool, '_max_workers', None)
if max_threadpool_workers is not None:
num_streams = min(num_streams, max_threadpool_workers)
return num_streams
return max(num_streams, 1)

def download(
self,
Expand Down
14 changes: 8 additions & 6 deletions b2sdk/_internal/transfer/inbound/downloader/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
class SimpleDownloader(AbstractDownloader):

REQUIRES_SEEKING = False
SUPPORTS_DECODE_CONTENT = True

def _download(
self,
Expand All @@ -39,12 +40,12 @@ def _download(
chunk_size = self._get_chunk_size(actual_size)

digest = self._get_hasher()

bytes_read = 0
decoded_bytes_read = 0
for data in response.iter_content(chunk_size=chunk_size):
file.write(data)
digest.update(data)
bytes_read += len(data)
decoded_bytes_read += len(data)
bytes_read = response.raw.tell()

assert actual_size >= 1 # code below does `actual_size - 1`, but it should never reach that part with an empty file

Expand All @@ -62,8 +63,8 @@ def _download(
# original response is not closed at this point yet, as another layer is responsible for closing it, so a new socket might be allocated,
# but this is a very rare case and so it is not worth the optimization
logger.debug(
're-download attempts remaining: %i, bytes read already: %i. Getting range %s now.',
retries_left, bytes_read, new_range
're-download attempts remaining: %i, bytes read: %i (decoded: %i). Getting range %s now.',
retries_left, bytes_read, decoded_bytes_read, new_range
)
with session.download_file_from_url(
response.request.url,
Expand All @@ -75,7 +76,8 @@ def _download(
):
file.write(data)
digest.update(data)
bytes_read += len(data)
decoded_bytes_read += len(data)
bytes_read += followup_response.raw.tell()
retries_left -= 1
return bytes_read, digest.hexdigest()

Expand Down
1 change: 1 addition & 0 deletions changelog.d/+fix_decode_content.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix `decode_content=True` causing an error when downloading tiny and large files.
1 change: 1 addition & 0 deletions changelog.d/+parallel_download_seekable_no_read.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Prevent errors due to the use of "seekable" download strategies for seekable, but not readable files.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Use ParallelDownloader for small files instead of SimpleDownloader to avoid blocking on I/O.
65 changes: 27 additions & 38 deletions test/integration/test_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,14 @@ def test_large_file(self):
]
):

# let's check that small file downloads fail with these settings
zero = bucket.upload_bytes(b'0', 'a_single_zero')
with pytest.raises(ValueError) as exc_info:
with io.BytesIO() as io_:
bucket.download_file_by_name('a_single_zero').save(io_)
assert exc_info.value.args == ('no strategy suitable for download was found!',)
# let's check that small file downloads do not fail with these settings
small_file_version = bucket.upload_bytes(b'0', 'a_single_char')
with io.BytesIO() as io_:
bucket.download_file_by_name('a_single_char').save(io_)
assert io_.getvalue() == b'0'

f, sha1 = self._file_helper(bucket)
if zero._type() != 'large':
if small_file_version._type() != 'large':
# if we are here, that's not the production server!
assert f.download_version.content_sha1_verified # large files don't have sha1, lets not check

Expand Down Expand Up @@ -98,37 +97,27 @@ def test_small_unverified(self):
pprint(f.download_version._get_args_for_clone())
assert not f.download_version.content_sha1_verified

def test_gzip(self):
bucket = self.create_bucket()
with tempfile.TemporaryDirectory() as temp_dir:
temp_dir = pathlib.Path(temp_dir)
source_file = temp_dir / 'compressed_file.gz'
downloaded_uncompressed_file = temp_dir / 'downloaded_uncompressed_file'
downloaded_compressed_file = temp_dir / 'downloaded_compressed_file'

data_to_write = b"I'm about to be compressed and sent to the cloud, yay!\n" * 100 # too short files failed somehow
with gzip.open(source_file, 'wb') as gzip_file:
gzip_file.write(data_to_write)
file_version = bucket.upload_local_file(
str(source_file), 'gzipped_file', file_info={'b2-content-encoding': 'gzip'}
)
self.b2_api.download_file_by_id(file_id=file_version.id_).save_to(
str(downloaded_compressed_file)
)
with open(downloaded_compressed_file, 'rb') as dcf:
downloaded_data = dcf.read()
with open(source_file, 'rb') as sf:
source_data = sf.read()
assert downloaded_data == source_data

decompressing_api, _ = authorize(
self.b2_auth_data, B2HttpApiConfig(decode_content=True)
)
decompressing_api.download_file_by_id(file_id=file_version.id_).save_to(
str(downloaded_uncompressed_file)
)
with open(downloaded_uncompressed_file, 'rb') as duf:
assert duf.read() == data_to_write

@pytest.mark.parametrize("size_multiplier", [1, 100])
def test_gzip(b2_auth_data, bucket, tmp_path, b2_api, size_multiplier):
"""Test downloading gzipped files of varius sizes with and without content-encoding."""
source_file = tmp_path / 'compressed_file.gz'
downloaded_uncompressed_file = tmp_path / 'downloaded_uncompressed_file'
downloaded_compressed_file = tmp_path / 'downloaded_compressed_file'

data_to_write = b"I'm about to be compressed and sent to the cloud, yay!\n" * size_multiplier
source_file.write_bytes(gzip.compress(data_to_write))
file_version = bucket.upload_local_file(
str(source_file), 'gzipped_file', file_info={'b2-content-encoding': 'gzip'}
)
b2_api.download_file_by_id(file_id=file_version.id_).save_to(str(downloaded_compressed_file))
assert downloaded_compressed_file.read_bytes() == source_file.read_bytes()

decompressing_api, _ = authorize(b2_auth_data, B2HttpApiConfig(decode_content=True))
decompressing_api.download_file_by_id(file_id=file_version.id_).save_to(
str(downloaded_uncompressed_file)
)
assert downloaded_uncompressed_file.read_bytes() == data_to_write


@pytest.fixture
Expand Down
28 changes: 28 additions & 0 deletions test/unit/bucket/test_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -2556,6 +2556,34 @@ def test_download_to_non_seekable_file(self):
)
assert output_file.getvalue() == self.DATA.encode()

@pytest.mark.apiver(from_ver=2)
def test_download_to_seekable_but_no_read_file(self):
file_version = self.bucket.upload_bytes(self.DATA.encode(), 'file1')

non_seekable_strategies = [
strat for strat in self.bucket.api.services.download_manager.strategies
if not isinstance(strat, ParallelDownloader)
]
context = contextlib.nullcontext() if non_seekable_strategies else pytest.raises(
ValueError,
match='no strategy suitable for download was found!',
)
output_file = io.BytesIO()
seekable_but_not_readable = io.BufferedWriter(output_file)

# test sanity check
assert seekable_but_not_readable.seekable()
with pytest.raises(io.UnsupportedOperation):
seekable_but_not_readable.read(0)

with context:
self.download_file_by_id(
file_version.id_,
v2_file=seekable_but_not_readable,
)
seekable_but_not_readable.flush()
assert output_file.getvalue() == self.DATA.encode()


# download empty file

Expand Down