From 928cbafec92b88dec7c29440eeb1886305dad186 Mon Sep 17 00:00:00 2001 From: Blue Date: Mon, 21 Feb 2022 13:45:40 -0800 Subject: [PATCH] Implement search API --- events/__init__.py | 72 +++++++++++++++++++++++++++++++++++---- events/test_events.py | 79 +++++++++++++++++++++++++++++++++++++++---- 2 files changed, 137 insertions(+), 14 deletions(-) diff --git a/events/__init__.py b/events/__init__.py index 0fc5bd9..eacf855 100644 --- a/events/__init__.py +++ b/events/__init__.py @@ -22,6 +22,7 @@ from flask_wtf.csrf import CSRFProtect from urllib.parse import quote_plus, urlparse from dateutil.rrule import rrulestr +from functools import cmp_to_key GCALENDAR_FILTER = r'-::~:~::~:~:~:~:~:~:~:~:~:~:~:~:~:~:~:~:~:~:~:~:~:~:~:~:~:~:~:~:~:~:~:~:~:~:~:~::~:~::-.*-::~:~::~:~:~:~:~:~:~:~:~:~:~:~:~:~:~:~:~:~:~:~:~:~:~:~:~:~:~:~:~:~:~:~:~:~:~:~::~:~::-' @@ -99,7 +100,10 @@ def validate_access_impl(token: str, path: str): raise InvalidToken() from e def get_event_fields(event): - component = next(e for e in event.subcomponents if 'summary' in e) + if 'summary' in event: + component = event + else: + component = next(e for e in event.subcomponents if 'summary' in e) event = {'title': str(component['summary'])} @@ -255,6 +259,23 @@ def subscribe_api(collection, event_id): return '', 200 +def event_json(collection: int, event) -> dict: + if not 'summary' in event: + event = next(e for e in event.subcomponents if 'summary' in e) + + content = get_event_fields(event) + fields = ['start', 'end', 'description', 'location', 'attendees'] + for e in fields: + if e not in content: + content[e] = None + + if 'dtstart' not in event or 'uid' not in event: + content['access_link'] = None + else: + ts = rationalize_time(event['dtstart'].dt) + content['access_link'] = generate_access_url(settings, f'/{collection}/{event["uid"]}', ts + timedelta(days=7)) + + return content @app.route('/api//', methods=['GET']) def event_api(collection, event_id): @@ -263,13 +284,50 @@ def event_api(collection, event_id): matched_collection = get_collection(collection) event = matched_collection.get_event(event_id) - content = get_event_fields(event) - fields = ['start', 'end', 'description', 'location', 'attendees'] - for e in fields: - if e not in content: - content[e] = None + return json.dumps(event_json(collection, event)), 200 + +@app.route('/api//search', methods=['POST']) +@csrf.exempt +def search_api(collection): + admin_only() + + body = json.loads(request.data) + search_term = body.get('pattern', '').lower() + + matched_collection = get_collection(collection) + matched_events = [e for e in matched_collection.all_events() if search_term in e['summary'].lower()] + + def compare_events(left, right): + def cmp(field: str) -> bool: + if field not in left and field not in right: + return 0 + + if not field in left: + return -1 + elif field not in right: + return 1 + + left_value = rationalize_time(left[field].dt) + right_value = rationalize_time(right[field].dt) + + if left_value == right_value: + return 0 + elif left_value > right_value: + return 1 + else: + return -1 + + res = cmp('LAST-MODIFIED') + if res == 0: + res = cmp('CREATED') + if res == 0: + res = cmp('DTSTART') + + return res + + sorted_events = sorted(matched_events, key=cmp_to_key(compare_events), reverse=True) - return json.dumps(content), 200 + return json.dumps([event_json(collection, e) for e in sorted_events]), 200 @app.route('/api/', methods=['POST']) @csrf.exempt diff --git a/events/test_events.py b/events/test_events.py index bb34ed3..c9ac258 100644 --- a/events/test_events.py +++ b/events/test_events.py @@ -95,6 +95,28 @@ event_10.add('dtend', datetime(2012, 10, 10, 10, 0, 0)) event_10['uid'] = 'event_10' +event_11 = Event() +event_11.add('dtstart', date(2012, 10, 10)) +event_11['summary'] = 'event_11' +event_11.add('dtend', datetime(2012, 10, 10, 10, 0, 0)) +event_11['uid'] = 'event_11' +event_11.add('last-modified', datetime(2012, 10, 10, 10, 0, 0)) + + +event_12 = Event() +event_12.add('dtstart', date(2012, 10, 10)) +event_12['summary'] = 'event_12' +event_12.add('dtend', datetime(2012, 10, 10, 10, 0, 0)) +event_12['uid'] = 'event_12' +event_12.add('last-modified', datetime(2013, 10, 10, 10, 0, 0)) + +event_13 = Event() +event_13.add('dtstart', date(2012, 10, 10)) +event_13['summary'] = 'event_13' +event_13.add('dtend', datetime(2012, 10, 10, 10, 0, 0)) +event_13['uid'] = 'event_13' +event_13.add('last-modified', datetime(2011, 10, 10, 10, 0, 0)) + save_event_override = None @@ -111,6 +133,9 @@ def __init__(self): 'event_8': event_8, 'event_9': event_9, 'event_10': event_10, + 'event_11': event_11, + 'event_12': event_12, + 'event_13': event_13, } def get_event_impl(self, name: str): @@ -132,7 +157,7 @@ def save_event(self, name: str, event): return save_event_override(name, event) def all_events(self): - return [event_1, event_2, event_3] + return self.content.values() class Config: collections = {'1': CollectionMock()} @@ -230,6 +255,35 @@ def test_admin_home(client): assert response.status_code == 200 +def test_search_api_without_admin(client): + response = client.post(f'api/1/search', data='') + assert response.status_code == 404 + +def test_search_api_one_match(client): + response = client.post(f'api/1/search', data='{"pattern": "event_2"}', headers={'X-Admin': 'true'}) + assert response.status_code == 200 + + content = json.loads(response.data) + del content[0]['access_link'] + + assert content == json.loads('[{"attendees": null, "description": null, "end": null, "location": "location_2", "start": "2011-10-10 10:00:00", "start_ts": 1318266000.0, "title": "event_2"}]') + +def test_search_api_no_match(client): + response = client.post(f'api/1/search', data='{"pattern": "nomatch"}', headers={'X-Admin': 'true'}) + assert response.status_code == 200 + + assert response.data == b'[]' + +def test_search_api_multiple_match_sorted(client): + response = client.post(f'api/1/search', data='{"pattern": "event_1"}', headers={'X-Admin': 'true'}) + assert response.status_code == 200 + + content = json.loads(response.data) + order = [e['title'] for e in content] + + assert order == ['event_12', 'event_11', 'event_13', 'event_10', 'event_1'] + + def test_view_event_admin(client): response = client.get('/1/event_1.ics', headers={'X-Admin': 'true'}) @@ -264,7 +318,6 @@ def test_view_event(client): def test_view_event_ics(client): token = quote_plus(generate_token(settings, '/1/event_1.ics', expires=datetime.now() + timedelta(days=1))) response = client.get(f'/1/event_1.ics/ics?t={token}') - assert response.status_code == 200 assert response.data.decode() == 'BEGIN:VCALENDAR\r\nBEGIN:VEVENT\r\nSUMMARY:event_1\r\nDTSTART;VALUE=DATE-TIME:20101010T100000\r\nUID:event_1\r\nLOCATION:Location_1\r\nEND:VEVENT\r\nEND:VCALENDAR\r\n' @@ -614,24 +667,36 @@ def test_event_api_non_admin(client): def test_event_api_admin(client): response = client.get(f'api/1/event_5.ics', headers={'X-Admin': 'true'}) assert response.status_code == 200 - assert response.data == b'{"title": "event_5", "start": "2012-10-10 10:00:00", "start_ts": 1349888400.0, "attendees": ["foo5@bar.com"], "end": null, "description": null, "location": null}' + + content = json.loads(response.data) + del content['access_link'] + assert json.dumps(content) == '{"title": "event_5", "start": "2012-10-10 10:00:00", "start_ts": 1349888400.0, "attendees": ["foo5@bar.com"], "end": null, "description": null, "location": null}' def test_event_api_admin_with_location(client): response = client.get(f'api/1/event_1.ics', headers={'X-Admin': 'true'}) assert response.status_code == 200 - assert response.data == b'{"title": "event_1", "start": "2010-10-10 10:00:00", "start_ts": 1286730000.0, "location": "Location_1", "attendees": ["foo@bar.com"], "end": null, "description": null}' + + content = json.loads(response.data) + del content['access_link'] + assert json.dumps(content) == '{"title": "event_1", "start": "2010-10-10 10:00:00", "start_ts": 1286730000.0, "location": "Location_1", "attendees": ["foo@bar.com"], "end": null, "description": null}' def test_event_api_admin_without_ics(client): response = client.get(f'api/1/event_4', headers={'X-Admin': 'true'}) assert response.status_code == 200 - assert response.data == b'{"title": "event_4", "start": "2012-10-10 10:00:00", "start_ts": 1349888400.0, "attendees": ["foo@bar.com", "foo2@bar.com", "foo3@bar.com"], "end": null, "description": null, "location": null}' + + content = json.loads(response.data) + del content['access_link'] + assert json.dumps(content) == '{"title": "event_4", "start": "2012-10-10 10:00:00", "start_ts": 1349888400.0, "attendees": ["foo@bar.com", "foo2@bar.com", "foo3@bar.com"], "end": null, "description": null, "location": null}' def test_event_api_admin_with_end(client): response = client.get(f'api/1/event_10', headers={'X-Admin': 'true'}) assert response.status_code == 200 - print(response.data) - assert response.data == b'{"title": "event_10", "start": "2012-10-10 00:00:00", "start_ts": 1349852400.0, "end": "2012-10-10 10:00:00", "end_ts": 1349888400.0, "description": null, "location": null, "attendees": null}' + + content = json.loads(response.data) + del content['access_link'] + + assert json.dumps(content) == '{"title": "event_10", "start": "2012-10-10 00:00:00", "start_ts": 1349852400.0, "end": "2012-10-10 10:00:00", "end_ts": 1349888400.0, "description": null, "location": null, "attendees": null}' def test_create_event_without_admin(client): response = client.post(f'api/1', data='')