From c80df55004919ba7ec4905d1b2fc1e3f6459eb88 Mon Sep 17 00:00:00 2001 From: signebedi Date: Sat, 23 Mar 2024 18:13:01 -0500 Subject: [PATCH] Added: form create group permission validation (#22) --- libreforms_fastapi/app/__init__.py | 47 ++++++++++ libreforms_fastapi/utils/sqlalchemy_models.py | 85 ++++++++++++++++--- 2 files changed, 121 insertions(+), 11 deletions(-) diff --git a/libreforms_fastapi/app/__init__.py b/libreforms_fastapi/app/__init__.py index e2492d1..a9c1676 100644 --- a/libreforms_fastapi/app/__init__.py +++ b/libreforms_fastapi/app/__init__.py @@ -52,6 +52,8 @@ User, TransactionLog, Signing, + Group, + ApprovalChains, ) from libreforms_fastapi.utils.scripts import ( @@ -208,6 +210,30 @@ async def start_test_logger(): logger.info('Relational database has been initialized') +# Create default group if it does not exist +with SessionLocal() as session: + # Check if a group with id 1 exists + default_group = session.query(Group).get(1) + + if not default_group: + # If not, create and add the new default group + default_permissions = [ + "example_form:create", + "example_form:read_own", + "example_form:read_all", + "example_form:update_own", + "example_form:update_all", + "example_form:delete_own", + "example_form:delete_all" + ] + default_group = Group(id=1, name="default", permissions=default_permissions) + session.add(default_group) + session.commit() + logger.info("Default group created.") + else: + print(default_group.get_permissions()) + logger.info("Default group already exists.") + # Initialize the document database if config.MONGODB_ENABLED: @@ -371,6 +397,12 @@ async def api_form_create(form_name: str, background_tasks: BackgroundTasks, req # the sqlalchemy-signing table is not optimized alongside the user model... user = session.query(User).filter_by(api_key=key).first() + try: + user.validate_permission(form_name=form_name, required_permission="create") + # print("\n\n\nUser has valid permissions\n\n\n") + except Exception as e: + raise HTTPException(status_code=403, detail=f"{e}") + # Set the document_id here, and pass to the DocumentDatabase document_id = str(ObjectId()) @@ -503,6 +535,11 @@ async def api_auth_create(user_request: CreateUserRequest, background_tasks: Bac api_key = signatures.write_key(scope=['api_key'], expiration=expiration, active=True, email=user_request.email) new_user.api_key = api_key + + # Add the user to the default group + group = session.query(Group).filter_by(name='default').first() + new_user.groups.append(group) + session.add(new_user) session.commit() @@ -565,6 +602,7 @@ async def api_auth_create(user_request: CreateUserRequest, background_tasks: Bac # Add new user # > paired with add newadmin UI route +# Modify user *** including disable user # Get Transaction Statistics # Paired with the Transaction Statistics @@ -575,6 +613,15 @@ async def api_auth_create(user_request: CreateUserRequest, background_tasks: Bac # Trigger site reload + +# Get all groups + +# Add new group + +# Update group + +# Delete group + ########################## ### UI Routes - Forms ########################## diff --git a/libreforms_fastapi/utils/sqlalchemy_models.py b/libreforms_fastapi/utils/sqlalchemy_models.py index f661721..3dd7490 100644 --- a/libreforms_fastapi/utils/sqlalchemy_models.py +++ b/libreforms_fastapi/utils/sqlalchemy_models.py @@ -3,6 +3,7 @@ from zoneinfo import ZoneInfo from sqlalchemy import ( + Table, Boolean, Column, ForeignKey, @@ -23,16 +24,27 @@ Base = declarative_base() +# Association table for the many-to-many relationship +user_group_association = Table('user_group_association', Base.metadata, + Column('user_id', Integer, ForeignKey('user.id'), primary_key=True), + Column('group_id', Integer, ForeignKey('group.id'), primary_key=True) +) + def tz_aware_datetime(): return datetime.now(config.TIMEZONE) +class InsufficientPermissionsError(Exception): + """Raised when users lack sufficient permissions""" + pass + class User(Base): __tablename__ = 'user' id = Column(Integer, primary_key=True) email = Column(String(1000)) password = Column(String(1000)) username = Column(String(1000), unique=True) - groups = Column(JSON, default=list) + # groups = Column(JSON, default=['default']) + groups = relationship('Group', secondary=user_group_association, back_populates='users') active = Column(Boolean) created_date = Column(DateTime, nullable=False, default=tz_aware_datetime) last_login = Column(DateTime, nullable=True, default=tz_aware_datetime) @@ -49,8 +61,67 @@ class User(Base): transaction_log = relationship("TransactionLog", order_by="TransactionLog.id", back_populates="user") def __repr__(self) -> str: - return f"User(id={self.id!r}, name={self.username!r}, site_admin={'Yes' if self.site_admin else 'No'}, " \ - f"active={'Yes' if self.active else 'No'}, groups={self.groups})" + + # Here we join the group names and represent them as a comma-separated string of values + groups = ", ".join([x.name for x in self.groups]) + + return f"User(id={self.id!r}, name={self.username!r}, email={self.email}, site_admin={'Yes' if self.site_admin else 'No'}, " \ + f"active={'Yes' if self.active else 'No'}, groups={groups})" + + def validate_permission(self, form_name: str, required_permission: str) -> bool: + """ + Checks if the user has the required permission for a given form across all assigned groups. + + :param form_name: The name of the form. + :param required_permission: The specific permission to check for. + :returns: True if at least one of the user's groups grants the required permission. + :raises InsufficientPermissionsError: If none of the groups grant the required permission. + """ + for group in self.groups: + # Utilize Group's method to unpack permissions + permissions = group.get_permissions() + + # Check if the group grants the required permission for the form + if form_name in permissions and required_permission in permissions[form_name]: + return True # Permission granted by this group + + # If no group grants the permission, raise an error + raise InsufficientPermissionsError(f"User does not have the required permission: {required_permission} for form: {form_name}") + +# Allow admins to define custom groups, see +# https://github.com/signebedi/libreforms-fastapi/issues/22 +class Group(Base): + __tablename__ = 'group' + id = Column(Integer, primary_key=True) + name = Column(String(1000), unique=True) + permissions = Column(JSON) + users = relationship('User', secondary=user_group_association, back_populates='groups') + + def get_permissions(self) -> dict: + """We expect permissions to be a list of permissions in the format of form_name:permission_granted - here, we unpack them""" + unpack_permissions = {} + + for item in self.permissions: + i = item.split(":") + form_name = i[0] + permission =i[1] + + if form_name not in unpack_permissions.keys(): + unpack_permissions[form_name] = [] + + unpack_permissions[form_name].append(permission) + + return unpack_permissions + + # def validate_permission(self, form_name, permission): + # permission_dict = self.get_permissions() + # if form_name not in permission_dict.keys(): + # raise InsufficientPermissionsError("User does not have the required permissions") + + # if permission not in permission_dict[form_name]: + # raise InsufficientPermissionsError("User does not have the required permissions") + + # return True # Many to one relationship with User table class TransactionLog(Base): @@ -65,14 +136,6 @@ class TransactionLog(Base): user = relationship("User", back_populates="transaction_log") -# Allow admins to define custom groups, see -# https://github.com/signebedi/libreforms-fastapi/issues/22 -class Group(Base): - __tablename__ = 'group' - id = Column(Integer, primary_key=True) - name = Column(String(1000), unique=True) - permissions = Column(JSON) - # Allow custom approval chains to be defined here class ApprovalChains(Base): __tablename__ = 'approval_chains'