From fafb25549ff69ae3b6c5dd488230a203de9e6dcd Mon Sep 17 00:00:00 2001 From: Thomas Zajac Date: Mon, 3 Jul 2023 18:03:20 +0200 Subject: [PATCH 1/7] Added alias->accession mapping to get meaningful ID (#18) --- ghga_datasteward_kit/__init__.py | 2 +- ghga_datasteward_kit/cli/file.py | 10 +--- ghga_datasteward_kit/file_ingest.py | 62 +++++++++++++------- ghga_datasteward_kit/s3_upload.py | 15 +++-- ingest_config.md | 2 + setup.cfg | 4 +- tests/fixtures/ingest.py | 91 +++++++++++++++++++---------- tests/test_file_ingest.py | 33 +++++++---- tests/test_s3_upload.py | 4 ++ 9 files changed, 140 insertions(+), 83 deletions(-) diff --git a/ghga_datasteward_kit/__init__.py b/ghga_datasteward_kit/__init__.py index 56dbb82..e62b876 100644 --- a/ghga_datasteward_kit/__init__.py +++ b/ghga_datasteward_kit/__init__.py @@ -15,4 +15,4 @@ """A utils package for GHGA data stewards.""" -__version__ = "0.4.0" +__version__ = "0.4.1" diff --git a/ghga_datasteward_kit/cli/file.py b/ghga_datasteward_kit/cli/file.py index 4241275..522d5ad 100644 --- a/ghga_datasteward_kit/cli/file.py +++ b/ghga_datasteward_kit/cli/file.py @@ -66,15 +66,7 @@ def ingest_upload_metadata( ): """Upload all output metdata files from the given directory to the file ingest service""" - def dummy_generator(): - """Placeholder, needs replacement with actual implementation""" - while True: - yield "test_id" - - errors = file_ingest.main( - config_path=config_path, - id_generator=dummy_generator, - ) + errors = file_ingest.main(config_path=config_path) if errors: print(f"Encountered {len(errors)} errors during processing.") diff --git a/ghga_datasteward_kit/file_ingest.py b/ghga_datasteward_kit/file_ingest.py index 4162437..ff96f23 100644 --- a/ghga_datasteward_kit/file_ingest.py +++ b/ghga_datasteward_kit/file_ingest.py @@ -14,17 +14,20 @@ # limitations under the License. """Interaction with file ingest service""" -from itertools import islice from pathlib import Path from typing import Callable import httpx -from pydantic import BaseSettings, Field, ValidationError +from metldata.submission_registry.submission_store import ( + SubmissionStore, + SubmissionStoreConfig, +) +from pydantic import Field, ValidationError from ghga_datasteward_kit import models, utils -class IngestConfig(BaseSettings): +class IngestConfig(SubmissionStoreConfig): """Config options for calling the file ingest endpoint""" file_ingest_url: str = Field( @@ -40,9 +43,30 @@ class IngestConfig(BaseSettings): ) +def alias_to_accession(alias: str, submission_store: SubmissionStore) -> str: + """Get all submissions to retrieve valid accessions from corresponding file aliases""" + + submission_ids = submission_store.get_all_submission_ids() + + all_submission_map = {} + + for submission_id in submission_ids: + all_submission_map.update( + submission_store.get_by_id(submission_id=submission_id).accession_map[ + "files" + ] + ) + + accession = all_submission_map.get(alias) + + if accession is None: + raise ValueError(f"No accession exists for file alias {alias}") + + return accession + + def main( config_path: Path, - id_generator: Callable[[], str], ): """Handle ingestion of a folder of s3 upload file metadata""" @@ -51,23 +75,11 @@ def main( errors = {} - # pre generate paths/ids to make sure generator procudes a sufficient amount of ids - file_paths = [ - file_path - for file_path in config.input_dir.iterdir() - if file_path.suffix == ".json" - ] - file_ids = list(islice(id_generator(), len(file_paths))) - - if len(file_paths) != len(file_ids): - raise ValueError( - "Provided ID generator function does not create the correct amount of IDs." - + f"\nRequired: {len(file_paths)}, generated {len(file_ids)}" - ) - - for in_path, file_id in zip(file_paths, file_ids): + for in_path in config.input_dir.iterdir(): + if in_path.suffix != ".json": + continue try: - file_ingest(in_path=in_path, file_id=file_id, token=token, config=config) + file_ingest(in_path=in_path, token=token, config=config) except (ValidationError, ValueError) as error: errors[in_path.resolve()] = str(error) continue @@ -75,13 +87,21 @@ def main( return errors -def file_ingest(in_path: Path, file_id: str, token: str, config: IngestConfig): +def file_ingest( + in_path: Path, + token: str, + config: IngestConfig, + alias_to_id: Callable[[str, SubmissionStore], str] = alias_to_accession, +): """ Transform from s3 upload output representation to what the file ingest service expects. Then call the ingest endpoint """ + submission_store = SubmissionStore(config=config) + output_metadata = models.OutputMetadata.load(input_path=in_path) + file_id = alias_to_id(output_metadata.alias, submission_store) upload_metadata = output_metadata.to_upload_metadata(file_id=file_id) encrypted = upload_metadata.encrypt_metadata(pubkey=config.file_ingest_pubkey) diff --git a/ghga_datasteward_kit/s3_upload.py b/ghga_datasteward_kit/s3_upload.py index 28cc506..44a2742 100755 --- a/ghga_datasteward_kit/s3_upload.py +++ b/ghga_datasteward_kit/s3_upload.py @@ -35,9 +35,8 @@ import crypt4gh.header # type: ignore import crypt4gh.keys # type: ignore import crypt4gh.lib # type: ignore -import requests # type: ignore +from ghga_connector.core.client import HttpxClientState, httpx_client from ghga_connector.core.file_operations import read_file_parts -from ghga_connector.core.session import RequestsSession from hexkit.providers.s3 import S3Config, S3ObjectStorage # type: ignore from nacl.bindings import crypto_aead_chacha20poly1305_ietf_encrypt from pydantic import BaseSettings, Field, SecretStr, validator @@ -46,15 +45,13 @@ from ghga_datasteward_kit.utils import load_config_yaml -def configure_session() -> requests.Session: +def configure_session(): """Configure session with exponential backoff retry""" - RequestsSession.configure(6) - return RequestsSession.session + HttpxClientState.configure(6) LOGGER = logging.getLogger("s3_upload") PART_SIZE = 16 * 1024**2 -SESSION = configure_session() def expand_env_vars_in_path(path: Path) -> Path: @@ -187,7 +184,8 @@ def _download_parts(self, download_url): ): headers = {"Range": f"bytes={start}-{stop}"} LOGGER.debug("Downloading part number %i. %s", part_no, headers) - response = SESSION.get(download_url, timeout=60, headers=headers) + with httpx_client() as client: + response = client.get(download_url, timeout=60, headers=headers) yield response.content async def download(self): @@ -399,7 +397,8 @@ async def send_part(self, part: bytes, part_number: int): object_id=self.file_id, part_number=part_number, ) - SESSION.put(url=upload_url, data=part) + with httpx_client() as client: + client.put(url=upload_url, data=part) except ( # pylint: disable=broad-except Exception, KeyboardInterrupt, diff --git a/ingest_config.md b/ingest_config.md index d58b649..f746fe5 100644 --- a/ingest_config.md +++ b/ingest_config.md @@ -7,6 +7,8 @@ ## Properties +- **`submission_store_dir`** *(string)*: The directory where the submission JSONs will be stored. + - **`file_ingest_url`** *(string)*: Base URL under which the /ingest endpoint is available. - **`file_ingest_pubkey`** *(string)*: Public key used for encryption of the payload. diff --git a/setup.cfg b/setup.cfg index b686726..bfc04e5 100644 --- a/setup.cfg +++ b/setup.cfg @@ -36,9 +36,9 @@ include_package_data = True packages = find: install_requires = hexkit[mongodb,s3]==0.10.0 - ghga-connector==0.3.3 + ghga-connector==0.3.5 ghga-transpiler==1.0.3 - metldata==0.2.3 + metldata==0.3.6 python_requires = >= 3.9 diff --git a/tests/fixtures/ingest.py b/tests/fixtures/ingest.py index 80d39d4..58310ce 100644 --- a/tests/fixtures/ingest.py +++ b/tests/fixtures/ingest.py @@ -23,10 +23,31 @@ import pytest from ghga_service_commons.utils.crypt import KeyPair, encode_key, generate_key_pair from ghga_service_commons.utils.simple_token import generate_token_and_hash +from ghga_service_commons.utils.utc_dates import now_as_utc +from metldata.submission_registry.models import ( + StatusChange, + Submission, + SubmissionStatus, +) +from metldata.submission_registry.submission_store import SubmissionStore from ghga_datasteward_kit.file_ingest import IngestConfig from ghga_datasteward_kit.models import OutputMetadata +EXAMPLE_SUBMISSION = Submission( + title="test", + description="test", + content={"test_class": [{"alias": "test_alias"}]}, + accession_map={"files": {"test_alias": "test_accession"}}, + id="testsubmission001", + status_history=( + StatusChange( + timestamp=now_as_utc(), + new_status=SubmissionStatus.COMPLETED, + ), + ), +) + @dataclass class IngestFixture: @@ -44,35 +65,41 @@ def ingest_fixture() -> Generator[IngestFixture, None, None]: """Generate necessary data for file ingest.""" with TemporaryDirectory() as input_dir: - token, token_hash = generate_token_and_hash() - keypair = generate_key_pair() - - file_path = Path(input_dir) / "test.json" - - metadata = OutputMetadata( - alias="test", - file_uuid="happy_little_object", - original_path=file_path, - part_size=16 * 1024**2, - unencrypted_size=50 * 1024**2, - encrypted_size=50 * 1024**2 + 128, - file_secret=os.urandom(32), - unencrypted_checksum="def", - encrypted_md5_checksums=["a", "b", "c"], - encrypted_sha256_checksums=["a", "b", "c"], - ) - - metadata.serialize(file_path) - - config = IngestConfig( - file_ingest_url="https://not-a-valid-url", - file_ingest_pubkey=encode_key(keypair.public), - input_dir=Path(input_dir), - ) - yield IngestFixture( - config=config, - file_path=file_path, - token=token, - token_hash=token_hash, - keypair=keypair, - ) + with TemporaryDirectory() as submission_store_dir: + token, token_hash = generate_token_and_hash() + keypair = generate_key_pair() + + file_path = Path(input_dir) / "test.json" + + metadata = OutputMetadata( + alias="test_alias", + file_uuid="happy_little_object", + original_path=file_path, + part_size=16 * 1024**2, + unencrypted_size=50 * 1024**2, + encrypted_size=50 * 1024**2 + 128, + file_secret=os.urandom(32), + unencrypted_checksum="def", + encrypted_md5_checksums=["a", "b", "c"], + encrypted_sha256_checksums=["a", "b", "c"], + ) + + metadata.serialize(file_path) + + config = IngestConfig( + file_ingest_url="https://not-a-valid-url", + file_ingest_pubkey=encode_key(keypair.public), + input_dir=Path(input_dir), + submission_store_dir=Path(submission_store_dir), + ) + + submission_store = SubmissionStore(config=config) + submission_store.insert_new(submission=EXAMPLE_SUBMISSION) + + yield IngestFixture( + config=config, + file_path=file_path, + token=token, + token_hash=token_hash, + keypair=keypair, + ) diff --git a/tests/test_file_ingest.py b/tests/test_file_ingest.py index 5d60d1a..9ee248c 100644 --- a/tests/test_file_ingest.py +++ b/tests/test_file_ingest.py @@ -16,17 +16,32 @@ import pytest import yaml from ghga_service_commons.utils.simple_token import generate_token +from metldata.submission_registry.submission_store import SubmissionStore from pytest_httpx import HTTPXMock +from ghga_datasteward_kit import models from ghga_datasteward_kit.cli.file import ingest_upload_metadata -from ghga_datasteward_kit.file_ingest import file_ingest -from tests.fixtures.ingest import IngestFixture, ingest_fixture # noqa: F401 +from ghga_datasteward_kit.file_ingest import alias_to_accession, file_ingest +from tests.fixtures.ingest import ( # noqa: F401 + EXAMPLE_SUBMISSION, + IngestFixture, + ingest_fixture, +) -def id_generator(): - """Generate dummy IDs.""" - for i in [1, 2]: - yield f"test_{i}" +@pytest.mark.asyncio +async def test_alias_to_accession(ingest_fixture: IngestFixture): # noqa: F811 + """Test alias->accession mapping""" + + submission_store = SubmissionStore(config=ingest_fixture.config) + metadata = models.OutputMetadata.load(input_path=ingest_fixture.file_path) + + accession = alias_to_accession(metadata.alias, submission_store=submission_store) + example_accession = list(EXAMPLE_SUBMISSION.accession_map["files"].values())[0] + assert accession == example_accession + + with pytest.raises(ValueError): + alias_to_accession("invalid_alias", submission_store=submission_store) @pytest.mark.asyncio @@ -40,7 +55,6 @@ async def test_ingest_directly( httpx_mock.add_response(url=ingest_fixture.config.file_ingest_url, status_code=202) file_ingest( in_path=ingest_fixture.file_path, - file_id="test", token=token, config=ingest_fixture.config, ) @@ -53,7 +67,6 @@ async def test_ingest_directly( with pytest.raises(ValueError, match="Unauthorized"): file_ingest( in_path=ingest_fixture.file_path, - file_id="test", token=token, config=ingest_fixture.config, ) @@ -68,7 +81,6 @@ async def test_ingest_directly( ): file_ingest( in_path=ingest_fixture.file_path, - file_id="test", token=token, config=ingest_fixture.config, ) @@ -87,9 +99,10 @@ async def test_main( config = ingest_fixture.config.dict() config["input_dir"] = str(config["input_dir"]) + config["submission_store_dir"] = str(config["submission_store_dir"]) with config_path.open("w") as config_file: - yaml.dump(config, config_file) + yaml.safe_dump(config, config_file) monkeypatch.setattr("ghga_datasteward_kit.utils.read_token", generate_token) diff --git a/tests/test_s3_upload.py b/tests/test_s3_upload.py index b8e7ebd..c62005a 100644 --- a/tests/test_s3_upload.py +++ b/tests/test_s3_upload.py @@ -19,6 +19,7 @@ from pathlib import Path import pytest +from ghga_connector.core.client import HttpxClientState from ghga_service_commons.utils.temp_files import big_temp_file # type: ignore from hexkit.providers.s3.testutils import ( # type: ignore config_from_localstack_container, @@ -36,6 +37,9 @@ @pytest.mark.asyncio async def test_process(config_fixture: Config): # noqa: F811 """Test whole upload/download process for s3_upload script""" + + HttpxClientState.configure(3) + with LocalStackContainer(image="localstack/localstack:0.14.2").with_services( "s3" ) as localstack: From 2e814f2f5d4df36470316b673a3b61c0726dc297 Mon Sep 17 00:00:00 2001 From: Thomas Zajac Date: Tue, 4 Jul 2023 10:53:19 +0200 Subject: [PATCH 2/7] Moved retry configuration to correct location Changed put argument from data to content (httpx warning) --- ghga_datasteward_kit/__init__.py | 2 +- ghga_datasteward_kit/s3_upload.py | 11 ++++------- tests/test_s3_upload.py | 3 --- 3 files changed, 5 insertions(+), 11 deletions(-) diff --git a/ghga_datasteward_kit/__init__.py b/ghga_datasteward_kit/__init__.py index e62b876..b5299a8 100644 --- a/ghga_datasteward_kit/__init__.py +++ b/ghga_datasteward_kit/__init__.py @@ -15,4 +15,4 @@ """A utils package for GHGA data stewards.""" -__version__ = "0.4.1" +__version__ = "0.4.2" diff --git a/ghga_datasteward_kit/s3_upload.py b/ghga_datasteward_kit/s3_upload.py index 44a2742..71921fd 100755 --- a/ghga_datasteward_kit/s3_upload.py +++ b/ghga_datasteward_kit/s3_upload.py @@ -44,12 +44,6 @@ from ghga_datasteward_kit import models from ghga_datasteward_kit.utils import load_config_yaml - -def configure_session(): - """Configure session with exponential backoff retry""" - HttpxClientState.configure(6) - - LOGGER = logging.getLogger("s3_upload") PART_SIZE = 16 * 1024**2 @@ -398,7 +392,7 @@ async def send_part(self, part: bytes, part_number: int): part_number=part_number, ) with httpx_client() as client: - client.put(url=upload_url, data=part) + client.put(url=upload_url, content=part) except ( # pylint: disable=broad-except Exception, KeyboardInterrupt, @@ -526,6 +520,9 @@ async def async_main(input_path: Path, alias: str, config: Config): file_size = input_path.stat().st_size check_adjust_part_size(config=config, file_size=file_size) + # set retry policy + HttpxClientState.configure(5) + uploader = ChunkedUploader( input_path=input_path, alias=alias, diff --git a/tests/test_s3_upload.py b/tests/test_s3_upload.py index c62005a..f938035 100644 --- a/tests/test_s3_upload.py +++ b/tests/test_s3_upload.py @@ -19,7 +19,6 @@ from pathlib import Path import pytest -from ghga_connector.core.client import HttpxClientState from ghga_service_commons.utils.temp_files import big_temp_file # type: ignore from hexkit.providers.s3.testutils import ( # type: ignore config_from_localstack_container, @@ -38,8 +37,6 @@ async def test_process(config_fixture: Config): # noqa: F811 """Test whole upload/download process for s3_upload script""" - HttpxClientState.configure(3) - with LocalStackContainer(image="localstack/localstack:0.14.2").with_services( "s3" ) as localstack: From 01e8f9685d185e9d7139c9242767a0e768ffb38c Mon Sep 17 00:00:00 2001 From: Thomas Zajac Date: Thu, 6 Jul 2023 13:51:44 +0200 Subject: [PATCH 3/7] Made accession map lookup field name configurable * Made accession map lookup field name configurable * Multiple file fields * Updated config --- ghga_datasteward_kit/__init__.py | 2 +- ghga_datasteward_kit/file_ingest.py | 27 +++++++++++++++++++-------- ingest_config.md | 4 ++++ tests/fixtures/ingest.py | 3 ++- tests/test_file_ingest.py | 25 ++++++++++++++++++++++--- 5 files changed, 48 insertions(+), 13 deletions(-) diff --git a/ghga_datasteward_kit/__init__.py b/ghga_datasteward_kit/__init__.py index b5299a8..09fd669 100644 --- a/ghga_datasteward_kit/__init__.py +++ b/ghga_datasteward_kit/__init__.py @@ -15,4 +15,4 @@ """A utils package for GHGA data stewards.""" -__version__ = "0.4.2" +__version__ = "0.4.3" diff --git a/ghga_datasteward_kit/file_ingest.py b/ghga_datasteward_kit/file_ingest.py index ff96f23..8ef60b6 100644 --- a/ghga_datasteward_kit/file_ingest.py +++ b/ghga_datasteward_kit/file_ingest.py @@ -41,9 +41,16 @@ class IngestConfig(SubmissionStoreConfig): description="Path to directory containing output files from the " + "upload/batch_upload command.", ) + map_files_fields: list[str] = Field( + ["study_files"], + description="Names of the accession map fields for looking up the" + + " alias->accession mapping.", + ) -def alias_to_accession(alias: str, submission_store: SubmissionStore) -> str: +def alias_to_accession( + alias: str, map_fields: list[str], submission_store: SubmissionStore +) -> str: """Get all submissions to retrieve valid accessions from corresponding file aliases""" submission_ids = submission_store.get_all_submission_ids() @@ -51,11 +58,13 @@ def alias_to_accession(alias: str, submission_store: SubmissionStore) -> str: all_submission_map = {} for submission_id in submission_ids: - all_submission_map.update( - submission_store.get_by_id(submission_id=submission_id).accession_map[ - "files" - ] - ) + submission = submission_store.get_by_id(submission_id=submission_id) + for field in map_fields: + if field not in submission.accession_map: + raise ValueError( + f"Configured field {field} not found in accession map." + ) + all_submission_map.update(submission.accession_map[field]) accession = all_submission_map.get(alias) @@ -91,7 +100,7 @@ def file_ingest( in_path: Path, token: str, config: IngestConfig, - alias_to_id: Callable[[str, SubmissionStore], str] = alias_to_accession, + alias_to_id: Callable[[str, list[str], SubmissionStore], str] = alias_to_accession, ): """ Transform from s3 upload output representation to what the file ingest service expects. @@ -101,7 +110,9 @@ def file_ingest( submission_store = SubmissionStore(config=config) output_metadata = models.OutputMetadata.load(input_path=in_path) - file_id = alias_to_id(output_metadata.alias, submission_store) + file_id = alias_to_id( + output_metadata.alias, config.map_files_fields, submission_store + ) upload_metadata = output_metadata.to_upload_metadata(file_id=file_id) encrypted = upload_metadata.encrypt_metadata(pubkey=config.file_ingest_pubkey) diff --git a/ingest_config.md b/ingest_config.md index f746fe5..c77f91a 100644 --- a/ingest_config.md +++ b/ingest_config.md @@ -14,3 +14,7 @@ - **`file_ingest_pubkey`** *(string)*: Public key used for encryption of the payload. - **`input_dir`** *(string)*: Path to directory containing output files from the upload/batch_upload command. + +- **`map_files_fields`** *(array)*: Names of the accession map fields for looking up the alias->accession mapping. Default: `['study_files']`. + + - **Items** *(string)* diff --git a/tests/fixtures/ingest.py b/tests/fixtures/ingest.py index 58310ce..d80ca13 100644 --- a/tests/fixtures/ingest.py +++ b/tests/fixtures/ingest.py @@ -38,7 +38,7 @@ title="test", description="test", content={"test_class": [{"alias": "test_alias"}]}, - accession_map={"files": {"test_alias": "test_accession"}}, + accession_map={"study_files": {"test_alias": "test_accession"}}, id="testsubmission001", status_history=( StatusChange( @@ -90,6 +90,7 @@ def ingest_fixture() -> Generator[IngestFixture, None, None]: file_ingest_url="https://not-a-valid-url", file_ingest_pubkey=encode_key(keypair.public), input_dir=Path(input_dir), + map_files_fields=["study_files"], submission_store_dir=Path(submission_store_dir), ) diff --git a/tests/test_file_ingest.py b/tests/test_file_ingest.py index 9ee248c..9f41a37 100644 --- a/tests/test_file_ingest.py +++ b/tests/test_file_ingest.py @@ -36,12 +36,31 @@ async def test_alias_to_accession(ingest_fixture: IngestFixture): # noqa: F811 submission_store = SubmissionStore(config=ingest_fixture.config) metadata = models.OutputMetadata.load(input_path=ingest_fixture.file_path) - accession = alias_to_accession(metadata.alias, submission_store=submission_store) - example_accession = list(EXAMPLE_SUBMISSION.accession_map["files"].values())[0] + accession = alias_to_accession( + alias=metadata.alias, + map_fields=ingest_fixture.config.map_files_fields, + submission_store=submission_store, + ) + example_accession = list( + EXAMPLE_SUBMISSION.accession_map[ + ingest_fixture.config.map_files_fields[0] + ].values() + )[0] assert accession == example_accession with pytest.raises(ValueError): - alias_to_accession("invalid_alias", submission_store=submission_store) + alias_to_accession( + alias="invalid_alias", + map_fields=ingest_fixture.config.map_files_fields, + submission_store=submission_store, + ) + + with pytest.raises(ValueError): + alias_to_accession( + alias=metadata.alias, + map_fields=["study_files", "sample_files"], + submission_store=submission_store, + ) @pytest.mark.asyncio From 2432e5f0d09f59bdd2d87bf0b51be3a176f01f06 Mon Sep 17 00:00:00 2001 From: Thomas Zajac Date: Fri, 14 Jul 2023 10:28:28 +0200 Subject: [PATCH 4/7] Moved some connector function into DS kit to remove dependency (#21) --- ghga_datasteward_kit/__init__.py | 2 +- ghga_datasteward_kit/s3_upload.py | 54 +++++++++++++++++++++++++++++-- setup.cfg | 2 +- 3 files changed, 53 insertions(+), 5 deletions(-) diff --git a/ghga_datasteward_kit/__init__.py b/ghga_datasteward_kit/__init__.py index 09fd669..227d057 100644 --- a/ghga_datasteward_kit/__init__.py +++ b/ghga_datasteward_kit/__init__.py @@ -15,4 +15,4 @@ """A utils package for GHGA data stewards.""" -__version__ = "0.4.3" +__version__ = "0.4.4" diff --git a/ghga_datasteward_kit/s3_upload.py b/ghga_datasteward_kit/s3_upload.py index 71921fd..715539a 100755 --- a/ghga_datasteward_kit/s3_upload.py +++ b/ghga_datasteward_kit/s3_upload.py @@ -25,18 +25,18 @@ import os import subprocess # nosec import sys +from contextlib import contextmanager from functools import partial from io import BufferedReader from pathlib import Path from time import time -from typing import Generator +from typing import Generator, Iterator from uuid import uuid4 import crypt4gh.header # type: ignore import crypt4gh.keys # type: ignore import crypt4gh.lib # type: ignore -from ghga_connector.core.client import HttpxClientState, httpx_client -from ghga_connector.core.file_operations import read_file_parts +import httpx from hexkit.providers.s3 import S3Config, S3ObjectStorage # type: ignore from nacl.bindings import crypto_aead_chacha20poly1305_ietf_encrypt from pydantic import BaseSettings, Field, SecretStr, validator @@ -405,6 +405,54 @@ async def send_part(self, part: bytes, part_number: int): raise exc +class HttpxClientState: + """Helper class to make max_retries user configurable""" + + max_retries: int + + @classmethod + def configure(cls, max_retries: int): + """Configure client with exponential backoff retry (using httpx's 0.5 default)""" + + # can't be negative - should we log this? + cls.max_retries = max(0, max_retries) + + +@contextmanager +def httpx_client(): + """Yields a context manager httpx client and closes it afterward""" + + transport = httpx.HTTPTransport(retries=HttpxClientState.max_retries) + + with httpx.Client(transport=transport) as client: + yield client + + +def read_file_parts( + file: BufferedReader, *, part_size: int, from_part: int = 1 +) -> Iterator[bytes]: + """ + Returns an iterator to iterate through file parts of the given size (in bytes). + + By default it start with the first part but you may also start from a specific part + in the middle of the file using the `from_part` argument. This might be useful to + resume an interrupted reading process. + + Please note: opening and closing of the file MUST happen outside of this function. + """ + + initial_offset = part_size * (from_part - 1) + file.seek(initial_offset) + + while True: + file_part = file.read(part_size) + + if len(file_part) == 0: + return + + yield file_part + + def objectstorage(config: Config): """Configure S3 and return S3 DAO""" s3_config = S3Config( diff --git a/setup.cfg b/setup.cfg index bfc04e5..6ddd7cb 100644 --- a/setup.cfg +++ b/setup.cfg @@ -35,8 +35,8 @@ zip_safe = False include_package_data = True packages = find: install_requires = + crypt4gh==1.6 hexkit[mongodb,s3]==0.10.0 - ghga-connector==0.3.5 ghga-transpiler==1.0.3 metldata==0.3.6 From fbbeafc1ab3b2f06191dc05dcbfdc40a151e68dc Mon Sep 17 00:00:00 2001 From: Christoph Zwerschke Date: Wed, 19 Jul 2023 16:29:23 +0200 Subject: [PATCH 5/7] Relax requirements (#22) --- .devcontainer/devcontainer.json | 122 ++++++++++++++-------------- .devcontainer/docker-compose.yml | 51 ------------ .github/workflows/pypi_publish.yaml | 6 +- ghga_datasteward_kit/__init__.py | 2 +- requirements-dev.txt | 3 +- setup.cfg | 4 +- 6 files changed, 69 insertions(+), 119 deletions(-) diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 5695b0c..95a6240 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -2,68 +2,68 @@ // https://github.com/microsoft/vscode-dev-containers/tree/v0.177.0/containers/python-3-postgres // Update the VARIANT arg in docker-compose.yml to pick a Python version: 3, 3.8, 3.7, 3.6 { - "name": "Data Steward scripts", + "name": "Data Steward Kit", "dockerComposeFile": "docker-compose.yml", "service": "app", "workspaceFolder": "/workspace", - // Set *default* container specific settings.json values on container create. - "settings": { - "terminal.integrated.shell.linux": "/bin/bash", - "sqltools.connections": [ - { - "name": "Container database", - "driver": "PostgreSQL", - "previewLimit": 50, - "server": "localhost", - "port": 5432, - "database": "postgres", - "username": "postgres", - "password": "postgres" - } - ], - "python.pythonPath": "/usr/local/bin/python", - "python.languageServer": "Pylance", - "python.linting.enabled": true, - "python.linting.pylintEnabled": true, - "python.formatting.autopep8Path": "/usr/local/py-utils/bin/autopep8", - "python.formatting.blackPath": "/usr/local/py-utils/bin/black", - "python.formatting.yapfPath": "/usr/local/py-utils/bin/yapf", - "python.formatting.provider": "black", - "editor.formatOnSave": true, - "python.linting.banditPath": "/usr/local/py-utils/bin/bandit", - "python.linting.mypyPath": "/usr/local/py-utils/bin/mypy", - "python.linting.pycodestylePath": "/usr/local/py-utils/bin/pycodestyle", - "python.linting.pydocstylePath": "/usr/local/py-utils/bin/pydocstyle", - "python.linting.pylintPath": "/usr/local/py-utils/bin/pylint", - "python.testing.pytestPath": "/usr/local/py-utils/bin/pytest", - "python.testing.unittestEnabled": false, - "python.testing.pytestEnabled": true, - "editor.renderWhitespace": "all", - "editor.rulers": [ - 88 - ], - "licenser.license": "AL2", - "licenser.author": "Universität Tübingen, DKFZ and EMBL\nfor the German Human Genome-Phenome Archive (GHGA)", + "customizations": { + "vscode": { + // Set *default* container specific settings.json values on container create. + "settings": { + "terminal.integrated.profiles.linux": { + "bash": { + "path": "/bin/bash" + } + }, + "python.pythonPath": "/usr/local/bin/python", + "python.languageServer": "Pylance", + "python.linting.enabled": true, + "python.linting.pylintEnabled": true, + "python.formatting.autopep8Path": "/usr/local/py-utils/bin/autopep8", + "python.formatting.blackPath": "/usr/local/py-utils/bin/black", + "python.formatting.yapfPath": "/usr/local/py-utils/bin/yapf", + "python.formatting.provider": "black", + "python.analysis.typeCheckingMode": "basic", + "python.linting.banditPath": "/usr/local/py-utils/bin/bandit", + "python.linting.mypyPath": "/usr/local/py-utils/bin/mypy", + "python.linting.pycodestylePath": "/usr/local/py-utils/bin/pycodestyle", + "python.linting.pydocstylePath": "/usr/local/py-utils/bin/pydocstyle", + "python.linting.pylintPath": "/usr/local/py-utils/bin/pylint", + "python.testing.pytestPath": "/usr/local/py-utils/bin/pytest", + "python.testing.unittestEnabled": false, + "python.testing.pytestEnabled": true, + "editor.formatOnSave": true, + "editor.renderWhitespace": "all", + "editor.rulers": [ + 88 + ], + "licenser.license": "Custom", + "licenser.customHeaderFile": "/workspace/.devcontainer/license_header.txt" + }, + // Add the IDs of extensions you want installed when the container is created. + "extensions": [ + "mikestead.dotenv", + "ms-azuretools.vscode-docker", + "ms-python.black-formatter", + "ms-python.python", + "ms-python.isort", + "ms-python.vscode-pylance", + "ms-toolsai.jupyter", + "vtenentes.bdd", + "njpwerner.autodocstring", + "redhat.vscode-yaml", + "42crunch.vscode-openapi", + "arjun.swagger-viewer", + "eamodio.gitlens", + "github.vscode-pull-request-github", + "streetsidesoftware.code-spell-checker", + "yzhang.markdown-all-in-one", + "visualstudioexptteam.vscodeintellicode", + "ymotongpoo.licenser", + "editorconfig.editorconfig" + ] + } }, - // Add the IDs of extensions you want installed when the container is created. - "extensions": [ - "ms-python.python", - "ms-python.vscode-pylance", - "mtxr.sqltools", - "mtxr.sqltools-driver-pg", - "42crunch.vscode-openapi", - "eamodio.gitlens", - "formulahendry.terminal", - "tyriar.terminal-tabs", - "alexcvzz.vscode-sqlite", - "njpwerner.autodocstring", - "arjun.swagger-viewer", - "ms-toolsai.jupyter", - "redhat.vscode-yaml", - "ymotongpoo.licenser", - "ms-azuretools.vscode-docker", - "EditorConfig.EditorConfig" - ], // Use 'forwardPorts' to make a list of ports inside the container available locally. // "forwardPorts": [5000, 5432], // Use 'postCreateCommand' to run commands after the container is created. @@ -71,9 +71,11 @@ // Comment out connect as root instead. More info: https://aka.ms/vscode-remote/containers/non-root. "remoteUser": "vscode", "features": { - "docker-in-docker": { + "ghcr.io/devcontainers/features/docker-in-docker:2": { "version": "latest", - "moby": true + "enableNonRootDocker": "true", + "moby": true, + "azureDnsAutoDetection": false } } } diff --git a/.devcontainer/docker-compose.yml b/.devcontainer/docker-compose.yml index 693e90e..a56c9b2 100644 --- a/.devcontainer/docker-compose.yml +++ b/.devcontainer/docker-compose.yml @@ -27,54 +27,3 @@ services: user: vscode # Use "forwardPorts" in **devcontainer.json** to forward an app port locally. # (Adding the "ports" property to this file will not forward from a Codespace.) - - - # Please uncomment, add, or remove services as needed: - - # postgresql: - # image: postgres:latest - # restart: unless-stopped - # volumes: - # - postgres_fs:/var/lib/postgresql/data - # environment: - # POSTGRES_USER: postgres - # POSTGRES_DB: postgres - # POSTGRES_PASSWORD: postgres - # # Add "forwardPorts": ["5432"] to **devcontainer.json** to forward PostgreSQL locally. - # # (Adding the "ports" property to this file will not forward from a Codespace.) - - # rabbitmq: - # image: rabbitmq:3-management - # ports: - # - 5672:5672 - # - 15672:15672 - - # localstack: - # image: localstack/localstack - # ports: - # - "4566:4566" - # environment: - # SERVICES: s3 - # DEFAULT_REGION: eu-west-1 - # AWS_DEFAULT_REGION: eu-west-1 - # # accessible at localhost - # HOSTNAME_EXTERNAL: localhost - # USE_SSL: "false" - # DATA_DIR: /tmp/localstack/data - # DEBUG: 1 - # volumes: - # - type: volume - # source: s3_fs - # target: /tmp/localstack - # volume: - # nocopy: true - # mongodb: - # image: mongo:5.0.4 - # restart: unless-stopped - # volumes: - # - mongo_fs:/data/db - - # volumes: - # postgres_fs: {} - # s3_fs: {} - # mongo_fs: {} diff --git a/.github/workflows/pypi_publish.yaml b/.github/workflows/pypi_publish.yaml index 07d9f65..029e60f 100644 --- a/.github/workflows/pypi_publish.yaml +++ b/.github/workflows/pypi_publish.yaml @@ -61,12 +61,12 @@ jobs: pytest . - name: Publish distribution package to PyPI (test) - uses: pypa/gh-action-pypi-publish@master + uses: pypa/gh-action-pypi-publish@release/v1 with: password: ${{ secrets.TEST_PYPI_API_TOKEN }} - repository_url: https://test.pypi.org/legacy/ + repository-url: https://test.pypi.org/legacy/ - name: Publish distribution package to PyPI (production) - uses: pypa/gh-action-pypi-publish@master + uses: pypa/gh-action-pypi-publish@release/v1 with: password: ${{ secrets.PYPI_API_TOKEN }} diff --git a/ghga_datasteward_kit/__init__.py b/ghga_datasteward_kit/__init__.py index 227d057..3b0c193 100644 --- a/ghga_datasteward_kit/__init__.py +++ b/ghga_datasteward_kit/__init__.py @@ -15,4 +15,4 @@ """A utils package for GHGA data stewards.""" -__version__ = "0.4.4" +__version__ = "0.4.5" diff --git a/requirements-dev.txt b/requirements-dev.txt index 7e16927..46f9b65 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -2,5 +2,4 @@ -r requirements-dev-common.txt -# additional requirements can be listed here -testcontainers[kafka,mongo,postgresql]==3.4.2 +testcontainers==3.4.1 diff --git a/setup.cfg b/setup.cfg index 6ddd7cb..fa20166 100644 --- a/setup.cfg +++ b/setup.cfg @@ -36,8 +36,8 @@ include_package_data = True packages = find: install_requires = crypt4gh==1.6 - hexkit[mongodb,s3]==0.10.0 - ghga-transpiler==1.0.3 + hexkit[s3]>=0.10,<0.11 + ghga-transpiler>=1.0.3,<1.1 metldata==0.3.6 python_requires = >= 3.9 From 8a983b703b443d9fe2bf304eb1b7a667060caf87 Mon Sep 17 00:00:00 2001 From: Christoph Zwerschke Date: Wed, 26 Jul 2023 12:52:10 +0200 Subject: [PATCH 6/7] Update metldata (#23) --- ghga_datasteward_kit/__init__.py | 2 +- setup.cfg | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ghga_datasteward_kit/__init__.py b/ghga_datasteward_kit/__init__.py index 3b0c193..8e22b3d 100644 --- a/ghga_datasteward_kit/__init__.py +++ b/ghga_datasteward_kit/__init__.py @@ -15,4 +15,4 @@ """A utils package for GHGA data stewards.""" -__version__ = "0.4.5" +__version__ = "0.4.6" diff --git a/setup.cfg b/setup.cfg index fa20166..5c6e640 100644 --- a/setup.cfg +++ b/setup.cfg @@ -38,7 +38,7 @@ install_requires = crypt4gh==1.6 hexkit[s3]>=0.10,<0.11 ghga-transpiler>=1.0.3,<1.1 - metldata==0.3.6 + metldata>=0.3.7,<0.4 python_requires = >= 3.9 From 2154b930550b02ef0bb7f32de91582270512425e Mon Sep 17 00:00:00 2001 From: Seyit Zor Date: Wed, 26 Jul 2023 17:55:43 +0200 Subject: [PATCH 7/7] Update the s3_upload module to run as script (#24) * update s3_upload to run as the main program * bump version number --- ghga_datasteward_kit/__init__.py | 2 +- ghga_datasteward_kit/s3_upload.py | 12 +++++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/ghga_datasteward_kit/__init__.py b/ghga_datasteward_kit/__init__.py index 8e22b3d..36c1238 100644 --- a/ghga_datasteward_kit/__init__.py +++ b/ghga_datasteward_kit/__init__.py @@ -15,4 +15,4 @@ """A utils package for GHGA data stewards.""" -__version__ = "0.4.6" +__version__ = "0.4.7" diff --git a/ghga_datasteward_kit/s3_upload.py b/ghga_datasteward_kit/s3_upload.py index 715539a..1d0c482 100755 --- a/ghga_datasteward_kit/s3_upload.py +++ b/ghga_datasteward_kit/s3_upload.py @@ -37,6 +37,7 @@ import crypt4gh.keys # type: ignore import crypt4gh.lib # type: ignore import httpx +import typer from hexkit.providers.s3 import S3Config, S3ObjectStorage # type: ignore from nacl.bindings import crypto_aead_chacha20poly1305_ietf_encrypt from pydantic import BaseSettings, Field, SecretStr, validator @@ -612,7 +613,11 @@ async def async_main(input_path: Path, alias: str, config: Config): metadata.serialize(output_path) -def main(input_path, alias: str, config_path: Path): +def main( + input_path: Path = typer.Option(..., help="Local path of the input file"), + alias: str = typer.Option(..., help="A human readable file alias"), + config_path: Path = typer.Option(..., help="Path to a config YAML."), +): """ Custom script to encrypt data using Crypt4GH and directly uploading it to S3 objectstorage. @@ -620,3 +625,8 @@ def main(input_path, alias: str, config_path: Path): config = load_config_yaml(config_path, Config) asyncio.run(async_main(input_path=input_path, alias=alias, config=config)) + + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) + typer.run(main)