From b620c2e51cb98268d0c01f8ff69c48d3d1e8afad Mon Sep 17 00:00:00 2001 From: Andrew Walker Date: Sat, 18 Jan 2025 13:35:06 -0600 Subject: [PATCH] Reduce situations in which hard-coded root privilege returned We should only return the wildcard allowlist when the root unix domain socket session is not created by an interactive shell session. This is achieved by storing the loginuid for the middleware client pid and checking as part of the check_permission call. This ensures that STIG restrictions are properly evaluated for users who use midclt, midcli, etc from shell. --- src/middlewared/middlewared/plugins/auth.py | 2 +- src/middlewared/middlewared/utils/auth.py | 20 +++++++++++++++ src/middlewared/middlewared/utils/origin.py | 27 +++++++++++++++++++++ tests/unit/test_stig.py | 17 +++++++++++++ 4 files changed, 65 insertions(+), 1 deletion(-) create mode 100644 tests/unit/test_stig.py diff --git a/src/middlewared/middlewared/plugins/auth.py b/src/middlewared/middlewared/plugins/auth.py index 3f2be766a5362..495df9390d96f 100644 --- a/src/middlewared/middlewared/plugins/auth.py +++ b/src/middlewared/middlewared/plugins/auth.py @@ -1084,7 +1084,7 @@ async def check_permission(middleware, app): return if origin.is_unix_family: - if origin.uid == 0: + if origin.uid == 0 and not origin.session_is_interactive: user = await middleware.call('auth.authenticate_root') else: try: diff --git a/src/middlewared/middlewared/utils/auth.py b/src/middlewared/middlewared/utils/auth.py index 9c0ee461beddd..639a583e375a5 100644 --- a/src/middlewared/middlewared/utils/auth.py +++ b/src/middlewared/middlewared/utils/auth.py @@ -6,6 +6,8 @@ LEGACY_API_KEY_USERNAME = 'LEGACY_API_KEY' MAX_OTP_ATTEMPTS = 3 +AUID_UNSET = 2 ** 32 -1 +AUID_FAULTED = 2 ** 32 -2 class AuthMech(enum.StrEnum): @@ -174,3 +176,21 @@ def authenticate(self, uid: int, plaintext: str) -> OTPWResponse: OTPW_MANAGER = OnetimePasswordManager() + + +def get_login_uid(pid: int, raise_error=False) -> int: + """ + Get the login uid of the specified PID. By design it is set by pam_loginuid + on session login. If value is unitialized then the value will be UINT32_MAX -1 + Which is set as constant AUID_UNSET here and in auditd rules. If an error + occurs during read then either the exception will be raised or the special + value AUID_FAULTED (UINT32_MAX -2) will be returned (which is the default). + """ + try: + with open(f'/proc/{pid}/loginuid', 'r') as f: + return int(f.read().strip()) + except Exception: + if not raise_error: + return AUID_FAULTED + + raise diff --git a/src/middlewared/middlewared/utils/origin.py b/src/middlewared/middlewared/utils/origin.py index dbe039318d0a2..78779b243a95a 100644 --- a/src/middlewared/middlewared/utils/origin.py +++ b/src/middlewared/middlewared/utils/origin.py @@ -4,6 +4,8 @@ from pyroute2 import DiagSocket +from .auth import get_login_uid, AUID_UNSET + __all__ = ('ConnectionOrigin',) HA_HEARTBEAT_IPS = ('169.254.10.1', '169.254.10.2') @@ -35,6 +37,11 @@ class ConnectionOrigin: gid: int | None = None """If `family` is of type AF_UNIX, this represents the group id associated to the unix datagram connection""" + loginuid: int | None = None + """If `family` is of type AF_UNIX, this represents + the login uid associated to the unix datagram connection. + The login uid can be used to determine whether the connection was + created by an interactive shell session""" @classmethod def create(cls, request): @@ -42,10 +49,12 @@ def create(cls, request): sock = request.transport.get_extra_info("socket") if sock.family == AF_UNIX: pid, uid, gid = unpack("3i", sock.getsockopt(SOL_SOCKET, SO_PEERCRED, calcsize("3i"))) + login_uid = get_login_uid(pid) return cls( family=sock.family, pid=pid, uid=uid, + loginuid=login_uid, gid=gid ) elif sock.family in (AF_INET, AF_INET6): @@ -81,6 +90,24 @@ def match(self, origin) -> bool: def repr(self) -> str: return f"pid:{self.pid}" if self.is_unix_family else self.rem_addr + @property + def session_is_interactive(self) -> bool: + """ This is used to indicate whether session is internal + for purposes of STIG checks. This property is only set for + AF_UNIX connections and indicates whether its an interactive """ + if self.loginuid is None: + # Not AF_UNIX connection. Always apply restrictions + # for interactive sessions. + return True + + # self.loginuid may be set to AUID_FAULTED if for some reason + # we encountered a major issue in retrieving the loginuid. + # Since this value is used to determine whether to allow + # enhanced privileges in STIG mode we treat AUID_FAULTED + # as being an interactive session with less privileges afforded + # it. + return self.loginuid != AUID_UNSET + @property def is_tcp_ip_family(self) -> bool: return self.family in (AF_INET, AF_INET6) diff --git a/tests/unit/test_stig.py b/tests/unit/test_stig.py new file mode 100644 index 0000000000000..fefd1672c7841 --- /dev/null +++ b/tests/unit/test_stig.py @@ -0,0 +1,17 @@ +import pytest +from truenas_api_client import Client, ClientException + +@pytest.fixture(scope='function') +def enable_stig(): + with Client() as c: + c.call('datastore.update', 'system.security', 1, {'enable_gpos_stig': True}) + try: + yield c + finally: + c.call('datastore.update', 'system.security', 1, {'enable_gpos_stig': False}) + +def test__stig_restrictions_af_unix(enable_stig): + # STIG RBAC should still be effective despite root session + with pytest.raises(ClientException, match='Not authorized'): + with Client() as c: + c.call('virt.global.update', {}, job=True)