Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement multipart copy and copying a particular version #308

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 99 additions & 2 deletions src/AWSS3.jl
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ end
"""
s3_copy([::AbstractAWSConfig], bucket, path; acl::AbstractString="",
to_bucket=bucket, to_path=path, metadata::AbstractDict=SSDict(),
parse_response::Bool=true, kwargs...)
parse_response::Bool=true, version=nothing, kwargs...)

Copy the object at `path` in `bucket` to `to_path` in `to_bucket`.

Expand All @@ -464,6 +464,8 @@ Copy the object at `path` in `bucket` to `to_path` in `to_bucket`.
See [here](https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html#canned-acl).
- `metadata::Dict=`; `x-amz-meta-` headers.
- `parse_response::Bool=`; when `false`, return raw `AWS.Response`
- `version=`; when not `nothing`, the specific `versionId` of the source object to copy,
otherwise the latest version is copied.
- `kwargs`; additional kwargs passed through into `S3.copy_object`

# API Calls
Expand All @@ -483,6 +485,7 @@ function s3_copy(
to_bucket=bucket,
to_path=path,
metadata::AbstractDict=SSDict(),
version::AbstractS3Version=nothing,
parse_response::Bool=true,
kwargs...,
)
Expand All @@ -495,10 +498,15 @@ function s3_copy(
headers["x-amz-acl"] = acl
end

source = "$bucket/$path"
if version !== nothing
source *= "&versionId=" * escapeuri(version)
end

response = S3.copy_object(
to_bucket,
to_path,
"$bucket/$path",
source,
Dict("headers" => headers);
aws_config=aws,
kwargs...,
Comment on lines 507 to 512
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[JuliaFormatter] reported by reviewdog 🐶

Suggested change
to_bucket,
to_path,
"$bucket/$path",
source,
Dict("headers" => headers);
aws_config=aws,
kwargs...,
to_bucket, to_path, source, Dict("headers" => headers); aws_config=aws, kwargs...

Expand Down Expand Up @@ -1075,6 +1083,31 @@ function s3_upload_part(
return get_robust_case(Dict(response.headers), "ETag")
end

function s3_upload_part_copy(
aws::AbstractAWSConfig,
source,
upload,
part_number,
byte_range;
args=Dict{String,Any}(),
kwargs...,
)
args["x-amz-copy-source-range"] = string(first(byte_range), '-', last(byte_range))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
args["x-amz-copy-source-range"] = string(first(byte_range), '-', last(byte_range))
args["x-amz-copy-source-range"] = string("bytes=", first(byte_range), '-', last(byte_range))

https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPartCopy.html#API_UploadPartCopy_RequestSyntax

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
args["x-amz-copy-source-range"] = string(first(byte_range), '-', last(byte_range))
headers = Dict(
"x-amz-copy-source-range" => string(
"bytes=", first(byte_range), '-', last(byte_range)
)
)
mergewith!(_merge, args, Dict("headers" => headers))

Otherwise it gets added as a query parameter rather than a header


response = S3.upload_part_copy(
upload["Bucket"],
upload["Key"],
part_number,
upload["UploadId"],
source,
args;
aws_config=aws,
kwargs...,
)

return get_robust_case(Dict(response.headers), "ETag")
end

function s3_complete_multipart_upload(
aws::AbstractAWSConfig,
upload,
Expand Down Expand Up @@ -1145,6 +1178,70 @@ function s3_multipart_upload(
return s3_complete_multipart_upload(aws, upload, tags; parse_response, kwargs...)
end

"""
s3_multipart_copy(aws::AbstractAWSConfig, bucket, path; to_bucket=bucket, to_path=path,
part_size_mb=50, version=nothing, parse_response::Bool=true,
kwargs...)

Copy the object at `path` in `bucket` to `to_path` in `to_bucket` using a
[multipart copy](https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpuoverview.html).

# Optional Arguments
- `part_size_mb`: maximum size per uploaded part, in mebibytes (MiB).
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it's worth exposing an option that allows matching the part size between the source and destination. IIUC, that should make the range-based accesses faster while copying. If a file is big enough for a multipart copy, it was probably uploaded with a multipart upload, in which case the parts and their sizes can be obtained with S3.get_object_attributes. Lacking that permission, one can also get the part size with S3.head_object by passing Dict("partNumber" => 1) as a query parameter, and the number of parts will be in the entity tag of the source object.

- `file_size_mb`: size in mebibytes of the object to copy. If `nothing`, the size is
determined using `s3_get_meta`.
- `version`: when not `nothing`, the specific `versionId` of the source object to copy,
otherwise the latest version is copied.
- `parse_response`: when `false`, return raw `AWS.Response`
- `kwargs`: additional kwargs passed through to `s3_upload_part_copy` and `s3_complete_multipart_upload`

# API Calls

- [`HeadObject`](https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html) (if `file_size_mb` is not provided)
- [`CreateMultipartUpload`](https://docs.aws.amazon.com/AmazonS3/latest/API/API_CreateMultipartUpload.html)
- [`UploadPartCopy`](https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPartCopy.html)
- [`CompleteMultipartUpload`](https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html)

# Permissions

- [`s3:PutObject`](https://docs.aws.amazon.com/service-authorization/latest/reference/list_amazons3.html#amazons3-PutObject)
- [`s3:GetObject`](https://docs.aws.amazon.com/service-authorization/latest/reference/list_amazons3.html#amazons3-GetObject)
"""
function s3_multipart_copy(
aws::AbstractAWSConfig,
bucket,
path;
to_bucket=bucket,
to_path=path,
part_size_mb::Integer=50,
file_size_mb::Union{Integer,Nothing}=nothing,
version::AbstractS3Version=nothing,
parse_response::Bool=true,
kwargs...,
)
if file_size_mb === nothing
file_meta = s3_get_meta(aws, bucket, path; version)
file_size = parse(Int, get_robust_case(file_meta, "Content-Length"))
else
file_size = file_size_mb * 1024 * 1024
end

part_size = part_size_mb * 1024 * 1024

source = bucket * '/' * path
if version !== nothing
source *= "&versionId=" * version
end

upload = s3_begin_multipart_upload(aws, bucket, path)
tags = map(enumerate(0:part_size:file_size)) do (part, byte_offset)
byte_range = byte_offset:min(byte_offset + part_size - 1, file_size)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
byte_range = byte_offset:min(byte_offset + part_size - 1, file_size)
byte_range = byte_offset:(min(byte_offset + part_size, file_size) - 1)

Since it's 0-based

return s3_upload_part_copy(aws, source, upload, part, byte_range; kwargs...)
end

return s3_complete_multipart_upload(aws, upload, tags; parse_response, kwargs...)
end

using MbedTLS

function _s3_sign_url_v2(
Expand Down
54 changes: 54 additions & 0 deletions src/s3path.jl
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,60 @@ function Base.write(
end
end

const MiB = 1024 * 1024
const GiB = MiB * 1024

"""
Base.cp(src::S3Path, dest::S3Path; multipart::Bool=true, part_size_mb=50)

Copy the object at `src` to `dest`.

A multipart copy is used when `src` is 5 GiB or larger. A multipart copy can also be
requested for objects larger than `part_size_mb` MiB by setting `multipart=true`.
When `multipart=true` and `src` is larger than
`part_size_mb` MiB, or when `src` is larger than 5 GiB, copying is performed using
[`s3_multipart_copy`](@ref). Otherwise, [`s3_copy`](@ref) is used.
"""
function Base.cp(src::S3Path, dest::S3Path; multipart::Bool=true, part_size_mb=50)
if src.config !== nothing && dest.config !== nothing && src.config != dest.config
# Avoid breaking the case where the source and destination use different credentials
# by directly invoking the method that used to be called before this more specific
# method was added
return invoke(cp, Tuple{AbstractPath,AbstractPath}, src, dest)
end
config = @something(src.config, dest.config, global_aws_config())
head = s3_get_meta(config, src.bucket, src.key; src.version)
size = parse(Int, get_robust_case(head, "Content-Length"))
if (multipart && size > part_size * MiB) || size >= 5 * GiB
response = s3_multipart_copy(
config,
src.bucket,
src.key;
to_bucket=dest.bucket,
to_path=dest.key,
version=src.version,
part_size_mb,
file_size_mb=div(size, MiB),
)
else
response = s3_copy(
config,
src.bucket,
src.key;
to_bucket=dest.bucket,
to_path=dest.key,
version=src.version,
)
end
return S3Path(
dest.bucket,
dest.key;
dest.isdirectory,
dest.config,
version=HTTP.header(response.headers, "x-amz-version-id", nothing),
)
end

function FilePathsBase.mktmpdir(parent::S3Path)
fp = parent / string(uuid4(), "/")
return mkdir(fp)
Expand Down
20 changes: 20 additions & 0 deletions test/awss3.jl
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,26 @@ function awss3_tests(base_config)
@test isa(result, AWS.Response)
end

@testset "Multi-Part Copy" begin
config = assume_testset_role("MultipartUploadTestset"; base_config)
MIN_S3_CHUNK_SIZE = 5 * 1024 * 1024 # 5 MiB
src_key_name = "multi-part-key"
dest_key_name = "multi-part-key-copy"
result = s3_multipart_copy(
config,
bucket_name,
src_key_name;
to_bucket=bucket_name,
to_path=dest_key_name,
part_size_mb=MIN_S3_CHUNK_SIZE,
)
@test s3_exists(config, bucket_name, dest_key_name)
@test isa(result, LittleDict)
src_bytes = s3_get(config, bucket_name, src_key_name; raw=true)
dest_bytes = s3_get(config, bucket_name, dest_key_name; raw=true)
@test src_bytes == dest_bytes
end

# these tests are needed because lack of functionality of the underlying AWS API makes certain
# seemingly inane tasks incredibly tricky: for example checking if an "object" (file or
# directory) exists is very subtle
Expand Down
Loading