Skip to content

Commit

Permalink
Add route to send event update to attendees
Browse files Browse the repository at this point in the history
  • Loading branch information
OneBlue committed Nov 2, 2024
1 parent d2299f4 commit 0622fe3
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 18 deletions.
51 changes: 38 additions & 13 deletions events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def get_event(collection: str, event: str):
if not matched_event:
raise NotFoundException(f'Event {event} not found in collection {collection}')

return matched_event
return matched_event, matched_collection

def rationalize_time(ts) -> datetime:
if not isinstance(ts, datetime): # In case of date, convert to datetime (set 00:00)
Expand Down Expand Up @@ -180,21 +180,20 @@ def event_page(collection, event):
validate_access()

try:
event_data = get_event(collection, event)
event_data, _ = get_event(collection, event)
except EventRedirect as e:
logging.info(f'Redirecting event {event} to {e.filename}')
assert event.lower() != e.filename.lower()
return redirect(f'/{collection}/{e.filename}')

return render_event(collection, event, event_data)
return render_event(collection, event, event_data, admin=settings.is_admin(request))

@app.route('/<collection>/<event_id>/subscribe', methods=['POST'])
def subscribe(collection, event_id):
token = request.form.get('t', None)
validate_access(token)

matched_collection = get_collection(collection)
event = matched_collection.get_event(event_id)
event, matched_collection = get_event(collection, event_id)
if not event:
raise NotFoundException(f'Event {event_id} not found in collection {collection}')

Expand All @@ -220,22 +219,49 @@ def subscribe(collection, event_id):
send_event_email(event, email, settings, matched_collection.default_organizer)

if already_subscribed:
response = Response(render_event(collection, event_id, event, notification= f'{email} is already subscribed to this event. New invite sent. Check your spam folder', token=token))
notification = f'{email} is already subscribed to this event. New invite sent. Check your spam folder'
else:
response = Response(render_event(collection, event_id, event, notification=f'Event sent to {email}, check your spam folder', token=token))
notification = f'Event sent to {email}, check your spam folder'

response = Response(render_event(collection, event_id, event, notification=notification, token=token, admin= not token and settings.is_admin(request)))
response.headers['Set-Cookie'] = f'email={email}; Secure; SameSite=Strict; Path=/'
return response
except InvalidEmailAddress as e:
return render_event(collection, event_id, event, notification= f'Invalid email: {str(e)}')
return render_event(collection, event_id, event, notification= f'Invalid email: {str(e)}', token=token, admin=not token and settings.is_admin(request))

@app.route('/<collection>/<event_id>/update', methods=['POST'])
def send_event_update(collection: str, event_id: str):
admin_only()

event, matched_collection = get_event(collection, event_id)
component = get_event_component(event)

attendees = component.get('attendee', None)
if isinstance(attendees, icalendar.vCalAddress):
emails = [attendees.title().lower()]
elif attendees:
emails = [e.title().lower() for e in attendees]
else:
emails = []

emails = [e.replace('mailto:', '') for e in emails]
if not emails:
notification = 'Event has no attendees'
else:
for e in emails:
logging.info(f'Emailing event {event_id} to {e}')
send_event_email(event, e, settings, matched_collection.default_organizer)

notification = 'Event sent to: ' + ', '.join(emails)

return render_event(collection, event_id, event, notification=notification, admin=True)

@app.route('/api/<collection>/<event_id>/subscribe', methods=['POST'])
@csrf.exempt
def subscribe_api(collection, event_id):
admin_only()

matched_collection = get_collection(collection)
event = matched_collection.get_event(event_id)
event, matched_collection = get_event(collection, event_id)
if not event:
raise NotFoundException(f'Event {event_id} not found in collection {collection}')

Expand Down Expand Up @@ -280,8 +306,7 @@ def event_json(collection: int, event) -> dict:
def event_api(collection, event_id):
admin_only()

matched_collection = get_collection(collection)
event = matched_collection.get_event(event_id)
event, matched_collection = get_event(collection, event_id)

return json.dumps(event_json(collection, event)), 200

Expand Down Expand Up @@ -367,7 +392,7 @@ def create_api(collection):
def event_ics(collection, event):
validate_access()

content = get_event(collection, event).to_ical()
content = get_event(collection, event)[0].to_ical()
response = Response(content)
response.headers['Content-Disposition'] = f'Attachement; filename="{event}"'
response.headers['Content-Type'] = f'content-type:text/calendar'
Expand Down
11 changes: 8 additions & 3 deletions events/templates/event.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,14 @@
{% if notification is defined %}
<b style="color:green">{{ notification }}</b> <br/>
{% endif %}
<a href="/{{ collection }}/{{ event }}/ics{% if token is defined and token %}?t={{token}}{% endif %}">
<button>Download ICS</button>
</a>
<a href="/{{ collection }}/{{ event }}/ics{% if token is defined and token %}?t={{token}}{% endif %}"><button>Download ICS</button></a>
{% if admin is defined %}
<form onsubmit="return confirm('Send event update ?');" action="/{{ collection }}/{{ event }}/update" method="post" style="display: inline;">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
<input type="submit" value="Send update (admin)">
</form>
{% endif %}

</body>

{% if admin_links is defined %}
Expand Down
66 changes: 64 additions & 2 deletions events/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,10 +346,72 @@ def test_view_event_ics_admin(client):
assert response.status_code == 200
assert response.data.decode() == 'BEGIN:VCALENDAR\r\nBEGIN:VEVENT\r\nSUMMARY:event_1\r\nDTSTART;VALUE=DATE-TIME:20101010T100000\r\nUID:event_1\r\nLOCATION:Location_1\r\nEND:VEVENT\r\nEND:VCALENDAR\r\n'

def test_update_no_csrf(client):
response = client.post(f'/1/event_1.ics/update', data='')

assert response.status_code == 400

def test_update_csrf_no_admin(client):
response = client.post(f'/1/event_1.ics/update', data={'csrf_token': client.csrf_token})
assert response.status_code == 404

def test_update_csrf_no_admin_with_token(client):
token = quote_plus(generate_token(settings, '/1/event_1.ics', expires=datetime.now() + timedelta(days=1)))
response = client.post(f'/1/event_1.ics/update', data={'csrf_token': client.csrf_token, 't': token})
assert response.status_code == 404

def test_update_no_attendees(client):
response = client.post(f'/1/event_1.ics/update', data={'csrf_token': client.csrf_token}, headers={'X-Admin': 'true'})

assert response.status_code == 200
assert 'Event has no attendees' in response.data.decode()

def test_update_one_attendee(client):
called = False
def send_email(source: str, destination: list, content: str):
nonlocal called
assert source == settings.email_from
assert destination == ['[email protected]']
assert 'Subject: event_3' in content
called = True

def save_event(*args):
assert False

save_event_override = save_event
override_send_email(send_email)

response = client.post(f'/1/event_3.ics/update', data={'csrf_token': client.csrf_token}, headers={'X-Admin': 'true'})

assert response.status_code == 200
assert 'Event sent to: [email protected]' in response.data.decode()
assert called

def test_update_two_attendees(client):
emails = []
def send_email(source: str, destination: list, content: str):
nonlocal emails
assert source == settings.email_from
emails += destination
assert 'Subject: event_4' in content
called = True

def save_event(*args):
assert False

save_event_override = save_event
override_send_email(send_email)

response = client.post(f'/1/event_4.ics/update', data={'csrf_token': client.csrf_token}, headers={'X-Admin': 'true'})

assert response.status_code == 200
assert 'Event sent to: [email protected], [email protected]' in response.data.decode()
assert emails == ['[email protected]', '[email protected]']

def test_subscribe_no_csrf(client):
response = client.get(f'/1/event_1.ics/subscribe', data='[email protected]&updates=on')
response = client.post(f'/1/event_1.ics/subscribe', data='[email protected]&updates=on')

assert response.status_code == 405
assert response.status_code == 400

def test_subscribe_with_csrf_no_token(client):
token = quote_plus(generate_token(settings, '/1/event_1.ics', expires=datetime.now() + timedelta(days=1)))
Expand Down

0 comments on commit 0622fe3

Please sign in to comment.