-
Notifications
You must be signed in to change notification settings - Fork 6
/
s3-bucket-list
executable file
·121 lines (110 loc) · 4.12 KB
/
s3-bucket-list
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#!/usr/bin/env python3
import html
import http.client
import os
import shlex
import sys
import urllib.parse
# Arguments
i = 1
withListUrls = False
listUrlsFD = None
startMarker = None
format = '{url}'
args = []
while i < len(sys.argv):
arg = sys.argv[i]
if arg == '--help':
print('s3-bucket-list [options] BUCKETURL', file = sys.stderr)
print('', file = sys.stderr)
print('Options:', file = sys.stderr)
print(f' --format FORMAT Modify the output format; FORMAT defaults to {format!r}; available fields: url, key, size, and all fields returned by S3 (e.g. LastModified)', file = sys.stderr)
print( ' --marker KEY Start after a particular key instead of from the beginning', file = sys.stderr)
print( ' --with-list-urls Enables printing the list URLs retrieved to FD 3', file = sys.stderr)
sys.exit(1)
elif arg == '--with-list-urls':
withListUrls = True
try:
listUrlsFD = os.fdopen(3, 'w')
except OSError:
print('Error: FD 3 not open', file = sys.stderr)
sys.exit(1)
elif arg == '--marker':
startMarker = sys.argv[i + 1]
i += 1
elif arg == '--format':
format = sys.argv[i + 1]
i += 1
else:
args.append(arg)
i += 1
assert len(args) == 1, 'Need one argument: bucket URL'
baseUrl = args[0]
assert baseUrl.startswith('http://') or baseUrl.startswith('https://'), 'Argument does not look like an HTTP URL'
if '/' not in baseUrl.split('://', 1)[1] or not baseUrl.endswith('/'):
baseUrl = f'{baseUrl}/'
hostname = baseUrl.split('://', 1)[1].split('/', 1)[0]
conn = http.client.HTTPSConnection(hostname)
params = {}
if startMarker is not None:
params['marker'] = startMarker
attempt = 1
while True:
queryString = urllib.parse.urlencode(params)
url = f'{baseUrl}{"?" + queryString if queryString else ""}'
if withListUrls:
print(f'{url}', file = listUrlsFD)
conn.request('GET', url[url.index('/', 8):])
resp = conn.getresponse()
body = resp.read()
if b'<Error><Code>InternalError</Code><Message>We encountered an internal error. Please try again.</Message>' in body:
print(f'Got internal error on {url} on attempt {attempt}; {"retrying" if attempt < 10 else "aborting"}', file = sys.stderr)
if attempt >= 10:
if 'marker' in params:
print(f'To retry, use --marker {shlex.quote(params["marker"])}', file = sys.stderr)
break
attempt += 1
continue
if not body.startswith(b'<?xml version="1.0" encoding="UTF-8"?>\n<ListBucketResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">'):
raise RuntimeError(f'Invalid body: {body[:200]}...')
if b'<Marker></Marker>' in body[:200] and 'marker' in params:
raise RuntimeError('Marker loop (empty marker in response despite providing one)')
# No risk, no fun!
contents = body.split(b'<Contents>')
assert all(content.startswith(b'<Key>') for content in contents[1:])
assert all(content.endswith(b'</Contents>') for content in contents[1:-1])
assert contents[-1].endswith(b'</Contents></ListBucketResult>')
contents[-1] = contents[-1][:-len('</ListBucketResult>')]
for content in contents[1:]:
key = html.unescape(content[5 : content.index(b'</Key>')].decode('utf-8')) # 5 = len(b'<Key>')
url = f'{baseUrl}{urllib.parse.quote(key)}'
tags = content.split(b'>')
assert len(tags) % 2 == 0
assert tags[-1] == b''
assert tags[-2] == b'</Contents'
openTags = [] # Current open tag hierarchy
fields = {}
for tag in tags[:-2]:
if tag.startswith(b'<'):
openTags.append(tag[1:])
continue
assert openTags
if tag.endswith(b'</' + openTags[-1]):
fields[b'>'.join(openTags).decode('utf-8')] = html.unescape(tag[:-(len(openTags[-1]) + 2)].decode('utf-8'))
openTags.pop()
continue
assert False
size = int(fields['Size']) if 'Size' in fields else None
try:
print(format.format(**fields, key = key, url = url, size = size))
except BrokenPipeError:
sys.exit(0)
lastKey = key
truncated = True if b'<IsTruncated>true</IsTruncated>' in body else (False if b'<IsTruncated>false</IsTruncated>' in body else None)
assert truncated in (True, False)
if not truncated:
break
if 'marker' in params and params['marker'] == lastKey:
raise RuntimeError('Marker loop (same last key as previous marker)')
params['marker'] = lastKey
attempt = 1