Skip to content

Commit

Permalink
Added: update api route (#34) (#42)
Browse files Browse the repository at this point in the history
  • Loading branch information
signebedi committed Mar 24, 2024
1 parent d2dae30 commit 7e0c44d
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 19 deletions.
85 changes: 81 additions & 4 deletions libreforms_fastapi/app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,10 @@ async def start_test_logger():
default_group = Group(id=1, name="default", permissions=default_permissions)
session.add(default_group)
session.commit()
logger.info("Default group created.")
logger.info("Default group created")
else:
# print(default_group.get_permissions())
logger.info("Default group already exists.")
logger.info("Default group already exists")


# Initialize the document database
Expand Down Expand Up @@ -568,8 +568,85 @@ async def api_form_read_all(form_name: str, background_tasks: BackgroundTasks, r
# # *** Should we use PATCH instead of PUT? In libreForms-flask, we only pass
# the changed details ... But maybe pydantic can handle the journaling and
# metadata. See https://github.com/signebedi/libreforms-fastapi/issues/20.
# @app.put("/api/form/update/{form_name}")
# async def api_form_update():
@app.patch("/api/form/update/{form_name}/{document_id}", dependencies=[Depends(api_key_auth)])
async def api_form_update(form_name: str, document_id: str, background_tasks: BackgroundTasks, request: Request, session: SessionLocal = Depends(get_db), key: str = Depends(X_API_KEY), body: Dict = Body(...)):

if form_name not in get_form_names():
raise HTTPException(status_code=404, detail=f"Form '{form_name}' not found")

# Yield the pydantic form model, setting update to True, which will mark
# all fields as Optional
FormModel = get_form_config(form_name=form_name, update=True)

# # Here we validate and coerce data into its proper type
form_data = FormModel.model_validate(body)
json_data = form_data.model_dump_json()

# Ugh, I'd like to find a more efficient way to get the user data. But alas, that
# the sqlalchemy-signing table is not optimized alongside the user model...
user = session.query(User).filter_by(api_key=key).first()

# Here we validate the user groups permit this level of access to the form
try:
user.validate_permission(form_name=form_name, required_permission="update_own")
# print("\n\n\nUser has valid permissions\n\n\n")
except Exception as e:
raise HTTPException(status_code=403, detail=f"{e}")

# Here, if the user is not able to see other user's data, then we denote the constraint.
try:
user.validate_permission(form_name=form_name, required_permission="update_all")
limit_query_to = False
except Exception as e:
limit_query_to = user.username

metadata={
DocumentDatabase.last_editor_field: user.username,
}

# Add the remote addr host if enabled
if config.COLLECT_USAGE_STATISTICS:
metadata[DocumentDatabase.ip_address_field] = request.client.host

# Process the validated form submission as needed
_ = DocumentDatabase.update_document(
form_name=form_name,
document_id=document_id,
json_data=json_data,
metadata=metadata
)

# Send email
if config.SMTP_ENABLED:
background_tasks.add_task(
mailer.send_mail,
subject="Form Submitted",
content=document_id,
to_address=user.email,
)


# Write this query to the TransactionLog
if config.COLLECT_USAGE_STATISTICS:

endpoint = request.url.path
remote_addr = request.client.host

background_tasks.add_task(
write_api_call_to_transaction_log,
api_key=key,
endpoint=endpoint,
remote_addr=remote_addr,
query_params=json_data,
)

return {
"message": "Form updated received and validated",
"document_id": document_id,
"data": json_data,
}



# Delete form
# @app.delete("/api/form/delete/{form_name}", dependencies=[Depends(api_key_auth)])
Expand Down
1 change: 1 addition & 0 deletions libreforms_fastapi/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,7 @@ def cli_id(username, environment):
f"ID: {user.id}\n"
f"Username: {user.username}\n"
f"Email: {user.email}\n"
f"Groups: {user.email}\n"
f"Active: {user.active}\n"
f"Created Date: {user.created_date.strftime('%Y-%m-%d %H:%M:%S')}\n"
f"Last Login: {user.last_login.strftime('%Y-%m-%d %H:%M:%S') if user.last_login else 'Never'}\n"
Expand Down
66 changes: 58 additions & 8 deletions libreforms_fastapi/utils/document_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ def create_document(self, form_name:str, json_data, metadata={}):
self.approved_field: metadata.get(self.approved_field, None),
self.approved_by_field: metadata.get(self.approved_by_field, None),
self.approval_signature_field: metadata.get(self.approval_signature_field, None),
self.journal_field: []
}
}

Expand All @@ -345,37 +346,86 @@ def create_document(self, form_name:str, json_data, metadata={}):

return document_id

def update_document(self, form_name:str, json_data, metadata={}):
def update_document(self, form_name:str, document_id:str, json_data, metadata={}, limit_users:Union[bool, str]=False, exclude_deleted:bool=True):
"""Updates existing form in specified form's database."""

self._check_form_exists(form_name)
if self.use_logger:
self.logger.info(f"Starting update for {form_name} with document_id {document_id}")

# Ensure the document exists
existing_document = self.databases[form_name].get(document_id=document_id)
if not existing_document:
raise ValueError(f"No document found with ID {document_id} in form {form_name}")
document = self.databases[form_name].get(doc_id=document_id)
if not document:
if self.use_logger:
self.logger.warning(f"No document for {form_name} with document_id {document_id}")
return None

# If exclude_deleted is set, then we return None if the document is marked as deleted
if exclude_deleted and document['metadata'][self.is_deleted_field] == True:
if self.use_logger:
self.logger.warning(f"Document for {form_name} with document_id {document_id} is deleted and was not updated")
return None

# If we are limiting user access based on group-based access controls, and this user is
# not the document creator, then return None
if isinstance(limit_users, str) and document['metadata'][self.created_by_field] != limit_users:
if self.use_logger:
self.logger.warning(f"Insufficient permissions to update document for {form_name} with document_id {document_id}")
return None

current_timestamp = datetime.now(self.timezone)

# This is a little hackish but TinyDB write data to file as Python dictionaries, not JSON.
updated_data_dict = json.loads(json_data)

# Here we remove data that has not been changed
dropping_unchanged_data = {}
for field in updated_data_dict.keys():
if field in document['data'].keys():
if updated_data_dict[field] != document['data'][field]:
dropping_unchanged_data[field] = updated_data_dict[field]


# Build the journal

journal = document['metadata'].get(self.journal_field)
journal.append (
{
self.last_modified_field: current_timestamp.isoformat(),
self.last_editor_field: metadata.get(self.last_editor_field, None),
self.ip_address_field: metadata.get(self.ip_address_field, None),
**dropping_unchanged_data,
}
)

# Prepare the updated data and metadata
update_dict = {
"data": updated_data_dict,
"metadata": {
# Here we update only a few metadata fields ... fields like approval and signature should be
# handled through separate API calls.
self.last_modified_field: current_timestamp.isoformat(),
self.last_editor_field: metadata.get(self.last_editor_field, existing_document["metadata"][self.last_editor_field]),
self.last_editor_field: metadata.get(self.last_editor_field, None),
self.ip_address_field: metadata.get(self.ip_address_field, None),
self.journal_field: journal,

# These fields should all remain the same
self.form_name_field: document['metadata'].get(self.form_name_field),
self.is_deleted_field: document['metadata'].get(self.is_deleted_field),
self.document_id_field: document['metadata'].get(self.document_id_field),
self.timezone_field: document['metadata'].get(self.timezone_field),
self.created_at_field: document['metadata'].get(self.created_at_field),
self.created_by_field: document['metadata'].get(self.created_by_field),
self.signature_field: document['metadata'].get(self.signature_field),
self.approved_field: document['metadata'].get(self.approved_field),
self.approved_by_field: document['metadata'].get(self.approved_by_field),
self.approval_signature_field: document['metadata'].get(self.approval_signature_field),
}
}

update_dict['metadata'][self.journal_field] = update_dict.copy()

# Update only the fields that are provided in json_data and metadata, not replacing the entire
# document. The partial approach will minimize the room for mistakes from overwriting entire documents.
_ = self.databases[form_name].update(update_dict, document_id=document_id)
_ = self.databases[form_name].update(update_dict, doc_ids=[document_id])

if self.use_logger:
self.logger.info(f"Updated document for {form_name} with document_id {document_id}")
Expand Down
46 changes: 39 additions & 7 deletions libreforms_fastapi/utils/pydantic_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def passwords_match(cls, data: Any) -> Any:
"field_name": "text_input",
"default": "Default Text",
"validators": [],
"required": False,
"options": None
},
"number_input": {
Expand All @@ -78,6 +79,7 @@ def passwords_match(cls, data: Any) -> Any:
"field_name": "number_input",
"default": 42,
"validators": [],
"required": False,
"options": None
},
"email_input": {
Expand All @@ -86,6 +88,7 @@ def passwords_match(cls, data: Any) -> Any:
"field_name": "email_input",
"default": "[email protected]",
"validators": [],
"required": False,
"options": None
},
"date_input": {
Expand All @@ -94,6 +97,7 @@ def passwords_match(cls, data: Any) -> Any:
"field_name": "date_input",
"default": "2024-01-01",
"validators": [],
"required": False,
"options": None
},
"checkbox_input": {
Expand All @@ -102,6 +106,7 @@ def passwords_match(cls, data: Any) -> Any:
"field_name": "checkbox_input",
"options": ["Option1", "Option2", "Option3"],
"validators": [],
"required": False,
"default": ["Option1", "Option3"]
},
"radio_input": {
Expand All @@ -110,6 +115,7 @@ def passwords_match(cls, data: Any) -> Any:
"field_name": "radio_input",
"options": ["Option1", "Option2"],
"validators": [],
"required": False,
"default": "Option2"
},
"select_input": {
Expand All @@ -118,6 +124,7 @@ def passwords_match(cls, data: Any) -> Any:
"field_name": "select_input",
"options": ["Option1", "Option2", "Option3"],
"validators": [],
"required": False,
"default": "Option2"
},
"textarea_input": {
Expand All @@ -126,14 +133,16 @@ def passwords_match(cls, data: Any) -> Any:
"field_name": "textarea_input",
"default": "Default textarea content.",
"validators": [],
"required": False,
"options": None
},
"file_input": {
"input_type": "file",
"output_type": Optional[bytes],
"output_type": bytes,
"field_name": "file_input",
"options": None,
"validators": [],
"required": False,
"default": None # File inputs can't have default values
},
},
Expand Down Expand Up @@ -295,8 +304,12 @@ def get_form_names(config_path=config.FORM_CONFIG_PATH):
# print("Config file does not exist. Using the default configuration.")
return form_config.keys()

def get_form_config(form_name, config_path=config.FORM_CONFIG_PATH):
"""Yields a single config dict for the form name passed, following a factory pattern approach"""
def get_form_config(form_name, config_path=config.FORM_CONFIG_PATH, update=False):
"""
Yields a single config dict for the form name passed, following a factory pattern approach.
If update is set to True, all fields will be set to optional.
"""
# Try to open config_path and if not existent or empty, use example config
form_config = example_form_config # Default to example_form_config

Expand All @@ -318,21 +331,40 @@ def get_form_config(form_name, config_path=config.FORM_CONFIG_PATH):
field_definitions = {}

for field_name, field_info in fields.items():

# Should we consider making an Enum for fields with a limited set of options...
# essentially requiring that the values passed are in the Enum of acceptable
# values? Bit difficult to implement for List and other data types, but may
# be worthwhile.

python_type: Type = field_info["output_type"]
default = field_info.get("default", ...)

required = field_info.get("required", False) # Default to not required field
validators = field_info.get("validators", [])

# Ensure Optional is always used with a specific type
if default is ... and python_type != Optional:
if (default is ... and python_type != Optional) or (not required) or (update):
python_type = Optional[python_type]

field_definitions[field_name] = (python_type, default)


for validator_func in validators:
# This assumes `validator_func` is callable that accepts a single
# value and returns a value or raises an exception
pass

# Creating the model dynamically, allowing arbitrary types
class Config:
arbitrary_types_allowed = True

model = create_model(form_name, __config__=Config, **field_definitions)


for field_name, field_info in fields.items():
validators = field_info.get("validators", [])
for v in validators:
# Placeholder for adding the validators to the model here
pass

return model


Expand Down

0 comments on commit 7e0c44d

Please sign in to comment.