Skip to content

Commit

Permalink
Added: form create group permission validation (#22)
Browse files Browse the repository at this point in the history
  • Loading branch information
signebedi committed Mar 23, 2024
1 parent 53d4979 commit c80df55
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 11 deletions.
47 changes: 47 additions & 0 deletions libreforms_fastapi/app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
User,
TransactionLog,
Signing,
Group,
ApprovalChains,
)

from libreforms_fastapi.utils.scripts import (
Expand Down Expand Up @@ -208,6 +210,30 @@ async def start_test_logger():

logger.info('Relational database has been initialized')

# Create default group if it does not exist
with SessionLocal() as session:
# Check if a group with id 1 exists
default_group = session.query(Group).get(1)

if not default_group:
# If not, create and add the new default group
default_permissions = [
"example_form:create",
"example_form:read_own",
"example_form:read_all",
"example_form:update_own",
"example_form:update_all",
"example_form:delete_own",
"example_form:delete_all"
]
default_group = Group(id=1, name="default", permissions=default_permissions)
session.add(default_group)
session.commit()
logger.info("Default group created.")
else:
print(default_group.get_permissions())
logger.info("Default group already exists.")


# Initialize the document database
if config.MONGODB_ENABLED:
Expand Down Expand Up @@ -371,6 +397,12 @@ async def api_form_create(form_name: str, background_tasks: BackgroundTasks, req
# the sqlalchemy-signing table is not optimized alongside the user model...
user = session.query(User).filter_by(api_key=key).first()

try:
user.validate_permission(form_name=form_name, required_permission="create")
# print("\n\n\nUser has valid permissions\n\n\n")
except Exception as e:
raise HTTPException(status_code=403, detail=f"{e}")

# Set the document_id here, and pass to the DocumentDatabase
document_id = str(ObjectId())

Expand Down Expand Up @@ -503,6 +535,11 @@ async def api_auth_create(user_request: CreateUserRequest, background_tasks: Bac
api_key = signatures.write_key(scope=['api_key'], expiration=expiration, active=True, email=user_request.email)
new_user.api_key = api_key


# Add the user to the default group
group = session.query(Group).filter_by(name='default').first()
new_user.groups.append(group)

session.add(new_user)
session.commit()

Expand Down Expand Up @@ -565,6 +602,7 @@ async def api_auth_create(user_request: CreateUserRequest, background_tasks: Bac
# Add new user
# > paired with add newadmin UI route

# Modify user *** including disable user

# Get Transaction Statistics
# Paired with the Transaction Statistics
Expand All @@ -575,6 +613,15 @@ async def api_auth_create(user_request: CreateUserRequest, background_tasks: Bac

# Trigger site reload


# Get all groups

# Add new group

# Update group

# Delete group

##########################
### UI Routes - Forms
##########################
Expand Down
85 changes: 74 additions & 11 deletions libreforms_fastapi/utils/sqlalchemy_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from zoneinfo import ZoneInfo

from sqlalchemy import (
Table,
Boolean,
Column,
ForeignKey,
Expand All @@ -23,16 +24,27 @@
Base = declarative_base()


# Association table for the many-to-many relationship
user_group_association = Table('user_group_association', Base.metadata,
Column('user_id', Integer, ForeignKey('user.id'), primary_key=True),
Column('group_id', Integer, ForeignKey('group.id'), primary_key=True)
)

def tz_aware_datetime():
return datetime.now(config.TIMEZONE)

class InsufficientPermissionsError(Exception):
"""Raised when users lack sufficient permissions"""
pass

class User(Base):
__tablename__ = 'user'
id = Column(Integer, primary_key=True)
email = Column(String(1000))
password = Column(String(1000))
username = Column(String(1000), unique=True)
groups = Column(JSON, default=list)
# groups = Column(JSON, default=['default'])
groups = relationship('Group', secondary=user_group_association, back_populates='users')
active = Column(Boolean)
created_date = Column(DateTime, nullable=False, default=tz_aware_datetime)
last_login = Column(DateTime, nullable=True, default=tz_aware_datetime)
Expand All @@ -49,8 +61,67 @@ class User(Base):
transaction_log = relationship("TransactionLog", order_by="TransactionLog.id", back_populates="user")

def __repr__(self) -> str:
return f"User(id={self.id!r}, name={self.username!r}, site_admin={'Yes' if self.site_admin else 'No'}, " \
f"active={'Yes' if self.active else 'No'}, groups={self.groups})"

# Here we join the group names and represent them as a comma-separated string of values
groups = ", ".join([x.name for x in self.groups])

return f"User(id={self.id!r}, name={self.username!r}, email={self.email}, site_admin={'Yes' if self.site_admin else 'No'}, " \
f"active={'Yes' if self.active else 'No'}, groups={groups})"

def validate_permission(self, form_name: str, required_permission: str) -> bool:
"""
Checks if the user has the required permission for a given form across all assigned groups.
:param form_name: The name of the form.
:param required_permission: The specific permission to check for.
:returns: True if at least one of the user's groups grants the required permission.
:raises InsufficientPermissionsError: If none of the groups grant the required permission.
"""
for group in self.groups:
# Utilize Group's method to unpack permissions
permissions = group.get_permissions()

# Check if the group grants the required permission for the form
if form_name in permissions and required_permission in permissions[form_name]:
return True # Permission granted by this group

# If no group grants the permission, raise an error
raise InsufficientPermissionsError(f"User does not have the required permission: {required_permission} for form: {form_name}")

# Allow admins to define custom groups, see
# https://github.com/signebedi/libreforms-fastapi/issues/22
class Group(Base):
__tablename__ = 'group'
id = Column(Integer, primary_key=True)
name = Column(String(1000), unique=True)
permissions = Column(JSON)
users = relationship('User', secondary=user_group_association, back_populates='groups')

def get_permissions(self) -> dict:
"""We expect permissions to be a list of permissions in the format of form_name:permission_granted - here, we unpack them"""
unpack_permissions = {}

for item in self.permissions:
i = item.split(":")
form_name = i[0]
permission =i[1]

if form_name not in unpack_permissions.keys():
unpack_permissions[form_name] = []

unpack_permissions[form_name].append(permission)

return unpack_permissions

# def validate_permission(self, form_name, permission):
# permission_dict = self.get_permissions()
# if form_name not in permission_dict.keys():
# raise InsufficientPermissionsError("User does not have the required permissions")

# if permission not in permission_dict[form_name]:
# raise InsufficientPermissionsError("User does not have the required permissions")

# return True

# Many to one relationship with User table
class TransactionLog(Base):
Expand All @@ -65,14 +136,6 @@ class TransactionLog(Base):

user = relationship("User", back_populates="transaction_log")

# Allow admins to define custom groups, see
# https://github.com/signebedi/libreforms-fastapi/issues/22
class Group(Base):
__tablename__ = 'group'
id = Column(Integer, primary_key=True)
name = Column(String(1000), unique=True)
permissions = Column(JSON)

# Allow custom approval chains to be defined here
class ApprovalChains(Base):
__tablename__ = 'approval_chains'
Expand Down

0 comments on commit c80df55

Please sign in to comment.