diff --git a/libreforms_fastapi/app/__init__.py b/libreforms_fastapi/app/__init__.py index 66ec6ef..e57ce83 100644 --- a/libreforms_fastapi/app/__init__.py +++ b/libreforms_fastapi/app/__init__.py @@ -230,10 +230,10 @@ async def start_test_logger(): default_group = Group(id=1, name="default", permissions=default_permissions) session.add(default_group) session.commit() - logger.info("Default group created.") + logger.info("Default group created") else: # print(default_group.get_permissions()) - logger.info("Default group already exists.") + logger.info("Default group already exists") # Initialize the document database @@ -568,8 +568,85 @@ async def api_form_read_all(form_name: str, background_tasks: BackgroundTasks, r # # *** Should we use PATCH instead of PUT? In libreForms-flask, we only pass # the changed details ... But maybe pydantic can handle the journaling and # metadata. See https://github.com/signebedi/libreforms-fastapi/issues/20. - # @app.put("/api/form/update/{form_name}") - # async def api_form_update(): +@app.patch("/api/form/update/{form_name}/{document_id}", dependencies=[Depends(api_key_auth)]) +async def api_form_update(form_name: str, document_id: str, background_tasks: BackgroundTasks, request: Request, session: SessionLocal = Depends(get_db), key: str = Depends(X_API_KEY), body: Dict = Body(...)): + + if form_name not in get_form_names(): + raise HTTPException(status_code=404, detail=f"Form '{form_name}' not found") + + # Yield the pydantic form model, setting update to True, which will mark + # all fields as Optional + FormModel = get_form_config(form_name=form_name, update=True) + + # # Here we validate and coerce data into its proper type + form_data = FormModel.model_validate(body) + json_data = form_data.model_dump_json() + + # Ugh, I'd like to find a more efficient way to get the user data. But alas, that + # the sqlalchemy-signing table is not optimized alongside the user model... + user = session.query(User).filter_by(api_key=key).first() + + # Here we validate the user groups permit this level of access to the form + try: + user.validate_permission(form_name=form_name, required_permission="update_own") + # print("\n\n\nUser has valid permissions\n\n\n") + except Exception as e: + raise HTTPException(status_code=403, detail=f"{e}") + + # Here, if the user is not able to see other user's data, then we denote the constraint. + try: + user.validate_permission(form_name=form_name, required_permission="update_all") + limit_query_to = False + except Exception as e: + limit_query_to = user.username + + metadata={ + DocumentDatabase.last_editor_field: user.username, + } + + # Add the remote addr host if enabled + if config.COLLECT_USAGE_STATISTICS: + metadata[DocumentDatabase.ip_address_field] = request.client.host + + # Process the validated form submission as needed + _ = DocumentDatabase.update_document( + form_name=form_name, + document_id=document_id, + json_data=json_data, + metadata=metadata + ) + + # Send email + if config.SMTP_ENABLED: + background_tasks.add_task( + mailer.send_mail, + subject="Form Submitted", + content=document_id, + to_address=user.email, + ) + + + # Write this query to the TransactionLog + if config.COLLECT_USAGE_STATISTICS: + + endpoint = request.url.path + remote_addr = request.client.host + + background_tasks.add_task( + write_api_call_to_transaction_log, + api_key=key, + endpoint=endpoint, + remote_addr=remote_addr, + query_params=json_data, + ) + + return { + "message": "Form updated received and validated", + "document_id": document_id, + "data": json_data, + } + + # Delete form # @app.delete("/api/form/delete/{form_name}", dependencies=[Depends(api_key_auth)]) diff --git a/libreforms_fastapi/cli/__init__.py b/libreforms_fastapi/cli/__init__.py index d912d05..4dc77ef 100644 --- a/libreforms_fastapi/cli/__init__.py +++ b/libreforms_fastapi/cli/__init__.py @@ -602,6 +602,7 @@ def cli_id(username, environment): f"ID: {user.id}\n" f"Username: {user.username}\n" f"Email: {user.email}\n" + f"Groups: {user.email}\n" f"Active: {user.active}\n" f"Created Date: {user.created_date.strftime('%Y-%m-%d %H:%M:%S')}\n" f"Last Login: {user.last_login.strftime('%Y-%m-%d %H:%M:%S') if user.last_login else 'Never'}\n" diff --git a/libreforms_fastapi/utils/document_database.py b/libreforms_fastapi/utils/document_database.py index def974a..3d534f2 100644 --- a/libreforms_fastapi/utils/document_database.py +++ b/libreforms_fastapi/utils/document_database.py @@ -334,6 +334,7 @@ def create_document(self, form_name:str, json_data, metadata={}): self.approved_field: metadata.get(self.approved_field, None), self.approved_by_field: metadata.get(self.approved_by_field, None), self.approval_signature_field: metadata.get(self.approval_signature_field, None), + self.journal_field: [] } } @@ -345,21 +346,58 @@ def create_document(self, form_name:str, json_data, metadata={}): return document_id - def update_document(self, form_name:str, json_data, metadata={}): + def update_document(self, form_name:str, document_id:str, json_data, metadata={}, limit_users:Union[bool, str]=False, exclude_deleted:bool=True): """Updates existing form in specified form's database.""" self._check_form_exists(form_name) + if self.use_logger: + self.logger.info(f"Starting update for {form_name} with document_id {document_id}") # Ensure the document exists - existing_document = self.databases[form_name].get(document_id=document_id) - if not existing_document: - raise ValueError(f"No document found with ID {document_id} in form {form_name}") + document = self.databases[form_name].get(doc_id=document_id) + if not document: + if self.use_logger: + self.logger.warning(f"No document for {form_name} with document_id {document_id}") + return None + + # If exclude_deleted is set, then we return None if the document is marked as deleted + if exclude_deleted and document['metadata'][self.is_deleted_field] == True: + if self.use_logger: + self.logger.warning(f"Document for {form_name} with document_id {document_id} is deleted and was not updated") + return None + + # If we are limiting user access based on group-based access controls, and this user is + # not the document creator, then return None + if isinstance(limit_users, str) and document['metadata'][self.created_by_field] != limit_users: + if self.use_logger: + self.logger.warning(f"Insufficient permissions to update document for {form_name} with document_id {document_id}") + return None current_timestamp = datetime.now(self.timezone) # This is a little hackish but TinyDB write data to file as Python dictionaries, not JSON. updated_data_dict = json.loads(json_data) + # Here we remove data that has not been changed + dropping_unchanged_data = {} + for field in updated_data_dict.keys(): + if field in document['data'].keys(): + if updated_data_dict[field] != document['data'][field]: + dropping_unchanged_data[field] = updated_data_dict[field] + + + # Build the journal + + journal = document['metadata'].get(self.journal_field) + journal.append ( + { + self.last_modified_field: current_timestamp.isoformat(), + self.last_editor_field: metadata.get(self.last_editor_field, None), + self.ip_address_field: metadata.get(self.ip_address_field, None), + **dropping_unchanged_data, + } + ) + # Prepare the updated data and metadata update_dict = { "data": updated_data_dict, @@ -367,15 +405,27 @@ def update_document(self, form_name:str, json_data, metadata={}): # Here we update only a few metadata fields ... fields like approval and signature should be # handled through separate API calls. self.last_modified_field: current_timestamp.isoformat(), - self.last_editor_field: metadata.get(self.last_editor_field, existing_document["metadata"][self.last_editor_field]), + self.last_editor_field: metadata.get(self.last_editor_field, None), + self.ip_address_field: metadata.get(self.ip_address_field, None), + self.journal_field: journal, + + # These fields should all remain the same + self.form_name_field: document['metadata'].get(self.form_name_field), + self.is_deleted_field: document['metadata'].get(self.is_deleted_field), + self.document_id_field: document['metadata'].get(self.document_id_field), + self.timezone_field: document['metadata'].get(self.timezone_field), + self.created_at_field: document['metadata'].get(self.created_at_field), + self.created_by_field: document['metadata'].get(self.created_by_field), + self.signature_field: document['metadata'].get(self.signature_field), + self.approved_field: document['metadata'].get(self.approved_field), + self.approved_by_field: document['metadata'].get(self.approved_by_field), + self.approval_signature_field: document['metadata'].get(self.approval_signature_field), } } - update_dict['metadata'][self.journal_field] = update_dict.copy() - # Update only the fields that are provided in json_data and metadata, not replacing the entire # document. The partial approach will minimize the room for mistakes from overwriting entire documents. - _ = self.databases[form_name].update(update_dict, document_id=document_id) + _ = self.databases[form_name].update(update_dict, doc_ids=[document_id]) if self.use_logger: self.logger.info(f"Updated document for {form_name} with document_id {document_id}") diff --git a/libreforms_fastapi/utils/pydantic_models.py b/libreforms_fastapi/utils/pydantic_models.py index 73a7621..fb1e02d 100644 --- a/libreforms_fastapi/utils/pydantic_models.py +++ b/libreforms_fastapi/utils/pydantic_models.py @@ -70,6 +70,7 @@ def passwords_match(cls, data: Any) -> Any: "field_name": "text_input", "default": "Default Text", "validators": [], + "required": False, "options": None }, "number_input": { @@ -78,6 +79,7 @@ def passwords_match(cls, data: Any) -> Any: "field_name": "number_input", "default": 42, "validators": [], + "required": False, "options": None }, "email_input": { @@ -86,6 +88,7 @@ def passwords_match(cls, data: Any) -> Any: "field_name": "email_input", "default": "user@example.com", "validators": [], + "required": False, "options": None }, "date_input": { @@ -94,6 +97,7 @@ def passwords_match(cls, data: Any) -> Any: "field_name": "date_input", "default": "2024-01-01", "validators": [], + "required": False, "options": None }, "checkbox_input": { @@ -102,6 +106,7 @@ def passwords_match(cls, data: Any) -> Any: "field_name": "checkbox_input", "options": ["Option1", "Option2", "Option3"], "validators": [], + "required": False, "default": ["Option1", "Option3"] }, "radio_input": { @@ -110,6 +115,7 @@ def passwords_match(cls, data: Any) -> Any: "field_name": "radio_input", "options": ["Option1", "Option2"], "validators": [], + "required": False, "default": "Option2" }, "select_input": { @@ -118,6 +124,7 @@ def passwords_match(cls, data: Any) -> Any: "field_name": "select_input", "options": ["Option1", "Option2", "Option3"], "validators": [], + "required": False, "default": "Option2" }, "textarea_input": { @@ -126,14 +133,16 @@ def passwords_match(cls, data: Any) -> Any: "field_name": "textarea_input", "default": "Default textarea content.", "validators": [], + "required": False, "options": None }, "file_input": { "input_type": "file", - "output_type": Optional[bytes], + "output_type": bytes, "field_name": "file_input", "options": None, "validators": [], + "required": False, "default": None # File inputs can't have default values }, }, @@ -295,8 +304,12 @@ def get_form_names(config_path=config.FORM_CONFIG_PATH): # print("Config file does not exist. Using the default configuration.") return form_config.keys() -def get_form_config(form_name, config_path=config.FORM_CONFIG_PATH): - """Yields a single config dict for the form name passed, following a factory pattern approach""" +def get_form_config(form_name, config_path=config.FORM_CONFIG_PATH, update=False): + """ + Yields a single config dict for the form name passed, following a factory pattern approach. + + If update is set to True, all fields will be set to optional. + """ # Try to open config_path and if not existent or empty, use example config form_config = example_form_config # Default to example_form_config @@ -318,21 +331,40 @@ def get_form_config(form_name, config_path=config.FORM_CONFIG_PATH): field_definitions = {} for field_name, field_info in fields.items(): + + # Should we consider making an Enum for fields with a limited set of options... + # essentially requiring that the values passed are in the Enum of acceptable + # values? Bit difficult to implement for List and other data types, but may + # be worthwhile. + python_type: Type = field_info["output_type"] default = field_info.get("default", ...) - + required = field_info.get("required", False) # Default to not required field + validators = field_info.get("validators", []) + # Ensure Optional is always used with a specific type - if default is ... and python_type != Optional: + if (default is ... and python_type != Optional) or (not required) or (update): python_type = Optional[python_type] field_definitions[field_name] = (python_type, default) - + + for validator_func in validators: + # This assumes `validator_func` is callable that accepts a single + # value and returns a value or raises an exception + pass + # Creating the model dynamically, allowing arbitrary types class Config: arbitrary_types_allowed = True model = create_model(form_name, __config__=Config, **field_definitions) - + + for field_name, field_info in fields.items(): + validators = field_info.get("validators", []) + for v in validators: + # Placeholder for adding the validators to the model here + pass + return model