Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for cloudflare's R2 storage #888

Merged
merged 12 commits into from
Sep 30, 2024
58 changes: 30 additions & 28 deletions s3fs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ def __init__(
asynchronous=False,
loop=None,
max_concurrency=1,
fixed_upload_size: bool = False,
arogozhnikov marked this conversation as resolved.
Show resolved Hide resolved
**kwargs,
):
if key and username:
Expand Down Expand Up @@ -333,6 +334,7 @@ def __init__(
self.cache_regions = cache_regions
self._s3 = None
self.session = session
self.fixed_upload_size = fixed_upload_size
if max_concurrency < 1:
raise ValueError("max_concurrency must be >= 1")
self.max_concurrency = max_concurrency
Expand Down Expand Up @@ -2331,41 +2333,41 @@ def _upload_chunk(self, final=False):
and self.tell() < self.blocksize
):
# only happens when closing small file, use on-shot PUT
data1 = False
pass
else:
self.buffer.seek(0)
(data0, data1) = (None, self.buffer.read(self.blocksize))

while data1:
(data0, data1) = (data1, self.buffer.read(self.blocksize))
data1_size = len(data1)

if 0 < data1_size < self.blocksize:
remainder = data0 + data1
remainder_size = self.blocksize + data1_size
def upload_part(part_data: bytes):
if len(part_data) == 0:
return
part = len(self.parts) + 1
logger.debug("Upload chunk %s, %s" % (self, part))

if remainder_size <= self.part_max:
(data0, data1) = (remainder, None)
else:
partition = remainder_size // 2
(data0, data1) = (remainder[:partition], remainder[partition:])
out = self._call_s3(
"upload_part",
Bucket=bucket,
PartNumber=part,
UploadId=self.mpu["UploadId"],
Body=part_data,
Key=key,
)

part = len(self.parts) + 1
logger.debug("Upload chunk %s, %s" % (self, part))
part_header = {"PartNumber": part, "ETag": out["ETag"]}
if "ChecksumSHA256" in out:
part_header["ChecksumSHA256"] = out["ChecksumSHA256"]
self.parts.append(part_header)

out = self._call_s3(
"upload_part",
Bucket=bucket,
PartNumber=part,
UploadId=self.mpu["UploadId"],
Body=data0,
Key=key,
)
def n_bytes_left() -> int:
return len(self.buffer.getbuffer()) - self.buffer.tell()

part_header = {"PartNumber": part, "ETag": out["ETag"]}
if "ChecksumSHA256" in out:
part_header["ChecksumSHA256"] = out["ChecksumSHA256"]
self.parts.append(part_header)
min_chunk = 1 if final else self.blocksize
if self.fs.fixed_upload_size:
# all chunks have fixed size, exception: last one can be smaller
while n_bytes_left() >= min_chunk:
arogozhnikov marked this conversation as resolved.
Show resolved Hide resolved
upload_part(self.buffer.read(self.blocksize))
else:
while n_bytes_left() >= min_chunk:
upload_part(self.buffer.read(self.part_max))

if self.autocommit and final:
self.commit()
Expand Down
Loading