Skip to content

Commit

Permalink
Timeout debugging (#57)
Browse files Browse the repository at this point in the history
  • Loading branch information
mephenor authored Aug 20, 2024
1 parent 30f87a7 commit bacad0c
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 10 deletions.
2 changes: 1 addition & 1 deletion .pyproject_generation/pyproject_custom.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "ghga_datasteward_kit"
version = "4.1.0"
version = "4.1.1"
description = "GHGA Data Steward Kit - A utils package for GHGA data stewards."
dependencies = [
"crypt4gh >=1.6, <2",
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ classifiers = [
"Intended Audience :: Developers",
]
name = "ghga_datasteward_kit"
version = "4.1.0"
version = "4.1.1"
description = "GHGA Data Steward Kit - A utils package for GHGA data stewards."
dependencies = [
"crypt4gh >=1.6, <2",
Expand Down
3 changes: 3 additions & 0 deletions src/ghga_datasteward_kit/s3_upload/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ class LegacyConfig(S3ObjectStoragesConfig):
default=5,
description="Number of times a request should be retried on non critical errors.",
)
debug: bool = Field(
default=False, description="Enable debug functionality for upload."
)

@field_validator("output_dir")
def expand_env_vars_output_dir(cls, output_dir: Path): # noqa: N805
Expand Down
1 change: 1 addition & 0 deletions src/ghga_datasteward_kit/s3_upload/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ async def validate_and_transfer_content(
config=config,
unencrypted_file_size=file_size,
storage_cleaner=storage_cleaner,
debug_mode=config.debug,
)
await uploader.encrypt_and_upload()

Expand Down
56 changes: 48 additions & 8 deletions src/ghga_datasteward_kit/s3_upload/uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,35 @@
from uuid import uuid4

import crypt4gh.lib # type: ignore
from httpx import TimeoutException

from ghga_datasteward_kit.s3_upload.config import LegacyConfig
from ghga_datasteward_kit.s3_upload.file_encryption import Encryptor
from ghga_datasteward_kit.s3_upload.utils import (
LOG,
HttpxClientConfig,
StorageCleaner,
get_bucket_id,
get_object_storage,
httpx_client,
)

MAX_TIMEOUT_DEBUG = (
3600 # maximum timeout for upload request used for debugging purposes
)


class ChunkedUploader:
"""Handler class dealing with upload functionality"""

def __init__(
def __init__( # noqa: PLR0913
self,
input_path: Path,
alias: str,
config: LegacyConfig,
unencrypted_file_size: int,
storage_cleaner: StorageCleaner,
debug_mode: bool,
) -> None:
self.alias = alias
self.config = config
Expand All @@ -52,6 +59,7 @@ def __init__(
self.unencrypted_file_size = unencrypted_file_size
self.encrypted_file_size = 0
self._storage_cleaner = storage_cleaner
self.debug_mode = debug_mode

async def encrypt_and_upload(self):
"""Delegate encryption and perform multipart upload"""
Expand All @@ -69,6 +77,7 @@ async def encrypt_and_upload(self):
encrypted_file_size=encrypted_file_size,
part_size=self.config.part_size,
storage_cleaner=self._storage_cleaner,
debug_mode=self.debug_mode,
) as upload:
LOG.info("(1/7) Initialized file upload for %s.", upload.file_id)
for part_number, part in enumerate(
Expand Down Expand Up @@ -96,13 +105,14 @@ async def encrypt_and_upload(self):
class MultipartUpload:
"""Context manager to handle init + complete/abort for S3 multipart upload"""

def __init__(
def __init__( # noqa: PLR0913
self,
config: LegacyConfig,
file_id: str,
encrypted_file_size: int,
part_size: int,
storage_cleaner: StorageCleaner,
debug_mode: bool,
) -> None:
self.config = config
self.storage = get_object_storage(config=self.config)
Expand All @@ -111,6 +121,7 @@ def __init__(
self.part_size = part_size
self.upload_id = ""
self.storage_cleaner = storage_cleaner
self.debug_mode = debug_mode

async def __aenter__(self):
"""Start multipart upload"""
Expand Down Expand Up @@ -146,13 +157,42 @@ async def send_part(self, part: bytes, part_number: int):
object_id=self.file_id,
part_number=part_number,
)
with httpx_client() as client:
response = client.put(url=upload_url, content=part)

status_code = response.status_code
if status_code != 200:
raise ValueError(f"Received unexpected status code {
status_code} when trying to upload file part {part_number}.")
if self.debug_mode:
num_retries = HttpxClientConfig.num_retries
timeout = HttpxClientConfig.timeout

while True:
LOG.info(
f"Attempting upload of part {part_number} ({len(part)} bytes) with a timeout of {
timeout} seconds."
)
HttpxClientConfig.configure(
num_retries=num_retries, timeout=timeout
)
with httpx_client() as client:
try:
response = client.put(url=upload_url, content=part)
break
except TimeoutException as error:
LOG.info(f"Encountered timeout for {
timeout} seconds. Details:\n{str(error)}")

# increase by a minute and reraise if we need to wait more than one hour
timeout += 60
if timeout > MAX_TIMEOUT_DEBUG:
raise error
LOG.info(f"Upload successful for a timeout of {
timeout} seconds")
else:
with httpx_client() as client:
response = client.put(url=upload_url, content=part)

status_code = response.status_code
if status_code != 200:
raise ValueError(f"Received unexpected status code {
status_code} when trying to upload file part {part_number}.")

except (Exception, KeyboardInterrupt, ValueError) as exc:
raise self.storage_cleaner.PartUploadError(
cause=str(exc),
Expand Down

0 comments on commit bacad0c

Please sign in to comment.