From 249e2c2782fbba7ca45bf54eafdb1f0b578e38b0 Mon Sep 17 00:00:00 2001 From: Eric Hanson <5846501+ericphanson@users.noreply.github.com> Date: Tue, 27 Jul 2021 20:50:05 +0200 Subject: [PATCH] wip --- src/AWSS3.jl | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/src/AWSS3.jl b/src/AWSS3.jl index 5a29893c..2e815ded 100644 --- a/src/AWSS3.jl +++ b/src/AWSS3.jl @@ -667,23 +667,35 @@ function s3_complete_multipart_upload(aws::AbstractAWSConfig, upload, parts::Vec end -function s3_multipart_upload(aws::AbstractAWSConfig, bucket, path, io::IO, part_size_mb=50; kwargs...) +function s3_multipart_upload(aws::AbstractAWSConfig, bucket, path, io::IO, part_size_mb=50; backend = AWS.DEFAULT_BACKEND[], n_concurrent_uploads = 1, kwargs...) part_size = part_size_mb * 1024 * 1024 - upload = s3_begin_multipart_upload(aws, bucket, path) + upload = s3_begin_multipart_upload(aws, bucket, path, LittleDict("backend" => backend)) tags = Vector{String}() - buf = Vector{UInt8}(undef, part_size) + bufs = [ Vector{UInt8}(undef, part_size) for _ = 1:n_concurrent_uploads] + + other_kwargs = ( (k => v) for (k, v) in pairs(kwargs) if k !== :args) + args = get(kwargs, :args, LittleDict{String, Any}()) + args["backend"] = backend i = 0 - while (n = readbytes!(io, buf, part_size)) > 0 - if n < part_size - resize!(buf, n) + n = 1 + futures = Task[] + sizehint!(futures, n_concurrent_uploads) + while n > 0 + for buf in bufs + n = readbytes!(io, buf, part_size) + n == 0 && break + if n < part_size + resize!(buf, n) + end + push!(futures, @async(s3_upload_part(aws, upload, (i += 1), buf; args, other_kwargs...))) end - - push!(tags, s3_upload_part(aws, upload, (i += 1), buf; kwargs...)) + append!(tags, fetch.(futures)) + empty!(futures) end - s3_complete_multipart_upload(aws, upload, tags; kwargs...) + s3_complete_multipart_upload(aws, upload, tags, LittleDict("backend" => backend); kwargs...) end using MbedTLS