diff --git a/src/inet_nm/cli_commission.py b/src/inet_nm/cli_commission.py index d0d8118..50f6f8b 100644 --- a/src/inet_nm/cli_commission.py +++ b/src/inet_nm/cli_commission.py @@ -3,6 +3,8 @@ import inet_nm.commissioner as cmr import inet_nm.config as cfg +from inet_nm.data_types import NmNode +from inet_nm.power_control import DEFAULT_MAX_ALLOWED_NODES, PowerControl from inet_nm.usb_ctrl import TtyNotPresent, get_devices_from_tty @@ -36,33 +38,64 @@ def _main(): bi_cfg.check_file(writable=False) saved_nodes = nodes_cfg.load() - nm_nodes = ( - get_devices_from_tty() if args.no_cache else get_devices_from_tty(saved_nodes) - ) - print(f"Found {len(saved_nodes)} saved nodes in {args.config}") - if args.mock_dev: - nm_nodes.append(cmr.mock_device()) + nm_nodes = [] + with PowerControl( + locations=cfg.LocationConfig(args.config).load(), + nodes=saved_nodes, + max_powered_devices=DEFAULT_MAX_ALLOWED_NODES, + ) as pc: + while not pc.power_on_complete: + pc.power_on_chunk() + nm_nodes.extend( + get_devices_from_tty() + if args.no_cache + else get_devices_from_tty(saved_nodes) + ) + pc.power_off_unused() - try: - selected_node = cmr.select_available_node(nm_nodes) - except ValueError: - print("No available nodes found") - sys.exit(1) - except TtyNotPresent as exc: + # filter out duplicate nodes + nm_node: NmNode + found_ids = set() + filtered_nodes = [] + for nm_node in nm_nodes: + if nm_node.uid in found_ids: + continue + found_ids.add(nm_node.uid) + filtered_nodes.append(nm_node) + nm_nodes = filtered_nodes + + print(f"Found {len(saved_nodes)} saved nodes in {args.config}") if args.mock_dev: - selected_node = nm_nodes[-1] + nm_nodes.append(cmr.mock_device()) + + try: + selected_node = cmr.select_available_node(nm_nodes) + except ValueError: + print("No available nodes found") + sys.exit(1) + except TtyNotPresent as exc: + if args.mock_dev: + selected_node = nm_nodes[-1] + else: + raise exc + if args.ignore: + selected_node.ignore = True else: - raise exc - if args.ignore: - selected_node.ignore = True - else: - binfo = bi_cfg.load() - selected_node.board = args.board or cmr.select_board( - list(binfo.keys()), selected_node - ) - if selected_node.board in binfo: - selected_node.features_provided = binfo[selected_node.board] - cmr.check_and_set_uninitialized_sn(selected_node) + binfo = bi_cfg.load() + selected_node.board = args.board or cmr.select_board( + list(binfo.keys()), selected_node + ) + if selected_node.board in binfo: + selected_node.features_provided = binfo[selected_node.board] + if cmr.is_uninitialized_sn(selected_node): + uid = selected_node.uid + # Normally we would not need to power this as it would only get powered + # down if it was already in the nodes list... If the no-cache is used + # then it may be powered down and we need to power it up. + # Slow but safe. + pc.power_on_uid(uid) + cmr.set_uninitialized_sn(selected_node) + pc.power_off_uid(uid) nodes = cmr.add_node_to_nodes(saved_nodes, selected_node) nodes_cfg.save(nodes) print(f"Updated {nodes_cfg.file_path}") diff --git a/src/inet_nm/cli_update_cache.py b/src/inet_nm/cli_update_cache.py index 0114d2d..9f70621 100644 --- a/src/inet_nm/cli_update_cache.py +++ b/src/inet_nm/cli_update_cache.py @@ -3,6 +3,7 @@ import inet_nm.config as cfg import inet_nm.location as loc from inet_nm._helpers import nm_print +from inet_nm.power_control import DEFAULT_MAX_ALLOWED_NODES, PowerControl def _main(): @@ -14,9 +15,17 @@ def _main(): nodes = cfg.NodesConfig(config_dir=args.config).load() loc_cache = cfg.LocationCache(config_dir=args.config) loc_cache.check_file(writable=True) - - cache = loc.get_location_cache(nodes, loc_mapping) - + caches = [] + with PowerControl( + locations=loc_mapping, + nodes=nodes, + max_powered_devices=DEFAULT_MAX_ALLOWED_NODES, + ) as pc: + while not pc.power_on_complete: + pc.power_on_chunk() + caches.append(loc.get_location_cache(nodes, loc_mapping)) + pc.power_off_unused() + cache = loc.merge_location_cache_chunks(caches) loc_cache.save(cache) nm_print(f"Updated {loc_cache.file_path}") diff --git a/src/inet_nm/commissioner.py b/src/inet_nm/commissioner.py index b5dd9c5..52c4fe6 100644 --- a/src/inet_nm/commissioner.py +++ b/src/inet_nm/commissioner.py @@ -14,7 +14,7 @@ from inet_nm.usb_ctrl import get_ttys_from_nm_node -def check_and_set_uninitialized_sn(node: NmNode, sns: List = None): +def is_uninitialized_sn(node: NmNode, sns: List = None): """ Check if a given NmNode has an uninitialized serial number and prompt the user to set it. @@ -22,12 +22,28 @@ def check_and_set_uninitialized_sn(node: NmNode, sns: List = None): Args: node: An NmNode object. sns: List of serial numbers to check against. - """ + + """ + # We cannot do anything with the sns if we don't have the cp210x module if cp210x is None: - return + return False sns = sns or ["0001"] + if node.serial not in sns: + return False + return True + + +def set_uninitialized_sn(node: NmNode): + """ + Set the serial number of a given NmNode. + + Args: + node: An NmNode object. + """ + + if cp210x is None: return pid_vid_sn = { diff --git a/src/inet_nm/location.py b/src/inet_nm/location.py index da61eb8..7bf0fc6 100644 --- a/src/inet_nm/location.py +++ b/src/inet_nm/location.py @@ -4,6 +4,39 @@ from inet_nm.data_types import NmNode +def merge_location_cache_chunks(caches: List[List[Dict]]): + """ + Merge location cache chunks into a single cache. + + Due to only being able to power on a chunk at a time we need to sort + through each of the location caches and look through all id_paths that + have a missing state and see if they are available in another chunk. + If they are then probably they were just powered off. + + Args: + caches: List of location cache chunks. + + Returns: + The merged location cache. + """ + # TODO: Also check if all id_paths that are attached have the same node_uid + tmp_cache = {} + for chunk in caches: + for entry in chunk: + # If entry is empty, skip it + if not entry: + continue + if entry["state"] != "missing": + tmp_cache[entry["id_path"]] = entry + continue + if entry["state"] == "missing" and entry["id_path"] not in tmp_cache: + tmp_cache[entry["id_path"]] = entry + # Convert tmp_cache to list + cache = list(tmp_cache.values()) + cache.sort(key=lambda x: x["id_path"]) + return cache + + def get_location_cache(nodes: List[NmNode], id_paths: Dict): """ Get the location cache for a list of NmNode objects. diff --git a/src/inet_nm/power_control.py b/src/inet_nm/power_control.py new file mode 100644 index 0000000..d8233e8 --- /dev/null +++ b/src/inet_nm/power_control.py @@ -0,0 +1,190 @@ +import logging +import subprocess +from time import sleep +from typing import List + +import inet_nm.locking as lck +import inet_nm.usb_ctrl as ucl +from inet_nm.data_types import NmNode + +DEFAULT_MAX_ALLOWED_NODES = 14 + + +class PowerControl: + DEFAULT_POWER_ON_WAIT = 10 + DEFAULT_POWER_OFF_WAIT = 1 + MAX_ALLOWED_NODES = 256 + + def __init__(self, locations, nodes: List[NmNode], max_powered_devices=None): + self.logging = logging.getLogger(__name__) + self.id_path_to_node_uid = {} + self.powered_locations = {} + self.powered_id_paths = set() + self.node_uids = {node.uid for node in nodes if not node.ignore} + for id, loc in locations.items(): + if loc["power_control"]: + self.powered_locations[id] = loc + self.powered_id_paths.add(id) + self.max_powered_devices = max_powered_devices + self._running = False + self._power_on_procs = [] + self._power_off_procs = [] + self._powered_on = set() + + def _available(self, powered_devs) -> int: + if self.max_powered_devices is not None: + return self.max_powered_devices - len(powered_devs) + return self.MAX_ALLOWED_NODES + + def power_on_uid(self, uid: str): + if uid not in self.id_path_to_node_uid.values(): + raise ValueError( + f"Node with uid {uid} not found, " + "must have been collected during power iterations." + ) + for id_path, node_uid in self.id_path_to_node_uid.items(): + if node_uid == uid: + if id_path in self.powered_locations: + self._power_on(id_path) + + def power_off_uid(self, uid: str): + if uid not in self.id_path_to_node_uid.values(): + raise ValueError( + f"Node with uid {uid} not found, " + "must have been collected during power iterations." + ) + for id_path, node_uid in self.id_path_to_node_uid.items(): + if node_uid == uid: + if id_path in self.powered_locations: + self._power_off(id_path) + + def _power_off(self, id_path): + self.logging.debug("Powering off %s", id_path) + usb_info = self.powered_locations[id_path] + self._power_off_procs.append( + subprocess.Popen( + [ + "sudo", + "uhubctl", + "-l", + usb_info["hub"], + "-p", + usb_info["port"], + "-a", + "off", + ], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + ) + + def _power_on(self, id_path): + self.logging.debug("Powering on %s", id_path) + usb_info = self.powered_locations[id_path] + self._power_on_procs.append( + subprocess.Popen( + [ + "sudo", + "uhubctl", + "-l", + usb_info["hub"], + "-p", + usb_info["port"], + "-a", + "on", + ], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + ) + + def power_on_chunk(self): + self._running = True + powered_devs = ucl.get_connected_id_paths() + available = self._available(powered_devs) + + if available < 0: + raise ValueError( + f"More than {self.max_powered_devices} nodes are " + f"already powered on, {-available} over." + ) + self.logging.debug( + "%s nodes powered, %s of %s available", + len(powered_devs), + available, + self.max_powered_devices or self.MAX_ALLOWED_NODES, + ) + for id_path in self.powered_locations: + if id_path in powered_devs: + self._powered_on.add(id_path) + continue + if id_path in self._powered_on: + continue + self._power_on(id_path) + # it takes a while to actually show up as a tty device + # so we just manually add it + self._powered_on.add(id_path) + powered_devs.add(id_path) + if self._available(powered_devs) == 0: + break + self.wait_for_power_on() + + @property + def power_on_complete(self) -> bool: + if not self._running: + return False + # check if all powered_id_paths are powered on + if self._powered_on == self.powered_id_paths: + self._running = False + return True + return False + + def _map_id_path_to_node_uid(self): + powered_id_paths = ucl.get_connected_id_paths() + for id_path in powered_id_paths: + if id_path in self.id_path_to_node_uid: + continue + uid = ucl.get_uid_from_id_path(id_path) + self.id_path_to_node_uid[id_path] = uid + + def power_off_unused(self) -> None: + self.logging.debug("Powering off") + # check locked devices from lockfiles + locked_uids = lck.get_locked_uids() + unused_uids = self.node_uids - set(locked_uids) + + for id_path, usb_info in self.powered_locations.items(): + uid = ucl.get_uid_from_id_path(id_path) + if uid is None: + continue + if uid in unused_uids: + self._power_off(id_path) + self.wait_for_power_off() + + def wait_for_power_off(self): + for proc in self._power_off_procs: + proc.wait() + if self._power_off_procs: + sleep(self.DEFAULT_POWER_OFF_WAIT) + self._power_off_procs = [] + self.logging.debug("Finished powering off") + + def wait_for_power_on(self, wait_time=None): + for proc in self._power_on_procs: + proc.wait() + if self._power_on_procs: + sleep(wait_time or self.DEFAULT_POWER_ON_WAIT) + self._power_on_procs = [] + self._map_id_path_to_node_uid() + self.logging.debug("Finished powering on") + + def __enter__(self): + return self + + def __exit__(self, exc_type, value, traceback) -> None: + """Release the lock when exiting the context.""" + self.wait_for_power_on(wait_time=0) + self.power_off_unused() + self.wait_for_power_off() diff --git a/tests/test_location.py b/tests/test_location.py new file mode 100644 index 0000000..b5cdbc2 --- /dev/null +++ b/tests/test_location.py @@ -0,0 +1,53 @@ +import pytest + +import inet_nm.location as loc + + +@pytest.mark.parametrize( + "caches", + [ + [[{}]], + [[{"id_path": "1", "node_uid": "1", "state": "attached"}]], + [ + [ + {"id_path": "1", "node_uid": "1", "state": "attached"}, + ], + [{"id_path": "1", "node_uid": "1", "state": "missing"}], + ], + [ + [ + {"id_path": "1", "node_uid": "1", "state": "unassigned"}, + {"id_path": "2", "node_uid": "2", "state": "missing"}, + ], + [{"id_path": "2", "node_uid": "2", "state": "attached"}], + ], + ], +) +def test_merge_location_cache_chunks_no_missing(caches): + cache = loc.merge_location_cache_chunks(caches) + assert not any(entry["state"] == "missing" for entry in cache), cache + + +@pytest.mark.parametrize( + "caches", + [ + [[{"id_path": "1", "node_uid": "1", "state": "missing"}]], + [ + [ + {"id_path": "1", "node_uid": "1", "state": "attached"}, + {"id_path": "2", "node_uid": "2", "state": "missing"}, + ], + ], + [ + [ + {"id_path": "1", "node_uid": "1", "state": "missing"}, + ], + [ + {"id_path": "1", "node_uid": "1", "state": "missing"}, + ], + ], + ], +) +def test_merge_location_cache_chunks_missing(caches): + cache = loc.merge_location_cache_chunks(caches) + assert any(entry["state"] == "missing" for entry in cache), cache