From f3924042c69d5e982236646ce2a0d2e58875e8a0 Mon Sep 17 00:00:00 2001 From: liyanzhang505 Date: Fri, 8 Nov 2019 17:53:29 +0800 Subject: [PATCH] Update delete objects xml escape. --- oss2/utils.py | 21 ++++++++++++++ oss2/xml_utils.py | 37 +++++++++++++------------ tests/test_object.py | 11 ++++++++ tests/test_object_versioning.py | 49 +++++++++++++++++++++++++++++++++ 4 files changed, 101 insertions(+), 17 deletions(-) diff --git a/oss2/utils.py b/oss2/utils.py index 8c80d702..9dee6e44 100644 --- a/oss2/utils.py +++ b/oss2/utils.py @@ -803,3 +803,24 @@ def to_str(pos): return str(pos) return to_str(start) + '-' + to_str(last) + +def xml_escape(key): + if key is None: + return None + + new_key = "" + escape_keys = { + '\r' : ' ', + '\'' :'"', + '&' : '&', + '<' : '<', + '>' : '>' + } + + for i in range(0, len(key)): + if escape_keys.get(key[i]) is not None: + new_key += escape_keys.get(key[i]) + else: + new_key += key[i] + + return new_key diff --git a/oss2/xml_utils.py b/oss2/xml_utils.py index 5d7b7530..d4e70863 100644 --- a/oss2/xml_utils.py +++ b/oss2/xml_utils.py @@ -711,34 +711,37 @@ def to_complete_upload_request(parts): return _node_to_string(root) - def to_batch_delete_objects_request(keys, quiet): - root_node = ElementTree.Element('Delete') - - _add_text_child(root_node, 'Quiet', str(quiet).lower()) + xml_body = '''''' + xml_body += "" + xml_body += "" + str(quiet).lower() + "" for key in keys: - object_node = ElementTree.SubElement(root_node, 'Object') - _add_text_child(object_node, 'Key', key) - - return _node_to_string(root_node) + new_key = utils.xml_escape(to_unicode(key)) + object_body = "" + "" + new_key + "" + "" + xml_body += object_body -def to_batch_delete_objects_version_request(objectVersions, quiet): + xml_body += "" - root_node = ElementTree.Element('Delete') + return xml_body - _add_text_child(root_node, 'Quiet', str(quiet).lower()) +def to_batch_delete_objects_version_request(objectVersions, quiet): + xml_body = '''''' + xml_body += "" + xml_body += "" + str(quiet).lower() + "" objectVersionList = objectVersions.object_version_list - for ver in objectVersionList: - object_node = ElementTree.SubElement(root_node, 'Object') - _add_text_child(object_node, 'Key', ver.key) - if ver.versionid != '': - _add_text_child(object_node, 'VersionId', ver.versionid) + new_key = utils.xml_escape(to_unicode(ver.key)) + object_body = "" + object_body += "" + new_key + "" + object_body += "" + ver.versionid + "" + object_body += "" + xml_body += object_body - return _node_to_string(root_node) + xml_body += "" + return xml_body def to_put_bucket_config(bucket_config): root = ElementTree.Element('CreateBucketConfiguration') diff --git a/tests/test_object.py b/tests/test_object.py index 10df80ff..918d148f 100644 --- a/tests/test_object.py +++ b/tests/test_object.py @@ -387,6 +387,17 @@ def test_batch_delete_objects(self): for object in object_list: self.assertTrue(not self.bucket.object_exists(object)) + def test_batch_delete_objects_specificalChars(self): + object_name = "测试中文python-bswodnvsttpqvnzwsgifetwe\n\n\t@#$!\t<>!@#$%^&*()-=" + for i in range(1, 32): + object_name += chr(i) + + result = self.bucket.put_object(object_name, "123") + + key_list = [object_name] + self.bucket.batch_delete_objects(key_list) + self.assertFalse(self.bucket.object_exists(object_name)) + def test_batch_delete_objects_empty(self): try: self.bucket.batch_delete_objects([]) diff --git a/tests/test_object_versioning.py b/tests/test_object_versioning.py index 9c097bcc..8e5f4f19 100644 --- a/tests/test_object_versioning.py +++ b/tests/test_object_versioning.py @@ -1025,6 +1025,55 @@ def test_delete_object_versions_with_invalid_arguments(self): self.assertRaises(oss2.exceptions.ClientError, bucket.delete_object_versions, None) self.assertRaises(oss2.exceptions.ClientError, bucket.delete_object_versions, []) + def test_batch_delete_verions_with_specifical_chars(self): + + from oss2.models import BucketVersioningConfig + from oss2.models import BatchDeleteObjectVersion + from oss2.models import BatchDeleteObjectVersionList + + auth = oss2.Auth(OSS_ID, OSS_SECRET) + bucket_name = OSS_BUCKET + "-test-batch-delete-versions-spec-chars" + bucket = oss2.Bucket(auth, self.endpoint, bucket_name) + + bucket.create_bucket(oss2.BUCKET_ACL_PRIVATE) + + wait_meta_sync() + + config = BucketVersioningConfig() + config.status = 'Enabled' + + result = bucket.put_bucket_versioning(config) + + wait_meta_sync() + + result = bucket.get_bucket_info() + + object_name = "测试中文python-bswodnvsttpqvnzwsgifetwe\n\n\t@#$!\t<>!@#$%^&*()-=" + for i in range(1, 32): + object_name += chr(i) + + # put version 1 + result = bucket.put_object(object_name, "123") + self.assertEqual(int(result.status)/100, 2) + self.assertTrue(result.versionid != "") + versionid1 = result.versionid + + version_list = BatchDeleteObjectVersionList() + version_list.append(BatchDeleteObjectVersion(key=object_name, versionid=versionid1)) + + self.assertTrue(version_list.len(), 1) + + result = bucket.delete_object_versions(version_list) + + self.assertTrue(len(result.delete_versions) == 1) + self.assertTrue(result.delete_versions[0].versionid == versionid1) + + self.assertFalse(bucket.object_exists(object_name)) + + try: + bucket.delete_bucket() + except: + self.assertFalse(True, "should not get a exception") if __name__ == '__main__': unittest.main()