How to stub S3.Object.wait_until_exists ? #2719
-
I have been tasked with writing tests for an s3 uploading function which uses S3.Object.wait_until_exists to wait for upload to complete and get the content length of the upload to return it. But so far I am failing to stub I have explored and found the waiter has two acceptors:
I don't know how to explain in text more so instead here is an MRE. from datetime import datetime
from io import BytesIO
import boto3
import botocore
import botocore.stub
testing_bucket = "bucket"
testing_key = "key/of/object"
testing_data = b"data"
s3 = boto3.resource("s3")
def put():
try:
o = s3.Object(testing_bucket, testing_key)
o.load() # head_object * 1
except botocore.exceptions.ClientError as e:
if e.response["Error"]["Code"] == "NoSuchKey":
etag = ""
else:
raise e
else:
etag = o.e_tag
try:
o.upload_fileobj(BytesIO(testing_data)) # put_object * 1
except botocore.exceptions.ClientError as e:
raise e
else:
o.wait_until_exists(IfNoneMatch=etag) # head_object * n until accepted
return o.content_length # not sure if calling head_object again
with botocore.stub.Stubber(s3.meta.client) as s3_stub:
s3_stub.add_response(
method="head_object",
service_response={
"ETag": "fffffffe",
"ContentLength": 0,
},
expected_params={
"Bucket": testing_bucket,
"Key": testing_key,
},
)
s3_stub.add_response(
method="put_object",
service_response={},
expected_params={
"Bucket": testing_bucket,
"Key": testing_key,
"Body": botocore.stub.ANY,
},
)
s3_stub.add_response( # cause time to increase by 5 seconds per response
method="head_object",
service_response={
"ETag": "ffffffff",
"AcceptRanges": "bytes",
"ContentLength": len(testing_data),
"LastModified": datetime.now(),
"Metadata": {},
"VersionId": "null",
},
expected_params={
"Bucket": testing_bucket,
"Key": testing_key,
"IfNoneMatch": "fffffffe",
},
)
print(put()) # should print 4 And running the above gives:
Or with 2 answer, same thing with |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment
-
Managed to solve on my own after a fair bit of testing by adding a |
Beta Was this translation helpful? Give feedback.
Managed to solve on my own after a fair bit of testing by adding a
"ResponseMetadata": {"HTTPStatusCode": 200},
to the response.