Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent workspace admins to modify the system user on a workspace #5664

Merged
merged 2 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion temba/api/v2/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -5300,7 +5300,7 @@ def test_users(self):
},
],
# one query per user for their settings
num_queries=NUM_BASE_SESSION_QUERIES + 3,
num_queries=NUM_BASE_SESSION_QUERIES + 2,
)

# filter by roles
Expand Down
2 changes: 1 addition & 1 deletion temba/api/v2/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -3543,7 +3543,7 @@ def derive_queryset(self):
else:
roles = None

return org.get_users(roles=roles).prefetch_related("settings")
return org.get_users(roles=roles)

def get_serializer_context(self):
context = super().get_serializer_context()
Expand Down
2 changes: 1 addition & 1 deletion temba/orgs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1043,7 +1043,7 @@ def get_users(self, *, roles: list = None, with_perm: str = None):
"""
Gets users in this org, filtered by role or permission.
"""
qs = self.users.filter(is_active=True)
qs = self.users.filter(is_active=True).select_related("settings")

if with_perm:
app_label, codename = with_perm.split(".")
Expand Down
52 changes: 50 additions & 2 deletions temba/orgs/views/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1728,6 +1728,13 @@ class UserCRUDLTest(TembaTest, CRUDLTestMixin):
def test_list(self):
list_url = reverse("orgs.user_list")

system_user = self.create_user("[email protected]")
system_user.settings.is_system = True
system_user.settings.save(update_fields=("is_system",))

# add system user to workspace
self.org.add_user(system_user, OrgRole.ADMINISTRATOR)

# nobody can access if users feature not enabled
response = self.requestView(list_url, self.admin)
self.assertRedirect(response, reverse("orgs.org_workspace"))
Expand All @@ -1748,12 +1755,21 @@ def test_list(self):
response = self.assertListFetch(
list_url, [self.admin], context_objects=[self.admin, self.agent, self.editor, self.user]
)
self.assertEqual(response.context["admin_count"], 1)
self.assertContains(response, "(All Topics)")

# can search by name or email
self.assertListFetch(list_url + "?search=andy", [self.admin], context_objects=[self.admin])
self.assertListFetch(list_url + "[email protected]", [self.admin], context_objects=[self.editor])

response = self.requestView(list_url, self.customer_support, choose_org=self.org)
self.assertEqual(
set(list(response.context["object_list"])),
{self.admin, self.agent, self.editor, self.user, system_user},
)
self.assertContains(response, "(All Topics)")
self.assertEqual(response.context["admin_count"], 2)

def test_team(self):
team_url = reverse("orgs.user_team", args=[self.org.default_ticket_team.id])

Expand All @@ -1775,6 +1791,10 @@ def test_team(self):
self.assertContentMenu(team_url, self.admin, ["Edit", "Delete"])

def test_update(self):
system_user = self.create_user("[email protected]")
system_user.settings.is_system = True
system_user.settings.save(update_fields=("is_system",))

update_url = reverse("orgs.user_update", args=[self.agent.id])

# nobody can access if users feature not enabled
Expand Down Expand Up @@ -1825,16 +1845,34 @@ def test_update(self):
self.assertEqual({self.user, self.editor}, set(self.org.get_users(roles=[OrgRole.EDITOR])))
self.assertEqual({self.admin}, set(self.org.get_users(roles=[OrgRole.ADMINISTRATOR])))

# even if we add system user to workspace
self.org.add_user(system_user, OrgRole.ADMINISTRATOR)
response = self.assertUpdateSubmit(update_url, self.admin, {"role": "E"}, object_unchanged=self.admin)
self.assertRedirect(response, reverse("orgs.user_list"))
self.assertEqual({self.user, self.editor}, set(self.org.get_users(roles=[OrgRole.EDITOR])))
self.assertEqual({self.admin, system_user}, set(self.org.get_users(roles=[OrgRole.ADMINISTRATOR])))

# add another admin to workspace and try again
self.org.add_user(self.admin2, OrgRole.ADMINISTRATOR)

response = self.assertUpdateSubmit(update_url, self.admin, {"role": "E"}, object_unchanged=self.admin)
self.assertRedirect(response, reverse("orgs.org_start")) # no longer have access to user list page

self.assertEqual({self.user, self.editor, self.admin}, set(self.org.get_users(roles=[OrgRole.EDITOR])))
self.assertEqual({self.admin2}, set(self.org.get_users(roles=[OrgRole.ADMINISTRATOR])))
self.assertEqual({self.admin2, system_user}, set(self.org.get_users(roles=[OrgRole.ADMINISTRATOR])))

# cannot update system user on a workspace
update_url = reverse("orgs.user_update", args=[system_user.id])
response = self.requestView(update_url, self.admin2)
self.assertRedirect(response, reverse("orgs.org_choose"))
self.assertEqual({self.user, self.editor, self.admin}, set(self.org.get_users(roles=[OrgRole.EDITOR])))
self.assertEqual({self.admin2, system_user}, set(self.org.get_users(roles=[OrgRole.ADMINISTRATOR])))

def test_delete(self):
system_user = self.create_user("[email protected]")
system_user.settings.is_system = True
system_user.settings.save(update_fields=("is_system",))

delete_url = reverse("orgs.user_delete", args=[self.agent.id])

# nobody can access if users feature not enabled
Expand Down Expand Up @@ -1868,6 +1906,16 @@ def test_delete(self):
self.assertRedirect(response, reverse("orgs.user_list"))
self.assertEqual({self.user, self.editor, self.admin}, set(self.org.get_users()))

# cannot still even when the other admin is a system user
self.org.add_user(system_user, OrgRole.ADMINISTRATOR)
response = self.assertDeleteSubmit(delete_url, self.admin, object_unchanged=self.admin)
self.assertRedirect(response, reverse("orgs.user_list"))
self.assertEqual({self.user, self.editor, self.admin, system_user}, set(self.org.get_users()))

# cannot remove system user too
self.assertRequestDisallowed(reverse("orgs.user_delete", args=[system_user.id]), [self.admin])
self.assertEqual({self.user, self.editor, self.admin, system_user}, set(self.org.get_users()))

# add another admin to workspace and try again
self.org.add_user(self.admin2, OrgRole.ADMINISTRATOR)

Expand All @@ -1876,7 +1924,7 @@ def test_delete(self):
# this time we could remove ourselves
response = self.assertDeleteSubmit(delete_url, self.admin, object_unchanged=self.admin)
self.assertRedirect(response, reverse("orgs.org_choose"))
self.assertEqual({self.user, self.editor, self.admin2}, set(self.org.get_users()))
self.assertEqual({self.user, self.editor, self.admin2, system_user}, set(self.org.get_users()))

def test_account(self):
self.login(self.agent)
Expand Down
34 changes: 27 additions & 7 deletions temba/orgs/views/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,14 +408,19 @@ class List(RequireFeatureMixin, SpaMixin, BaseListView):
search_fields = ("email__icontains", "first_name__icontains", "last_name__icontains")

def derive_queryset(self, **kwargs):
return (
qs = (
super(BaseListView, self)
.derive_queryset(**kwargs)
.filter(id__in=self.request.org.get_users().values_list("id", flat=True))
.order_by(Lower("email"))
.select_related("settings")
)

if not self.request.user.is_staff:
qs = qs.exclude(settings__is_system=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to update admin_count below on line 435


return qs

def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)

Expand All @@ -427,7 +432,12 @@ def get_context_data(self, **kwargs):

context["has_viewers"] = self.request.org.get_users(roles=[OrgRole.VIEWER]).exists()
context["has_teams"] = Org.FEATURE_TEAMS in self.request.org.features
context["admin_count"] = self.request.org.get_users(roles=[OrgRole.ADMINISTRATOR]).count()

admins = self.request.org.get_users(roles=[OrgRole.ADMINISTRATOR])
if not self.request.user.is_staff:
admins = admins.exclude(settings__is_system=True)
context["admin_count"] = admins.count()

return context

class Team(RequireFeatureMixin, SpaMixin, ContextMenuMixin, BaseListView):
Expand Down Expand Up @@ -504,7 +514,7 @@ def get_object_org(self):
return self.request.org

def get_queryset(self):
return self.request.org.get_users()
return self.request.org.get_users().exclude(settings__is_system=True)

def derive_exclude(self):
return [] if Org.FEATURE_TEAMS in self.request.org.features else ["team"]
Expand All @@ -528,7 +538,12 @@ def save(self, obj):
team = (team or self.request.org.default_ticket_team) if role == OrgRole.AGENT else None

# don't update if user is the last administrator and role is being changed to something else
has_other_admins = self.request.org.get_users(roles=[OrgRole.ADMINISTRATOR]).exclude(id=obj.id).exists()
has_other_admins = (
self.request.org.get_users(roles=[OrgRole.ADMINISTRATOR])
.exclude(id=obj.id)
.exclude(settings__is_system=True)
.exists()
)
if role != OrgRole.ADMINISTRATOR and not has_other_admins:
return obj

Expand All @@ -550,7 +565,7 @@ def get_object_org(self):
return self.request.org

def get_queryset(self):
return self.request.org.get_users()
return self.request.org.get_users().exclude(settings__is_system=True)

def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
Expand All @@ -560,8 +575,13 @@ def get_context_data(self, **kwargs):
def post(self, request, *args, **kwargs):
user = self.get_object()

# only actually remove user if they're not the last administator
if self.request.org.get_users(roles=[OrgRole.ADMINISTRATOR]).exclude(id=user.id).exists():
# only actually remove user if they're not the last administator or a system user only for staff
if (
self.request.org.get_users(roles=[OrgRole.ADMINISTRATOR])
.exclude(id=user.id)
.exclude(settings__is_system=True)
.exists()
):
self.request.org.remove_user(user)

return HttpResponseRedirect(self.get_redirect_url())
Expand Down
Loading