Skip to content

Commit

Permalink
[IMPORT] add permission check for habitat imported on existing station (
Browse files Browse the repository at this point in the history
#3209)

* feat(import,check) : when an habitat is imported on existing station, we now check permissions on the latter
* feat(occhab): change `Station.filter_by_scope()`  signature, now return a whereclause
  • Loading branch information
jacquesfize authored Oct 15, 2024
1 parent cd74e5f commit c7bb770
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@ On importe une station qui existe déjà en base et un nouvel habitat, seul l
On importe une station et un habitat existant déjà en base;SKIP_EXISTING_UUID;SKIP_EXISTING_UUID;;EXISTING_STATION_UUID;;17/11/2023;17/11/2023;Toto;;POINT(3.634 44.399);St;EXISTING_HABITAT_UUID;prairie;24;;
technique collect vaut « autre » mais pas de précision fournise;OK !;CONDITIONAL_MANDATORY_FIELD_ERROR;;;VALID_DATASET_UUID;17/11/2023;17/11/2023;Toto;;POINT(3.634 44.399);St;;prairie;24;;10
technique collect vaut « autre » et une précision est bien fournies;OK !;OK !;;;VALID_DATASET_UUID;17/11/2023;17/11/2023;Toto;;POINT(3.634 44.399);St;;prairie;24;moyen précis;10
L’habitat est crée dans une station que existe en base mais sur laquelle nous n’avons pas les droits;DATASET_NOT_AUTHORIZED(unique_dataset_id);OK ! ;;STRANGER_STATION_UUID;;;;;;;;;prairie;24;moyen précis;10
jeu de données pas actif;DATASET_NOT_ACTIVE;ERRONEOUS_PARENT_ENTITY;;;INACTIVE_DATASET_UUID;17/11/2023;17/11/2023;Toto;;POINT(3.634 44.399);St;;prairie;24;moyen précis;10
36 changes: 33 additions & 3 deletions backend/geonature/tests/imports/test_imports_occhab.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,14 @@ def contentmapping(occhab_destination):

@pytest.fixture()
def uploaded_import(
client, users, datasets, station, habitat, import_file_name, display_unique_dataset_id
client,
users,
datasets,
station,
station_stranger_dataset,
habitat,
import_file_name,
display_unique_dataset_id,
):
with open(test_files_path / import_file_name, "rb") as f:
test_file_line_count = sum(1 for line in f) - 1 # remove headers
Expand All @@ -95,6 +102,10 @@ def uploaded_import(
b"EXISTING_STATION_UUID",
station.unique_id_sinp_station.hex.encode("ascii"),
)
content = content.replace(
b"STRANGER_STATION_UUID",
station_stranger_dataset.unique_id_sinp_station.hex.encode("ascii"),
)
content = content.replace(
b"EXISTING_HABITAT_UUID",
habitat.unique_id_sinp_hab.hex.encode("ascii"),
Expand Down Expand Up @@ -209,6 +220,18 @@ def station(datasets):
return station


@pytest.fixture(scope="function")
def station_stranger_dataset(datasets):
station = Station(
id_dataset=datasets["stranger_dataset"].id_dataset,
date_min=datetime.strptime("17/11/2023", "%d/%m/%Y"),
geom_4326=from_shape(Point(3.634, 44.399), 4326),
)
with db.session.begin_nested():
db.session.add(station)
return station


@pytest.fixture(scope="function")
def habitat(station):
habitat = OccurenceHabitat(
Expand Down Expand Up @@ -238,6 +261,7 @@ def no_default_uuid(monkeypatch):
class TestImportsOcchab:
@pytest.mark.parametrize("import_file_name", ["valid_file.csv"])
def test_import_valid_file(self, imported_import):

assert_import_errors(
imported_import,
{
Expand All @@ -254,11 +278,17 @@ def test_import_valid_file(self, imported_import):
"unique_dataset_id",
frozenset({6}),
),
(
ImportCodeError.DATASET_NOT_AUTHORIZED,
"habitat",
"",
frozenset({43}),
),
(
ImportCodeError.DATASET_NOT_ACTIVE,
"station",
"unique_dataset_id",
frozenset({43}),
frozenset({44}),
),
(
ImportCodeError.INVALID_UUID,
Expand Down Expand Up @@ -319,7 +349,7 @@ def test_import_valid_file(self, imported_import):
ImportCodeError.ERRONEOUS_PARENT_ENTITY,
"habitat",
"",
frozenset({5, 6, 9, 24, 43}),
frozenset({5, 6, 9, 24, 44}),
),
(
ImportCodeError.NO_PARENT_ENTITY,
Expand Down
7 changes: 5 additions & 2 deletions backend/geonature/tests/test_pr_occhab.py
Original file line number Diff line number Diff line change
Expand Up @@ -552,8 +552,11 @@ def query_test_filter_by_params(params):
)

def test_filter_by_scope(self):
res = Station.filter_by_scope(0)
res = db.session.scalars(res).unique().all()
res = (
db.session.scalars(sa.select(Station).where(Station.filter_by_scope(scope=0)))
.unique()
.all()
)
assert not len(res) # <=> len(res) == 0

def test_has_instance_permission(self, stations):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ def get_joinedload_when_scope(scope):
@permissions.check_cruved_scope("R", module_code="OCCHAB", get_scope=True)
def list_stations(scope):
joinedload_when_scope = get_joinedload_when_scope(scope)
stations = Station.filter_by_params(request.args)
stations = Station.filter_by_scope(scope=scope, query=stations)
stations = Station.filter_by_params(request.args).where(Station.filter_by_scope(scope=scope))
stations = stations.order_by(Station.date_min.desc()).options(
raiseload("*"),
joinedload(Station.observers),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
check_mandatory_field,
)
from .checks import (
check_existing_station_permissions,
generate_id_station,
set_id_station_from_line_no,
)
Expand Down Expand Up @@ -191,6 +192,8 @@ def check_habitat_sql(imprt):
selected_fields.get("unique_id_sinp_station"),
],
)
check_existing_station_permissions(imprt)

set_parent_line_no(
imprt,
parent_entity=entity_station,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
from geonature.core.gn_permissions.tools import get_scopes_by_action
from geonature.core.imports.checks.errors import ImportCodeError
from geonature.core.imports.checks.sql.utils import report_erroneous_rows
from geonature.core.imports.models import Entity, TImports
from gn_module_occhab.models import Station
import sqlalchemy as sa
from sqlalchemy.orm import aliased

Expand Down Expand Up @@ -59,3 +63,33 @@ def set_id_station_from_line_no(imprt: TImports, habitat_entity: Entity) -> None
.where(transient_station.c.line_no == transient_habitat.c.station_line_no)
.values({"id_station": transient_station.c.id_station})
)


def check_existing_station_permissions(imprt: TImports) -> None:
"""
Check that the user has update right on all stations associated with the newly imported habitats.
Parameters
----------
imprt : TImports
Current import
"""

transient_table = imprt.destination.get_transient_table()
entity_habitat = Entity.query.filter_by(code="habitat").one()

# Get User permissions on OCCHAB
author = imprt.authors[0]
cruved = get_scopes_by_action(id_role=author.id_role, module_code="OCCHAB")

# Return error when a station in the transition table is not updatable
report_erroneous_rows(
imprt,
entity=entity_habitat,
error_type=ImportCodeError.DATASET_NOT_AUTHORIZED,
error_column="",
whereclause=sa.and_(
transient_table.c.id_station == Station.id_station,
sa.not_(Station.filter_by_scope(scope=cruved["U"], user=author)),
),
)
39 changes: 27 additions & 12 deletions contrib/gn_module_occhab/backend/gn_module_occhab/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
from utils_flask_sqla.serializers import serializable
from werkzeug.datastructures import TypeConversionDict

from flask_login import current_user

cor_station_observer = db.Table(
"cor_station_observer",
db.Column("id_cor_station_observer", db.Integer, primary_key=True),
Expand Down Expand Up @@ -126,23 +128,36 @@ def filter_by_params(cls, params, *, query):
query = query.where(Station.date_max <= date_up)
return query

@qfilter(query=True)
def filter_by_scope(cls, scope, user=None, *, query):
@qfilter
def filter_by_scope(cls, scope, user=None, **kwargs):
"""
Filter Station instances by scope and user.
Parameters
----------
scope : int
0, 1, 2 or 3
user : User, optional
user instance. If None, use current_user (default is None)
Returns
-------
sqlalchemy.sql.expression.BooleanClauseList
filter by scope and user
"""
if user is None:
user = g.current_user
user = current_user

if scope == 0:
query = query.where(sa.false())
return False
elif scope in (1, 2):
ds_list = Dataset.filter_by_scope(scope).with_only_columns(Dataset.id_dataset)
query = query.where(
sa.or_(
Station.observers.any(id_role=user.id_role),
Station.id_dataset.in_(
[ds.id_dataset for ds in db.session.execute(ds_list).all()]
),
)

return sa.or_(
Station.observers.any(id_role=user.id_role),
Station.id_dataset.in_([ds.id_dataset for ds in db.session.execute(ds_list).all()]),
)
return query
return True


@serializable
Expand Down

0 comments on commit c7bb770

Please sign in to comment.