Skip to content

Commit

Permalink
Added: verification of signatures on sign (#71) (#59)
Browse files Browse the repository at this point in the history
  • Loading branch information
signebedi committed Mar 26, 2024
1 parent ebd0d9d commit 3687cd0
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 13 deletions.
8 changes: 6 additions & 2 deletions libreforms_fastapi/app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
DocumentIsDeleted,
InsufficientPermissions,
DocumentIsNotDeleted,
SignatureError,
)

from libreforms_fastapi.utils.pydantic_models import (
Expand Down Expand Up @@ -1036,8 +1037,8 @@ async def api_form_sign(

# Here we validate the user groups permit this level of access to the form
try:
for group in user.groups:
print("\n\n", group.get_permissions())
# for group in user.groups:
# print("\n\n", group.get_permissions())

user.validate_permission(form_name=form_name, required_permission="sign_own")
except Exception as e:
Expand Down Expand Up @@ -1072,6 +1073,9 @@ async def api_form_sign(
except DocumentIsDeleted as e:
raise HTTPException(status_code=410, detail=f"{e}")

except SignatureError as e:
raise HTTPException(status_code=403, detail=f"{e}")


# Send email
if config.SMTP_ENABLED:
Expand Down
12 changes: 8 additions & 4 deletions libreforms_fastapi/utils/certificates.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,31 +194,35 @@ def serialize_record_for_signing(record):
# print(select_data_fields)
return json.dumps(select_data_fields, sort_keys=True)

def sign_record(record, username, env="development"):
def sign_record(record, username, env="development", private_key_path=None):
"""
Generates a signature for the given record and inserts it into the '_signature' field.
"""
ds_manager = DigitalSignatureManager(username=username, env=env)
ds_manager = DigitalSignatureManager(username=username, env=env, private_key_path=private_key_path)
serialized = serialize_record_for_signing(record)
signature = ds_manager.sign_data(serialized.encode())
s = signature.hex()
record['metadata']['_signature'] = s # Store the signature as a hex string
return record, s

def verify_record_signature(record, username, env="development", public_key=None):
def verify_record_signature(record, username, env="development", public_key=None, private_key_path=None):
"""
Verifies the signature of the given record.
Returns True if the signature is valid, False otherwise.
"""
if '_signature' not in record['metadata'] or record['metadata']['_signature'] is None:
return False # No signature to verify

ds_manager = DigitalSignatureManager(username=username, env=env)
ds_manager = DigitalSignatureManager(username=username, env=env, private_key_path=private_key_path)

record_copy = copy.deepcopy(record)
signature_bytes = bytes.fromhex(record['metadata']['_signature'])
serialized = serialize_record_for_signing(record_copy)

# This is hackish, because ds_manager.verify_signature should be able to accept bytes. It's a workaround for now.
if isinstance(public_key, bytes):
public_key = public_key.decode('utf-8')

try:
return ds_manager.verify_signature(serialized.encode(), signature_bytes, public_key=public_key)
except InvalidSignature:
Expand Down
27 changes: 20 additions & 7 deletions libreforms_fastapi/utils/document_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from libreforms_fastapi.utils.logging import set_logger

# This import is used to afix digital signatures to records
from libreforms_fastapi.utils.certificates import sign_record
from libreforms_fastapi.utils.certificates import sign_record, verify_record_signature


# We want to modify TinyDB use use string representations of bson
Expand Down Expand Up @@ -170,11 +170,18 @@ def __init__(self, form_name, document_id):
class InsufficientPermissions(Exception):
"""Exception raised when attempting to access a document that user lacks permissions for."""
def __init__(self, form_name, document_id, username):
message = f"User '{user}' has insufficinet permissions to perform the requested operation on document" \
message = f"User '{username}' has insufficient permissions to perform the requested operation on document" \
f"with ID '{document_id}' collection '{form_name}'."
super().__init__(message)

class SignatureError(Exception):
"""Exception raised when attempting to sign a document but the process fails."""
def __init__(self, form_name, document_id, username):
message = f"User '{username}' has failed to sign the document" \
f"with ID '{document_id}' collection '{form_name}'."
super().__init__(message)

SignatureError
# Pulled from https://github.com/signebedi/gita-api
def fuzzy_search_normalized(text_string, search_term, segment_length=None):
if segment_length is None:
Expand Down Expand Up @@ -510,7 +517,8 @@ def sign_document(
public_key=None,
private_key_path=None,
metadata={},
exclude_deleted=True
exclude_deleted=True,
verify_on_sign=True,
):
"""
Manage signatures existing form in specified form's database.
Expand All @@ -535,10 +543,15 @@ def sign_document(
raise DocumentIsDeleted(form_name, document_id)

# Now we afix the signature
_, signature = sign_record(record=document, username=username, env=self.env)
try:
r, signature = sign_record(record=document, username=username, env=self.env, private_key_path=private_key_path)

# Placeholder - before proceeding, should we verify the signature and raise an
# SignatureError exception if the verification fails?
if verify_on_sign:
verify = verify_record_signature(record=r, username=username, env=self.env, public_key=public_key, private_key_path=private_key_path)
assert (verify)
# print ("\n\n\n", a)
except:
raise SignatureError(form_name, document_id, username)

current_timestamp = datetime.now(self.timezone)

Expand All @@ -563,7 +576,7 @@ def sign_document(
document['metadata'][self.signature_field] = signature


# Update only the fields that are provided in json_data and metadata, not replacing the entire
# Update only the fields that are provided in metadata, not replacing the entire
# document. The partial approach will minimize the room for mistakes from overwriting entire documents.
_ = self.databases[form_name].update(document, doc_ids=[document_id])

Expand Down
1 change: 1 addition & 0 deletions libreforms_fastapi/utils/sqlalchemy_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class User(Base):
last_login = Column(DateTime, nullable=True, default=tz_aware_datetime)
locked_until = Column(DateTime, nullable=True, default=tz_aware_datetime)
public_key = Column(LargeBinary(), nullable=True)
# public_key = Column(String, nullable=True)
private_key_ref = Column(String, nullable=True)
last_password_change = Column(DateTime, nullable=True, default=tz_aware_datetime)
failed_login_attempts = Column(Integer, default=0)
Expand Down

0 comments on commit 3687cd0

Please sign in to comment.