Skip to content

Commit

Permalink
Add method get_all_units_cached() to class Source
Browse files Browse the repository at this point in the history
  • Loading branch information
Josef-Friedrich committed Mar 2, 2024
1 parent f44606f commit 29f02a9
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 141 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
test:
test: test_unmocked
poetry run tox

test_unmocked:
Expand Down
199 changes: 90 additions & 109 deletions check_systemd.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,73 @@ def _check_load_state(state: object) -> LoadState | None:
return state


class DataSource:
class Source:
class Unit:
"""This class bundles all state related informations of a systemd unit in a
object. This class is inherited by the class ``DbusUnit`` and the
attributes are overwritten by properties.
"""

name: str
"""The name of the system unit, for example ``nginx.service``. In the
command line table of the command ``systemctl list-units`` is the
column containing unit names titled with “UNIT”.
"""

active_state: ActiveState

sub_state: SubState

load_state: LoadState

def __check_active_state(self, state: object) -> ActiveState:
states: tuple[ActiveState] = get_args(ActiveState)
if state in states:
return state
raise ValueError(f"Invalid active state: {state}")

def __check_sub_state(self, state: object) -> SubState:
states: tuple[SubState] = get_args(SubState)
if state in states:
return state
raise ValueError(f"Invalid sub state: {state}")

def __check_load_state(self, state: object) -> LoadState:
states: tuple[LoadState] = get_args(LoadState)
if state in states:
return state
raise ValueError(f"Invalid load state: {state}")

def __init__(
self,
name: str,
active_state: Optional[object] = None,
sub_state: Optional[object] = None,
load_state: Optional[object] = None,
) -> None:
self.name = name
self.active_state = self.__check_active_state(active_state)
self.sub_state = self.__check_sub_state(sub_state)
self.load_state = self.__check_load_state(load_state)

def convert_to_exitcode(self) -> ServiceState:
"""Convert the different systemd states into a Nagios compatible
exit code.
:return: A Nagios compatible exit code: 0, 1, 2, 3
"""
if opts.expected_state and opts.expected_state.lower() != self.active_state:
return Critical
if self.load_state == "error" or self.active_state == "failed":
return Critical
return Ok

@dataclass
class Timer:
name: str
next: Optional[float]
passed: Optional[float]

class UnitNameFilter:
"""This class stores all system unit names (e. g. ``nginx.service`` or
``fstrim.timer``) and provides a interface to filter the names by regular
Expand Down Expand Up @@ -335,40 +401,19 @@ def filter(
class UnitCache:
"""This class is a container class for systemd units."""

__units: dict[str, Unit]
__units: dict[str, Source.Unit]

__name_filter: DataSource.UnitNameFilter
__name_filter: Source.UnitNameFilter

def __init__(self) -> None:
self.__units = {}
self.__name_filter = DataSource.UnitNameFilter()
self.__name_filter = Source.UnitNameFilter()

def __add_unit(self, unit: Unit) -> None:
def add(self, unit: Source.Unit) -> None:
self.__units[unit.name] = unit
self.__name_filter.add(unit.name)

def add_unit(
self,
unit: Optional[Unit] = None,
name: Optional[str] = None,
active_state: Optional[ActiveState] = None,
sub_state: Optional[SubState] = None,
load_state: Optional[LoadState] = None,
) -> Unit:
if not unit:
unit = Unit()
if name:
unit.name = name
if active_state:
unit.active_state = active_state
if sub_state:
unit.sub_state = sub_state
if load_state:
unit.load_state = load_state
self.__add_unit(unit)
return unit

def get(self, name: Optional[str] = None) -> Unit | None:
def get(self, name: Optional[str] = None) -> Source.Unit | None:
if name:
return self.__units[name]
return None
Expand All @@ -377,7 +422,7 @@ def filter(
self,
include: str | Sequence[str] | None = None,
exclude: str | Sequence[str] | None = None,
) -> Generator[Unit, None, None]:
) -> Generator[Source.Unit, None, None]:
"""
List all units or apply filters (``include`` or ``exclude``) to
the list of unit.
Expand Down Expand Up @@ -427,83 +472,23 @@ def count_by_states(

return counter

class Unit:
"""This class bundles all state related informations of a systemd unit in a
object. This class is inherited by the class ``DbusUnit`` and the
attributes are overwritten by properties.
"""

name: str
"""The name of the system unit, for example ``nginx.service``. In the
command line table of the command ``systemctl list-units`` is the
column containing unit names titled with “UNIT”.
"""

active_state: ActiveState

sub_state: SubState

load_state: LoadState

def __check_active_state(self, state: object) -> ActiveState:
states: tuple[ActiveState] = get_args(ActiveState)
if state in states:
return state
raise ValueError(f"Invalid active state: {state}")

def __check_sub_state(self, state: object) -> SubState:
states: tuple[SubState] = get_args(SubState)
if state in states:
return state
raise ValueError(f"Invalid sub state: {state}")

def __check_load_state(self, state: object) -> LoadState:
states: tuple[LoadState] = get_args(LoadState)
if state in states:
return state
raise ValueError(f"Invalid load state: {state}")

def __init__(
self,
name: str,
active_state: Optional[object],
sub_state: Optional[object],
load_state: Optional[object],
) -> None:
self.name = name
self.active_state = self.__check_active_state(active_state)
self.sub_state = self.__check_sub_state(sub_state)
self.load_state = self.__check_load_state(load_state)

def convert_to_exitcode(self) -> ServiceState:
"""Convert the different systemd states into a Nagios compatible
exit code.
:return: A Nagios compatible exit code: 0, 1, 2, 3
"""
if opts.expected_state and opts.expected_state.lower() != self.active_state:
return Critical
if self.load_state == "error" or self.active_state == "failed":
return Critical
return Ok

@dataclass
class Timer:
name: str
next: Optional[float]
passed: Optional[float]

def get_all_units(self, user: bool) -> Generator[DataSource.Unit, Any, None]:
def get_all_units(self, user: bool = False) -> Generator[Source.Unit, Any, None]:
...

def get_all_units_cached(self, user: bool = False) -> Source.UnitCache:
cache = Source.UnitCache()
for unit in self.get_all_units(user):
cache.add(unit)
return cache

def get_startup_time(self) -> float | None:
...

def get_all_timers(self) -> Generator[DataSource.Timer, Any, None]:
def get_all_timers(self) -> Generator[Source.Timer, Any, None]:
...


class CliSource(DataSource):
class CliSource(Source):
class Table:
"""This class reads the text tables that some systemd commands like
``systemctl list-units`` or ``systemctl list-timers`` produce."""
Expand Down Expand Up @@ -747,9 +732,7 @@ def get_unit(self, name: str) -> Unit:
load_state=_check_load_state(properties["LoadState"]),
)

def get_all_units(
self, user: bool = False
) -> Generator[DataSource.Unit, None, None]:
def get_all_units(self, user: bool = False) -> Generator[Source.Unit, None, None]:
command = ["systemctl", "list-units", "--all"]
if user:
command += ["--user"]
Expand Down Expand Up @@ -790,7 +773,7 @@ def get_startup_time(self) -> float | None:
if match:
return format_timespan_to_seconds(match.group(1))

def get_all_timers(self) -> Generator[DataSource.Timer, Any, None]:
def get_all_timers(self) -> Generator[Source.Timer, Any, None]:
stdout = CliSource.__execute_cli(["systemctl", "list-timers", "--all"])

# NEXT LEFT
Expand All @@ -816,7 +799,7 @@ def get_all_timers(self) -> Generator[DataSource.Timer, Any, None]:
if row["passed"] != "n/a":
passed = CliSource.__convert_to_sec(row["passed"])

yield DataSource.Timer(name=unit, next=next, passed=passed)
yield Source.Timer(name=unit, next=next, passed=passed)


class DbusSource(CliSource):
Expand Down Expand Up @@ -889,9 +872,7 @@ def __get_manager(self, user: bool = False) -> DbusSource.DbusApi:
),
)

def get_all_units(
self, user: bool = False
) -> Generator[DataSource.Unit, None, None]:
def get_all_units(self, user: bool = False) -> Generator[Source.Unit, None, None]:
for (
name,
_,
Expand Down Expand Up @@ -1674,7 +1655,7 @@ def probe(self) -> Generator[Metric, None, None]:


class UnitsContext(Context):
def __init__(self):
def __init__(self) -> None:
super(UnitsContext, self).__init__("units")

def evaluate(self, metric: Metric, resource: Resource) -> Result:
Expand Down Expand Up @@ -1793,9 +1774,9 @@ class StartupTimeResource(Resource):
`src/analyze/analyze-time-data.c <https://github.com/systemd/systemd/blob/1f901c24530fb9b111126381a6ea101af8040e65/src/analyze/analyze-time-data.c#L141-L197>`_
"""

__source: DataSource
__source: Source

def __init__(self, source: DataSource) -> None:
def __init__(self, source: Source) -> None:
super(StartupTimeResource, self).__init__()
self.__source = source

Expand Down Expand Up @@ -2272,7 +2253,7 @@ def main() -> None:
logger.set_level(opts.debug)
logger.show_levels()

source: DataSource
source: Source
if is_gi and opts.data_source == "dbus":
source = DbusSource()
else:
Expand Down
25 changes: 18 additions & 7 deletions tests/_test_unmocked.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
"""Run this test in an unmocked environment."""

from __future__ import annotations

import pytest

from check_systemd import CliSource, DbusSource
from check_systemd import CliSource, DbusSource, Source


@pytest.fixture
Expand All @@ -17,33 +18,43 @@ def dbus() -> DbusSource:


class TestGetAllUnits:
def test_cli(self, cli: CliSource) -> None:
def test_cli(self, cli: Source) -> None:
for unit in cli.get_all_units():
assert unit

def test_cli_user(self, cli: CliSource) -> None:
def test_cli_user(self, cli: Source) -> None:
for unit in cli.get_all_units(True):
assert unit

def test_dbus(self, dbus: DbusSource) -> None:
def test_dbus(self, dbus: Source) -> None:
for unit in dbus.get_all_units():
assert unit

def test_dbus_user(self, dbus: CliSource) -> None:
def test_dbus_user(self, dbus: Source) -> None:
for unit in dbus.get_all_units(True):
assert unit

def test_compare(self, cli: CliSource, dbus: DbusSource) -> None:
def test_compare(self, cli: Source, dbus: Source) -> None:
list_cli = list(cli.get_all_units())
list_dbus = list(dbus.get_all_units())
assert len(list_cli) == len(list_dbus)

def test_compare_user(self, cli: CliSource, dbus: DbusSource) -> None:
def test_compare_user(self, cli: Source, dbus: DbusSource) -> None:
list_cli = list(cli.get_all_units(True))
list_dbus = list(dbus.get_all_units(True))
assert len(list_cli) == len(list_dbus)


class TestGetAllUnitsCached:
def test_cli(self, cli: Source) -> None:
cache = cli.get_all_units_cached()
assert cache.count > 0

def test_cli_user(self, cli: Source) -> None:
cache = cli.get_all_units_cached(True)
assert cache.count > 0


class TestGetUnit:
def test_cli(self) -> None:
cli = CliSource()
Expand Down
Loading

0 comments on commit 29f02a9

Please sign in to comment.