Skip to content

Commit

Permalink
Merge pull request #13 from inetrg/pr/powerctrl
Browse files Browse the repository at this point in the history
Initial power control for hubs
  • Loading branch information
MrKevinWeiss authored Feb 22, 2024
2 parents 4bbcc22 + 6a9a4af commit ac348f1
Show file tree
Hide file tree
Showing 6 changed files with 364 additions and 30 deletions.
81 changes: 57 additions & 24 deletions src/inet_nm/cli_commission.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

import inet_nm.commissioner as cmr
import inet_nm.config as cfg
from inet_nm.data_types import NmNode
from inet_nm.power_control import DEFAULT_MAX_ALLOWED_NODES, PowerControl
from inet_nm.usb_ctrl import TtyNotPresent, get_devices_from_tty


Expand Down Expand Up @@ -36,33 +38,64 @@ def _main():
bi_cfg.check_file(writable=False)

saved_nodes = nodes_cfg.load()
nm_nodes = (
get_devices_from_tty() if args.no_cache else get_devices_from_tty(saved_nodes)
)
print(f"Found {len(saved_nodes)} saved nodes in {args.config}")
if args.mock_dev:
nm_nodes.append(cmr.mock_device())
nm_nodes = []
with PowerControl(
locations=cfg.LocationConfig(args.config).load(),
nodes=saved_nodes,
max_powered_devices=DEFAULT_MAX_ALLOWED_NODES,
) as pc:
while not pc.power_on_complete:
pc.power_on_chunk()
nm_nodes.extend(
get_devices_from_tty()
if args.no_cache
else get_devices_from_tty(saved_nodes)
)
pc.power_off_unused()

try:
selected_node = cmr.select_available_node(nm_nodes)
except ValueError:
print("No available nodes found")
sys.exit(1)
except TtyNotPresent as exc:
# filter out duplicate nodes
nm_node: NmNode
found_ids = set()
filtered_nodes = []
for nm_node in nm_nodes:
if nm_node.uid in found_ids:
continue
found_ids.add(nm_node.uid)
filtered_nodes.append(nm_node)
nm_nodes = filtered_nodes

print(f"Found {len(saved_nodes)} saved nodes in {args.config}")
if args.mock_dev:
selected_node = nm_nodes[-1]
nm_nodes.append(cmr.mock_device())

try:
selected_node = cmr.select_available_node(nm_nodes)
except ValueError:
print("No available nodes found")
sys.exit(1)
except TtyNotPresent as exc:
if args.mock_dev:
selected_node = nm_nodes[-1]
else:
raise exc
if args.ignore:
selected_node.ignore = True
else:
raise exc
if args.ignore:
selected_node.ignore = True
else:
binfo = bi_cfg.load()
selected_node.board = args.board or cmr.select_board(
list(binfo.keys()), selected_node
)
if selected_node.board in binfo:
selected_node.features_provided = binfo[selected_node.board]
cmr.check_and_set_uninitialized_sn(selected_node)
binfo = bi_cfg.load()
selected_node.board = args.board or cmr.select_board(
list(binfo.keys()), selected_node
)
if selected_node.board in binfo:
selected_node.features_provided = binfo[selected_node.board]
if cmr.is_uninitialized_sn(selected_node):
uid = selected_node.uid
# Normally we would not need to power this as it would only get powered
# down if it was already in the nodes list... If the no-cache is used
# then it may be powered down and we need to power it up.
# Slow but safe.
pc.power_on_uid(uid)
cmr.set_uninitialized_sn(selected_node)
pc.power_off_uid(uid)
nodes = cmr.add_node_to_nodes(saved_nodes, selected_node)
nodes_cfg.save(nodes)
print(f"Updated {nodes_cfg.file_path}")
Expand Down
15 changes: 12 additions & 3 deletions src/inet_nm/cli_update_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import inet_nm.config as cfg
import inet_nm.location as loc
from inet_nm._helpers import nm_print
from inet_nm.power_control import DEFAULT_MAX_ALLOWED_NODES, PowerControl


def _main():
Expand All @@ -14,9 +15,17 @@ def _main():
nodes = cfg.NodesConfig(config_dir=args.config).load()
loc_cache = cfg.LocationCache(config_dir=args.config)
loc_cache.check_file(writable=True)

cache = loc.get_location_cache(nodes, loc_mapping)

caches = []
with PowerControl(
locations=loc_mapping,
nodes=nodes,
max_powered_devices=DEFAULT_MAX_ALLOWED_NODES,
) as pc:
while not pc.power_on_complete:
pc.power_on_chunk()
caches.append(loc.get_location_cache(nodes, loc_mapping))
pc.power_off_unused()
cache = loc.merge_location_cache_chunks(caches)
loc_cache.save(cache)
nm_print(f"Updated {loc_cache.file_path}")

Expand Down
22 changes: 19 additions & 3 deletions src/inet_nm/commissioner.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,36 @@
from inet_nm.usb_ctrl import get_ttys_from_nm_node


def check_and_set_uninitialized_sn(node: NmNode, sns: List = None):
def is_uninitialized_sn(node: NmNode, sns: List = None):
"""
Check if a given NmNode has an uninitialized serial number and prompt the user
to set it.
Args:
node: An NmNode object.
sns: List of serial numbers to check against.
"""
"""
# We cannot do anything with the sns if we don't have the cp210x module
if cp210x is None:
return
return False
sns = sns or ["0001"]

if node.serial not in sns:
return False
return True


def set_uninitialized_sn(node: NmNode):
"""
Set the serial number of a given NmNode.
Args:
node: An NmNode object.
"""

if cp210x is None:
return

pid_vid_sn = {
Expand Down
33 changes: 33 additions & 0 deletions src/inet_nm/location.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,39 @@
from inet_nm.data_types import NmNode


def merge_location_cache_chunks(caches: List[List[Dict]]):
"""
Merge location cache chunks into a single cache.
Due to only being able to power on a chunk at a time we need to sort
through each of the location caches and look through all id_paths that
have a missing state and see if they are available in another chunk.
If they are then probably they were just powered off.
Args:
caches: List of location cache chunks.
Returns:
The merged location cache.
"""
# TODO: Also check if all id_paths that are attached have the same node_uid
tmp_cache = {}
for chunk in caches:
for entry in chunk:
# If entry is empty, skip it
if not entry:
continue
if entry["state"] != "missing":
tmp_cache[entry["id_path"]] = entry
continue
if entry["state"] == "missing" and entry["id_path"] not in tmp_cache:
tmp_cache[entry["id_path"]] = entry
# Convert tmp_cache to list
cache = list(tmp_cache.values())
cache.sort(key=lambda x: x["id_path"])
return cache


def get_location_cache(nodes: List[NmNode], id_paths: Dict):
"""
Get the location cache for a list of NmNode objects.
Expand Down
190 changes: 190 additions & 0 deletions src/inet_nm/power_control.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
import logging
import subprocess
from time import sleep
from typing import List

import inet_nm.locking as lck
import inet_nm.usb_ctrl as ucl
from inet_nm.data_types import NmNode

DEFAULT_MAX_ALLOWED_NODES = 14


class PowerControl:
DEFAULT_POWER_ON_WAIT = 10
DEFAULT_POWER_OFF_WAIT = 1
MAX_ALLOWED_NODES = 256

def __init__(self, locations, nodes: List[NmNode], max_powered_devices=None):
self.logging = logging.getLogger(__name__)
self.id_path_to_node_uid = {}
self.powered_locations = {}
self.powered_id_paths = set()
self.node_uids = {node.uid for node in nodes if not node.ignore}
for id, loc in locations.items():
if loc["power_control"]:
self.powered_locations[id] = loc
self.powered_id_paths.add(id)
self.max_powered_devices = max_powered_devices
self._running = False
self._power_on_procs = []
self._power_off_procs = []
self._powered_on = set()

def _available(self, powered_devs) -> int:
if self.max_powered_devices is not None:
return self.max_powered_devices - len(powered_devs)
return self.MAX_ALLOWED_NODES

def power_on_uid(self, uid: str):
if uid not in self.id_path_to_node_uid.values():
raise ValueError(
f"Node with uid {uid} not found, "
"must have been collected during power iterations."
)
for id_path, node_uid in self.id_path_to_node_uid.items():
if node_uid == uid:
if id_path in self.powered_locations:
self._power_on(id_path)

def power_off_uid(self, uid: str):
if uid not in self.id_path_to_node_uid.values():
raise ValueError(
f"Node with uid {uid} not found, "
"must have been collected during power iterations."
)
for id_path, node_uid in self.id_path_to_node_uid.items():
if node_uid == uid:
if id_path in self.powered_locations:
self._power_off(id_path)

def _power_off(self, id_path):
self.logging.debug("Powering off %s", id_path)
usb_info = self.powered_locations[id_path]
self._power_off_procs.append(
subprocess.Popen(
[
"sudo",
"uhubctl",
"-l",
usb_info["hub"],
"-p",
usb_info["port"],
"-a",
"off",
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
)

def _power_on(self, id_path):
self.logging.debug("Powering on %s", id_path)
usb_info = self.powered_locations[id_path]
self._power_on_procs.append(
subprocess.Popen(
[
"sudo",
"uhubctl",
"-l",
usb_info["hub"],
"-p",
usb_info["port"],
"-a",
"on",
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
)

def power_on_chunk(self):
self._running = True
powered_devs = ucl.get_connected_id_paths()
available = self._available(powered_devs)

if available < 0:
raise ValueError(
f"More than {self.max_powered_devices} nodes are "
f"already powered on, {-available} over."
)
self.logging.debug(
"%s nodes powered, %s of %s available",
len(powered_devs),
available,
self.max_powered_devices or self.MAX_ALLOWED_NODES,
)
for id_path in self.powered_locations:
if id_path in powered_devs:
self._powered_on.add(id_path)
continue
if id_path in self._powered_on:
continue
self._power_on(id_path)
# it takes a while to actually show up as a tty device
# so we just manually add it
self._powered_on.add(id_path)
powered_devs.add(id_path)
if self._available(powered_devs) == 0:
break
self.wait_for_power_on()

@property
def power_on_complete(self) -> bool:
if not self._running:
return False
# check if all powered_id_paths are powered on
if self._powered_on == self.powered_id_paths:
self._running = False
return True
return False

def _map_id_path_to_node_uid(self):
powered_id_paths = ucl.get_connected_id_paths()
for id_path in powered_id_paths:
if id_path in self.id_path_to_node_uid:
continue
uid = ucl.get_uid_from_id_path(id_path)
self.id_path_to_node_uid[id_path] = uid

def power_off_unused(self) -> None:
self.logging.debug("Powering off")
# check locked devices from lockfiles
locked_uids = lck.get_locked_uids()
unused_uids = self.node_uids - set(locked_uids)

for id_path, usb_info in self.powered_locations.items():
uid = ucl.get_uid_from_id_path(id_path)
if uid is None:
continue
if uid in unused_uids:
self._power_off(id_path)
self.wait_for_power_off()

def wait_for_power_off(self):
for proc in self._power_off_procs:
proc.wait()
if self._power_off_procs:
sleep(self.DEFAULT_POWER_OFF_WAIT)
self._power_off_procs = []
self.logging.debug("Finished powering off")

def wait_for_power_on(self, wait_time=None):
for proc in self._power_on_procs:
proc.wait()
if self._power_on_procs:
sleep(wait_time or self.DEFAULT_POWER_ON_WAIT)
self._power_on_procs = []
self._map_id_path_to_node_uid()
self.logging.debug("Finished powering on")

def __enter__(self):
return self

def __exit__(self, exc_type, value, traceback) -> None:
"""Release the lock when exiting the context."""
self.wait_for_power_on(wait_time=0)
self.power_off_unused()
self.wait_for_power_off()
Loading

0 comments on commit ac348f1

Please sign in to comment.