-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathwebserver.py
310 lines (225 loc) · 9.01 KB
/
webserver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
#!/usr/bin/env python3
from flask import Flask, session, render_template, abort, request, flash, url_for, redirect, g
from main import *
import dns.resolver
from config import *
from flask_sqlalchemy import SQLAlchemy
from flask_babel import Babel, gettext
def get_locale():
# otherwise try to guess the language from the user accept
# header the browser transmits. We support de/fr/en in this
# example. The best match wins.
return request.accept_languages.best_match(['de', 'fr', 'en'])
app = Flask(__name__)
app.secret_key = FLASK_SECRET_KEY
# This is necessary, since sqlalchemy-flask seems to
# expect things in a subfolder called "instance" normally now.
SQLITE_URI = 'sqlite:///'+ app.root_path + '/foo.db'
app.config['SQLALCHEMY_DATABASE_URI'] = SQLITE_URI
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['BABEL_DEFAULT_LOCALE'] = 'en'
babel = Babel(app, locale_selector=get_locale)
sqlalchemy = SQLAlchemy(app)
def get_db():
global sqlalchemy
return sqlalchemy.session
@app.errorhandler(404)
def page_not_found(e):
# note that we set the 404 status explicitly
return render_template('404.html'), 404
@app.route('/')
def hello_world():
return render_template("index.html")
@app.route('/node/<nodeid>')
def node(nodeid):
db = get_db()
nodeset = NodeSet()
nodeset.update_from_db(db)
node = nodeset.find_by_nodeid(nodeid)
if not node:
flash('Error: Node with nodeid ' + nodeid + " not found!", 'danger')
abort(404)
now = datetime.datetime.now()
subscription = node.get_subscription_by_user(db, get_user())
return render_template("node.html", node=node, now=now,
NODE_LINKS=NODE_LINKS, subscription=subscription)
@app.route('/node/<nodeid>/toggle_notifications')
def toggle_notifications(nodeid):
db = get_db()
nodeset = NodeSet()
nodeset.update_from_db(db)
user = get_user()
if not user:
flash('Error: You need to be logged in to toggle notifications.', 'danger')
return redirect('/')
node = nodeset.find_by_nodeid(nodeid)
if not node:
flash('Error: Node with nodeid ' + nodeid + " not found!", 'danger')
return redirect_to_last_page()
subscription = node.get_subscription_by_user(db, user)
if not subscription:
flash('Error: Toggling notifications failed. You were not subscribed to ' + node.name + "!", 'danger')
return redirect_to_last_page()
subscription.send_notifications = not subscription.send_notifications
db.add(subscription)
db.commit()
return redirect_to_last_page()
@app.route('/register', methods=['GET', 'POST'])
def register():
db = get_db()
def res(code):
return render_template("register.html"), code
if request.method == 'POST':
email = request.form.get('email')
if '@' not in email:
flash(gettext('Error: The entered mail does not contain an @.'), 'danger')
return res(400)
domain = email.rsplit('@', 1)[-1]
try:
dns.resolver.query(domain, 'MX')
except dns.resolver.Timeout:
flash(gettext('Error: Email invalid. The domain %(domain)s does not have an MX record.', domain=domain), 'danger')
return res(400)
except dns.resolver.NoAnswer:
flash(gettext('Error: Email invalid. The domain %(domain)s does not have an MX record.', domain=domain), 'danger')
return res(400)
except dns.resolver.NXDOMAIN:
flash(gettext('Error: Email invalid. The domain %(domain)s does not have an MX record.', domain=domain), 'danger')
return res(400)
user = User.find_by_email(db, email)
if user:
flash(gettext('User already registered. Resending login token.'), 'warning')
user.send_confirm_mail(url_for('login', _external=True))
return res(200)
user = User(email=email)
db.add(user)
db.commit()
user.send_confirm_mail(url_for('login', _external=True))
flash(gettext('Confirmation mail sent.'), 'info')
return res(200)
return res(200)
def get_user():
db = get_db()
email = session.get('email', None)
user = None
if email:
user = User.find_by_email(db, email)
return user
@app.context_processor
def inject_stuff():
db = get_db()
nodeset = NodeSet()
nodeset.update_from_db(db)
return dict(node_list=nodeset, user=get_user(), np=np)
@app.template_filter('show_constitution')
def show_constitution(node):
if node.constitution == 'ok':
css_class = 'text-success'
elif node.constitution == 'problem':
css_class = 'text-danger'
else:
css_class = 'text-muted'
return '<span class="%s">%s</span>' % (css_class, node.constitution)
@app.route('/login')
def login():
db = get_db()
def res(code):
return render_template("login.html"), code
if 'email' not in request.args:
flash(gettext('Error: Email was not given in request. Please use the link from your confirmation mail.'), 'danger')
return res(400)
if 'token' not in request.args:
flash(gettext('Error: Token was not given in request. Please use the link from your confirmation mail.'), 'danger')
return res(400)
user = User.find_by_email(db, request.args['email'])
if not user:
flash(gettext('Error: Email not found.'), 'danger')
return res(400)
if not user.try_confirm(db, request.args['token']):
flash(gettext('Error: The supplied token is invalid.'), 'danger')
return res(400)
session['email'] = user.email
flash(gettext('Success: Email confirmed. You are now logged in.'), 'success')
return redirect('/')
@app.route('/logout')
def logout():
if session['email'] is not None:
session['email'] = None
flash(gettext('Logged out.'), 'info')
return redirect_to_last_page()
def redirect_to_last_page():
referrer = request.headers.get("Referer", None)
if referrer:
return redirect(referrer)
return redirect('/')
@app.route('/subscribe')
def subscribe():
user = get_user()
if not user:
flash(gettext('Error: You need to be logged in to subscribe.'), 'danger')
return redirect('/')
def res(code):
return render_template("subscribe.html", nodes_json_cache=nodes_json_cache), code
def try_subscribe(node):
if user in node.subscribed_users:
flash(gettext('Error: You are already subscribed to %(node)s!', node=node.name), 'danger')
return redirect_to_last_page()
s = Subscription()
s.user = user
s.node = node
db.add(s)
db.add(node) # node might not be in db yet
db.commit()
flash(gettext('Subscribed to node %(node)s.', node=node.name), 'success')
if request.args.get('goto') == 'yes':
return redirect(url_for('node', nodeid=node.nodeid))
else:
return redirect_to_last_page()
db = get_db()
nodeset = NodeSet()
nodeset.update_from_db(db)
# First, we try to query the nodeset. By chance, someone is already
# subscribed to this node. This is usually faster than loading the
# nodes.json.
if "nodeid" in request.args:
node = nodeset.find_by_nodeid(request.args['nodeid'])
if node:
return try_subscribe(node)
nodes_json_cache = NodesJSONCache()
nodes_json_cache.update(nodeset)
if "nodeid" in request.args:
node = nodes_json_cache.find_by_nodeid(request.args['nodeid'])
if not node:
flash(gettext('Error: Node with nodeid %(nodeid)s not found!', nodeid=request.args['nodeid']), 'danger')
return res(400)
return try_subscribe(node)
return res(200)
@app.route('/unsubscribe')
def unsubscribe():
user = get_user()
if not user:
flash(gettext('Error: You need to be logged in to unsubscribe.'), 'danger')
return redirect('/')
if 'nodeid' not in request.args:
flash(gettext('Error: Unsubscribe failed. No nodeid was given.'), 'danger')
return redirect_to_last_page()
db = get_db()
nodeset = NodeSet()
nodeset.update_from_db(db)
node = nodeset.find_by_nodeid(request.args['nodeid'])
if not node:
flash(gettext('Error: Unsubscribe failed. Node with nodeid %(nodeid)s not found!', nodeid=request.args['nodeid']), 'danger')
return redirect_to_last_page()
subscription = node.get_subscription_by_user(db, user)
if not subscription:
flash(gettext('Error: Unsubscribe failed. You were not subscribed to %(node)s!', node=node.name), 'danger')
return redirect_to_last_page()
db.delete(subscription)
db.commit()
flash(gettext('Sucessfully unsubscribed from %(node)s!', node=node.name), 'info')
if len(node.subscriptions) == 0:
db.delete(node)
db.commit()
flash(gettext('Node %(node)s was removed, because nobody subscribes to it anymore.', node=node.name), 'info')
return redirect('/')
return redirect_to_last_page()