From 3687cd0f359a4ccf568e97716c92fa21ccd96dd6 Mon Sep 17 00:00:00 2001 From: signebedi Date: Tue, 26 Mar 2024 14:33:23 -0500 Subject: [PATCH] Added: verification of signatures on sign (#71) (#59) --- libreforms_fastapi/app/__init__.py | 8 ++++-- libreforms_fastapi/utils/certificates.py | 12 ++++++--- libreforms_fastapi/utils/document_database.py | 27 ++++++++++++++----- libreforms_fastapi/utils/sqlalchemy_models.py | 1 + 4 files changed, 35 insertions(+), 13 deletions(-) diff --git a/libreforms_fastapi/app/__init__.py b/libreforms_fastapi/app/__init__.py index 7c2baa4..91141c5 100644 --- a/libreforms_fastapi/app/__init__.py +++ b/libreforms_fastapi/app/__init__.py @@ -79,6 +79,7 @@ DocumentIsDeleted, InsufficientPermissions, DocumentIsNotDeleted, + SignatureError, ) from libreforms_fastapi.utils.pydantic_models import ( @@ -1036,8 +1037,8 @@ async def api_form_sign( # Here we validate the user groups permit this level of access to the form try: - for group in user.groups: - print("\n\n", group.get_permissions()) + # for group in user.groups: + # print("\n\n", group.get_permissions()) user.validate_permission(form_name=form_name, required_permission="sign_own") except Exception as e: @@ -1072,6 +1073,9 @@ async def api_form_sign( except DocumentIsDeleted as e: raise HTTPException(status_code=410, detail=f"{e}") + except SignatureError as e: + raise HTTPException(status_code=403, detail=f"{e}") + # Send email if config.SMTP_ENABLED: diff --git a/libreforms_fastapi/utils/certificates.py b/libreforms_fastapi/utils/certificates.py index 281a240..77bad48 100644 --- a/libreforms_fastapi/utils/certificates.py +++ b/libreforms_fastapi/utils/certificates.py @@ -194,18 +194,18 @@ def serialize_record_for_signing(record): # print(select_data_fields) return json.dumps(select_data_fields, sort_keys=True) -def sign_record(record, username, env="development"): +def sign_record(record, username, env="development", private_key_path=None): """ Generates a signature for the given record and inserts it into the '_signature' field. """ - ds_manager = DigitalSignatureManager(username=username, env=env) + ds_manager = DigitalSignatureManager(username=username, env=env, private_key_path=private_key_path) serialized = serialize_record_for_signing(record) signature = ds_manager.sign_data(serialized.encode()) s = signature.hex() record['metadata']['_signature'] = s # Store the signature as a hex string return record, s -def verify_record_signature(record, username, env="development", public_key=None): +def verify_record_signature(record, username, env="development", public_key=None, private_key_path=None): """ Verifies the signature of the given record. Returns True if the signature is valid, False otherwise. @@ -213,12 +213,16 @@ def verify_record_signature(record, username, env="development", public_key=None if '_signature' not in record['metadata'] or record['metadata']['_signature'] is None: return False # No signature to verify - ds_manager = DigitalSignatureManager(username=username, env=env) + ds_manager = DigitalSignatureManager(username=username, env=env, private_key_path=private_key_path) record_copy = copy.deepcopy(record) signature_bytes = bytes.fromhex(record['metadata']['_signature']) serialized = serialize_record_for_signing(record_copy) + # This is hackish, because ds_manager.verify_signature should be able to accept bytes. It's a workaround for now. + if isinstance(public_key, bytes): + public_key = public_key.decode('utf-8') + try: return ds_manager.verify_signature(serialized.encode(), signature_bytes, public_key=public_key) except InvalidSignature: diff --git a/libreforms_fastapi/utils/document_database.py b/libreforms_fastapi/utils/document_database.py index 8e4f05f..16f7452 100644 --- a/libreforms_fastapi/utils/document_database.py +++ b/libreforms_fastapi/utils/document_database.py @@ -25,7 +25,7 @@ from libreforms_fastapi.utils.logging import set_logger # This import is used to afix digital signatures to records -from libreforms_fastapi.utils.certificates import sign_record +from libreforms_fastapi.utils.certificates import sign_record, verify_record_signature # We want to modify TinyDB use use string representations of bson @@ -170,11 +170,18 @@ def __init__(self, form_name, document_id): class InsufficientPermissions(Exception): """Exception raised when attempting to access a document that user lacks permissions for.""" def __init__(self, form_name, document_id, username): - message = f"User '{user}' has insufficinet permissions to perform the requested operation on document" \ + message = f"User '{username}' has insufficient permissions to perform the requested operation on document" \ f"with ID '{document_id}' collection '{form_name}'." super().__init__(message) +class SignatureError(Exception): + """Exception raised when attempting to sign a document but the process fails.""" + def __init__(self, form_name, document_id, username): + message = f"User '{username}' has failed to sign the document" \ + f"with ID '{document_id}' collection '{form_name}'." + super().__init__(message) +SignatureError # Pulled from https://github.com/signebedi/gita-api def fuzzy_search_normalized(text_string, search_term, segment_length=None): if segment_length is None: @@ -510,7 +517,8 @@ def sign_document( public_key=None, private_key_path=None, metadata={}, - exclude_deleted=True + exclude_deleted=True, + verify_on_sign=True, ): """ Manage signatures existing form in specified form's database. @@ -535,10 +543,15 @@ def sign_document( raise DocumentIsDeleted(form_name, document_id) # Now we afix the signature - _, signature = sign_record(record=document, username=username, env=self.env) + try: + r, signature = sign_record(record=document, username=username, env=self.env, private_key_path=private_key_path) - # Placeholder - before proceeding, should we verify the signature and raise an - # SignatureError exception if the verification fails? + if verify_on_sign: + verify = verify_record_signature(record=r, username=username, env=self.env, public_key=public_key, private_key_path=private_key_path) + assert (verify) + # print ("\n\n\n", a) + except: + raise SignatureError(form_name, document_id, username) current_timestamp = datetime.now(self.timezone) @@ -563,7 +576,7 @@ def sign_document( document['metadata'][self.signature_field] = signature - # Update only the fields that are provided in json_data and metadata, not replacing the entire + # Update only the fields that are provided in metadata, not replacing the entire # document. The partial approach will minimize the room for mistakes from overwriting entire documents. _ = self.databases[form_name].update(document, doc_ids=[document_id]) diff --git a/libreforms_fastapi/utils/sqlalchemy_models.py b/libreforms_fastapi/utils/sqlalchemy_models.py index f111232..e618ea8 100644 --- a/libreforms_fastapi/utils/sqlalchemy_models.py +++ b/libreforms_fastapi/utils/sqlalchemy_models.py @@ -51,6 +51,7 @@ class User(Base): last_login = Column(DateTime, nullable=True, default=tz_aware_datetime) locked_until = Column(DateTime, nullable=True, default=tz_aware_datetime) public_key = Column(LargeBinary(), nullable=True) + # public_key = Column(String, nullable=True) private_key_ref = Column(String, nullable=True) last_password_change = Column(DateTime, nullable=True, default=tz_aware_datetime) failed_login_attempts = Column(Integer, default=0)