-
Notifications
You must be signed in to change notification settings - Fork 361
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
add write_get_object_response (#364)
- Loading branch information
1 parent
0881767
commit 99de76d
Showing
5 changed files
with
186 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
# -*- coding: utf-8 -*- | ||
import os | ||
import oss2 | ||
import logging | ||
import json | ||
|
||
# Specify access information, such as AccessKeyId, AccessKeySecret, and Endpoint. | ||
# You can obtain access information from evironment variables or replace sample values in the code, such as <your AccessKeyId> with actual values. | ||
# | ||
# For example, if your bucket is located in the China (Hangzhou) region, you can set Endpoint to one of the following values: | ||
# http://oss-cn-hangzhou.aliyuncs.com | ||
# https://oss-cn-hangzhou.aliyuncs.com | ||
|
||
|
||
# access_key_id = os.getenv('OSS_TEST_ACCESS_KEY_ID', '<yourAccessKeyId>') | ||
# access_key_secret = os.getenv('OSS_TEST_ACCESS_KEY_SECRET', '<yourAccessKeySecret>') | ||
# bucket_name = os.getenv('OSS_TEST_BUCKET', '<yourBucketName>') | ||
# endpoint = os.getenv('OSS_TEST_ENDPOINT', '<yourEndpoint>') | ||
|
||
access_key_id = os.getenv('OSS_TEST_ACCESS_KEY_ID', '<yourAccessKeyId>') | ||
access_key_secret = os.getenv('OSS_TEST_ACCESS_KEY_SECRET', '<yourAccessKeySecret>') | ||
endpoint = '' | ||
|
||
route = 'test-ap-zxl-process-name-43-1283******516515-opap.oss-cn-beijing-internal.oss-object-process.aliyuncs.com' | ||
token = 'OSSV1#UMoA43+Bi9b6Q1Lu6UjhLXnmq4I/wIFac3uZfBkgJtg2xtHkZJ4bZglDWyOgWRlGTrA8y/i6D9eH8PmAiq2NL2R/MD/UX6zvRhT8WMHUewgc9QWPs9LPHiZytkUZnGa39mnv/73cyPWTuxgxyk4dNhlzEE6U7PdzmCCu8gIrjuYLPrA9psRn0ZC8J2/DCZGVx0BE7AmIJTcNtLKTSjxsJyTts/******' | ||
fwd_status = '200' | ||
content = 'a' * 1024 | ||
|
||
# Fc function entry | ||
def handler(event, context): | ||
|
||
headers = dict() | ||
headers['x-oss-fwd-header-Content-Type'] = 'application/octet-stream' | ||
headers['x-oss-fwd-header-ETag'] = 'testetag' | ||
|
||
logger = logging.getLogger() | ||
logger.info(event) | ||
logger.info("enter request") | ||
evt = json.loads(event) | ||
event_ctx = evt["getObjectContext"] | ||
route = event_ctx["outputRoute"] | ||
token = event_ctx["outputToken"] | ||
print('outputRoute: '+route) | ||
print('outputToken: '+token) | ||
|
||
endpoint = route | ||
|
||
service = oss2.Service(oss2.Auth(access_key_id, access_key_secret), endpoint) | ||
resp = service.write_get_object_response(route, token, fwd_status, content, headers) | ||
|
||
logger.info(resp) | ||
logger.info("end request") | ||
return 'success' | ||
|
||
|
||
|
||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
from .common import * | ||
import tempfile | ||
|
||
class TestWriteGetObjectResponse(OssTestCase): | ||
def test_write_get_object_response_normal(self): | ||
|
||
# The fc function returns route | ||
route = "test-ap-zxl-process-name-43-1283******516515-opap.oss-cn-beijing-internal.oss-object-process.aliyuncs.com" | ||
# The fc function returns token | ||
token = "CAISgQJ1q6Ft5B2yfSjIr5DFE/7HiepJj6ugYUv5jzdgO8tnhZLNtjz2IHxMdHJsCeAcs/Q0lGFR5/sflrEtFsUUHBDqPJNewMh8qCr7PqOb45eewOBcYi8hpLLKWXDBx8b3T7jTbrG0I4WACT3tkit03sGJF1GLVECkNpukkINuas9tMCCzcTtBAqU9RGIg0rh4U0HcLvGwKBXnr3PNBU5zwGpGhHh49L60z7/3iHOcriWjkL9O%2Bdioesb4MpAyYc4iabrvgrwqLJim%2BTVL9h1H%2BJ1xiKF54jrdtrmfeQINu0Xdb7qEqIA/d18oP/lnRrQmtvH5kuZjpuHIi5RmB9P71C6/ORqAAS1lAA7hnd1vUHQxjsCeTLBccf9wgeDefOqht46RyE5pxroD0XyHG1Jj/25HuSvGcwGUafW7hLCup3x4wzeL9aDOX%2BE6Pd4yPqQrk6%2BX%2BXYyFWxhxVXrUyQ18MkgI65mDkY8pN9Jysg2sdTjxwxAkfzqTf62DVuCuaAzktvLps18IAA%3D&Expires=1697191658&OSSAccessKeyId=STS.NSpXDsd5h8iKcmHk757DKjWfT&Signature******" | ||
|
||
fwd_status = '200' | ||
content = 'a' * 1024 | ||
|
||
headers = dict() | ||
headers['x-oss-fwd-header-Content-Type'] = 'application/octet-stream' | ||
headers['x-oss-fwd-header-ETag'] = 'testetag' | ||
|
||
try: | ||
service = oss2.Service(oss2.Auth(OSS_ID, OSS_SECRET), route) | ||
# testing is only supported in FC | ||
result = service.write_get_object_response(route, token, fwd_status, content, headers) | ||
except oss2.exceptions.RequestError as e: | ||
pass | ||
except oss2.exceptions.ClientError as e2: | ||
pass | ||
|
||
def test_write_get_object_response_file_like(self): | ||
|
||
# The fc function returns route | ||
route = "test-ap-zxl-process-name-43-1283******516515-opap.oss-cn-beijing-internal.oss-object-process.aliyuncs.com" | ||
# The fc function returns token | ||
token = "CAISgQJ1q6Ft5B2yfSjIr5DFE/7HiepJj6ugYUv5jzdgO8tnhZLNtjz2IHxMdHJsCeAcs/Q0lGFR5/sflrEtFsUUHBDqPJNewMh8qCr7PqOb45eewOBcYi8hpLLKWXDBx8b3T7jTbrG0I4WACT3tkit03sGJF1GLVECkNpukkINuas9tMCCzcTtBAqU9RGIg0rh4U0HcLvGwKBXnr3PNBU5zwGpGhHh49L60z7/3iHOcriWjkL9O%2Bdioesb4MpAyYc4iabrvgrwqLJim%2BTVL9h1H%2BJ1xiKF54jrdtrmfeQINu0Xdb7qEqIA/d18oP/lnRrQmtvH5kuZjpuHIi5RmB9P71C6/ORqAAS1lAA7hnd1vUHQxjsCeTLBccf9wgeDefOqht46RyE5pxroD0XyHG1Jj/25HuSvGcwGUafW7hLCup3x4wzeL9aDOX%2BE6Pd4yPqQrk6%2BX%2BXYyFWxhxVXrUyQ18MkgI65mDkY8pN9Jysg2sdTjxwxAkfzqTf62DVuCuaAzktvLps18IAA%3D&Expires=1697191658&OSSAccessKeyId=STS.NSpXDsd5h8iKcmHk757DKjWfT&Signature******" | ||
|
||
fwd_status = '200' | ||
|
||
headers = dict() | ||
headers['x-oss-fwd-header-Content-Type'] = 'application/octet-stream' | ||
headers['x-oss-fwd-header-ETag'] = 'testetag' | ||
|
||
try: | ||
service = oss2.Service(oss2.Auth(OSS_ID, OSS_SECRET), route) | ||
# testing is only supported in FC | ||
# 通过with语句创建临时文件,with会自动关闭临时文件 | ||
with tempfile.TemporaryFile() as fp: | ||
fp.write(b'hello world!') | ||
fp.seek(0) | ||
|
||
f = tempfile.TemporaryFile() | ||
f.write(b' hello world2!') | ||
f.seek(0) | ||
|
||
resp = service.write_get_object_response(route, token, fwd_status, f, headers) | ||
except oss2.exceptions.RequestError as e: | ||
pass | ||
except oss2.exceptions.ClientError as e2: | ||
pass | ||
|
||
|
||
if __name__ == '__main__': | ||
unittest.main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters