diff --git a/src/server/operandi_server/routers/admin_panel.py b/src/server/operandi_server/routers/admin_panel.py index 65fc831..0c8bdf8 100644 --- a/src/server/operandi_server/routers/admin_panel.py +++ b/src/server/operandi_server/routers/admin_panel.py @@ -1,10 +1,16 @@ from logging import getLogger +from datetime import datetime +from typing import List, Optional from fastapi import APIRouter, Depends, HTTPException, status from fastapi.security import HTTPBasic, HTTPBasicCredentials -from operandi_server.models import PYUserInfo +from operandi_server.models import PYUserInfo, WorkflowJobRsrc, WorkspaceRsrc, WorkflowRsrc from operandi_utils.constants import AccountType, ServerApiTag -from operandi_utils.database import db_get_all_user_accounts, db_get_processing_stats +from operandi_utils.database import ( + db_get_all_user_accounts, db_get_processing_stats, db_get_all_jobs_by_user, + db_get_workflow, db_get_workspace, db_get_all_workspaces_by_user, db_get_all_workflows_by_user + +) from operandi_utils.utils import send_bag_to_ola_hd from .user import RouterUser from .workspace_utils import create_workspace_bag, get_db_workspace_with_handling, validate_bag_with_handling @@ -30,7 +36,21 @@ def __init__(self): endpoint=self.get_processing_stats_for_user, methods=["GET"], status_code=status.HTTP_200_OK, summary="Get processing stats for a specific user by user_id" ) - + self.router.add_api_route( + path="/admin/{user_id}/workflow_jobs", + endpoint=self.user_workflow_jobs, methods=["GET"], status_code=status.HTTP_200_OK, + summary="Get all workflow jobs submitted by the user identified by user_id" + ) + self.router.add_api_route( + path="/admin/{user_id}/workspaces", + endpoint=self.user_workspaces, methods=["GET"], status_code=status.HTTP_200_OK, + summary="Get all workspaces submitted by the user identified by user_id" + ) + self.router.add_api_route( + path="/admin/{user_id}/workflows", + endpoint=self.user_workflows, methods=["GET"], status_code=status.HTTP_200_OK, + summary="Get all workflows submitted by the user identified by user_id" + ) async def push_to_ola_hd(self, workspace_id: str, auth: HTTPBasicCredentials = Depends(HTTPBasic())): py_user_action = await self.user_authenticator.user_login(auth) if py_user_action.account_type != AccountType.ADMIN: @@ -91,4 +111,74 @@ async def get_processing_stats_for_user(self, user_id: str, auth: HTTPBasicCrede raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=message) # Return the processing stats in the response model - return db_processing_stats \ No newline at end of file + return db_processing_stats + async def user_workflow_jobs( + self, + user_id: str, + auth: HTTPBasicCredentials = Depends(HTTPBasic()), + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None + ) -> List: + # Authenticate the admin user + py_user_action = await self.user_authenticator.user_login(auth) + if py_user_action.account_type != AccountType.ADMIN: + message = f"Admin privileges required for the endpoint" + self.logger.error(f"{message}") + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=message) + # Retrieve workflow jobs for the user identified with user_id with optional date filtering + db_workflow_jobs = await db_get_all_jobs_by_user( + user_id=user_id, + start_date=start_date, + end_date=end_date + ) + response = [] + for db_workflow_job in db_workflow_jobs: + db_workflow = await db_get_workflow(db_workflow_job.workflow_id) + db_workspace = await db_get_workspace(db_workflow_job.workspace_id) + response.append(WorkflowJobRsrc.from_db_workflow_job(db_workflow_job, db_workflow, db_workspace)) + return response + + + async def user_workspaces( + self, + user_id: str, + auth: HTTPBasicCredentials = Depends(HTTPBasic()), + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None + ) -> List: + # Authenticate the admin user + py_user_action = await self.user_authenticator.user_login(auth) + if py_user_action.account_type != AccountType.ADMIN: + message = f"Admin privileges required for the endpoint" + self.logger.error(f"{message}") + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=message) + # Retrieve workspaces for the user with optional date filtering + db_workspaces = await db_get_all_workspaces_by_user( + user_id=user_id, + start_date=start_date, + end_date=end_date + ) + + return [WorkspaceRsrc.from_db_workspace(db_workspace) for db_workspace in db_workspaces] + + async def user_workflows( + self, + user_id: str, + auth: HTTPBasicCredentials = Depends(HTTPBasic()), + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None + ) -> List: + # Authenticate the admin user + py_user_action = await self.user_authenticator.user_login(auth) + if py_user_action.account_type != AccountType.ADMIN: + message = f"Admin privileges required for the endpoint" + self.logger.error(f"{message}") + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=message) + # Retrieve workspaces for the user with optional date filtering + db_workflows = await db_get_all_workflows_by_user( + user_id=user_id, + start_date=start_date, + end_date=end_date + ) + + return [WorkflowRsrc.from_db_workflow(db_workflow) for db_workflow in db_workflows] diff --git a/src/server/operandi_server/routers/user.py b/src/server/operandi_server/routers/user.py index 1e9bff6..bc6243e 100644 --- a/src/server/operandi_server/routers/user.py +++ b/src/server/operandi_server/routers/user.py @@ -1,11 +1,16 @@ from logging import getLogger +from typing import List, Optional +from datetime import datetime from fastapi import APIRouter, Depends, HTTPException, status from fastapi.security import HTTPBasic, HTTPBasicCredentials from operandi_utils.constants import AccountType, ServerApiTag -from operandi_utils.database import db_get_processing_stats, db_get_user_account_with_email +from operandi_utils.database import ( + db_get_processing_stats, db_get_all_jobs_by_user, db_get_user_account_with_email, + db_get_workflow, db_get_workspace, db_get_all_workspaces_by_user, db_get_all_workflows_by_user +) from operandi_server.exceptions import AuthenticationError -from operandi_server.models import PYUserAction +from operandi_server.models import PYUserAction, WorkflowJobRsrc, WorkspaceRsrc, WorkflowRsrc from operandi_utils.database.models import DBProcessingStatistics from .user_utils import user_auth, user_register_with_handling @@ -32,6 +37,25 @@ def __init__(self): summary="Get user account statistics of the current account", response_model=DBProcessingStatistics, response_model_exclude_unset=True, response_model_exclude_none=True ) + self.router.add_api_route( + path="/user/workflow_jobs", + endpoint=self.user_workflow_jobs, methods=["GET"], status_code=status.HTTP_200_OK, + summary="Get all workflow jobs submitted by the user", + response_model=List, response_model_exclude_unset=True, response_model_exclude_none=True + ) + self.router.add_api_route( + path="/user/workspaces", + endpoint=self.user_workspaces, methods=["GET"], status_code=status.HTTP_200_OK, + summary="Get all workspaces submitted by the user", + response_model=List, response_model_exclude_unset=True, response_model_exclude_none=True + ) + self.router.add_api_route( + path="/user/workflows", + endpoint=self.user_workflows, methods=["GET"], status_code=status.HTTP_200_OK, + summary="Get all workflows submitted by the user", + response_model=List, response_model_exclude_unset=True, response_model_exclude_none=True + ) + async def user_login(self, auth: HTTPBasicCredentials = Depends(HTTPBasic())) -> PYUserAction: """ @@ -82,3 +106,64 @@ async def user_processing_stats(self, auth: HTTPBasicCredentials = Depends(HTTPB db_user_account = await db_get_user_account_with_email(email=auth.username) db_processing_stats = await db_get_processing_stats(db_user_account.user_id) return db_processing_stats + + async def user_workflow_jobs( + self, + auth: HTTPBasicCredentials = Depends(HTTPBasic()), + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None + ) -> List: + await self.user_login(auth) + # Fetch user account details + db_user_account = await db_get_user_account_with_email(email=auth.username) + # Retrieve workflow jobs for the user with optional date filtering + db_workflow_jobs = await db_get_all_jobs_by_user( + user_id=db_user_account.user_id, + start_date=start_date, + end_date=end_date + ) + response = [] + for db_workflow_job in db_workflow_jobs: + db_workflow = await db_get_workflow(db_workflow_job.workflow_id) + db_workspace = await db_get_workspace(db_workflow_job.workspace_id) + response.append(WorkflowJobRsrc.from_db_workflow_job(db_workflow_job, db_workflow, db_workspace)) + return response + + + async def user_workspaces( + self, + auth: HTTPBasicCredentials = Depends(HTTPBasic()), + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None + ) -> List: + + await self.user_login(auth) + # Fetch user account details + db_user_account = await db_get_user_account_with_email(email=auth.username) + # Retrieve workspaces for the user with optional date filtering + db_workspaces = await db_get_all_workspaces_by_user( + user_id=db_user_account.user_id, + start_date=start_date, + end_date=end_date + ) + + return [WorkspaceRsrc.from_db_workspace(db_workspace) for db_workspace in db_workspaces] + + async def user_workflows( + self, + auth: HTTPBasicCredentials = Depends(HTTPBasic()), + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None + ) -> List: + + await self.user_login(auth) + # Fetch user account details + db_user_account = await db_get_user_account_with_email(email=auth.username) + # Retrieve workflow for the user with optional date filtering + db_workflows = await db_get_all_workflows_by_user( + user_id=db_user_account.user_id, + start_date=start_date, + end_date=end_date + ) + + return [WorkflowRsrc.from_db_workflow(db_workflow) for db_workflow in db_workflows] diff --git a/src/utils/operandi_utils/database/__init__.py b/src/utils/operandi_utils/database/__init__.py index c2639de..e1460b8 100644 --- a/src/utils/operandi_utils/database/__init__.py +++ b/src/utils/operandi_utils/database/__init__.py @@ -16,8 +16,11 @@ "db_get_user_account", "db_get_user_account_with_email", "db_get_workflow", + "db_get_all_workflows_by_user", "db_get_workflow_job", + "db_get_all_jobs_by_user", "db_get_workspace", + "db_get_all_workspaces_by_user", "db_increase_processing_stats", "db_increase_processing_stats_with_handling", "db_initiate_database", @@ -38,8 +41,11 @@ "sync_db_get_user_account", "sync_db_get_user_account_with_email", "sync_db_get_workflow", + "sync_db_get_all_workflows_by_user", "sync_db_get_workflow_job", + "sync_db_get_all_jobs_by_user", "sync_db_get_workspace", + "sync_db_get_all_workspaces_by_user", "sync_db_increase_processing_stats", "sync_db_initiate_database", "sync_db_update_hpc_slurm_job", @@ -74,26 +80,32 @@ from .db_workflow import ( db_create_workflow, db_get_workflow, + db_get_all_workflows_by_user, db_update_workflow, sync_db_create_workflow, sync_db_get_workflow, + sync_db_get_all_workflows_by_user, sync_db_update_workflow ) from .db_workflow_job import ( db_create_workflow_job, db_get_workflow_job, + db_get_all_jobs_by_user, db_update_workflow_job, sync_db_create_workflow_job, sync_db_get_workflow_job, + sync_db_get_all_jobs_by_user, sync_db_update_workflow_job ) from .db_workspace import ( db_create_workspace, db_get_workspace, + db_get_all_workspaces_by_user, db_update_workspace, sync_db_create_workspace, sync_db_get_workspace, - sync_db_update_workspace + sync_db_update_workspace, + sync_db_get_all_workspaces_by_user ) from .db_processing_statistics import ( db_create_processing_stats, diff --git a/src/utils/operandi_utils/database/db_workflow.py b/src/utils/operandi_utils/database/db_workflow.py index 03e0d07..91b3da8 100644 --- a/src/utils/operandi_utils/database/db_workflow.py +++ b/src/utils/operandi_utils/database/db_workflow.py @@ -1,4 +1,5 @@ from datetime import datetime +from typing import List, Optional from operandi_utils import call_sync from .models import DBWorkflow @@ -48,6 +49,22 @@ async def db_get_workflow(workflow_id: str) -> DBWorkflow: raise RuntimeError(f"No DB workflow entry found for id: {workflow_id}") return db_workflow +async def db_get_all_workflows_by_user(user_id: str, start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None) -> List[DBWorkflow]: + # Start with the user_id filter + query = {"user_id": user_id} + + # Add date filters conditionally + if start_date or end_date: + query["datetime"] = {} + if start_date: + query["datetime"]["$gte"] = start_date + if end_date: + query["datetime"]["$lte"] = end_date + + # Execute the query + db_workflows = await DBWorkflow.find_many(query).to_list() + return db_workflows @call_sync async def sync_db_get_workflow(workflow_id: str) -> DBWorkflow: @@ -83,3 +100,7 @@ async def db_update_workflow(find_workflow_id: str, **kwargs) -> DBWorkflow: @call_sync async def sync_db_update_workflow(find_workflow_id: str, **kwargs) -> DBWorkflow: return await db_update_workflow(find_workflow_id=find_workflow_id, **kwargs) + +@call_sync +async def sync_db_get_all_workflows_by_user(user_id: str, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None) -> List[DBWorkflow]: + return await db_get_all_workflows_by_user(user_id, start_date, end_date) \ No newline at end of file diff --git a/src/utils/operandi_utils/database/db_workflow_job.py b/src/utils/operandi_utils/database/db_workflow_job.py index 4c4ebc2..a96a511 100644 --- a/src/utils/operandi_utils/database/db_workflow_job.py +++ b/src/utils/operandi_utils/database/db_workflow_job.py @@ -1,7 +1,8 @@ from datetime import datetime +from typing import List, Optional from operandi_utils import call_sync from operandi_utils.constants import StateJob -from .models import DBWorkflowJob +from operandi_utils.database.models import DBWorkflowJob async def db_create_workflow_job( @@ -37,6 +38,24 @@ async def db_get_workflow_job(job_id: str) -> DBWorkflowJob: return db_workflow_job +async def db_get_all_jobs_by_user(user_id: str, start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None) -> List[DBWorkflowJob]: + # Start with the user_id filter + query = {"user_id": user_id} + + # Add date filters conditionally + if start_date or end_date: + query["datetime"] = {} + if start_date: + query["datetime"]["$gte"] = start_date + if end_date: + query["datetime"]["$lte"] = end_date + + # Execute the query + db_workflow_jobs = await DBWorkflowJob.find_many(query).to_list() + return db_workflow_jobs + + @call_sync async def sync_db_get_workflow_job(job_id: str) -> DBWorkflowJob: return await db_get_workflow_job(job_id) @@ -77,3 +96,7 @@ async def db_update_workflow_job(find_job_id: str, **kwargs) -> DBWorkflowJob: @call_sync async def sync_db_update_workflow_job(find_job_id: str, **kwargs) -> DBWorkflowJob: return await db_update_workflow_job(find_job_id=find_job_id, **kwargs) + +@call_sync +async def sync_db_get_all_jobs_by_user(user_id: str, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None) -> List[DBWorkflowJob]: + return await db_get_all_jobs_by_user(user_id, start_date, end_date) \ No newline at end of file diff --git a/src/utils/operandi_utils/database/db_workspace.py b/src/utils/operandi_utils/database/db_workspace.py index fd7a675..7895d05 100644 --- a/src/utils/operandi_utils/database/db_workspace.py +++ b/src/utils/operandi_utils/database/db_workspace.py @@ -1,4 +1,4 @@ -from typing import List +from typing import List, Optional from datetime import datetime from os.path import join from operandi_utils import call_sync @@ -75,6 +75,23 @@ async def db_get_workspace(workspace_id: str) -> DBWorkspace: return db_workspace +async def db_get_all_workspaces_by_user(user_id: str, start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None) -> List[DBWorkspace]: + # Start with the user_id filter + query = {"user_id": user_id} + + # Add date filters conditionally + if start_date or end_date: + query["datetime"] = {} + if start_date: + query["datetime"]["$gte"] = start_date + if end_date: + query["datetime"]["$lte"] = end_date + + # Execute the query + db_workspaces = await DBWorkspace.find_many(query).to_list() + return db_workspaces + @call_sync async def sync_db_get_workspace(workspace_id: str) -> DBWorkspace: return await db_get_workspace(workspace_id) @@ -121,3 +138,7 @@ async def db_update_workspace(find_workspace_id: str, **kwargs) -> DBWorkspace: @call_sync async def sync_db_update_workspace(find_workspace_id: str, **kwargs) -> DBWorkspace: return await db_update_workspace(find_workspace_id=find_workspace_id, **kwargs) + +@call_sync +async def sync_db_get_all_workspaces_by_user(user_id: str, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None) -> List[DBWorkspace]: + return await db_get_all_workspaces_by_user(user_id, start_date, end_date) \ No newline at end of file