From 82766501655e0ce56c18557bccc24f73531ea852 Mon Sep 17 00:00:00 2001 From: David Cruciani Date: Wed, 7 Feb 2024 15:53:43 +0100 Subject: [PATCH] chg: [webiste] history --- .gitignore | 3 +- webiste/app/case/case.py | 591 ---------------------------- webiste/app/case/case_api.py | 602 ---------------------------- webiste/app/case/case_core.py | 610 ----------------------------- webiste/app/case/case_core_api.py | 160 -------- webiste/app/case/common_core.py | 251 ------------ webiste/app/case/form.py | 116 ------ webiste/app/case/task.py | 464 ---------------------- webiste/app/case/task_core.py | 555 -------------------------- webiste/app/session.py | 10 +- webiste/app/templates/history.html | 21 +- 11 files changed, 27 insertions(+), 3356 deletions(-) delete mode 100644 webiste/app/case/case.py delete mode 100644 webiste/app/case/case_api.py delete mode 100644 webiste/app/case/case_core.py delete mode 100644 webiste/app/case/case_core_api.py delete mode 100644 webiste/app/case/common_core.py delete mode 100644 webiste/app/case/form.py delete mode 100644 webiste/app/case/task.py delete mode 100644 webiste/app/case/task_core.py diff --git a/.gitignore b/.gitignore index 4c3db86e..f36ec063 100644 --- a/.gitignore +++ b/.gitignore @@ -19,4 +19,5 @@ site* venv* #vscode -.vscode* \ No newline at end of file +.vscode* +*.sqlite diff --git a/webiste/app/case/case.py b/webiste/app/case/case.py deleted file mode 100644 index f5351f24..00000000 --- a/webiste/app/case/case.py +++ /dev/null @@ -1,591 +0,0 @@ -import json -from flask import Blueprint, render_template, redirect, jsonify, request, flash -from .form import CaseForm, CaseEditForm, AddOrgsCase, RecurringForm -from flask_login import login_required, current_user -from . import case_core as CaseModel -from . import common_core as CommonModel -from . import task_core as TaskModel -from ..db_class.db import Task_Template, Case_Template -from ..decorators import editor_required -from ..utils.utils import form_to_dict, check_tag - -case_blueprint = Blueprint( - 'case', - __name__, - template_folder='templates', - static_folder='static' -) - -from .task import task_blueprint -case_blueprint.register_blueprint(task_blueprint) - -########## -# Render # -########## - - -@case_blueprint.route("/", methods=['GET', 'POST']) -@login_required -def index(): - """List all cases""" - return render_template("case/case_index.html") - -@case_blueprint.route("/create_case", methods=['GET', 'POST']) -@login_required -def create_case(): - """Create a case""" - form = CaseForm() - form.template_select.choices = [(template.id, template.title) for template in Case_Template.query.all()] - form.template_select.choices.insert(0, (0," ")) - - form.tasks_templates.choices = [(template.id, template.title) for template in Task_Template.query.all()] - form.tasks_templates.choices.insert(0, (0," ")) - - if form.validate_on_submit(): - tag_list = request.form.getlist("tags_select") - cluster_list = request.form.getlist("clusters_select") - if CommonModel.check_tag(tag_list): - if CommonModel.check_cluster(cluster_list): - form_dict = form_to_dict(form) - form_dict["tags"] = tag_list - form_dict["clusters"] = cluster_list - case = CaseModel.create_case(form_dict, current_user) - flash("Case created", "success") - return redirect(f"/case/{case.id}") - return render_template("case/create_case.html", form=form) - return render_template("case/create_case.html", form=form) - return render_template("case/create_case.html", form=form) - -@case_blueprint.route("/", methods=['GET', 'POST']) -@login_required -def view(cid): - """View a case""" - case = CommonModel.get_case(cid) - if case: - present_in_case = CaseModel.get_present_in_case(cid, current_user) - return render_template("case/case_view.html", case=case.to_json(), present_in_case=present_in_case) - return render_template("404.html") - - -@case_blueprint.route("/edit/", methods=['GET','POST']) -@login_required -@editor_required -def edit_case(cid): - """Edit the case""" - if CommonModel.get_case(cid): - if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin(): - form = CaseEditForm() - - if form.validate_on_submit(): - tag_list = request.form.getlist("tags_select") - cluster_list = request.form.getlist("clusters_select") - if CommonModel.check_tag(tag_list): - if CommonModel.check_cluster(cluster_list): - form_dict = form_to_dict(form) - form_dict["tags"] = tag_list - form_dict["clusters"] = cluster_list - CaseModel.edit_case(form_dict, cid, current_user) - flash("Case edited", "success") - return redirect(f"/case/{cid}") - return render_template("case/edit_case.html", form=form) - return render_template("case/edit_case.html", form=form) - else: - case_modif = CommonModel.get_case(cid) - form.description.data = case_modif.description - form.title.data = case_modif.title - form.deadline_date.data = case_modif.deadline - form.deadline_time.data = case_modif.deadline - - return render_template("case/edit_case.html", form=form) - else: - flash("Access denied", "error") - else: - return render_template("404.html") - return redirect(f"/case/{id}") - - -@case_blueprint.route("//add_orgs", methods=['GET', 'POST']) -@login_required -@editor_required -def add_orgs(cid): - """Add orgs to the case""" - - if CommonModel.get_case(cid): - if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin(): - form = AddOrgsCase() - case_org = CommonModel.get_org_in_case_by_case_id(cid) - org_list = list() - for org in CommonModel.get_org_order_by_name(): - if case_org: - flag = False - for c_o in case_org: - if c_o.org_id == org.id: - flag = True - if not flag: - org_list.append((org.id, f"{org.name}")) - else: - org_list.append((org.id, f"{org.name}")) - - form.org_id.choices = org_list - form.case_id.data = cid - - if form.validate_on_submit(): - form_dict = form_to_dict(form) - CaseModel.add_orgs_case(form_dict, cid, current_user) - flash("Orgs added", "success") - return redirect(f"/case/{cid}") - - return render_template("case/add_orgs.html", form=form) - else: - flash("Access denied", "error") - else: - return render_template("404.html") - return redirect(f"/case/{cid}") - - -@case_blueprint.route("//recurring", methods=['GET', 'POST']) -@login_required -@editor_required -def recurring(cid): - """Recurring form""" - - if CommonModel.get_case(cid): - if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin(): - form = RecurringForm() - form.case_id.data = cid - - # List orgs and users in and verify if all users of an org are currently notify - orgs_in_case = CommonModel.get_orgs_in_case(cid) - orgs_to_return = list() - for org in orgs_in_case: - loc = org.to_json() - loc["users"] = list() - cp_checked_user = 0 - cp_users = 0 - for user in org.users: - cp_users += 1 - loc_user = user.to_json() - if CommonModel.get_recu_notif_user(cid, user.id): - loc_user["checked"] = True - cp_checked_user += 1 - else: - loc_user["checked"] = False - loc["users"].append(loc_user) - # if all users in an org are notify, then check the org checkbox - if cp_checked_user == cp_users: - loc["checked"] = True - else: - loc["checked"] = False - orgs_to_return.append(loc) - - if form.validate_on_submit(): - form_dict = form_to_dict(form) - if not CaseModel.change_recurring(form_dict, cid, current_user): - flash("Recurring empty", "error") - return redirect(f"/case/{cid}/recurring") - if not form_dict["remove"]: - CaseModel.notify_user_recurring(request.form.to_dict(), cid, orgs_in_case) - flash("Recurring set", "success") - return redirect(f"/case/{cid}") - - return render_template("case/case_recurring.html", form=form, orgs=orgs_to_return) - - flash("Action not allowed", "warning") - return redirect(f"/case/{cid}") - - return render_template("404.html") - - -############ -# Function # -# Route # -############ - -@case_blueprint.route("/get_cases_page", methods=['GET']) -@login_required -def get_cases(): - """Return all cases""" - page = request.args.get('page', 1, type=int) - tags = request.args.get('tags') - taxonomies = request.args.get('taxonomies') - or_and = request.args.get("or_and") - - cases = CaseModel.sort_by_status(page, tags, taxonomies, or_and, completed=False) - role = CommonModel.get_role(current_user).to_json() - - loc = CaseModel.regroup_case_info(cases, current_user) - return jsonify({"cases": loc["cases"], "role": role, "nb_pages": cases.pages}), 200 - - -@case_blueprint.route("/search", methods=['GET']) -@login_required -def search(): - """Return cases matching search terms""" - text_search = request.args.get("text") - cases = CommonModel.search(text_search) - if cases: - return {"cases": [case.to_json() for case in cases]}, 200 - return {"message": "No case", 'toast_class': "danger-subtle"}, 404 - - -@case_blueprint.route("//delete", methods=['GET']) -@login_required -@editor_required -def delete(cid): - """Delete the case""" - - if CommonModel.get_case(cid): - if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin(): - if CaseModel.delete_case(cid, current_user): - return {"message": "Case deleted", "toast_class": "success-subtle"}, 200 - else: - return {"message": "Error case deleted", 'toast_class': "danger-subtle"}, 400 - return {"message": "Action not allowed", "toast_class": "warning-subtle"}, 401 - return {"message": "Case no found", 'toast_class': "danger-subtle"}, 404 - - -@case_blueprint.route("//get_case_info", methods=['GET']) -@login_required -def get_case_info(cid): - """Return all info of the case""" - case = CommonModel.get_case(cid) - if case: - tasks = TaskModel.sort_by_status_task_core(case, current_user, completed=False) - - o_in_c = CommonModel.get_orgs_in_case(case.id) - orgs_in_case = [o_c.to_json() for o_c in o_in_c] - permission = CommonModel.get_role(current_user).to_json() - present_in_case = CaseModel.get_present_in_case(cid, current_user) - - return jsonify({"case": case.to_json(), "tasks": tasks, "orgs_in_case": orgs_in_case, "permission": permission, "present_in_case": present_in_case, "current_user": current_user.to_json()}), 200 - return {"message": "Case not found", 'toast_class': "danger-subtle"}, 404 - - -@case_blueprint.route("//complete_case", methods=['GET']) -@login_required -@editor_required -def complete_case(cid): - """Complete the case""" - if CommonModel.get_case(cid): - if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin(): - if CaseModel.complete_case(cid, current_user): - flash("Case Completed") - if request.args.get('revived', 1) == "true": - return {"message": "Case Revived", "toast_class": "success-subtle"}, 200 - return {"message": "Case completed", "toast_class": "success-subtle"}, 200 - else: - if request.args.get('revived', 1) == "true": - return {"message": "Error case revived", 'toast_class': "danger-subtle"}, 400 - return {"message": "Error case completed", 'toast_class': "danger-subtle"}, 400 - return {"message": "Action not allowed", "toast_class": "warning-subtle"}, 401 - return {"message": "Case not found", 'toast_class': "danger-subtle"}, 404 - - -@case_blueprint.route("//remove_org/", methods=['GET']) -@login_required -@editor_required -def remove_org_case(cid, oid): - """Remove an org to the case""" - - if CommonModel.get_case(cid): - if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin(): - if CaseModel.remove_org_case(cid, oid, current_user): - return {"message": "Org removed from case", "toast_class": "success-subtle"}, 200 - return {"message": "Error removing org from case", "toast_class": "danger-subtle"}, 400 - return {"message": "Action not allowed", "toast_class": "warning-subtle"}, 401 - return {"message": "Case not found", 'toast_class': "danger-subtle"}, 404 - - -@case_blueprint.route("//change_status", methods=['POST']) -@login_required -@editor_required -def change_status(cid): - """Change the status of the case""" - - status = request.json["status"] - case = CommonModel.get_case(cid) - - if CommonModel.get_case(cid): - if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin(): - CaseModel.change_status_core(status, case, current_user) - return {"message": "Status changed", "toast_class": "success-subtle"}, 200 - return {"message": "Action not allowed", "toast_class": "warning-subtle"}, 401 - return {"message": "Case not found", 'toast_class': "danger-subtle"}, 404 - - -@case_blueprint.route("/get_status", methods=['GET']) -@login_required -def get_status(): - """Get status""" - - status = CommonModel.get_all_status() - status_list = list() - for s in status: - status_list.append(s.to_json()) - return jsonify({"status": status_list}), 200 - - -@case_blueprint.route("/sort_by_ongoing", methods=['GET']) -@login_required -def sort_by_ongoing(): - """Sort Case by living one""" - page = request.args.get('page', 1, type=int) - tags = request.args.get('tags') - taxonomies = request.args.get('taxonomies') - or_and_taxo = request.args.get("or_and_taxo") - - galaxies = request.args.get('galaxies') - clusters = request.args.get('clusters') - or_and_galaxies = request.args.get("or_and_galaxies") - - cases_list = CaseModel.sort_by_status(page, taxonomies, galaxies, tags, clusters, or_and_taxo, or_and_galaxies, completed=False) - return CaseModel.regroup_case_info(cases_list, current_user) - - - -@case_blueprint.route("/sort_by_finished", methods=['GET']) -@login_required -def sort_by_finished(): - """Sort Case by finished one""" - page = request.args.get('page', 1, type=int) - tags = request.args.get('tags') - taxonomies = request.args.get('taxonomies') - or_and_taxo = request.args.get("or_and_taxo") - - galaxies = request.args.get('galaxies') - clusters = request.args.get('clusters') - or_and_galaxies = request.args.get("or_and_galaxies") - - cases_list = CaseModel.sort_by_status(page, taxonomies, galaxies, tags, clusters, or_and_taxo, or_and_galaxies, completed=True) - return CaseModel.regroup_case_info(cases_list, current_user) - - -@case_blueprint.route("/ongoing", methods=['GET']) -@login_required -def ongoing_sort_by_filter(): - """Sort by filter for living case""" - page = request.args.get('page', 1, type=int) - filter = request.args.get('filter') - tags = request.args.get('tags') - taxonomies = request.args.get('taxonomies') - or_and_taxo = request.args.get("or_and_taxo") - - galaxies = request.args.get('galaxies') - clusters = request.args.get('clusters') - or_and_galaxies = request.args.get("or_and_galaxies") - - if filter: - cases_list, nb_pages = CaseModel.sort_by_filter(filter, page, taxonomies, galaxies, tags, clusters, or_and_taxo, or_and_galaxies, completed=False) - return CaseModel.regroup_case_info(cases_list, current_user, nb_pages) - return {"message": "No filter pass"} - - -@case_blueprint.route("/finished", methods=['GET']) -@login_required -def finished_sort_by_filter(): - """Sort by filter for finished task""" - page = request.args.get('page', 1, type=int) - filter = request.args.get('filter') - tags = request.args.get('tags') - taxonomies = request.args.get('taxonomies') - or_and_taxo = request.args.get("or_and_taxo") - - galaxies = request.args.get('galaxies') - clusters = request.args.get('clusters') - or_and_galaxies = request.args.get("or_and_galaxies") - - if filter: - cases_list, nb_pages = CaseModel.sort_by_filter(filter, page, taxonomies, galaxies, tags, clusters, or_and_taxo, or_and_galaxies, completed=True) - return CaseModel.regroup_case_info(cases_list, current_user, nb_pages) - return {"message": "No filter pass"} - - -@case_blueprint.route("//get_all_users", methods=['GET']) -@login_required -def get_all_users(cid): - """Get all user in case""" - - case = CommonModel.get_case(cid) - if case: - users_list = list() - orgs = CommonModel.get_all_users_core(case) - for org in orgs: - for user in org.users: - if not user == current_user: - users_list.append(user.to_json()) - return {"users_list": users_list} - return {"message": "Case not found"}, 404 - - -@case_blueprint.route("//get_assigned_users/", methods=['GET']) -@login_required -def get_assigned_users(cid, tid): - """Get assigned users to the task""" - - if CommonModel.get_case(cid): - users, _ = TaskModel.get_users_assign_task(tid, current_user) - return users - return {"message": "Case not found", 'toast_class': "danger-subtle"}, 404 - - -@case_blueprint.route("//download", methods=['GET']) -@login_required -def download_case(cid): - """Download a case""" - - case = CommonModel.get_case(cid) - if case: - task_list = list() - for task in case.tasks: - task_list.append(task.download()) - return_dict = case.download() - return_dict["tasks"] = task_list - return jsonify(return_dict), 200, {'Content-Disposition': f'attachment; filename=case_{case.title}.json'} - return {"message": "Case not found", 'toast_class': "danger-subtle"}, 404 - - - -@case_blueprint.route("//fork", methods=['POST']) -@login_required -def fork_case(cid): - """Assign current user to the task""" - - if CommonModel.get_case(cid): - if "case_title_fork" in request.json: - case_title_fork = request.json["case_title_fork"] - - new_case = CaseModel.fork_case_core(cid, case_title_fork, current_user) - if type(new_case) == dict: - return new_case - return {"new_case_id": new_case.id}, 201 - return {"message": "'case_title_fork' is missing", 'toast_class': "danger-subtle"}, 400 - return {"message": "Case not found", 'toast_class': "danger-subtle"}, 404 - - -@case_blueprint.route("/get_all_case_title", methods=['GET']) -@login_required -def get_all_case_title(): - data_dict = dict(request.args) - if CommonModel.get_case_by_title(data_dict["title"]): - flag = True - else: - flag = False - - return {"title_already_exist": flag} - - -@case_blueprint.route("//create_template", methods=['POST']) -@login_required -@editor_required -def create_template(cid): - if CommonModel.get_case(cid): - if "case_title_template" in request.json: - case_title_template = request.json["case_title_template"] - - new_template = CaseModel.create_template_from_case(cid, case_title_template) - if type(new_template) == dict: - return new_template - return {"template_id": new_template.id}, 201 - return {"message": "'case_title_template' is missing", 'toast_class': "danger-subtle"}, 400 - return {"message": "Case not found", 'toast_class': "danger-subtle"}, 404 - - -@case_blueprint.route("/get_all_case_template_title", methods=['GET']) -@login_required -def get_all_case_template_title(): - data_dict = dict(request.args) - if CommonModel.get_case_template_by_title(data_dict["title"]): - flag = True - else: - flag = False - - return {"title_already_exist": flag} - - -@case_blueprint.route("/history/", methods=['GET']) -@login_required -def history(cid): - case = CommonModel.get_case(cid) - if case: - history = CommonModel.get_history(case.uuid) - if history: - return {"history": history} - return {"history": None} - return {"message": "Case Not found", 'toast_class': "danger-subtle"}, 404 - - -@case_blueprint.route("/get_taxonomies", methods=['GET']) -@login_required -def get_taxonomies(): - return {"taxonomies": CommonModel.get_taxonomies()}, 200 - -@case_blueprint.route("/get_tags", methods=['GET']) -@login_required -def get_tags(): - data_dict = dict(request.args) - if "taxonomies" in data_dict: - taxos = json.loads(data_dict["taxonomies"]) - return {"tags": CommonModel.get_tags(taxos)}, 200 - return {"message": "'taxonomies' is missing", 'toast_class': "warning-subtle"}, 400 - - -@case_blueprint.route("/get_taxonomies_case/", methods=['GET']) -@login_required -def get_taxonomies_case(cid): - case = CommonModel.get_case(cid) - if case: - tags = CommonModel.get_case_tags(case.id) - taxonomies = [] - if tags: - taxonomies = [tag.split(":")[0] for tag in tags] - return {"tags": tags, "taxonomies": taxonomies} - return {"message": "Case Not found", 'toast_class': "danger-subtle"}, 404 - - -@case_blueprint.route("/get_galaxies", methods=['GET']) -@login_required -def get_galaxies(): - return {"galaxies": CommonModel.get_galaxies()}, 200 - - -@case_blueprint.route("/get_clusters", methods=['GET']) -@login_required -def get_clusters(): - if "galaxies" in request.args: - galaxies = request.args.get("galaxies") - galaxies = json.loads(galaxies) - return {"clusters": CommonModel.get_clusters_galaxy(galaxies)}, 200 - return {"message": "'galaxies' is missing", 'toast_class': "warning-subtle"}, 400 - - -@case_blueprint.route("/get_galaxies_case/", methods=['GET']) -@login_required -def get_galaxies_case(cid): - case = CommonModel.get_case(cid) - if case: - clusters = CommonModel.get_case_clusters(case.id) - galaxies = [] - if clusters: - for cluster in clusters: - loc_g = CommonModel.get_galaxy(cluster.galaxy_id) - if not loc_g.name in galaxies: - galaxies.append(loc_g.name) - index = clusters.index(cluster) - clusters[index] = cluster.tag - return {"clusters": clusters, "galaxies": galaxies} - return {"message": "Case Not found", 'toast_class': "danger-subtle"}, 404 - - -@case_blueprint.route("/get_modules", methods=['GET']) -@login_required -def get_modules(): - return {"modules": CaseModel.get_modules()}, 200 - # return {"message": "'galaxies' is missing", 'toast_class': "warning-subtle"}, 400 - - -@case_blueprint.route("/get_instance_module", methods=['GET']) -@login_required -def get_instance_module(): - if "module" in request.args: - module = request.args.get("module") - return {"instances": CaseModel.get_instance_module_core(module, current_user.id)}, 200 diff --git a/webiste/app/case/case_api.py b/webiste/app/case/case_api.py deleted file mode 100644 index 05a8c65b..00000000 --- a/webiste/app/case/case_api.py +++ /dev/null @@ -1,602 +0,0 @@ -from flask import Blueprint, request -from . import case_core as CaseModel -from . import common_core as CommonModel -from . import task_core as TaskModel -from . import case_core_api as CaseModelApi - -from flask_restx import Api, Resource -from ..decorators import api_required, editor_required - -api_case_blueprint = Blueprint('api_case', __name__) -api = Api(api_case_blueprint, - title='Flowintel-cm API', - description='API to manage a case management instance.', - version='0.1', - default='GenericAPI', - default_label='Generic Flowintel-cm API', - doc='/doc/' - ) - - - -@api.route('/all') -@api.doc(description='Get all cases') -class GetCases(Resource): - method_decorators = [api_required] - def get(self): - cases = CommonModel.get_all_cases() - return {"cases": [case.to_json() for case in cases]}, 200 - -@api.route('/not_completed') -@api.doc(description='Get all not completed cases') -class GetCases_not_completed(Resource): - method_decorators = [api_required] - def get(self): - cases = CommonModel.get_case_by_completed(False) - return {"cases": [case.to_json() for case in cases]}, 200 - -@api.route('/completed') -@api.doc(description='Get all completed cases') -class GetCases_not_completed(Resource): - method_decorators = [api_required] - def get(self): - cases = CommonModel.get_case_by_completed(True) - return {"cases": [case.to_json() for case in cases]}, 200 - - -@api.route('/') -@api.doc(description='Get a case', params={'cid': 'id of a case'}) -class GetCase(Resource): - method_decorators = [api_required] - def get(self, cid): - case = CommonModel.get_case(cid) - if case: - case_json = case.to_json() - orgs = CommonModel.get_orgs_in_case(cid) - case_json["orgs"] = list() - for org in orgs: - case_json["orgs"].append({"id": org.id, "uuid": org.uuid, "name": org.name}) - - return case_json, 200 - return {"message": "Case not found"}, 404 - - -@api.route('/title', methods=["POST"]) -@api.doc(description='Get a case by title') -class GetCaseTitle(Resource): - method_decorators = [api_required] - @api.doc(params={"title": "Title of a case"}) - def post(self): - if "title" in request.json: - case = CommonModel.get_case_by_title(request.json["title"]) - if case: - case_json = case.to_json() - orgs = CommonModel.get_orgs_in_case(case.id) - case_json["orgs"] = [{"id": org.id, "uuid": org.uuid, "name": org.name} for org in orgs] - return case_json, 200 - return {"message": "Case not found"}, 404 - return {"message": "Need to pass a title"}, 404 - - -@api.route('//complete') -@api.doc(description='Complete a case', params={'cid': 'id of a case'}) -class CompleteCase(Resource): - method_decorators = [editor_required, api_required] - def get(self, cid): - current_user = CaseModelApi.get_user_api(request.headers["X-API-KEY"]) - if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin(): - case = CommonModel.get_case(cid) - if case: - if CaseModel.complete_case(cid, current_user): - return {"message": f"Case {cid} completed"}, 200 - return {"message": f"Error case {cid} completed"}, 400 - return {"message": "Case not found"}, 404 - return {"message": "Permission denied"}, 403 - - -@api.route('//create_template', methods=["POST"]) -@api.doc(description='Create a template form case', params={'cid': 'id of a case'}) -class CreateTemplate(Resource): - method_decorators = [editor_required, api_required] - @api.doc(params={"title_template": "Title for the template that will be create"}) - def post(self, cid): - if "title_template" in request.json: - if CommonModel.get_case(cid): - new_template = CaseModel.create_template_from_case(cid, request.json["title_template"]) - if type(new_template) == dict: - return new_template - return {"template_id": new_template.id}, 201 - return {"message": "Case not found"}, 404 - return {"message": "'title_template' is missing"}, 400 - - -@api.route('//recurring', methods=['POST']) -@api.doc(description='Set a case recurring') -class RecurringCase(Resource): - method_decorators = [editor_required, api_required] - @api.doc(params={ - "once": "Date(%Y-%m-%d)", - "daily": "Boolean", - "weekly": "Date(%Y-%m-%d). Start date.", - "monthly": "Date(%Y-%m-%d). Start date.", - "remove": "Boolean" - }) - def post(self, cid): - current_user = CaseModelApi.get_user_api(request.headers["X-API-KEY"]) - if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin(): - if request.json: - verif_dict = CaseModelApi.verif_set_recurring(request.json) - - if "message" not in verif_dict: - CaseModel.change_recurring(verif_dict, cid, current_user) - return {"message": "Recurring changed"}, 200 - return verif_dict - return {"message": "Please give data"}, 400 - return {"message": "Permission denied"}, 403 - - -@api.route('//tasks') -@api.doc(description='Get all tasks for a case', params={'cid': 'id of a case'}) -class GetTasks(Resource): - method_decorators = [api_required] - def get(self, cid): - case = CommonModel.get_case(cid) - if case: - tasks = list() - for task in case.tasks: - tasks.append(task.to_json()) - - return tasks, 200 - return {"message": "Case not found"}, 404 - - -@api.route('//task/') -@api.doc(description='Get a specific task for a case', params={"cid": "id of a case", "tid": "id of a task"}) -class GetTask(Resource): - method_decorators = [api_required] - def get(self, cid, tid): - task = CommonModel.get_task(tid) - if task: - if int(cid) == task.case_id: - loc = dict() - loc["users_assign"], loc["is_current_user_assign"] = TaskModel.get_users_assign_task(task.id, CaseModelApi.get_user_api(request.headers["X-API-KEY"])) - loc["task"] = task.to_json() - return loc, 200 - else: - return {"message": "Task not in this case"}, 404 - return {"message": "Task not found"}, 404 - -@api.route('//delete') -@api.doc(description='Delete a case', params={'cid': 'id of a case'}) -class DeleteCase(Resource): - method_decorators = [editor_required, api_required] - def get(self, cid): - current_user = CaseModelApi.get_user_api(request.headers["X-API-KEY"]) - if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin(): - if CaseModel.delete_case(cid, current_user): - return {"message": "Case deleted"}, 200 - return {"message": "Error case deleted"}, 400 - return {"message": "Permission denied"}, 403 - - -@api.route('//task//delete') -@api.doc(description='Delete a specific task in a case', params={'cid': 'id of a case', "tid": "id of a task"}) -class DeleteTask(Resource): - method_decorators = [editor_required, api_required] - def get(self, cid, tid): - current_user = CaseModelApi.get_user_api(request.headers["X-API-KEY"]) - if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin(): - task = CommonModel.get_task(tid) - if task: - if int(cid) == task.case_id: - if TaskModel.delete_task(tid, current_user): - return {"message": "Task deleted"}, 200 - else: - return {"message": "Error task deleted"}, 400 - else: - return {"message": "Task not in this case"}, 404 - return {"message": "Task not found"}, 404 - return {"message": "Permission denied"}, 403 - - -@api.route('/create', methods=['POST']) -@api.doc(description='Create a case') -class CreateCase(Resource): - method_decorators = [editor_required, api_required] - @api.doc(params={ - "title": "Required. Title for a case", - "description": "Description of a case", - "deadline_date": "Date(%Y-%m-%d)", - "deadline_time": "Time(%H-%M)" - }) - def post(self): - user = CaseModelApi.get_user_api(request.headers["X-API-KEY"]) - - if request.json: - verif_dict = CaseModelApi.verif_create_case_task(request.json, True) - - if "message" not in verif_dict: - case = CaseModel.create_case(verif_dict, user) - return {"message": f"Case created, id: {case.id}"}, 201 - - return verif_dict, 400 - return {"message": "Please give data"}, 400 - - -@api.route('//create_task', methods=['POST']) -@api.doc(description='Create a new task to a case', params={'cid': 'id of a case'}) -class CreateTask(Resource): - method_decorators = [editor_required, api_required] - @api.doc(params={ - "title": "Required. Title for a task", - "description": "Description of a task", - "url": "Link to a tool or a ressource", - "deadline_date": "Date(%Y-%m-%d)", - "deadline_time": "Time(%H-%M)" - }) - def post(self, cid): - current_user = CaseModelApi.get_user_api(request.headers["X-API-KEY"]) - if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin(): - if request.json: - verif_dict = CaseModelApi.verif_create_case_task(request.json, False) - - if "message" not in verif_dict: - task = TaskModel.create_task(verif_dict, cid, current_user) - return {"message": f"Task created for case id: {cid}"}, 201 - return verif_dict, 400 - return {"message": "Please give data"}, 400 - return {"message": "Permission denied"}, 403 - - -@api.route('//edit', methods=['POST']) -@api.doc(description='Edit a case', params={'id': 'id of a case'}) -class EditCase(Resource): - method_decorators = [editor_required, api_required] - @api.doc(params={"title": "Title for a case", "description": "Description of a case", "deadline_date": "Date(%Y-%m-%d)", "deadline_time": "Time(%H-%M)"}) - def post(self, id): - current_user = CaseModelApi.get_user_api(request.headers["X-API-KEY"]) - if CaseModel.get_present_in_case(id, current_user) or current_user.is_admin(): - if request.json: - verif_dict = CaseModelApi.verif_edit_case(request.json, id) - - if "message" not in verif_dict: - CaseModel.edit_case(verif_dict, id, current_user) - return {"message": f"Case {id} edited"}, 200 - - return verif_dict, 400 - return {"message": "Please give data"}, 400 - return {"message": "Permission denied"}, 403 - - -@api.route('//task//edit', methods=['POST']) -@api.doc(description='Edit a task in a case', params={'cid': 'id of a case', "tid": "id of a task"}) -class EditTake(Resource): - method_decorators = [editor_required, api_required] - @api.doc(params={"title": "Title for a case", "description": "Description of a case", "deadline_date": "Date(%Y-%m-%d)", "deadline_time": "Time(%H-%M)"}) - def post(self, cid, tid): - current_user = CaseModelApi.get_user_api(request.headers["X-API-KEY"]) - if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin(): - if request.json: - task = CommonModel.get_task(tid) - if task: - if int(cid) == task.case_id: - verif_dict = CaseModelApi.verif_edit_task(request.json, tid) - - if "message" not in verif_dict: - TaskModel.edit_task_core(verif_dict, tid, current_user) - return {"message": f"Task {tid} edited"}, 200 - - return verif_dict, 400 - else: - return {"message": "Task not in this case"}, 404 - else: - return {"message": "Task not found"}, 404 - return {"message": "Please give data"}, 400 - return {"message": "Permission denied"}, 403 - - -@api.route('//task//complete') -@api.doc(description='Complete a task in a case', params={'cid': 'id of a case', "tid": "id of a task"}) -class CompleteTake(Resource): - method_decorators = [editor_required, api_required] - def get(self, cid, tid): - current_user = CaseModelApi.get_user_api(request.headers["X-API-KEY"]) - if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin(): - task = CommonModel.get_task(tid) - if task: - if int(cid) == task.case_id: - if TaskModel.complete_task(tid, current_user): - return {"message": f"Task {tid} completed"}, 200 - return {"message": f"Error task {tid} completed"}, 400 - else: - return {"message": "Task not in this case"}, 404 - return {"message": "Task not found"}, 404 - return {"message": "Permission denied"}, 403 - - -@api.route('//task//get_note') -@api.doc(description='Get note of a task in a case', params={'cid': 'id of a case', "tid": "id of a task"}) -class GetNoteTask(Resource): - method_decorators = [api_required] - def get(self, cid, tid): - task = CommonModel.get_task(tid) - if task: - if int(cid) == task.case_id: - return {"note": task.notes}, 200 - else: - return {"message": "Task not in this case"}, 404 - return {"message": "Task not found"}, 404 - - -@api.route('//task//modif_note', methods=['POST']) -@api.doc(description='Edit note of a task in a case', params={'cid': 'id of a case', "tid": "id of a task"}) -class ModifNoteTask(Resource): - method_decorators = [editor_required, api_required] - @api.doc(params={"note": "note to create or modify"}) - def post(self, cid, tid): - current_user = CaseModelApi.get_user_api(request.headers["X-API-KEY"]) - if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin(): - if "note" in request.json: - task = CommonModel.get_task(tid) - if task: - if int(cid) == task.case_id: - if TaskModel.modif_note_core(tid, current_user, request.json["note"]): - return {"message": f"Note for task {tid} edited"}, 200 - return {"message": f"Error Note for task {tid} edited"}, 400 - else: - return {"message": "Task not in this case"}, 404 - return {"message": "Task not found"}, 404 - return {"message": "Key 'note' not found"}, 400 - return {"message": "Permission denied"}, 403 - - - -@api.route('//add_org', methods=['POST']) -@api.doc(description='Add an org to the case', params={'cid': 'id of a case'}) -class AddOrgCase(Resource): - method_decorators = [editor_required, api_required] - @api.doc(params={"name": "Name of the organisation", "oid": "id of the organisation"}) - def post(self, cid): - current_user = CaseModelApi.get_user_api(request.headers["X-API-KEY"]) - if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin(): - if "name" in request.json: - org = CommonModel.get_org_by_name(request.json["name"]) - elif "oid" in request.json: - org = CommonModel.get_org(request.json["oid"]) - else: - return {"message": "Required an id or a name of an Org"}, 400 - - if org: - if not CommonModel.get_org_in_case(org.id, cid): - if CaseModel.add_orgs_case({"org_id": [org.id]}, cid, current_user): - return {"message": f"Org added to case {cid}"}, 200 - return {"message": f"Error Org added to case {cid}"}, 400 - return {"message": "Org already in case"}, 400 - return {"message": "Org not found"}, 404 - return {"message": "Permission denied"}, 403 - - -@api.route('//remove_org/', methods=['GET']) -@api.doc(description='Add an org to the case', params={'cid': 'id of a case', "oid": "id of an org"}) -class RemoveOrgCase(Resource): - method_decorators = [editor_required, api_required] - def get(self, cid, oid): - current_user = CaseModelApi.get_user_api(request.headers["X-API-KEY"]) - if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin(): - org = CommonModel.get_org(oid) - - if org: - if CommonModel.get_org_in_case(org.id, cid): - if CaseModel.remove_org_case(cid, org.id, current_user): - return {"message": f"Org deleted from case {cid}"}, 200 - return {"message": f"Error Org deleted from case {cid}"}, 400 - return {"message": "Org not in case"}, 404 - return {"message": "Org not found"}, 404 - return {"message": "Permission denied"}, 403 - - -@api.route('//take_task/', methods=['GET']) -@api.doc(description='Assign current user to the task', params={'cid': 'id of a case', "tid": "id of a task"}) -class AssignTask(Resource): - method_decorators = [editor_required, api_required] - def get(self, cid, tid): - current_user = CaseModelApi.get_user_api(request.headers["X-API-KEY"]) - if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin(): - task = CommonModel.get_task(tid) - - if task: - if int(cid) == task.case_id: - if TaskModel.assign_task(tid, user=current_user, current_user=current_user, flag_current_user=True): - return {"message": f"Task Take"}, 200 - return {"message": f"Error Task Take"}, 400 - return {"message": "Task not in this case"}, 404 - return {"message": "Task not found"}, 404 - return {"message": "Permission denied"}, 403 - - -@api.route('//remove_assignment/', methods=['GET']) -@api.doc(description='Remove assigment of current user to the task', params={'cid': 'id of a case', "tid": "id of a task"}) -class RemoveOrgCase(Resource): - method_decorators = [editor_required, api_required] - def get(self, cid, tid): - current_user = CaseModelApi.get_user_api(request.headers["X-API-KEY"]) - if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin(): - task = CommonModel.get_task(tid) - if task: - if int(cid) == task.case_id: - if TaskModel.remove_assign_task(tid, user=current_user, current_user=current_user, flag_current_user=True): - return {"message": f"Removed from assignment"}, 200 - return {"message": f"Error Removed from assignment"}, 400 - return {"message": "Task not in this case"}, 404 - return {"message": "Task not found"}, 404 - return {"message": "Permission denied"}, 403 - - -@api.route('//get_all_users', methods=['GET']) -@api.doc(description='Get list of user that can be assign', params={'cid': 'id of a case'}) -class GetAllUsers(Resource): - method_decorators = [api_required] - def get(self, cid): - current_user = CaseModelApi.get_user_api(request.headers["X-API-KEY"]) - case = CommonModel.get_case(cid) - if case: - users_list = list() - for org in CommonModel.get_all_users_core(case): - for user in org.users: - if not user == current_user: - users_list.append(user.to_json()) - return {"users": users_list}, 200 - return {"message": "Case not found"}, 404 - - -@api.route('//task//assign_users', methods=['POST']) -@api.doc(description='Assign users to a task', params={'cid': 'id of a case', "tid": "id of a task"}) -class AssignUser(Resource): - method_decorators = [editor_required, api_required] - @api.doc(params={"users_id": "List of user id"}) - def post(self, cid, tid): - current_user = CaseModelApi.get_user_api(request.headers["X-API-KEY"]) - if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin(): - task = CommonModel.get_task(tid) - if task: - if int(cid) == task.case_id: - users_list = request.json["users_id"] - for user in users_list: - TaskModel.assign_task(tid, user=user, current_user=current_user, flag_current_user=False) - return {"message": "Users Assigned"}, 200 - return {"message": "Task not in this case"}, 404 - return {"message": "Task not found"}, 404 - return {"message": "Permission denied"}, 403 - - -@api.route('//task//remove_assign_user', methods=['POST']) -@api.doc(description='Remove an assign user to a task', params={'cid': 'id of a case', "tid": "id of a task"}) -class AssignUser(Resource): - method_decorators = [editor_required, api_required] - @api.doc(params={"user_id": "Id of a user"}) - def post(self, cid, tid): - current_user = CaseModelApi.get_user_api(request.headers["X-API-KEY"]) - if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin(): - task = CommonModel.get_task(tid) - if task: - if int(cid) == task.case_id: - user_id = request.json["user_id"] - if TaskModel.remove_assign_task(tid, user=user_id, current_user=current_user, flag_current_user=False): - return {"message": "User Removed from assignment"}, 200 - return {"message": "Task not in this case"}, 404 - return {"message": "Task not found"}, 404 - return {"message": "Permission denied"}, 403 - - - -@api.route('//task//change_status', methods=['POST']) -@api.doc(description='Change status of a task', params={'cid': 'id of a case', "tid": "id of a task"}) -class ChangeStatus(Resource): - method_decorators = [editor_required, api_required] - @api.doc(params={"status_id": "Id of the new status"}) - def post(self, cid, tid): - current_user = CaseModelApi.get_user_api(request.headers["X-API-KEY"]) - if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin(): - task = CommonModel.get_task(tid) - if task: - if int(cid) == task.case_id: - if TaskModel.change_task_status(request.json["status_id"], task, current_user): - return {"message": "Status changed"}, 200 - return {"message": "Task not in this case"}, 404 - return {"message": "Task not found"}, 404 - return {"message": "Permission denied"}, 403 - - -@api.route('/list_status', methods=['GET']) -@api.doc(description='List all status') -class ChangeStatus(Resource): - method_decorators = [api_required] - def get(self): - return [status.to_json() for status in CommonModel.get_all_status()], 200 - - -@api.route('//history', methods=['GET']) -@api.doc(description='Get history of a case', params={'cid': 'id of a case'}) -class ChangeStatus(Resource): - method_decorators = [api_required] - def get(self, cid): - case = CommonModel.get_case(cid) - if case: - history = CommonModel.get_history(case.uuid) - if history: - return {"history": history} - return {"history": None} - return {"message": "Case Not found"}, 404 - - -@api.route('//task//files') -@api.doc(description='Get list of files', params={"cid": "id of a case", "tid": "id of a task"}) -class DownloadFile(Resource): - method_decorators = [api_required] - def get(self, cid, tid): - case = CommonModel.get_case(cid) - if case: - task = CommonModel.get_task(tid) - if task: - file_list = [file.to_json() for file in task.files] - return {"files": file_list}, 200 - return {"message": "Task Not found"}, 404 - return {"message": "Case Not found"}, 404 - -@api.route('//task//upload_file') -@api.doc(description='Upload a file') -class UploadFile(Resource): - method_decorators = [api_required] - @api.doc(params={}) - def post(self, cid, tid): - case = CommonModel.get_case(cid) - if case: - current_user = CaseModelApi.get_user_api(request.headers["X-API-KEY"]) - if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin(): - task = CommonModel.get_task(tid) - if task: - if TaskModel.add_file_core(task, request.files, current_user): - return {"message": "File added"}, 200 - return {"message": "Task Not found"}, 404 - return {"message": "Permission denied"}, 403 - return {"message": "Case Not found"}, 404 - - -@api.route('//task//download_file/') -@api.doc(description='Download a file', params={"cid": "id of a case", "tid": "id of a task", "fid": "id of a file"}) -class DownloadFile(Resource): - method_decorators = [api_required] - def get(self, cid, tid, fid): - case = CommonModel.get_case(cid) - if case: - current_user = CaseModelApi.get_user_api(request.headers["X-API-KEY"]) - if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin(): - task = CommonModel.get_task(tid) - if task: - file = CommonModel.get_file(fid) - if file and file in task.files: - return TaskModel.download_file(file) - return {"message": "Task Not found"}, 404 - return {"message": "Permission denied"}, 403 - return {"message": "Case Not found"}, 404 - -@api.route('//task//delete_file/') -@api.doc(description='Delete a file', params={"cid": "id of a case", "tid": "id of a task", "fid": "id of a file"}) -class DeleteFile(Resource): - method_decorators = [api_required] - @api.doc(params={ - }) - def get(self, cid, tid, fid): - case = CommonModel.get_case(cid) - if case: - current_user = CaseModelApi.get_user_api(request.headers["X-API-KEY"]) - if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin(): - task = CommonModel.get_task(tid) - if task: - file = CommonModel.get_file(fid) - if file and file in task.files: - if TaskModel.delete_file(file, task, current_user): - return {"message": "File Deleted"}, 200 - return {"message": "Task Not found"}, 404 - return {"message": "Permission denied"}, 403 - return {"message": "Case Not found"}, 404 \ No newline at end of file diff --git a/webiste/app/case/case_core.py b/webiste/app/case/case_core.py deleted file mode 100644 index a3d2fac3..00000000 --- a/webiste/app/case/case_core.py +++ /dev/null @@ -1,610 +0,0 @@ -import os -import ast -import uuid -import datetime - -from app.utils.utils import get_modules_list, MODULES, MODULES_CONFIG -from .. import db -from ..db_class.db import * -from sqlalchemy import desc, and_ -from ..notification import notification_core as NotifModel -from dateutil import relativedelta -from ..tools.tools_core import create_case_from_template - -from . import common_core as CommonModel -from . import task_core as TaskModel - - -def delete_case(cid, current_user): - """Delete a case by is id""" - case = CommonModel.get_case(cid) - if case: - # Delete all tasks in the case - for task in case.tasks: - TaskModel.delete_task(task.id, current_user) - - history_path = os.path.join(CommonModel.HISTORY_FOLDER, str(case.uuid)) - if os.path.isfile(history_path): - try: - os.remove(history_path) - except: - return False - - NotifModel.create_notification_all_orgs(f"Case: '{case.id}-{case.title}' was deleted", cid, html_icon="fa-solid fa-trash", current_user=current_user) - - Case_Tags.query.filter_by(case_id=case.id).delete() - Case_Galaxy_Tags.query.filter_by(case_id=case.id).delete() - Case_Org.query.filter_by(case_id=case.id).delete() - db.session.delete(case) - db.session.commit() - return True - return False - - -def complete_case(cid, current_user): - """Complete case by is id""" - case = CommonModel.get_case(cid) - if case is not None: - case.completed = not case.completed - if case.completed: - case.status_id = Status.query.filter_by(name="Finished").first().id - for task in case.tasks: - TaskModel.complete_task(task.id, current_user) - NotifModel.create_notification_all_orgs(f"Case: '{case.id}-{case.title}' is now completed", cid, html_icon="fa-solid fa-square-check", current_user=current_user) - else: - case.status_id = Status.query.filter_by(name="Created").first().id - NotifModel.create_notification_all_orgs(f"Case: '{case.id}-{case.title}' is now revived", cid, html_icon="fa-solid fa-heart-circle-plus", current_user=current_user) - - CommonModel.update_last_modif(cid) - db.session.commit() - CommonModel.save_history(case.uuid, current_user, "Case completed") - return True - return False - - -def create_case(form_dict, user): - """Add a case to the DB""" - if "template_select" in form_dict and not 0 in form_dict["template_select"]: - for template in form_dict["template_select"]: - if Case_Template.query.get(template): - case = create_case_from_template(template, form_dict["title_template"], user) - else: - deadline = CommonModel.deadline_check(form_dict["deadline_date"], form_dict["deadline_time"]) - case = Case( - title=form_dict["title"], - description=form_dict["description"], - uuid=str(uuid.uuid4()), - creation_date=datetime.datetime.now(tz=datetime.timezone.utc), - last_modif=datetime.datetime.now(tz=datetime.timezone.utc), - deadline=deadline, - status_id=1, - owner_org_id=user.org_id - ) - db.session.add(case) - db.session.commit() - - for tags in form_dict["tags"]: - tag = CommonModel.get_tag(tags) - - case_tag = Case_Tags( - tag_id=tag.id, - case_id=case.id - ) - db.session.add(case_tag) - db.session.commit() - - for clusters in form_dict["clusters"]: - cluster = CommonModel.get_cluster_by_name(clusters) - - case_galaxy = Case_Galaxy_Tags( - cluster_id=cluster.id, - case_id=case.id - ) - db.session.add(case_galaxy) - db.session.commit() - - if "tasks_templates" in form_dict and not 0 in form_dict["tasks_templates"]: - for tid in form_dict["tasks_templates"]: - task = Task_Template.query.get(tid) - t = Task( - uuid=str(uuid.uuid4()), - title=task.title, - description=task.description, - url=task.url, - creation_date=datetime.datetime.now(tz=datetime.timezone.utc), - last_modif=datetime.datetime.now(tz=datetime.timezone.utc), - case_id=case.id, - status_id=1 - ) - db.session.add(t) - db.session.commit() - - for t_t in Task_Template_Tags.query.filter_by(task_id=task.id).all(): - task_tag = Task_Tags( - task_id=t.id, - tag_id=t_t.tag_id - ) - db.session.add(task_tag) - db.session.commit() - - for t_t in Task_Template_Galaxy_Tags.query.filter_by(task_id=task.id).all(): - task_galaxy = Task_Galaxy_Tags( - task_id=t.id, - cluster_id=t_t.cluster_id - ) - db.session.add(task_galaxy) - db.session.commit() - - # Add the current user's org to the case - case_org = Case_Org( - case_id=case.id, - org_id=user.org_id - ) - db.session.add(case_org) - db.session.commit() - - CommonModel.save_history(case.uuid, user, "Case Created") - - return case - - -def edit_case(form_dict, cid, current_user): - """Edit a case to the DB""" - case = CommonModel.get_case(cid) - deadline = CommonModel.deadline_check(form_dict["deadline_date"], form_dict["deadline_time"]) - - case.title = form_dict["title"] - case.description=form_dict["description"] - case.deadline=deadline - - ## Tags - case_tag_db = Case_Tags.query.filter_by(case_id=case.id).all() - for tags in form_dict["tags"]: - tag = CommonModel.get_tag(tags) - - if not tags in case_tag_db: - case_tag = Case_Tags( - tag_id=tag.id, - case_id=case.id - ) - db.session.add(case_tag) - db.session.commit() - - for c_t_db in case_tag_db: - if not c_t_db in form_dict["tags"]: - Case_Tags.query.filter_by(id=c_t_db.id).delete() - db.session.commit() - - ## Clusters - case_cluster_db = Case_Galaxy_Tags.query.filter_by(case_id=case.id).all() - for clusters in form_dict["clusters"]: - cluster = CommonModel.get_cluster_by_name(clusters) - - if not clusters in case_cluster_db: - case_galaxy_tag = Case_Galaxy_Tags( - cluster_id=cluster.id, - case_id=case.id - ) - db.session.add(case_galaxy_tag) - db.session.commit() - - for c_t_db in case_cluster_db: - if not c_t_db in form_dict["clusters"]: - Case_Galaxy_Tags.query.filter_by(id=c_t_db.id).delete() - db.session.commit() - - CommonModel.update_last_modif(cid) - db.session.commit() - - CommonModel.save_history(case.uuid, current_user, f"Case edited") - - -def add_orgs_case(form_dict, cid, current_user): - """Add orgs to case in th DB""" - for org_id in form_dict["org_id"]: - case_org = Case_Org( - case_id=cid, - org_id=org_id - ) - db.session.add(case_org) - case = CommonModel.get_case(cid) - NotifModel.create_notification_org(f"{CommonModel.get_org(org_id).name} add to case: '{case.id}-{case.title}'", cid, org_id, html_icon="fa-solid fa-sitemap", current_user=current_user) - - CommonModel.update_last_modif(cid) - db.session.commit() - case = CommonModel.get_case(cid) - CommonModel.save_history(case.uuid, current_user, f"Org {org_id} added") - return True - - -def remove_org_case(case_id, org_id, current_user): - """Remove an org from a case""" - case_org = Case_Org.query.filter_by(case_id=case_id, org_id=org_id).first() - if case_org: - db.session.delete(case_org) - - case = CommonModel.get_case(case_id) - NotifModel.create_notification_org(f"{CommonModel.get_org(org_id).name} removed from case: '{case.id}-{case.title}'", case_id, org_id, html_icon="fa-solid fa-door-open", current_user=current_user) - - CommonModel.update_last_modif(case_id) - db.session.commit() - case = CommonModel.get_case(case_id) - CommonModel.save_history(case.uuid, current_user, f"Org {org_id} removed") - return True - return False - - -def get_present_in_case(case_id, user): - """Return if current user is present in a case""" - orgs_in_case = CommonModel.get_orgs_in_case(case_id) - - present_in_case = False - for org in orgs_in_case: - if org.id == user.org_id: - present_in_case = True - - return present_in_case - - -def change_status_core(status, case, current_user): - case.status_id = status - CommonModel.update_last_modif(case.id) - db.session.commit() - CommonModel.save_history(case.uuid, current_user, "Case Status changed") - - return True - - -def regroup_case_info(cases, user, nb_pages=None): - loc = dict() - loc["cases"] = list() - - for case in cases: - present_in_case = get_present_in_case(case.id, user) - case_loc = case.to_json() - case_loc["present_in_case"] = present_in_case - case_loc["current_user_permission"] = CommonModel.get_role(user).to_json() - - loc["cases"].append(case_loc) - if nb_pages: - loc["nb_pages"] = nb_pages - else: - try: - loc["nb_pages"] = cases.pages - except: - pass - - return loc - - -def build_case_query(page, completed, tags=None, taxonomies=None, galaxies=None, clusters=None, filter=None): - query = Case.query - conditions = [Case.completed == completed] - - if tags or taxonomies: - query = query.join(Case_Tags, Case_Tags.case_id == Case.id) - query = query.join(Tags, Case_Tags.tag_id == Tags.id) - if tags: - tags = ast.literal_eval(tags) - conditions.append(Tags.name.in_(list(tags))) - - if taxonomies: - taxonomies = ast.literal_eval(taxonomies) - query = query.join(Taxonomy, Taxonomy.id == Tags.taxonomy_id) - conditions.append(Taxonomy.name.in_(list(taxonomies))) - - if clusters or galaxies: - query = query.join(Case_Galaxy_Tags, Case_Galaxy_Tags.case_id == Case.id) - query = query.join(Cluster, Case_Galaxy_Tags.cluster_id == Cluster.id) - if clusters: - clusters = ast.literal_eval(clusters) - conditions.append(Cluster.name.in_(list(clusters))) - - if galaxies: - galaxies = ast.literal_eval(galaxies) - query = query.join(Galaxy, Galaxy.id == Cluster.galaxy_id) - conditions.append(Galaxy.name.in_(list(galaxies))) - - if filter: - query.order_by(desc(filter)) - - return query.filter(and_(*conditions)).paginate(page=page, per_page=25, max_per_page=50) - - -def sort_by_status(page, taxonomies=[], galaxies=[], tags=[], clusters=[], or_and_taxo="true", or_and_galaxies="true", completed=False): - cases = build_case_query(page, completed, tags, taxonomies, galaxies, clusters) - - if tags: - tags = ast.literal_eval(tags) - if taxonomies: - taxonomies = ast.literal_eval(taxonomies) - - if galaxies: - galaxies = ast.literal_eval(galaxies) - if clusters: - clusters = ast.literal_eval(clusters) - - if tags or taxonomies or galaxies or clusters: - if or_and_taxo == "false": - glob_list = [] - - for case in cases: - tags_db = case.to_json()["tags"] - loc_tag = [tag["name"] for tag in tags_db] - taxo_list = [Taxonomy.query.get(tag["taxonomy_id"]).name for tag in tags_db] - - if (not tags or all(item in loc_tag for item in tags)) and \ - (not taxonomies or all(item in taxo_list for item in taxonomies)): - glob_list.append(case) - - cases = glob_list - if or_and_galaxies == "false": - glob_list = [] - - for case in cases: - clusters_db = case.to_json()["clusters"] - loc_cluster = [cluster["name"] for cluster in clusters_db] - galaxies_list = [Galaxy.query.get(cluster["galaxy_id"]).name for cluster in clusters_db] - - if (not clusters or all(item in loc_cluster for item in clusters)) and \ - (not galaxies or all(item in galaxies_list for item in galaxies)): - glob_list.append(case) - - cases = glob_list - else: - cases = Case.query.filter_by(completed=completed).paginate(page=page, per_page=25, max_per_page=50) - return cases - - -def sort_by_filter(filter, page, taxonomies=[], galaxies=[], tags=[], clusters=[], or_and_taxo="true", or_and_galaxies="true", completed=False): - cases = build_case_query(page, completed, tags, taxonomies, galaxies, clusters, filter) - nb_pages = cases.pages - if tags: - tags = ast.literal_eval(tags) - if taxonomies: - taxonomies = ast.literal_eval(taxonomies) - - if galaxies: - galaxies = ast.literal_eval(galaxies) - if clusters: - clusters = ast.literal_eval(clusters) - - if tags or taxonomies or galaxies or clusters: - if or_and_taxo == "false": - glob_list = [] - - for case in cases: - tags_db = case.to_json()["tags"] - loc_tag = [tag["name"] for tag in tags_db] - taxo_list = [Taxonomy.query.get(tag["taxonomy_id"]).name for tag in tags_db] - - if (not tags or all(item in loc_tag for item in tags)) and \ - (not taxonomies or all(item in taxo_list for item in taxonomies)): - glob_list.append(case) - - cases = glob_list - if or_and_galaxies == "false": - glob_list = [] - - for case in cases: - clusters_db = case.to_json()["clusters"] - loc_cluster = [cluster["name"] for cluster in clusters_db] - galaxies_list = [Galaxy.query.get(cluster["galaxy_id"]).name for cluster in clusters_db] - - if (not clusters or all(item in loc_cluster for item in clusters)) and \ - (not galaxies or all(item in galaxies_list for item in galaxies)): - glob_list.append(case) - - cases = glob_list - else: - cases = Case.query.filter_by(completed=completed).order_by(desc(filter)).paginate(page=page, per_page=25, max_per_page=50) - nb_pages = cases.pages - - # for deadline filter, only case with a deadline defined is required - loc = list() - for case in cases: - if getattr(case, filter): - loc.append(case) - return loc, nb_pages - - -def fork_case_core(cid, case_title_fork, user): - case_title_stored = CommonModel.get_case_by_title(case_title_fork) - if case_title_stored: - return {"message": "Error, title already exist"} - case = CommonModel.get_case(cid) - - case_json = case.to_json() - case_json["title"] = case_title_fork - - if case.deadline: - case_json["deadline_date"] = datetime.datetime.strptime(case_json["deadline"].split(" ")[0], "%Y-%m-%d").date() - case_json["deadline_time"] = datetime.datetime.strptime(case_json["deadline"].split(" ")[1], "%H:%M").time() - else: - case_json["deadline_date"] = None - case_json["deadline_time"] = None - - new_case = create_case(case_json, user) - - for task in case.tasks: - task_json = task.to_json() - if task.deadline: - task_json["deadline_date"] = datetime.datetime.strptime(task_json["deadline"].split(" ")[0], "%Y-%m-%d").date() - task_json["deadline_time"] = datetime.datetime.strptime(task_json["deadline"].split(" ")[1], "%H:%M").time() - else: - task_json["deadline_date"] = None - task_json["deadline_time"] = None - - TaskModel.create_task(task_json, new_case.id, user) - return new_case - - -def create_template_from_case(cid, case_title_template): - if Case_Template.query.filter_by(title=case_title_template).first(): - return {"message": "Error, title already exist"} - - case = CommonModel.get_case(cid) - new_template = Case_Template( - uuid=str(uuid.uuid4()), - title=case_title_template, - description=case.description - ) - db.session.add(new_template) - db.session.commit() - - for c_t in Case_Tags.query.filter_by(case_id=case.id).all(): - case_tag = Case_Template_Tags( - case_id=new_template.id, - tag_id=c_t.tag_id - ) - db.session.add(case_tag) - db.session.commit() - - for c_t in Case_Galaxy_Tags.query.filter_by(case_id=case.id).all(): - case_cluster = Case_Template_Galaxy_Tags( - template_id=new_template.id, - cluster_id=c_t.cluster_id - ) - db.session.add(case_cluster) - db.session.commit() - - for task in case.tasks: - task_exist = Task_Template.query.filter_by(title=task.title).first() - if not task_exist: - task_template = Task_Template( - uuid=str(uuid.uuid4()), - title=task.title, - description=task.description, - url=task.url - ) - db.session.add(task_template) - db.session.commit() - - ## Tags - for t_t in Task_Tags.query.filter_by(task_id=task.id).all(): - task_tag = Task_Template_Tags( - task_id=task_template.id, - tag_id=t_t.tag_id - ) - db.session.add(task_tag) - db.session.commit() - - ## Clusters - for t_t in Task_Galaxy_Tags.query.filter_by(task_id=task.id).all(): - task_cluster = Task_Template_Galaxy_Tags( - template_id=task_template.id, - cluster_id=t_t.cluster_id - ) - db.session.add(task_cluster) - db.session.commit() - - ## Task Connectors - for t_c in Task_Connector_Instance.query.filter_by(task_id=task.id).all(): - task_connector = Task_Template_Connector_Instance( - template_id=task_template.id, - instance_id=t_c.instance_id - ) - db.session.add(task_connector) - db.session.commit() - - case_task_template = Case_Task_Template( - case_id=new_template.id, - task_id=task_template.id - ) - db.session.add(case_task_template) - db.session.commit() - else: - case_task_template = Case_Task_Template( - case_id=new_template.id, - task_id=task_exist.id - ) - db.session.add(case_task_template) - db.session.commit() - - return new_template - - -def change_recurring(form_dict, cid, current_user): - case = CommonModel.get_case(cid) - recurring_status = Status.query.filter_by(name="Recurring").first() - created_status = Status.query.filter_by(name="Created").first() - - if "once" in form_dict and form_dict["once"]: - case.recurring_type = "once" - case.recurring_date = form_dict["once"] - case.status_id = recurring_status.id - elif "daily" in form_dict and form_dict["daily"]: - case.recurring_type = "daily" - case.recurring_date = datetime.datetime.today() + datetime.timedelta(days=1) - case.status_id = recurring_status.id - elif "weekly" in form_dict and form_dict["weekly"]: - case.recurring_type = "weekly" - if form_dict["weekly"]/create_task", methods=['GET', 'POST']) -@login_required -def create_task(cid): - """View of a case""" - if CommonModel.get_case(cid): - present_in_case = CaseModel.get_present_in_case(cid, current_user) - if present_in_case or current_user.is_admin(): - form = TaskForm() - form.template_select.choices = [(template.id, template.title) for template in CommonModel.get_task_templates()] - form.template_select.choices.insert(0, (0," ")) - - if form.validate_on_submit(): - tag_list = request.form.getlist("tags_select") - cluster_list = request.form.getlist("clusters_select") - connector_list = request.form.getlist("connectors_select") - if CommonModel.check_tag(tag_list): - if CommonModel.check_cluster(cluster_list): - form_dict = form_to_dict(form) - form_dict["tags"] = tag_list - form_dict["clusters"] = cluster_list - form_dict["connectors"] = connector_list - if TaskModel.create_task(form_dict, cid, current_user): - flash("Task created", "success") - else: - flash("Error Task Created", "error") - return redirect(f"/case/{cid}") - return render_template("case/create_task.html", form=form) - return render_template("case/create_task.html", form=form) - return render_template("case/create_task.html", form=form) - return redirect(f"/case/{cid}") - return render_template("404.html") - -@task_blueprint.route("//edit_task/", methods=['GET','POST']) -@login_required -@editor_required -def edit_task(cid, tid): - """Edit the task""" - if CommonModel.get_case(cid): - if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin(): - form = TaskEditForm() - - if form.validate_on_submit(): - tag_list = request.form.getlist("tags_select") - cluster_list = request.form.getlist("clusters_select") - connector_list = request.form.getlist("connectors_select") - if CommonModel.check_tag(tag_list): - if CommonModel.check_cluster(cluster_list): - form_dict = form_to_dict(form) - form_dict["tags"] = tag_list - form_dict["clusters"] = cluster_list - form_dict["connectors"] = connector_list - TaskModel.edit_task_core(form_dict, tid, current_user) - flash("Task edited", "success") - return redirect(f"/case/{cid}") - return render_template("case/create_task.html", form=form) - return render_template("case/create_task.html", form=form) - else: - task_modif = CommonModel.get_task(tid) - form.description.data = task_modif.description - form.title.data = task_modif.title - form.url.data = task_modif.url - form.deadline_date.data = task_modif.deadline - form.deadline_time.data = task_modif.deadline - - return render_template("case/edit_task.html", form=form) - else: - flash("Access denied", "error") - return redirect(f"/case/{cid}") - return render_template("404.html") - - -@task_blueprint.route("/complete_task/", methods=['GET']) -@login_required -@editor_required -def complete_task(tid): - """Complete the task""" - - task = CommonModel.get_task(str(tid)) - if task: - if CaseModel.get_present_in_case(task.case_id, current_user) or current_user.is_admin(): - if TaskModel.complete_task(tid, current_user): - return {"message": "Task completed", "toast_class": "success-subtle"}, 200 - return {"message": "Error task completed", "toast_class": "danger-subtle"}, 400 - return {"message": "Action not allowed", "toast_class": "warning-subtle"}, 401 - return {"message": "Task not found", "toast_class": "danger-subtle"}, 404 - - - -@task_blueprint.route("//delete_task/", methods=['GET']) -@login_required -@editor_required -def delete_task(cid, tid): - """Delete the task""" - if CommonModel.get_case(cid): - if CommonModel.get_task(tid): - if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin(): - if TaskModel.delete_task(tid, current_user): - return {"message": "Task deleted", "toast_class": "success-subtle"}, 200 - return {"message": "Error task deleted", "toast_class": "danger-subtle"}, 400 - return {"message": "Action not allowed", "toast_class": "warning-subtle"}, 401 - return {"message": "Task not found", "toast_class": "danger-subtle"}, 404 - return {"message": "Case not found", "toast_class": "danger-subtle"}, 404 - - -@task_blueprint.route("//modif_note/", methods=['POST']) -@login_required -@editor_required -def modif_note(cid, tid): - """Modify note of the task""" - if CommonModel.get_case(cid): - if CommonModel.get_task(tid): - if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin(): - notes = request.json["notes"] - if TaskModel.modif_note_core(tid, current_user, notes): - return {"message": "Note added", "toast_class": "success-subtle"}, 200 - return {"message": "Error add/modify note", "toast_class": "danger-subtle"}, 400 - return {"message": "Action not allowed", "toast_class": "warning-subtle"}, 401 - return {"message": "Task not found", "toast_class": "danger-subtle"}, 404 - return {"message": "Case not found", "toast_class": "danger-subtle"}, 404 - - -@task_blueprint.route("//get_note/", methods=['GET']) -@editor_required -def get_note(cid, tid): - """Get not of a task in text format""" - if CommonModel.get_case(cid): - task = CommonModel.get_task(tid) - if task: - return {"note": task.notes}, 201 - return {"message": "Task not found", "toast_class": "danger-subtle"}, 404 - return {"message": "Case not found", "toast_class": "danger-subtle"}, 404 - - -@task_blueprint.route("//take_task/", methods=['GET']) -@login_required -@editor_required -def take_task(cid, tid): - """Assign current user to the task""" - if CommonModel.get_case(cid): - if CommonModel.get_task(tid): - if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin(): - if TaskModel.assign_task(tid, user=current_user, current_user=current_user, flag_current_user=True): - return {"message": "User Assigned", "toast_class": "success-subtle"}, 200 - return {"message": "Error assignment", "toast_class": "danger-subtle"}, 400 - return {"message": "Action not allowed", "toast_class": "warning-subtle"}, 401 - return {"message": "Task not found", "toast_class": "danger-subtle"}, 404 - return {"message": "Case not found", "toast_class": "danger-subtle"}, 404 - - -@task_blueprint.route("//assign_users/", methods=['POST']) -@login_required -@editor_required -def assign_user(cid, tid): - """Assign a list of users to the task""" - if CommonModel.get_case(cid): - if "users_id" in request.json: - users_list = request.json["users_id"] - - if CommonModel.get_task(tid): - if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin(): - for user in users_list: - TaskModel.assign_task(tid, user=user, current_user=current_user, flag_current_user=False) - return {"message": "Users Assigned", "toast_class": "success-subtle"}, 200 - return {"message": "Action not allowed", "toast_class": "warning-subtle"}, 401 - return {"message": "Task not found", "toast_class": "danger-subtle"}, 404 - return {"message": "'users_id' is missing", "toast_class": "danger-subtle"}, 400 - return {"message": "Case not found", "toast_class": "danger-subtle"}, 404 - - -@task_blueprint.route("//remove_assignment/", methods=['GET']) -@login_required -@editor_required -def remove_assign_task(cid, tid): - """Remove current user assignment to the task""" - if CommonModel.get_case(cid): - if CommonModel.get_task(tid): - if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin(): - if TaskModel.remove_assign_task(tid, user=current_user, current_user=current_user, flag_current_user=True): - return {"message": "User Removed from assignment", "toast_class": "success-subtle"}, 200 - return {"message": "Error removed assignment", "toast_class": "danger-subtle"}, 400 - return {"message": "Action not allowed", "toast_class": "warning-subtle"}, 401 - return {"message": "Task not found", "toast_class": "danger-subtle"}, 404 - return {"message": "Case not found", "toast_class": "danger-subtle"}, 404 - - -@task_blueprint.route("//remove_assigned_user/", methods=['POST']) -@login_required -@editor_required -def remove_assigned_user(cid, tid): - """Assign current user to the task""" - if CommonModel.get_case(cid): - if "user_id" in request.json: - user_id = request.json["user_id"] - if CommonModel.get_task(tid): - if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin(): - if TaskModel.remove_assign_task(tid, user=user_id, current_user=current_user, flag_current_user=False): - return {"message": "User Removed from assignment", "toast_class": "success-subtle"}, 200 - return {"message": "Error removed assignment", "toast_class": "danger-subtle"}, 400 - return {"message": "Action not allowed", "toast_class": "warning-subtle"}, 401 - return {"message": "Task not found", "toast_class": "danger-subtle"}, 404 - return {"message": "'user_id' is missing", "toast_class": "danger-subtle"}, 400 - return {"message": "Case not found", "toast_class": "danger-subtle"}, 404 - - -@task_blueprint.route("//change_task_status/", methods=['POST']) -@login_required -@editor_required -def change_task_status(cid, tid): - """Change the status of the task""" - if CommonModel.get_case(cid): - if "status" in request.json: - status = request.json["status"] - task = CommonModel.get_task(tid) - if task: - if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin(): - if TaskModel.change_task_status(status, task, current_user): - return {"message": "Status changed", "toast_class": "success-subtle"}, 200 - return {"message": "Error changed status", "toast_class": "danger-subtle"}, 400 - return {"message": "Action not allowed", "toast_class": "warning-subtle"}, 401 - return {"message": "Task not found", "toast_class": "danger-subtle"}, 404 - return {"message": "'status' is missing", "toast_class": "danger-subtle"}, 400 - return {"message": "Case not found", "toast_class": "danger-subtle"}, 404 - - -@task_blueprint.route("/task//download_file/", methods=['GET']) -@login_required -@editor_required -def download_file(tid, fid): - """Download the file""" - task = CommonModel.get_task(tid) - file = CommonModel.get_file(fid) - if file and file in task.files: - if CaseModel.get_present_in_case(task.case_id, current_user) or current_user.is_admin(): - return TaskModel.download_file(file) - return {"message": "Action not allowed", "toast_class": "warning-subtle"}, 401 - return {"message": "File not found", "toast_class": "danger-subtle"}, 404 - - -@task_blueprint.route("/task//delete_file/", methods=['GET']) -@login_required -@editor_required -def delete_file(tid, fid): - """Delete the file""" - task = CommonModel.get_task(tid) - file = CommonModel.get_file(fid) - if file and file in task.files: - if CaseModel.get_present_in_case(task.case_id, current_user) or current_user.is_admin(): - if TaskModel.delete_file(file, task, current_user): - return {"message": "File Deleted", "toast_class": "success-subtle"}, 200 - return {"message": "Error deleting file"}, 400 - return {"message": "Action not allowed", "toast_class": "warning-subtle"}, 401 - return {"message": "File not found", "toast_class": "danger-subtle"}, 404 - - -@task_blueprint.route("//add_files/", methods=['POST']) -@login_required -@editor_required -def add_files(cid, tid): - """Add files to a task""" - if CommonModel.get_case(cid): - task = CommonModel.get_task(tid) - if task: - if len(request.files) > 0: - if TaskModel.add_file_core(task=task, files_list=request.files, current_user=current_user): - return {"message":"Files added", "toast_class": "success-subtle"}, 200 - return {"message":"Something goes wrong adding files", "toast_class": "danger-subtle"}, 400 - return {"message":"No Files given", "toast_class": "warning-subtle"}, 400 - return {"message":"Task not found", "toast_class": "danger-subtle"}, 404 - return {"message":"Case not found", "toast_class": "danger-subtle"}, 404 - - -@task_blueprint.route("//get_files/", methods=['GET']) -@login_required -@editor_required -def get_files(cid, tid): - """Get files of a task""" - if CommonModel.get_case(cid): - task = CommonModel.get_task(tid) - if task: - file_list = [file.to_json() for file in task.files] - return {"files": file_list}, 200 - return {"message":"Task not found", "toast_class": "danger-subtle"}, 404 - return {"message":"Case not found", "toast_class": "danger-subtle"}, 404 - - -@task_blueprint.route("//sort_by_ongoing_task", methods=['GET']) -@login_required -def sort_by_ongoing_task(cid): - """Sort Task by living one""" - case = CommonModel.get_case(cid) - tags = request.args.get('tags') - or_and_taxo = request.args.get("or_and_taxo") - taxonomies = request.args.get('taxonomies') - - galaxies = request.args.get('galaxies') - clusters = request.args.get('clusters') - or_and_galaxies = request.args.get("or_and_galaxies") - - return TaskModel.sort_by_status_task_core(case, current_user, taxonomies, galaxies, tags, clusters, or_and_taxo, or_and_galaxies, completed=False) - - -@task_blueprint.route("//sort_by_finished_task", methods=['GET']) -@login_required -def sort_by_finished_task(cid): - """Sort task by finished one""" - case = CommonModel.get_case(cid) - tags = request.args.get('tags') - or_and_taxo = request.args.get("or_and_taxo") - taxonomies = request.args.get('taxonomies') - - galaxies = request.args.get('galaxies') - clusters = request.args.get('clusters') - or_and_galaxies = request.args.get("or_and_galaxies") - - return TaskModel.sort_by_status_task_core(case, current_user, taxonomies, galaxies, tags, clusters, or_and_taxo, or_and_galaxies, completed=True) - - -@task_blueprint.route("//tasks/ongoing", methods=['GET']) -@login_required -def ongoing_tasks_sort_by_filter(cid): - """Sort by filter for living task""" - tags = request.args.get('tags') - or_and_taxo = request.args.get("or_and_taxo") - taxonomies = request.args.get('taxonomies') - - galaxies = request.args.get('galaxies') - clusters = request.args.get('clusters') - or_and_galaxies = request.args.get("or_and_galaxies") - filter = request.args.get('filter') - - if filter: - case = CommonModel.get_case(cid) - return TaskModel.sort_tasks_by_filter(case, current_user, filter, taxonomies, galaxies, tags, clusters, or_and_taxo, or_and_galaxies, completed=False) - return {"message": "No filter pass"}, 400 - - -@task_blueprint.route("//tasks/finished", methods=['GET']) -@login_required -def finished_tasks_sort_by_filter(cid): - """Sort by filter for finished task""" - tags = request.args.get('tags') - or_and_taxo = request.args.get("or_and_taxo") - taxonomies = request.args.get('taxonomies') - filter = request.args.get('filter') - - galaxies = request.args.get('galaxies') - clusters = request.args.get('clusters') - or_and_galaxies = request.args.get("or_and_galaxies") - - if filter: - case = CommonModel.get_case(cid) - return TaskModel.sort_tasks_by_filter(case, current_user, filter, taxonomies, galaxies, tags, clusters, or_and_taxo, or_and_galaxies, completed=True) - return {"message": "No filter pass"}, 400 - - -@task_blueprint.route("//task//notify_user", methods=['POST']) -@login_required -@editor_required -def notify_user(cid, tid): - """Notify a user about a task""" - if CommonModel.get_case(cid): - if "user_id" in request.json: - user = request.json["user_id"] - task = CommonModel.get_task(tid) - if task: - if CaseModel.get_present_in_case(cid, current_user) or current_user.is_admin(): - if CaseModel.notify_user(task, user): - return {"message":"User notified", "toast_class": "success-subtle"}, 200 - return {"message":"Something goes wrong", "toast_class": "danger-subtle"}, 400 - return {"message":"Action not Allowed", "toast_class": "warning-subtle"}, 400 - return {"message":"Task not found", "toast_class": "danger-subtle"}, 404 - return {"message": "'user_id' is missing", "toast_class": "danger-subtle"}, 404 - return {"message": "Case not found", "toast_class": "danger-subtle"}, 404 - - -@task_blueprint.route("//task//export_notes", methods=['GET']) -@login_required -def export_notes(cid, tid): - """Export note of a task as pdf""" - - case = CommonModel.get_case(cid) - if case: - task = CommonModel.get_task(tid) - if task: - data_dict = dict(request.args) - if "type" in data_dict: - type_req = data_dict["type"] - res = TaskModel.export_notes(task, type_req) - CommonModel.delete_temp_folder() - return res - return {"message": "'type' is missing", 'toast_class': "warning-subtle"}, 400 - return {"message": "Task not found", 'toast_class': "danger-subtle"}, 404 - return {"message": "Case not found", 'toast_class': "danger-subtle"}, 404 - - -@task_blueprint.route("/get_taxonomies_task/", methods=['GET']) -@login_required -def get_taxonomies_case(tid): - task = CommonModel.get_task(tid) - if task: - tags = CommonModel.get_task_tags(task.id) - taxonomies = [] - if tags: - taxonomies = [tag.split(":")[0] for tag in tags] - return {"tags": tags, "taxonomies": taxonomies} - return {"message": "task Not found", 'toast_class': "danger-subtle"}, 404 - -@task_blueprint.route("/get_galaxies_task/", methods=['GET']) -@login_required -def get_galaxies_task(tid): - task = CommonModel.get_task(tid) - if task: - clusters = CommonModel.get_task_clusters(task.id) - galaxies = [] - if clusters: - for cluster in clusters: - loc_g = CommonModel.get_galaxy(cluster.galaxy_id) - if not loc_g.name in galaxies: - galaxies.append(loc_g.name) - index = clusters.index(cluster) - clusters[index] = cluster.tag - return {"clusters": clusters, "galaxies": galaxies} - return {"message": "task Not found", 'toast_class': "danger-subtle"}, 404 - - -@task_blueprint.route("/get_connectors", methods=['GET']) -@login_required -def get_connectors(): - connectors_list = CommonModel.get_connectors() - connectors_dict = dict() - for connector in connectors_list: - loc = list() - for instance in connector.instances: - if CommonModel.get_user_instance_both(user_id=current_user.id, instance_id=instance.id): - loc.append(instance.to_json()) - if loc: - connectors_dict[connector.name] = loc - - return jsonify({"connectors": connectors_dict}), 200 - -@task_blueprint.route("/get_connectors_task/", methods=['GET']) -@login_required -def get_connectors_task(tid): - task = CommonModel.get_task(tid) - if task: - return {"connectors": [CommonModel.get_instance(task_instance.instance_id).name for task_instance in CommonModel.get_task_connectors(task.id) ]} - return {"message": "task Not found", 'toast_class': "danger-subtle"}, 404 diff --git a/webiste/app/case/task_core.py b/webiste/app/case/task_core.py deleted file mode 100644 index 58ea9e25..00000000 --- a/webiste/app/case/task_core.py +++ /dev/null @@ -1,555 +0,0 @@ -import os -import ast -import uuid -import shutil -import datetime -import subprocess -from .. import db -from ..db_class.db import * -from ..utils.utils import create_specific_dir - -from sqlalchemy import desc, and_ -from flask import request, send_file -from werkzeug.utils import secure_filename -from ..notification import notification_core as NotifModel - -from . import common_core as CommonModel - - -UPLOAD_FOLDER = os.path.join(os.getcwd(), "uploads") -FILE_FOLDER = os.path.join(UPLOAD_FOLDER, "files") -TEMP_FOLDER = os.path.join(os.getcwd(), "temp") - - -def delete_task(tid, current_user): - """Delete a task by is id""" - task = CommonModel.get_task(tid) - if task is not None: - for file in task.files: - try: - os.remove(os.path.join(FILE_FOLDER, file.uuid)) - except: - return False - db.session.delete(file) - db.session.commit() - - case = CommonModel.get_case(task.case_id) - task_users = Task_User.query.where(Task_User.task_id==task.id).all() - for task_user in task_users: - user = User.query.get(task_user.user_id) - NotifModel.create_notification_user(f"Task '{task.id}-{task.title}' of case '{case.id}-{case.title}' was deleted", task.case_id, user_id=user.id, html_icon="fa-solid fa-trash") - - Task_Tags.query.filter_by(task_id=task.id).delete() - Task_Galaxy_Tags.query.filter_by(task_id=task.id).delete() - Task_User.query.filter_by(task_id=task.id).delete() - Task_Connector_Instance.query.filter_by(task_id=task.id).delete() - db.session.delete(task) - CommonModel.update_last_modif(task.case_id) - db.session.commit() - - CommonModel.save_history(case.uuid, current_user, f"Task '{task.title}' deleted") - return True - return False - - -def complete_task(tid, current_user): - """Complete task by is id""" - task = CommonModel.get_task(tid) - if task is not None: - task.completed = not task.completed - - case = CommonModel.get_case(task.case_id) - task_users = Task_User.query.where(Task_User.task_id==task.id).all() - if task.completed: - task.status_id = Status.query.filter_by(name="Finished").first().id - message = f"Task '{task.id}-{task.title}' of case '{case.id}-{case.title}' completed" - else: - task.status_id = Status.query.filter_by(name="Created").first().id - message = f"Task '{task.id}-{task.title}' of case '{case.id}-{case.title}' revived" - for task_user in task_users: - user = User.query.get(task_user.user_id) - if task.completed: - message = f"Task '{task.id}-{task.title}' of case '{case.id}-{case.title}' completed" - NotifModel.create_notification_user(message, task.case_id, user_id=user.id, html_icon="fa-solid fa-check") - else: - message = f"Task '{task.id}-{task.title}' of case '{case.id}-{case.title}' revived" - NotifModel.create_notification_user(message, task.case_id, user_id=user.id, html_icon="fa-solid fa-heart-circle-bolt") - - CommonModel.update_last_modif(task.case_id) - CommonModel.update_last_modif_task(task.id) - db.session.commit() - CommonModel.save_history(case.uuid, current_user, f"Task '{task.title}' completed") - return True - return False - - -def create_task(form_dict, cid, current_user): - """Add a task to the DB""" - if "template_select" in form_dict and not 0 in form_dict["template_select"]: - template = Task_Template.query.get(form_dict["template_select"]) - task = Task( - uuid=str(uuid.uuid4()), - title=template.title, - description=template.description, - url=template.url, - creation_date=datetime.datetime.now(tz=datetime.timezone.utc), - last_modif=datetime.datetime.now(tz=datetime.timezone.utc), - case_id=cid, - status_id=1 - ) - db.session.add(task) - db.session.commit() - - for t_t in Task_Template_Tags.query.filter_by(task_id=task.id).all(): - task_tag = Task_Tags( - task_id=task.id, - tag_id=t_t.tag_id - ) - db.session.add(task_tag) - db.session.commit() - - for t_t in Task_Template_Galaxy_Tags.query.filter_by(task_id=task.id).all(): - task_tag = Task_Galaxy_Tags( - task_id=task.id, - cluster_id=t_t.cluster_id - ) - db.session.add(task_tag) - db.session.commit() - - for t_t in Task_Template_Connector_Instance.query.filter_by(template_id=task.id).all(): - task_instance = Task_Connector_Instance( - task_id=task.id, - instance_id=t_t.instance_id - ) - db.session.add(task_instance) - db.session.commit() - else: - deadline = CommonModel.deadline_check(form_dict["deadline_date"], form_dict["deadline_time"]) - - ## Check if instance is from current user - for instance in form_dict["connectors"]: - instance = CommonModel.get_instance_by_name(instance) - if not CommonModel.get_user_instance_by_instance(instance.id): - return False - - task = Task( - uuid=str(uuid.uuid4()), - title=form_dict["title"], - description=form_dict["description"], - url=form_dict["url"], - creation_date=datetime.datetime.now(tz=datetime.timezone.utc), - last_modif=datetime.datetime.now(tz=datetime.timezone.utc), - deadline=deadline, - case_id=cid, - status_id=1 - ) - db.session.add(task) - db.session.commit() - - for tags in form_dict["tags"]: - tag = CommonModel.get_tag(tags) - - task_tag = Task_Tags( - tag_id=tag.id, - task_id=task.id - ) - db.session.add(task_tag) - db.session.commit() - - for clusters in form_dict["clusters"]: - cluster = CommonModel.get_cluster_by_name(clusters) - - task_galaxy_tag = Task_Galaxy_Tags( - cluster_id=cluster.id, - task_id=task.id - ) - db.session.add(task_galaxy_tag) - db.session.commit() - - for instance in form_dict["connectors"]: - instance = CommonModel.get_instance_by_name(instance) - task_instance = Task_Connector_Instance( - task_id=task.id, - instance_id=instance.id - ) - db.session.add(task_instance) - db.session.commit() - - CommonModel.update_last_modif(cid) - - case = CommonModel.get_case(cid) - CommonModel.save_history(case.uuid, current_user, f"Task '{task.title}' Created") - - return task - - -def edit_task_core(form_dict, tid, current_user): - """Edit a task to the DB""" - task = CommonModel.get_task(tid) - deadline = CommonModel.deadline_check(form_dict["deadline_date"], form_dict["deadline_time"]) - - ## Check if instance is from current user - for instance in form_dict["connectors"]: - instance = CommonModel.get_instance_by_name(instance) - if not CommonModel.get_user_instance_by_instance(instance.id): - return False - - task.title = form_dict["title"] - task.description=form_dict["description"] - task.url=form_dict["url"] - task.deadline=deadline - - ## Tags - task_tag_db = Task_Tags.query.filter_by(task_id=task.id).all() - for tags in form_dict["tags"]: - tag = CommonModel.get_tag(tags) - - if not tags in task_tag_db: - task_tag = Task_Tags( - tag_id=tag.id, - task_id=task.id - ) - db.session.add(task_tag) - db.session.commit() - - for c_t_db in task_tag_db: - if not c_t_db in form_dict["tags"]: - Task_Tags.query.filter_by(id=c_t_db.id).delete() - db.session.commit() - - ## Clusters - task_tag_db = Task_Galaxy_Tags.query.filter_by(task_id=task.id).all() - for clusters in form_dict["clusters"]: - cluster = CommonModel.get_cluster_by_name(clusters) - - if not clusters in task_tag_db: - task_tag = Task_Galaxy_Tags( - cluster_id=cluster.id, - task_id=task.id - ) - db.session.add(task_tag) - db.session.commit() - - for c_t_db in task_tag_db: - if not c_t_db in form_dict["clusters"]: - Task_Galaxy_Tags.query.filter_by(id=c_t_db.id).delete() - db.session.commit() - - ## Connectors - task_connector_db = Task_Connector_Instance.query.filter_by(task_id=task.id).all() - for connectors in form_dict["connectors"]: - instance = CommonModel.get_instance_by_name(connectors) - - if not connectors in task_connector_db: - task_tag = Task_Connector_Instance( - instance_id=instance.id, - task_id=task.id - ) - db.session.add(task_tag) - db.session.commit() - - for c_t_db in task_connector_db: - if not c_t_db in form_dict["connectors"]: - Task_Connector_Instance.query.filter_by(id=c_t_db.id).delete() - db.session.commit() - - CommonModel.update_last_modif(task.case_id) - CommonModel.update_last_modif_task(task.id) - db.session.commit() - - case = CommonModel.get_case(task.case_id) - CommonModel.save_history(case.uuid, current_user, f"Task '{task.title}' edited") - - -def add_file_core(task, files_list, current_user): - create_specific_dir(UPLOAD_FOLDER) - create_specific_dir(FILE_FOLDER) - for file in files_list: - if files_list[file].filename: - uuid_loc = str(uuid.uuid4()) - filename = secure_filename(files_list[file].filename) - try: - file_data = request.files[file].read() - with open(os.path.join(FILE_FOLDER, uuid_loc), "wb") as write_file: - write_file.write(file_data) - except Exception as e: - print(e) - return False - - f = File( - name=filename, - task_id=task.id, - uuid = uuid_loc - ) - db.session.add(f) - CommonModel.update_last_modif(task.case_id) - CommonModel.update_last_modif_task(task.id) - db.session.commit() - case = CommonModel.get_case(task.case_id) - CommonModel.save_history(case.uuid, current_user, f"File added for task '{task.title}'") - return True - -def modif_note_core(tid, current_user, notes): - """Modify a noe of a task to the DB""" - task = CommonModel.get_task(tid) - if task: - task.notes = notes - CommonModel.update_last_modif(task.case_id) - CommonModel.update_last_modif_task(task.id) - db.session.commit() - case = CommonModel.get_case(task.case_id) - CommonModel.save_history(case.uuid, current_user, f"Notes for '{task.title}' modified") - return True - return False - - -def assign_task(tid, user, current_user, flag_current_user): - """Assign current user to a task""" - task = CommonModel.get_task(tid) - case = CommonModel.get_case(task.case_id) - if task: - if type(user) == str or type(user) == int: - user = User.query.get(user) - - task_user = Task_User(task_id=task.id, user_id=user.id) - if not flag_current_user: - NotifModel.create_notification_user(f"You have been assign to: '{task.id}-{task.title}' of case '{case.id}-{case.title}'", task.case_id, user_id=user.id, html_icon="fa-solid fa-hand") - - if not Task_User.query.filter_by(task_id=task.id, user_id=user.id).first(): - db.session.add(task_user) - CommonModel.update_last_modif(task.case_id) - CommonModel.update_last_modif_task(task.id) - db.session.commit() - CommonModel.save_history(case.uuid, current_user, f"Task '{task.id}-{task.title}' assigned to {user.first_name} {user.last_name}") - return True - return False - return False - -def get_users_assign_task(task_id, user): - """Return users assigned to a task""" - task_users = Task_User.query.filter_by(task_id=task_id).all() - users = list() - flag = False - for task_user in task_users: - u = User.query.get(task_user.user_id) - # if current user is assigned to the task - if u.id == user.id: - flag = True - users.append(u.to_json()) - return users, flag - - -def remove_assign_task(tid, user, current_user, flag_current_user): - """Remove current user to the assignement to a task""" - task = CommonModel.get_task(tid) - case = CommonModel.get_case(task.case_id) - if task: - if type(user) == int or type(user) == str: - user = User.query.get(user) - task_users = Task_User.query.filter_by(task_id=task.id, user_id=user.id).all() - if not flag_current_user: - NotifModel.create_notification_user(f"Your assignment have been removed: '{task.id}-{task.title}' of case '{case.id}-{case.title}'", task.case_id, user_id=user.id, html_icon="fa-solid fa-handshake-slash") - - for task_user in task_users: - db.session.delete(task_user) - - CommonModel.update_last_modif(task.case_id) - CommonModel.update_last_modif_task(task.id) - db.session.commit() - CommonModel.save_history(case.uuid, current_user, f"Assignment '{task.title}' removed to {user.first_name} {user.last_name}") - return True - return False - - -def change_task_status(status, task, current_user): - task.status_id = status - CommonModel.update_last_modif(task.case_id) - CommonModel.update_last_modif_task(task.id) - db.session.commit() - - case = CommonModel.get_case(task.case_id) - CommonModel.save_history(case.uuid, current_user, f"Status changed for task '{task.title}'") - - return True - - -def download_file(file): - return send_file(os.path.join(FILE_FOLDER, file.uuid), as_attachment=True, download_name=file.name) - - -def delete_file(file, task, current_user): - try: - os.remove(os.path.join(FILE_FOLDER, file.uuid)) - except: - return False - - db.session.delete(file) - db.session.commit() - case = CommonModel.get_case(task.case_id) - CommonModel.save_history(case.uuid, current_user, f"File deleted for task '{task.title}'") - return True - - -def get_task_info(tasks_list, user): - tasks = list() - for task in tasks_list: - case = CommonModel.get_case(task.case_id) - users, is_current_user_assigned = get_users_assign_task(task.id, user) - file_list = list() - for file in task.files: - file_list.append(file.to_json()) - finalTask = task.to_json() - finalTask["users"] = users - finalTask["is_current_user_assigned"] = is_current_user_assigned - finalTask["files"] = file_list - finalTask["case_title"] = case.title - - finalTask["instances"] = list() - for task_connector in CommonModel.get_task_connectors(task.id): - loc_instance = CommonModel.get_instance(task_connector.instance_id).to_json() - loc_instance["icon"] = Icon_File.query.join(Connector_Icon, Connector_Icon.file_icon_id==Icon_File.id)\ - .join(Connector, Connector.icon_id==Connector_Icon.id)\ - .join(Connector_Instance, Connector_Instance.connector_id==Connector.id)\ - .where(Connector_Instance.id==task_connector.instance_id)\ - .first().uuid - finalTask["instances"].append(loc_instance) - - tasks.append(finalTask) - return tasks - - -def build_task_query(completed, tags=None, taxonomies=None, galaxies=None, clusters=None, filter=None): - query = Task.query - conditions = [Task.completed == completed] - - if tags or taxonomies: - query = query.join(Task_Tags, Task_Tags.task_id == Task.id) - query = query.join(Tags, Task_Tags.tag_id == Tags.id) - if tags: - tags = ast.literal_eval(tags) - conditions.append(Tags.name.in_(list(tags))) - - if taxonomies: - taxonomies = ast.literal_eval(taxonomies) - query = query.join(Taxonomy, Taxonomy.id == Tags.taxonomy_id) - conditions.append(Taxonomy.name.in_(list(taxonomies))) - - if clusters or galaxies: - query = query.join(Task_Galaxy_Tags, Task_Galaxy_Tags.task_id == Task.id) - query = query.join(Cluster, Task_Galaxy_Tags.cluster_id == Cluster.id) - if clusters: - clusters = ast.literal_eval(clusters) - conditions.append(Cluster.name.in_(list(clusters))) - - if galaxies: - galaxies = ast.literal_eval(galaxies) - query = query.join(Galaxy, Galaxy.id == Cluster.galaxy_id) - conditions.append(Galaxy.name.in_(list(galaxies))) - - if filter: - query.order_by(desc(filter)) - - return query.filter(and_(*conditions)).all() - -def sort_by_status_task_core(case, user, taxonomies=[], galaxies=[], tags=[], clusters=[], or_and_taxo="true", or_and_galaxies="true", completed=False, no_info=False, filter=False): - tasks = build_task_query(completed, tags, taxonomies, galaxies, clusters, filter) - - if tags: - tags = ast.literal_eval(tags) - if taxonomies: - taxonomies = ast.literal_eval(taxonomies) - - if galaxies: - galaxies = ast.literal_eval(galaxies) - if clusters: - clusters = ast.literal_eval(clusters) - - if tags or taxonomies or galaxies or clusters: - if or_and_taxo == "false": - glob_list = [] - - for task in tasks: - tags_db = task.to_json()["tags"] - loc_tag = [tag["name"] for tag in tags_db] - taxo_list = [Taxonomy.query.get(tag["taxonomy_id"]).name for tag in tags_db] - - if (not tags or all(item in loc_tag for item in tags)) and \ - (not taxonomies or all(item in taxo_list for item in taxonomies)): - glob_list.append(task) - - tasks = glob_list - if or_and_galaxies == "false": - glob_list = [] - - for task in tasks: - clusters_db = task.to_json()["clusters"] - loc_cluster = [cluster["name"] for cluster in clusters_db] - galaxies_list = [Galaxy.query.get(cluster["galaxy_id"]).name for cluster in clusters_db] - - if (not clusters or all(item in loc_cluster for item in clusters)) and \ - (not galaxies or all(item in galaxies_list for item in galaxies)): - glob_list.append(task) - - tasks = glob_list - else: - tasks = Task.query.filter_by(case_id=case.id, completed=completed).all() - - if no_info: - return tasks - return get_task_info(tasks, user) - - -def sort_tasks_by_filter(case, user, filter, taxonomies=[], galaxies=[], tags=[], clusters=[], or_and_taxo="true", or_and_galaxies="true", completed=False): - tasks_list = sort_by_status_task_core(case, user, taxonomies, galaxies, tags, clusters, or_and_taxo, or_and_galaxies, completed, no_info=True, filter=filter) - - loc_list = list() - if filter == "assigned_tasks": - for task in tasks_list: - if Task_User.query.filter_by(task_id=task.id).first(): - loc_list.append(task) - tasks_list = loc_list - - elif filter == "my_assignment": - for task in tasks_list: - if Task_User.query.filter_by(task_id=task.id, user_id=user.id).first(): - task.is_current_user_assigned = True - loc_list.append(task) - tasks_list = loc_list - - elif filter == "deadline": - # for deadline filter, only task with a deadline defined is required - loc = list() - for task in tasks_list: - if getattr(task, filter): - loc.append(task) - tasks_list = loc - else: - # status, last_modif, title - tasks_list.sort(key=lambda x: getattr(x, filter)) - - return get_task_info(tasks_list, user) - - -def export_notes(task, type_req): - if not os.path.isdir(TEMP_FOLDER): - os.mkdir(TEMP_FOLDER) - - download_filename = f"export_note_task_{task.id}.{type_req}" - temp_md = os.path.join(TEMP_FOLDER, "index.md") - temp_export = os.path.join(TEMP_FOLDER, f"output.{type_req}") - - with open(temp_md, "w")as write_file: - write_file.write(task.notes) - if type_req == "pdf": - process = subprocess.Popen(["pandoc", temp_md, "--pdf-engine=xelatex", "-V", "colorlinks=true", "-V", "linkcolor=blue", "-V", "urlcolor=red", "-V", "tocolor=gray"\ - "--number-sections", "--toc", "--template", "eisvogel", "-o", temp_export, "--filter=pandoc-mermaid"], stdout=subprocess.PIPE) - elif type_req == "docx": - process = subprocess.Popen(["pandoc", temp_md, "-o", temp_export, "--filter=mermaid-filter"], stdout=subprocess.PIPE) - process.wait() - - try: - shutil.rmtree(os.path.join(os.getcwd(), "mermaid-images")) - except: - pass - - return send_file(temp_export, as_attachment=True, download_name=download_filename) \ No newline at end of file diff --git a/webiste/app/session.py b/webiste/app/session.py index 665f606d..23caf261 100644 --- a/webiste/app/session.py +++ b/webiste/app/session.py @@ -153,9 +153,13 @@ def save_info(self): db.session.commit() histories = History.query.all() - while len(histories) > 100: - history = History.query.filter_by(func.min(History.id)) - history.delete() + + while len(histories) > 3: + history = History.query.order_by(History.id).all() + Session_db.query.filter_by(id=history[0].session_id).delete() + History.query.filter_by(id=history[0].id).delete() + histories = History.query.all() + db.session.commit() return \ No newline at end of file diff --git a/webiste/app/templates/history.html b/webiste/app/templates/history.html index f179000b..b73aaaf6 100644 --- a/webiste/app/templates/history.html +++ b/webiste/app/templates/history.html @@ -11,9 +11,24 @@
-
- [[h]] -
+
[Go Back Top]