Skip to content

Commit

Permalink
faster s3 upload/download
Browse files Browse the repository at this point in the history
  • Loading branch information
b8raoult committed Jun 19, 2024
1 parent 1150845 commit 3622dc2
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 15 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ dynamic = [
"version",
]
dependencies = [
"anemoi-utils[provenance]>=0.3",
"anemoi-utils[provenance]>=0.3.4",
"numpy",
"pyyaml",
"semantic-version",
Expand Down
75 changes: 61 additions & 14 deletions src/anemoi/datasets/commands/copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from concurrent.futures import as_completed

import tqdm
from anemoi.utils.s3 import download
from anemoi.utils.s3 import upload

from . import Command

Expand All @@ -22,26 +24,49 @@
except AttributeError:
isatty = False

"""

~/.aws/credentials
class S3Downloader:
def __init__(self, source, target, transfers, overwrite, resume, progress, **kwargs):
self.source = source
self.target = target
self.transfers = transfers
self.overwrite = overwrite
self.resume = resume
self.progress = progress

[default]
endpoint_url = https://object-store.os-api.cci1.ecmwf.int
aws_access_key_id=xxx
aws_secret_access_key=xxxx
def run(self):
download(
self.source + "/" if not self.source.endswith("/") else self.source,
self.target,
overwrite=self.overwrite,
ignore_existing=self.resume,
threads=self.transfers,
show_progress=self.progress,
)

Then:

anemoi-datasets copy aifs-ea-an-oper-0001-mars-o96-1979-2022-1h-v3.zarr/
s3://ml-datasets/stable/aifs-ea-an-oper-0001-mars-o96-1979-2022-1h-v3.zarr
class S3Uploader:
def __init__(self, source, target, transfers, overwrite, resume, progress, **kwargs):
self.source = source
self.target = target
self.transfers = transfers
self.overwrite = overwrite
self.resume = resume
self.progress = progress

zinfo https://object-store.os-api.cci1.ecmwf.int/
ml-datasets/stable/aifs-ea-an-oper-0001-mars-o96-1979-2022-1h-v3.zarr
"""
def run(self):
upload(
self.source,
self.target,
self.transfers,
overwrite=self.overwrite,
ignore_existing=self.resume,
threads=self.transfers,
show_progress=self.progress,
)


class Copier:
class DefaultCopier:
def __init__(self, source, target, transfers, block_size, overwrite, resume, progress, nested, rechunk, **kwargs):
self.source = source
self.target = target
Expand Down Expand Up @@ -295,7 +320,29 @@ def add_arguments(self, command_parser):
command_parser.add_argument("target", help="Target location.")

def run(self, args):
Copier(**vars(args)).run()
if args.source == args.target:
raise ValueError("Source and target are the same.")

kwargs = vars(args)

if args.overwrite and args.resume:
raise ValueError("Cannot use --overwrite and --resume together.")

source_in_s3 = args.source.startswith("s3://")
target_in_s3 = args.target.startswith("s3://")

copier = None

if args.rechunk or (source_in_s3 and target_in_s3):
copier = DefaultCopier(**kwargs)
else:
if source_in_s3:
copier = S3Downloader(**kwargs)

if target_in_s3:
copier = S3Uploader(**kwargs)

copier.run()


class Copy(CopyMixin, Command):
Expand Down

0 comments on commit 3622dc2

Please sign in to comment.