diff --git a/check_systemd.py b/check_systemd.py index 8437df0..df73f34 100755 --- a/check_systemd.py +++ b/check_systemd.py @@ -365,14 +365,14 @@ def __init__(self, unit_names: Sequence[str] = ()) -> None: self.__unit_names = set(unit_names) @staticmethod - def __match(unit_name: str, regexes: str | Sequence[str]) -> bool: + def match(unit_name: str, regexes: str | Sequence[str]) -> bool: """ Match multiple regular expressions against a unit name. :param unit_name: The unit name to be matched. :param regexes: A single regular expression (``include='.*service'``) or a - list of regular expressions (``include=('.*service', '.*mount')``). + list of regular expressions (``include=('.*service', '.*mount')``). :return: True if one regular expression matches""" if isinstance(regexes, str): @@ -417,7 +417,7 @@ def filter( regular expression (``exclude='.*service'``) or a list of regular expressions (``exclude=('.*service', '.*mount')``). """ - match = Source.NameFilter.__match + match = Source.NameFilter.match for name in self.__unit_names: output: Optional[str] = name if include and not match(name, include): @@ -503,6 +503,9 @@ def count_by_states( return counter + def get_unit(self, name: str) -> Source.Unit: + ... + def get_all_units(self, user: bool = False) -> Generator[Source.Unit, Any, None]: ... @@ -727,9 +730,7 @@ def __convert_to_sec(fmt_timespan: str) -> float: result += float(value) * seconds[unit] return round(float(result), 3) - def get_unit(self, name: str) -> Unit: - properties = _collect_properties(name) - + def get_unit(self, name: str) -> Source.Unit: stdout = CliSource.__execute_cli( [ "systemctl", @@ -756,7 +757,7 @@ def get_unit(self, name: str) -> Unit: logger.debug("Properties of unit '%s': %s", name, properties) - return Unit( + return Source.Unit( name=properties["Id"], active_state=_check_active_state(properties["ActiveState"]), sub_state=_check_sub_state(properties["SubState"]), @@ -1321,66 +1322,6 @@ class CheckSystemdRegexpError(CheckSystemdError): pass -def match_multiple(unit_name: str, regexes: str | Sequence[str]) -> bool: - """ - Match multiple regular expressions against a unit name. - - :param unit_name: The unit name to be matched. - - :param regexes: A single regular expression (``include='.*service'``) or a - list of regular expressions (``include=('.*service', '.*mount')``). - - :return: True if one regular expression matches""" - if isinstance(regexes, str): - regexes = [regexes] - for regex in regexes: - try: - if re.match(regex, unit_name): - return True - except Exception: - raise CheckSystemdRegexpError( - "Invalid regular expression: '{}'".format(regex) - ) - return False - - -class Unit: - """This class bundles all state related informations of a systemd unit in a - object. This class is inherited by the class ``DbusUnit`` and the - attributes are overwritten by properties. - """ - - name: str - """The name of the system unit, for example ``nginx.service``. In the - command line table of the command ``systemctl list-units`` is the - column containing unit names titled with “UNIT”. - """ - - active_state: ActiveState - - sub_state: SubState - - load_state: LoadState - - def __init__(self, **kwargs) -> None: - self.name = kwargs.get("name") - self.active_state = kwargs.get("active_state") - self.sub_state = kwargs.get("sub_state") - self.load_state = kwargs.get("load_state") - - def convert_to_exitcode(self) -> ServiceState: - """Convert the different systemd states into a Nagios compatible - exit code. - - :return: A Nagios compatible exit code: 0, 1, 2, 3 - """ - if opts.expected_state and opts.expected_state.lower() != self.active_state: - return Critical - if self.load_state == "error" or self.active_state == "failed": - return Critical - return Ok - - class SystemdUnitTypesList(MutableSequence[str]): unit_types: list[str] @@ -1432,248 +1373,20 @@ def convert_to_regexp(self): return r".*\.({})$".format("|".join(self.unit_types)) -class UnitNameFilter: - """This class stores all system unit names (e. g. ``nginx.service`` or - ``fstrim.timer``) and provides a interface to filter the names by regular - expressions.""" - - __unit_names: set[str] - - def __init__(self, unit_names: Sequence[str] = ()) -> None: - self.__unit_names = set(unit_names) - - def add(self, unit_name: str) -> None: - """Add one unit name. - - :param unit_name: The name of the unit, for example ``apt.timer``. - """ - self.__unit_names.add(unit_name) - - def get(self) -> set[str]: - """Get all stored unit names.""" - return self.__unit_names - - def list( - self, - include: str | Sequence[str] | None = None, - exclude: str | Sequence[str] | None = None, - ) -> Generator[str, None, None]: - """ - List all unit names or apply filters (``include`` or ``exclude``) to - the list of unit names. - - :param include: If the unit name matches the provided regular - expression, it is included in the list of unit names. A single - regular expression (``include='.*service'``) or a list of regular - expressions (``include=('.*service', '.*mount')``). - - :param exclude: If the unit name matches the provided regular - expression, it is excluded from the list of unit names. A single - regular expression (``exclude='.*service'``) or a list of regular - expressions (``exclude=('.*service', '.*mount')``). - """ - for name in self.__unit_names: - output: Optional[str] = name - if include and not match_multiple(name, include): - output = None - - if output and exclude and match_multiple(name, exclude): - output = None - - if output: - yield output - - -class UnitCache: - """This class is a container class for systemd units.""" - - __units: dict[str, Unit] - - __name_filter: UnitNameFilter - - def __init__(self) -> None: - self.__units = {} - self.__name_filter = UnitNameFilter() - - def __add_unit(self, unit: Unit) -> None: - self.__units[unit.name] = unit - self.__name_filter.add(unit.name) - - def add_unit( - self, - unit: Optional[Unit] = None, - name: Optional[str] = None, - active_state: Optional[ActiveState] = None, - sub_state: Optional[SubState] = None, - load_state: Optional[LoadState] = None, - ) -> Unit: - if not unit: - unit = Unit() - if name: - unit.name = name - if active_state: - unit.active_state = active_state - if sub_state: - unit.sub_state = sub_state - if load_state: - unit.load_state = load_state - self.__add_unit(unit) - return unit - - def get(self, name: Optional[str] = None) -> Unit | None: - if name: - return self.__units[name] - return None - - def list( - self, - include: str | Sequence[str] | None = None, - exclude: str | Sequence[str] | None = None, - ) -> Generator[Unit, None, None]: - """ - List all units or apply filters (``include`` or ``exclude``) to - the list of unit. - - :param include: If the unit name matches the provided regular - expression, it is included in the list of unit names. A single - regular expression (``include='.*service'``) or a list of regular - expressions (``include=('.*service', '.*mount')``). - - :param exclude: If the unit name matches the provided regular - expression, it is excluded from the list of unit names. A single - regular expression (``exclude='.*service'``) or a list of regular - expressions (``exclude=('.*service', '.*mount')``). - """ - for name in self.__name_filter.list(include=include, exclude=exclude): - yield self.__units[name] - - @property - def count(self) -> int: - return len(self.__units) - - def count_by_states( - self, - states: Sequence[str], - include: str | Sequence[str] | None = None, - exclude: str | Sequence[str] | None = None, - ) -> dict[str, int]: - states_normalized: list[dict[str, str]] = [] - counter: dict[str, int] = {} - for state_spec in states: - # state_proerty:state_value - # for example: active_state:failed - state_property = state_spec.split(":")[0] - state_value = state_spec.split(":")[1] - state: dict[str, str] = { - "property": state_property, - "value": state_value, - "spec": state_spec, - } - states_normalized.append(state) - counter[state_spec] = 0 - - for unit in self.list(include=include, exclude=exclude): - for state in states_normalized: - if getattr(unit, state["property"]) == state["value"]: - counter[state["spec"]] += 1 - - return counter - - -def _collect_properties(unit_name: str) -> dict[str, str]: - stdout = execute_cli( - [ - "systemctl", - "show", - "--property", - "Id", - "--property", - "ActiveState", - "--property", - "SubState", - "--property", - "LoadState", - unit_name, - ] - ) - if stdout is None: - raise CheckSystemdError(f"The unit '{unit_name}' couldn't be found.") - rows = stdout.splitlines() - - properties: dict[str, str] = {} - for row in rows: - index_equal_sign = row.index("=") - properties[row[:index_equal_sign]] = row[index_equal_sign + 1 :] - - logger.debug("Properties of unit '%s': %s", unit_name, properties) - - return properties - - -class CliUnitCache(UnitCache): - def __init__(self, with_user_units: bool = False) -> None: - super().__init__() - command = ["systemctl", "list-units", "--all"] - if with_user_units: - command += ["--user"] - stdout = execute_cli(command) - if stdout: - table_parser = TableParser(stdout) - table_parser.check_header(("unit", "active", "sub", "load")) - for row in table_parser.list_rows(): - self.add_unit( - name=row["unit"], - active_state=_check_active_state(row["active"]), - sub_state=_check_sub_state(row["sub"]), - load_state=_check_load_state(row["load"]), - ) - - if opts.include_unit is not None: - properties = _collect_properties(opts.include_unit) - self.add_unit( - name=properties["Id"], - active_state=_check_active_state(properties["ActiveState"]), - sub_state=_check_sub_state(properties["SubState"]), - load_state=_check_load_state(properties["LoadState"]), - ) - - -class DbusUnitCache(UnitCache): - def __init__(self) -> None: - super().__init__() - if dbus_manager is None: - raise CheckSystemdError( - "The package PyGObject (gi) is not available. " - "The D-Bus backend can't be used." - ) - all_units = dbus_manager.manager.ListUnits() - for name, _, load_state, active_state, sub_state, _, _, _, _, _ in all_units: - logger.debug( - "Dbus ListUnits(): name: %s load_state: %s active_state: %s sub_state: %s", - name, - load_state, - active_state, - sub_state, - ) - self.add_unit( - name=name, - active_state=active_state, - sub_state=sub_state, - load_state=load_state, - ) - - -unit_cache: UnitCache = None -"""An instance of :class:`DbusUnitCache` or :class:`CliUnitCache`""" - +Units = Source.Cache[Source.Unit] # scope: units ################################################################ class UnitsResource(Resource): + units: Units + + def __init__(self, units: Units) -> None: + self.units = units + def probe(self) -> Generator[Metric, None, None]: counter = 0 - for unit in unit_cache.list(include=opts.include, exclude=opts.exclude): + for unit in self.units.filter(include=opts.include, exclude=opts.exclude): yield Metric(name=unit.name, value=unit, context="units") counter += 1 @@ -1698,7 +1411,7 @@ def evaluate(self, metric: Metric, resource: Resource) -> Result: :returns: :class:`~.result.Result` """ - if isinstance(metric.value, Unit): + if isinstance(metric.value, Source.Unit): unit = metric.value exitcode = unit.convert_to_exitcode() if exitcode != 0: @@ -1744,7 +1457,7 @@ def __init__(self, source: Source) -> None: def probe(self) -> Generator[Metric, None, None]: for timer in self.source.get_all_timers(): - if match_multiple(timer.name, opts.exclude): + if Source.NameFilter.match(timer.name, opts.exclude): continue state = Ok @@ -1803,7 +1516,7 @@ def probe(self) -> Generator[Metric, None, None]: class StartupTimeContext(ScalarContext): def __init__(self) -> None: - super(StartupTimeContext, self).__init__("startup_time") + super().__init__("startup_time") if opts.scope_startup_time: self.warning = Range(opts.warning) self.critical = Range(opts.critical) @@ -1826,8 +1539,13 @@ def performance(self, metric: Metric, resource: Resource): class PerformanceDataResource(Resource): + units: Units + + def __init__(self, units: Units) -> None: + self.units = units + def probe(self) -> Generator[Metric, None, None]: - for state_spec, count in unit_cache.count_by_states( + for state_spec, count in self.units.count_by_states( ( "active_state:failed", "active_state:active", @@ -1843,7 +1561,7 @@ def probe(self) -> Generator[Metric, None, None]: ) yield Metric( - name="count_units", value=unit_cache.count, context="performance_data" + name="count_units", value=self.units.count, context="performance_data" ) @@ -1851,7 +1569,7 @@ class PerformanceDataContext(Context): def __init__(self) -> None: super().__init__("performance_data") - def performance(self, metric: Metric, resource: Resource): + def performance(self, metric: Metric, resource: Resource) -> Performance: """Derives performance data from a given metric. :param metric: associated metric from which performance data are @@ -2270,14 +1988,14 @@ def main() -> None: else: source = CliSource() - global unit_cache - if opts.data_source == "dbus": - unit_cache = DbusUnitCache() - else: - unit_cache = CliUnitCache(with_user_units=opts.with_user_units) + units = source.get_all_units_cached(user=opts.with_user_units) + + if opts.include_unit is not None: + unit = source.get_unit(opts.include_unit) + units.add(unit.name, unit) tasks: list[Union[Resource, Context, Summary]] = [ - UnitsResource(), + UnitsResource(units), UnitsContext(), SystemdSummary(), StartupTimeResource(source), @@ -2292,7 +2010,7 @@ def main() -> None: if opts.performance_data: tasks += [ - PerformanceDataResource(), + PerformanceDataResource(units), PerformanceDataContext(), ] diff --git a/tests/test_data_source_cli.py b/tests/test_data_source_cli.py index 620e30d..ec852b2 100644 --- a/tests/test_data_source_cli.py +++ b/tests/test_data_source_cli.py @@ -3,15 +3,15 @@ from unittest.mock import patch -from check_systemd import _collect_properties # type: ignore +from check_systemd import CliSource from tests.helper import get_mocks_for_popen def test_collect_properties() -> None: with patch("check_systemd.subprocess.Popen") as Popen: Popen.return_value = get_mocks_for_popen("systemctl-show-nginx_active.txt")[0] - properties = _collect_properties("unit") - assert properties["Id"] == "nginx.service" - assert properties["ActiveState"] == "active" - assert properties["SubState"] == "running" - assert properties["LoadState"] == "loaded" + unit = CliSource().get_unit("nginx.service") + assert unit.name == "nginx.service" + assert unit.active_state == "active" + assert unit.sub_state == "running" + assert unit.load_state == "loaded" diff --git a/tests/test_unit.py b/tests/test_unit.py index 77077a0..55cc1ab 100644 --- a/tests/test_unit.py +++ b/tests/test_unit.py @@ -2,14 +2,8 @@ from __future__ import annotations -from unittest.mock import patch - -import pytest -from nagiosplugin import CheckError - import check_systemd -from check_systemd import SystemdUnitTypesList, execute_cli -from tests.helper import MPopen +from check_systemd import SystemdUnitTypesList class TestMethodConvertToSec: @@ -49,19 +43,3 @@ def test_initialization(self) -> None: def test_convert_to_regexp(self) -> None: unit_types = SystemdUnitTypesList("service", "timer") assert ".*\\.(service|timer)$" == unit_types.convert_to_regexp() - - -class TestFunctionExecuteCli: - def test_execute_cli_stdout(self) -> None: - with patch("check_systemd.subprocess.Popen") as Popen: - Popen.return_value = MPopen(stdout="ok") - stdout = execute_cli(["ls"]) - assert "ok" == stdout - - def test_execute_cli_stderr(self) -> None: - with patch("check_systemd.subprocess.Popen") as Popen: - Popen.side_effect = (MPopen(stdout="ok"), MPopen(stderr="Not ok")) - stdout = execute_cli(["ls"]) - assert "ok" == stdout - with pytest.raises(CheckError): - execute_cli(["ls"])