Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bucket_id propagation #62

Merged
merged 10 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pyproject_generation/pyproject_custom.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "ghga_datasteward_kit"
version = "4.3.0"
version = "4.4.0"
description = "GHGA Data Steward Kit - A utils package for GHGA data stewards."
dependencies = [
"crypt4gh >=1.6, <2",
Expand Down
25 changes: 15 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ This is achieved using the data steward kit, using the following steps:
storage is done in one go. This is achieved using either the
`ghga-datasteward-kit files upload` for uploading a single file or the
`ghga-datasteward-kit files batch-upload` for uploading multiple files at once.
There also exist legacy versions of these subcommands for compatibility reasons,
where the commonad is prefixed with `legacy-`.
TheByronHimes marked this conversation as resolved.
Show resolved Hide resolved
Please see [this section](#files-batch-upload) for further details. This will output
one summary JSON per uploaded file. The encryption secret is automatically
transferred to GHGA central.
Expand Down Expand Up @@ -120,11 +122,9 @@ content to a (remote) S3-compatible object storage.
This process consists of multiple steps:
1. Generate a unique file id
2. Create unencrypted file checksum
3. Encrypt file
4. Extract file secret and remove Crypt4GH envelope
5. Upload encrypted file content
6. Download encrypted file content, decrypt and verify checksum
7. Write file/upload information to output file
3. Encrypt and upload file in chunks
4. Download encrypted file content, decrypt and verify checksum
5. Write file/upload information to output file

The user needs to provide a config yaml containing information as described
[here](./s3_upload_config.md).
Expand All @@ -135,11 +135,13 @@ An overview of important information about each the upload is written to a file
It contains the following information:
1. The file alias
2. A unique identifier for the file
3. The local file path
4. A SHA256 checksum over the unencrypted content
5. MD5 checksums over all encrypted file parts
6. SHA256 checksums over all encrypted file parts
7. The file encryption/decryption secret
3. An identifier of the storage bucket the file was uploaded to (Added in v4.4.0)
4. The local file path
5. A SHA256 checksum over the unencrypted content
6. MD5 checksums over all encrypted file parts
7. SHA256 checksums over all encrypted file parts
8. The file encryption/decryption secret id or the actual textual representation of the secret, if the legacy command was used (Secret ID since v1.0.0)
9. An alias for the storage node the file was uploaded to (Added in v3.0.0)

Attention: Keep this output file in a safe, private location.
If this file is lost, the uploaded file content becomes inaccessible.
Expand All @@ -154,6 +156,9 @@ running system and make the corresponding files available for download.

This command requires a configuration file as described [here](./ingest_config.md).

#### ingest version compatibility
Currently v4.4.0 of this tool and v4.0.0 of the `File Ingest Service` are compatible.

### metadata

*To be performed by Central Data Stewards only.*
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ classifiers = [
"Intended Audience :: Developers",
]
name = "ghga_datasteward_kit"
version = "4.3.0"
version = "4.4.0"
description = "GHGA Data Steward Kit - A utils package for GHGA data stewards."
dependencies = [
"crypt4gh >=1.6, <2",
Expand Down
17 changes: 15 additions & 2 deletions src/ghga_datasteward_kit/file_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.
"""Interaction with file ingest service"""

import logging
from collections.abc import Callable
from pathlib import Path

Expand All @@ -26,6 +27,8 @@

from ghga_datasteward_kit import models, utils

LOG = logging.getLogger(__name__)


class IngestConfig(SubmissionStoreConfig):
"""Config options for calling the file ingest endpoint"""
Expand Down Expand Up @@ -77,6 +80,10 @@ class IngestConfig(SubmissionStoreConfig):
+ " During the later ingest phase, the alias will be validated by the File Ingest Service."
),
)
selected_bucket_id: str = Field(
default=...,
description="Fallback bucket_id for older output metadata files that don't contain a bucket ID.",
)


def alias_to_accession(
Expand Down Expand Up @@ -137,14 +144,20 @@ def file_ingest(
"""
try:
output_metadata = models.OutputMetadata.load(
input_path=in_path, selected_alias=config.selected_storage_alias
input_path=in_path,
selected_alias=config.selected_storage_alias,
selected_bucket=config.selected_bucket_id,
)
endpoint = config.file_ingest_federated_endpoint
LOG.info("Selected non-legacy endpoint %s for file %s.", endpoint, in_path)
except (KeyError, ValidationError):
output_metadata = models.LegacyOutputMetadata.load(
input_path=in_path, selected_alias=config.selected_storage_alias
input_path=in_path,
selected_alias=config.selected_storage_alias,
selected_bucket=config.selected_bucket_id,
)
endpoint = config.file_ingest_legacy_endpoint
LOG.info("Selected legacy endpoint %s for file %s.", endpoint, in_path)

endpoint_url = utils.path_join(config.file_ingest_baseurl, endpoint)

Expand Down
43 changes: 39 additions & 4 deletions src/ghga_datasteward_kit/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@

import hashlib
import json
import logging
import os
from pathlib import Path
from typing import Any

from ghga_service_commons.utils.crypt import encrypt
from pydantic import BaseModel

LOG = logging.getLogger(__name__)


class Checksums:
"""Container for checksum calculation"""
Expand Down Expand Up @@ -68,6 +71,7 @@ class MetadataBase(BaseModel):
"""Common base for all output and upload models"""

file_id: str
bucket_id: str
object_id: str
part_size: int
unencrypted_size: int
Expand All @@ -81,6 +85,7 @@ def prepare_output(self) -> dict[str, str]:
"""Prepare shared fields for output"""
output: dict[str, Any] = {}

output["Bucket ID"] = self.bucket_id
output["File UUID"] = self.file_id
output["Part Size"] = f"{self.part_size // 1024**2} MiB"
output["Unencrypted file size"] = self.unencrypted_size
Expand Down Expand Up @@ -124,24 +129,38 @@ def serialize(self, output_path: Path):
os.chmod(path=output_path, mode=0o400)

@classmethod
def load(cls, input_path: Path, selected_alias: str):
def load(cls, input_path: Path, selected_alias: str, selected_bucket: str):
mephenor marked this conversation as resolved.
Show resolved Hide resolved
"""Load metadata from serialized file"""
with input_path.open("r") as infile:
data = json.load(infile)

# Support for older file uploads without explicit storage alias
# Support for older file uploads without explicit storage alias or bucket id
# Ingest the configured selected alias if none can be found in the metadata
try:
storage_alias = data["Storage alias"]
except KeyError:
LOG.warning(
"Could not find storage alias in metadata, populating with configured alias '%s' instead.",
selected_alias,
)
storage_alias = selected_alias
try:
bucket_id = data["Bucket ID"]
except KeyError:
LOG.warning(
"Could not find bucket ID in metadata, populating with configured alias '%s' instead.",
mephenor marked this conversation as resolved.
Show resolved Hide resolved
selected_bucket,
)
bucket_id = selected_bucket

file_id = data["File UUID"]
part_size = int(data["Part Size"].rpartition(" MiB")[0]) * 1024**2

return OutputMetadata(
alias=data["Alias"],
original_path=data["Original filesystem path"],
file_id=file_id,
bucket_id=bucket_id,
object_id=file_id,
part_size=part_size,
secret_id=data["Symmetric file encryption secret ID"],
Expand All @@ -157,6 +176,7 @@ def to_upload_metadata(self, file_id: str):
"""Convert internal output file representation to request model"""
return Metadata(
file_id=file_id,
bucket_id=self.bucket_id,
object_id=self.object_id,
part_size=self.part_size,
unencrypted_size=self.unencrypted_size,
Expand Down Expand Up @@ -205,24 +225,38 @@ def serialize(self, output_path: Path):
os.chmod(path=output_path, mode=0o400)

@classmethod
def load(cls, input_path: Path, selected_alias: str):
def load(cls, input_path: Path, selected_alias: str, selected_bucket: str):
mephenor marked this conversation as resolved.
Show resolved Hide resolved
"""Load metadata from serialized file"""
with input_path.open("r") as infile:
data = json.load(infile)

# Support for older file uploads without explicit storage alias
# Support for older file uploads without explicit storage alias or bucket id
# Ingest the configured selected alias if none can be found in the metadata
try:
storage_alias = data["Storage alias"]
except KeyError:
LOG.warning(
"Could not find storage alias in metadata, populating with configured alias '%s' instead.",
selected_alias,
)
storage_alias = selected_alias
try:
bucket_id = data["Bucket ID"]
except KeyError:
LOG.warning(
"Could not find bucket ID in metadata, populating with configured alias '%s' instead.",
TheByronHimes marked this conversation as resolved.
Show resolved Hide resolved
selected_bucket,
)
bucket_id = selected_bucket

file_id = data["File UUID"]
part_size = int(data["Part Size"].rpartition(" MiB")[0]) * 1024**2

return LegacyOutputMetadata(
alias=data["Alias"],
original_path=data["Original filesystem path"],
file_id=file_id,
bucket_id=bucket_id,
object_id=file_id,
part_size=part_size,
file_secret=data["Symmetric file encryption secret"],
Expand All @@ -238,6 +272,7 @@ def to_upload_metadata(self, file_id: str):
"""Convert internal output file representation to request model"""
return LegacyMetadata(
file_id=file_id,
bucket_id=self.bucket_id,
object_id=self.object_id,
part_size=self.part_size,
unencrypted_size=self.unencrypted_size,
Expand Down
3 changes: 2 additions & 1 deletion src/ghga_datasteward_kit/s3_upload/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ async def validate_and_transfer_content(
config=config,
unencrypted_file_size=file_size,
storage_cleaner=storage_cleaner,
debug_mode=config.debug,
)
await uploader.encrypt_and_upload()

Expand Down Expand Up @@ -163,6 +162,7 @@ async def async_main(input_path: Path, alias: str, config: Config, token: str):
metadata = models.OutputMetadata(
alias=uploader.alias,
file_id=uploader.file_id,
bucket_id=get_bucket_id(config=config),
object_id=uploader.file_id,
original_path=input_path,
part_size=config.part_size,
Expand Down Expand Up @@ -213,6 +213,7 @@ async def legacy_async_main(input_path: Path, alias: str, config: LegacyConfig):
metadata = models.LegacyOutputMetadata(
alias=uploader.alias,
file_id=uploader.file_id,
bucket_id=get_bucket_id(config=config),
object_id=uploader.file_id,
original_path=input_path,
part_size=config.part_size,
Expand Down
8 changes: 1 addition & 7 deletions src/ghga_datasteward_kit/s3_upload/uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,6 @@
httpx_client,
)

MAX_TIMEOUT_DEBUG = (
600 # maximum timeout for upload request used for debugging purposes
)


class UploadTaskHandler:
"""Wraps task scheduling details."""
Expand All @@ -63,14 +59,13 @@ async def gather(self):
class ChunkedUploader:
"""Handler class dealing with upload functionality"""

def __init__( # noqa: PLR0913
def __init__(
self,
input_path: Path,
alias: str,
config: LegacyConfig,
unencrypted_file_size: int,
storage_cleaner: StorageCleaner,
debug_mode: bool,
) -> None:
self.alias = alias
self.config = config
Expand All @@ -80,7 +75,6 @@ def __init__( # noqa: PLR0913
self.unencrypted_file_size = unencrypted_file_size
self.encrypted_file_size = 0
self._storage_cleaner = storage_cleaner
self.debug_mode = debug_mode

async def encrypt_and_upload(self):
"""Delegate encryption and perform multipart upload"""
Expand Down
13 changes: 10 additions & 3 deletions tests/fixtures/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@
),
)

SELECTED_STORAGE_ALIAS = "test"
SELECTED_BUCKET_ID = "selected-test-bucket"
mephenor marked this conversation as resolved.
Show resolved Hide resolved


@dataclass
class IngestFixture:
Expand All @@ -70,11 +73,12 @@ def legacy_ingest_fixture() -> Generator[IngestFixture, None, None]:
keypair = generate_key_pair()

file_path = Path(input_dir) / "test.json"
file_id = "happy_little_object"
file_id = "happy-little-object"

metadata = LegacyOutputMetadata(
alias="test_alias",
file_id=file_id,
bucket_id="test-bucket",
object_id=file_id,
original_path=file_path,
part_size=16 * 1024**2,
Expand All @@ -95,7 +99,8 @@ def legacy_ingest_fixture() -> Generator[IngestFixture, None, None]:
input_dir=Path(input_dir),
map_files_fields=["study_files"],
submission_store_dir=Path(submission_store_dir),
selected_storage_alias="test",
selected_storage_alias=SELECTED_STORAGE_ALIAS,
selected_bucket_id=SELECTED_BUCKET_ID,
)

submission_store = SubmissionStore(config=config)
Expand Down Expand Up @@ -124,6 +129,7 @@ def ingest_fixture() -> Generator[IngestFixture, None, None]:
metadata = OutputMetadata(
alias="test_alias",
file_id=file_id,
bucket_id="test-bucket",
object_id=file_id,
original_path=file_path,
part_size=16 * 1024**2,
Expand All @@ -144,7 +150,8 @@ def ingest_fixture() -> Generator[IngestFixture, None, None]:
input_dir=Path(input_dir),
map_files_fields=["study_files"],
submission_store_dir=Path(submission_store_dir),
selected_storage_alias="test",
selected_storage_alias=SELECTED_STORAGE_ALIAS,
selected_bucket_id=SELECTED_BUCKET_ID,
)

submission_store = SubmissionStore(config=config)
Expand Down
Loading