diff --git a/libreforms_fastapi/app/__init__.py b/libreforms_fastapi/app/__init__.py index 2a402a8..b230d54 100644 --- a/libreforms_fastapi/app/__init__.py +++ b/libreforms_fastapi/app/__init__.py @@ -413,6 +413,7 @@ async def api_form_create( # # Here we validate and coerce data into its proper type form_data = FormModel.model_validate(body) json_data = form_data.model_dump_json() + data_dict = form_data.model_dump() # Ugh, I'd like to find a more efficient way to get the user data. But alas, that # the sqlalchemy-signing table is not optimized alongside the user model... @@ -443,7 +444,8 @@ async def api_form_create( # doc_db.create_document, d = doc_db.create_document( form_name=form_name, - json_data=json_data, + # json_data=json_data, + data_dict=data_dict, metadata=metadata, ) @@ -626,6 +628,7 @@ async def api_form_update( # # Here we validate and coerce data into its proper type form_data = FormModel.model_validate(body) json_data = form_data.model_dump_json() + data_dict = form_data.model_dump() # print("\n\n\n", json_data) @@ -660,7 +663,8 @@ async def api_form_update( d = doc_db.update_document( form_name=form_name, document_id=document_id, - json_data=json_data, + # json_data=json_data, + updated_data_dict=data_dict, metadata=metadata, limit_users=limit_query_to, ) diff --git a/libreforms_fastapi/utils/certificates.py b/libreforms_fastapi/utils/certificates.py index 77bad48..8e273ed 100644 --- a/libreforms_fastapi/utils/certificates.py +++ b/libreforms_fastapi/utils/certificates.py @@ -113,29 +113,6 @@ def sign_data(self, data): ) return signature - # def verify_signature(self, data, signature): - # """ - # Verifies the signature of the data using the public key. - # """ - # with open(self.get_public_key_file(), "rb") as key_file: - # public_key = serialization.load_pem_public_key( - # key_file.read(), - # backend=default_backend() - # ) - - # try: - # public_key.verify( - # signature, - # data, - # padding.PSS( - # mgf=padding.MGF1(hashes.SHA256()), - # salt_length=padding.PSS.MAX_LENGTH - # ), - # hashes.SHA256() - # ) - # return True - # except Exception as e: - # return False def verify_signature(self, data, signature, public_key=None): """ Verifies the signature of the data using the provided public key. diff --git a/libreforms_fastapi/utils/custom_tinydb.py b/libreforms_fastapi/utils/custom_tinydb.py new file mode 100644 index 0000000..a5b8934 --- /dev/null +++ b/libreforms_fastapi/utils/custom_tinydb.py @@ -0,0 +1,144 @@ +import json +from datetime import date +from json import JSONEncoder +from bson import ObjectId + +from tinydb import ( + TinyDB, + Query, + Storage +) +from tinydb.table import ( + Table as TinyTable, + Document +) + +from typing import ( + Mapping, + Union, + Iterable, + List, +) + + +class DateEncoder(JSONEncoder): + """We need to convert date objects to 'YYYY-MM-DD' format""" + def default(self, obj): + if isinstance(obj, date): + return obj.isoformat() + # Fall back to the superclass method for other types + return JSONEncoder.default(self, obj) + +# We want to modify TinyDB use use string representations of bson +# ObjectIDs. As such, we will need to modify some underlying behavior, +# see https://github.com/signebedi/libreforms-fastapi/issues/15. +class CustomTable(TinyTable): + document_id_class = str # Use string IDs instead of integers + + def _get_next_id(self, document_id=str(ObjectId())): + """ + Generate a new BSON ObjectID string to use as the TinyDB document ID. + """ + return document_id + + + def insert(self, document: Mapping, document_id:Union[str, bool]=False) -> int: + """ + Insert a new document into the table. + + :param document: the document to insert + :returns: the inserted document's ID + """ + + if not document_id: + document_id = str(ObjectId()) + + # Make sure the document implements the ``Mapping`` interface + if not isinstance(document, Mapping): + raise ValueError('Document is not a Mapping') + + # First, we get the document ID for the new document + if isinstance(document, Document): + # For a `Document` object we use the specified ID + doc_id = document.doc_id + + # We also reset the stored next ID so the next insert won't + # re-use document IDs by accident when storing an old value + self._next_id = None + else: + # In all other cases we use the next free ID + doc_id = self._get_next_id(document_id=document_id) + + # Now, we update the table and add the document + def updater(table: dict): + if doc_id in table: + raise ValueError(f'Document with ID {str(doc_id)} ' + f'already exists') + + # By calling ``dict(document)`` we convert the data we got to a + # ``dict`` instance even if it was a different class that + # implemented the ``Mapping`` interface + table[doc_id] = dict(document) + + # See below for details on ``Table._update`` + self._update_table(updater) + + return doc_id + + def insert_multiple(self, documents: Iterable[Mapping], document_ids:Union[List, bool]=False) -> List[int]: + """ + Insert multiple documents into the table. + + :param documents: an Iterable of documents to insert + :returns: a list containing the inserted documents' IDs + """ + doc_ids = [] + + if document_ids and len(document_ids) != len(documents): + raise Exception("When inserting multiple and passing your own document_ids," \ + "the list must be the same length as the document list") + + def updater(table: dict): + # for document in documents: + for i, document in enumerate(documents): + + # Make sure the document implements the ``Mapping`` interface + if not isinstance(document, Mapping): + raise ValueError('Document is not a Mapping') + + if isinstance(document, Document): + # Check if document does not override an existing document + if document.doc_id in table: + raise ValueError( + f'Document with ID {str(document.doc_id)} ' + f'already exists' + ) + + # Store the doc_id, so we can return all document IDs + # later. Then save the document with its doc_id and + # skip the rest of the current loop + doc_id = document.doc_id + doc_ids.append(doc_id) + table[doc_id] = dict(document) + continue + + # Generate new document ID for this document + # Store the doc_id, so we can return all document IDs + # later, then save the document with the new doc_id + if not document_ids: + document_id = str(ObjectId()) + else: + document_id = document_ids[i] + doc_id = self._get_next_id() + doc_ids.append(doc_id) + table[doc_id] = dict(document) + + # See below for details on ``Table._update`` + self._update_table(updater) + + return doc_ids + +# Subclass TinyDB and override the table_class attribute with our new logic +class CustomTinyDB(TinyDB): + table_class = CustomTable + diff --git a/libreforms_fastapi/utils/document_database.py b/libreforms_fastapi/utils/document_database.py index 61fc9bc..5a3ccb5 100644 --- a/libreforms_fastapi/utils/document_database.py +++ b/libreforms_fastapi/utils/document_database.py @@ -5,20 +5,11 @@ from zoneinfo import ZoneInfo from tinydb import ( - TinyDB, Query, - Storage -) -from tinydb.table import ( - Table as TinyTable, - Document ) from typing import ( - Mapping, Union, - Iterable, - List, ) from abc import ABC, abstractmethod @@ -27,120 +18,10 @@ # This import is used to afix digital signatures to records from libreforms_fastapi.utils.certificates import sign_record, verify_record_signature - -# We want to modify TinyDB use use string representations of bson -# ObjectIDs. As such, we will need to modify some underlying behavior, -# see https://github.com/signebedi/libreforms-fastapi/issues/15. -class CustomTable(TinyTable): - document_id_class = str # Use string IDs instead of integers - - def _get_next_id(self, document_id=str(ObjectId())): - """ - Generate a new BSON ObjectID string to use as the TinyDB document ID. - """ - return document_id - - - def insert(self, document: Mapping, document_id:Union[str, bool]=False) -> int: - """ - Insert a new document into the table. - - :param document: the document to insert - :returns: the inserted document's ID - """ - - if not document_id: - document_id = str(ObjectId()) - - # Make sure the document implements the ``Mapping`` interface - if not isinstance(document, Mapping): - raise ValueError('Document is not a Mapping') - - # First, we get the document ID for the new document - if isinstance(document, Document): - # For a `Document` object we use the specified ID - doc_id = document.doc_id - - # We also reset the stored next ID so the next insert won't - # re-use document IDs by accident when storing an old value - self._next_id = None - else: - # In all other cases we use the next free ID - doc_id = self._get_next_id(document_id=document_id) - - # Now, we update the table and add the document - def updater(table: dict): - if doc_id in table: - raise ValueError(f'Document with ID {str(doc_id)} ' - f'already exists') - - # By calling ``dict(document)`` we convert the data we got to a - # ``dict`` instance even if it was a different class that - # implemented the ``Mapping`` interface - table[doc_id] = dict(document) - - # See below for details on ``Table._update`` - self._update_table(updater) - - return doc_id - - def insert_multiple(self, documents: Iterable[Mapping], document_ids:Union[List, bool]=False) -> List[int]: - """ - Insert multiple documents into the table. - - :param documents: an Iterable of documents to insert - :returns: a list containing the inserted documents' IDs - """ - doc_ids = [] - - if document_ids and len(document_ids) != len(documents): - raise Exception("When inserting multiple and passing your own document_ids," \ - "the list must be the same length as the document list") - - def updater(table: dict): - # for document in documents: - for i, document in enumerate(documents): - - # Make sure the document implements the ``Mapping`` interface - if not isinstance(document, Mapping): - raise ValueError('Document is not a Mapping') - - if isinstance(document, Document): - # Check if document does not override an existing document - if document.doc_id in table: - raise ValueError( - f'Document with ID {str(document.doc_id)} ' - f'already exists' - ) - - # Store the doc_id, so we can return all document IDs - # later. Then save the document with its doc_id and - # skip the rest of the current loop - doc_id = document.doc_id - doc_ids.append(doc_id) - table[doc_id] = dict(document) - continue - - # Generate new document ID for this document - # Store the doc_id, so we can return all document IDs - # later, then save the document with the new doc_id - if not document_ids: - document_id = str(ObjectId()) - else: - document_id = document_ids[i] - doc_id = self._get_next_id() - doc_ids.append(doc_id) - table[doc_id] = dict(document) - - # See below for details on ``Table._update`` - self._update_table(updater) - - return doc_ids - -# Subclass TinyDB and override the table_class attribute with our new logic -class CustomTinyDB(TinyDB): - table_class = CustomTable - +from libreforms_fastapi.utils.custom_tinydb import ( + DateEncoder, + CustomTinyDB, +) class CollectionDoesNotExist(Exception): @@ -370,7 +251,7 @@ def _initialize_database_collections(self): self.databases = {} for form_name in self.form_names_callable(): # self.databases[form_name] = TinyDB(self._get_db_path(form_name)) - self.databases[form_name] = CustomTinyDB(self._get_db_path(form_name)) + self.databases[form_name] = CustomTinyDB(self._get_db_path(form_name), cls=DateEncoder) def _get_db_path(self, form_name:str): """Constructs a file path for the given form's database.""" @@ -387,19 +268,26 @@ def _check_form_exists(self, form_name:str): if form_name not in self.databases.keys(): self._initialize_database_collections() - def create_document(self, form_name:str, json_data, metadata={}): + def create_document( + self, + form_name:str, + # json_data, + data_dict, + metadata={} + ): """Adds json data to the specified form's database.""" self._check_form_exists(form_name) current_timestamp = datetime.now(self.timezone) # This is a little hackish but TinyDB write data to file as Python dictionaries, not JSON. - convert_data_to_dict = json.loads(json_data) + # convert_data_to_dict = json.loads(json_data) document_id = metadata.get(self.document_id_field, str(ObjectId())) data_dict = { - "data": convert_data_to_dict, + # "data": convert_data_to_dict, + "data": data_dict, "metadata": { # self.document_id_field: document_id, self.is_deleted_field: metadata.get(self.is_deleted_field, False), @@ -427,7 +315,16 @@ def create_document(self, form_name:str, json_data, metadata={}): return data_dict - def update_document(self, form_name:str, document_id:str, json_data:str, metadata={}, limit_users:Union[bool, str]=False, exclude_deleted:bool=True): + def update_document( + self, + form_name:str, + document_id:str, + # json_data:str, + updated_data_dict:dict, + metadata={}, + limit_users:Union[bool, str]=False, + exclude_deleted:bool=True + ): """Updates existing form in specified form's database.""" self._check_form_exists(form_name) @@ -459,7 +356,7 @@ def update_document(self, form_name:str, document_id:str, json_data:str, metadat # print("\n\n\n\nDocument: ", document) # This is a little hackish but TinyDB write data to file as Python dictionaries, not JSON. - updated_data_dict = json.loads(json_data) + # updated_data_dict = json.loads(json_data) # Here we remove data that has not been changed dropping_unchanged_data = {} diff --git a/libreforms_fastapi/utils/pydantic_models.py b/libreforms_fastapi/utils/pydantic_models.py index 3f0084f..6e0a13f 100644 --- a/libreforms_fastapi/utils/pydantic_models.py +++ b/libreforms_fastapi/utils/pydantic_models.py @@ -282,134 +282,3 @@ class Config: return model - -# Deprecated -def __reconstruct_form_data(request, form_fields): - """ - This repackages request data into a format that pydantic will be able to understand. - - The flask request structure can be understood from the following resource https://stackoverflow.com/a/16664376/13301284. - - We can start by getting the list of fields: - - >>> list(request.form) - - Then, we can iterate through each and get each value: - - >>> for field in list(request.form): - ... print(request.form.getlist(field)) - """ - - reconstructed_form_data = {} - - for field in list(request): - - # Skip field if it's not supposed to be here - if not field in form_fields: - continue - - field_config = form_fields[field] - reconstructed_form_data[field] = request[field] - - target_type = form_fields[field]['output_type'] - - # Check if the output type calls for a collection or a scalar - if isinstance(reconstructed_form_data[field], list) and len(reconstructed_form_data[field]) == 1 and target_type != list: - reconstructed_form_data[field] = reconstructed_form_data[field][0] - - return reconstructed_form_data - - - -# Deprecated -def __generate_pydantic_models(form_config: dict): - """ - Dynamically generates Pydantic models based on a specified form configuration. Each form is represented as a model, - with fields defined according to the configuration provided in `form_config`. This allows for the dynamic validation - of data according to administratively defined forms. Each field's type, default value, and optionality are considered - in the model creation process. - - Parameters - ---------- - form_config : dict - A dictionary containing the form configurations, where each key is a form name and its value is another dictionary - mapping field names to their specifications. Each field specification must at least include 'output_type' for the - field's data type, and may optionally include a 'default' value. If a 'default' value is not provided, the field - is treated as optional. - - Example: - { - "form_name": { - "field_name": { - "output_type": Type, - "default": Any, # Optional - }, - ... - }, - ... - } - - Returns - ------- - dict - A dictionary where each key is a form name and its value is a dynamically created Pydantic model class. These models - can then be used to validate data according to the defined form configurations. - - Raises - ------ - TypeError - If an unrecognized type is provided in the form configuration, though this is primarily handled through Pydantic's - own type validation mechanisms. - - Example Usage - ------------- - form_config = { - "contact_form": { - "name": { - "output_type": str, - "default": "John Doe" - }, - "age": { - "output_type": int, - # No default implies optional - }, - ... - } - } - models = generate_pydantic_models(form_config) - ContactFormModel = models["contact_form"] - form_data = {"name": "Jane Doe", "age": 30} - validated_data = ContactFormModel(**form_data) - - A quick note - ----- - The function dynamically sets fields as optional if no default value is provided, unless the field is explicitly - marked as `Optional[Type]`. It also allows for arbitrary types to be used within the models through the - `arbitrary_types_allowed` configuration. - - This function is designed for use in applications where form fields and validation rules are configurable and - not known until runtime, providing flexibility in handling user submissions. - """ - models = {} - - for form_name, fields in form_config.items(): - field_definitions = {} - - for field_name, field_info in fields.items(): - python_type: Type = field_info["output_type"] - default = field_info.get("default", ...) - - # Ensure Optional is always used with a specific type - if default is ... and python_type != Optional: - python_type = Optional[python_type] - - field_definitions[field_name] = (python_type, default) - - # Creating the model dynamically, allowing arbitrary types - class Config: - arbitrary_types_allowed = True - - model = create_model(form_name, __config__=Config, **field_definitions) - models[form_name] = model - - return models \ No newline at end of file