-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathyoggi.py
221 lines (164 loc) · 6.46 KB
/
yoggi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
from os import getenv, listdir
from os.path import join
from magic import from_buffer, from_file
from requests import get
from json import dumps
from mimetypes import types_map
import string
from urllib.parse import urlencode
from werkzeug.middleware.proxy_fix import ProxyFix
from werkzeug.wrappers import Request, Response
from werkzeug.utils import redirect
import s3
class Auth:
def __init__(self):
self.login_frontend_url = getenv('LOGIN_FRONTEND_URL', 'https://sso.datasektionen.se/legacyapi')
self.login_api_url = getenv('LOGIN_API_URL', 'https://sso.datasektionen.se/legacyapi')
self.api_key = getenv('LOGIN_API_KEY')
self.token_alphabet = set(string.ascii_letters + string.digits + "-_")
def any(self, request, response):
login_url = self.login_frontend_url + '/login?' + urlencode({'callback': request.base_url + '?token='})
token = request.cookies.get('token') or request.args.get('token') or request.form.get('token')
if token == None or token == "":
return redirect(login_url)
response.user = self.validate_user(token)
if not response.user:
# The token might have been invalidated. Clear cookies.
resp = redirect(login_url)
resp.set_cookie('token', max_age=0) # max_age=0 unsets the cookie.
return resp
response.set_cookie('token', token, httponly=True, samesite='Lax')
return response
def validate_user(self, token):
if not self.validate_token(token):
return False
url = '{}/verify/{}'.format(self.login_api_url, token)
params = {'api_key': self.api_key}
response = get(url, params=params)
if response.status_code != 200:
return False
return response.json()['user']
def validate_token(self, token):
for letter in token:
if letter not in self.token_alphabet:
return False
return True
class ListFiles:
def GET(self, request, response):
list_type = request.args.get('list')
if not list_type is None:
response.data = dumps(s3.list(request.path[1:]))
return response
class PlsPermission:
def any(self, request, response):
self.pls_url = getenv('PLS_URL', 'https://pls.datasektionen.se')
response.permissions = self.has_permission(response.user)
def has_permission(self, user):
url = self.pls_url + '/api/user/{}/yoggi/'.format(user)
res = get(url)
return res.json()
class Static:
def __init__(self, path):
self.path = path
self.files = listdir(path)
def GET(self, request, response):
if request.path.endswith('/'):
request.path = '/index.html'
filename = request.path[1:]
if filename not in self.files:
return None
real_path = join(self.path, filename)
response.response = open(real_path, 'rb')
response.mimetype = from_file(real_path)
if response.user:
response.set_cookie('user', response.user)
response.set_cookie('permissions', ', '.join(response.permissions))
return response
class S3Handler:
def has_access(self, response, path):
if not response.user:
return False
if '*' in response.permissions:
return True
path_items = path.split('/')
folder = path_items[0] if len(path_items) > 1 else '~'
if folder == '~':
return True
return folder in response.permissions
def GET(self, request, response):
url = s3.get_url(request.path[1:])
if url:
response = redirect(url)
response.cache_control.max_age = 1800
else:
response.data = 'Cannot GET ' + request.path
response.status_code = 404
return response
def POST(self, request, response):
path = request.path[1:]
folder = path.split('/')
if self.has_access(response, path):
file = request.files['file']
mimetype = from_buffer(file.stream.read(1024), mime=True)
public = request.args.get('public') or "False"
s3.put(path, file, response.user, mimetype, public)
response.data = 'That probably worked...'
else:
response.data = 'Permission denied in this folder.'
response.status_code = 401
return response
def PUT(self, request, response):
path = request.path[1:]
state = request.args.get('public')
if state != "True" and state != "False":
response.data = 'Invalid mapping for "public". Should be "?public=True" or "?public=False"'
response.status_code = 400
return response
if self.has_access(response, path):
s3.put_permissions(path, state)
response.data = 'That probably worked...'
else:
response.data = 'Permission denied in this folder.'
response.status_code = 401
return response
def DELETE(self, request, response):
path = request.path[1:]
folder = path.split('/')
if s3.owner(path) == response.user or self.has_access(response, path):
s3.delete(path)
response.data = 'That probably worked...'
else:
response.data = 'Not allowed'
response.status_code = 401
return response
middlewarez = [
Auth(),
ListFiles(),
PlsPermission(),
Static('build'),
S3Handler()
]
supported_methods = set(["GET", "PUT", "POST", "DELETE"])
@Request.application
def request_handler(request):
response = Response()
if request.method not in supported_methods:
response.data = 'Method Not Allowed'
response.status_code = 405
return response
for middleware in middlewarez:
d = dir(middleware)
if 'any' in d:
finished_response = middleware.any(request, response)
elif request.method in d:
finished_response = middleware.__getattribute__(request.method)(request, response)
if finished_response == None:
continue
elif finished_response.data == b'':
response = finished_response
else:
return finished_response
yoggi = ProxyFix(request_handler)
if __name__ == '__main__':
from werkzeug.serving import run_simple
run_simple('localhost', getenv('PORT') or 5000, request_handler, use_debugger=True, use_reloader=True)