diff --git a/docs/source/quickstart.rst b/docs/source/quickstart.rst index 8f9f2655..f01884c9 100644 --- a/docs/source/quickstart.rst +++ b/docs/source/quickstart.rst @@ -77,13 +77,11 @@ process for managing all devices of a certain driver type. ````````````````````````````` The following concepts are used in **tomato**: -- *devices*, which represent individually-addressable and optionally multichannel - instruments in the lab, -- *pipelines*, which represent the real world organisation of *devices* into independent - experimental set-ups, optionally containing one or more *device* (or their channels), -- *jobs*, which are processes that carry out a *payload* (i.e. a set of experimental - instructions) on a *pipeline*, and -- *drivers*, which are processes managing all devices of a certain driver type: +- *devices*, which represent separately addressable and optionally multichannel instruments in the lab, +- *components*, which represent the individual channels of each *device*, describing the role of this *component* and its associated capabilities, +- *pipelines*, which represent the real world organisation of device *components* into independent experimental set-ups, optionally containing *components* from one or more *device*, +- *jobs*, which are processes that carry out a *payload* (i.e. a set of experimental instructions) on a *pipeline*, and +- *drivers*, which are processes managing all *devices* of a certain driver type and their *component*: .. mermaid:: @@ -210,14 +208,10 @@ The ``devices`` section of the default *devices file* is shown below: address: "example-addr" channels: [1] pollrate: 1 - capabilities: - - count Here, we define a single device using the :mod:`~tomato.drivers.example_counter` driver. The definition includes the ``address`` of the device (:class:`str` type) as well as an enumeration of individually-addressable channels the device has (:class:`list[int]`). -The ``capabilities`` list enumerates all "techniques" or "methods" the device can -perform (:class:`list[str]`). For example, the devices shown in the :ref:`concepts flowchart ` above would be defined as: @@ -231,19 +225,16 @@ be defined as: address: "192.168.1.1" channels: [1, 2, 3] pollrate: 1 - capabilities: [...] - name: device a driver: "driver abc" address: "COM1" channels: [100] pollrate: 5 - capabilities: [...] - name: device b driver: "driver abc" address: "COM2" channels: [100] pollrate: 5 - capabilities: [...] .. note:: @@ -262,9 +253,9 @@ The default ``pipelines`` section looks as follows: pipelines: - name: pip-counter - devices: - - tag: counter - name: dev-counter + components: + - role: counter + device: dev-counter channel: 1 Here, a single *pipeline* called ``pip-counter`` is defined to contain the one available @@ -278,9 +269,9 @@ For example, with the following definition: pipelines: - name: pip-counter-* - devices: - - tag: counter - name: dev-counter + components: + - role: counter + device: dev-counter channel: each a set of pipelines would be created using each of the available channels in @@ -294,27 +285,27 @@ above can be defined as: .. code-block:: yaml :linenos: - devices: + pipelines: - name: pipeline a1 - devices: - - tag: dev 123 - name: device 1 + components: + - role: dev 123 + device: device 1 channel: 1 - - tag: dev abc - name: device a + - role: dev abc + device: device a channel: 100 - name: pipeline b2 - devices: - - tag: dev 123 - name: device 1 + components: + - role: dev 123 + device: device 1 channel: 2 - - tag: dev abc - name: device b + - role: dev abc + device: device b channel: 100 - name: pipeline 3 - devices: - - tag: dev 123 - name: device 1 + components: + - role: dev 123 + device: device 1 channel: 3 .. _payfile: diff --git a/src/tomato/__init__.py b/src/tomato/__init__.py index fcf3db26..42216152 100644 --- a/src/tomato/__init__.py +++ b/src/tomato/__init__.py @@ -95,6 +95,7 @@ def run_tomato(): "-p", help="Port number of tomato's reply socket", default=DEFAULT_TOMATO_PORT, + type=int, ) p.add_argument( "--timeout", diff --git a/src/tomato/daemon/cmd.py b/src/tomato/daemon/cmd.py index 9e3c2188..0b805088 100644 --- a/src/tomato/daemon/cmd.py +++ b/src/tomato/daemon/cmd.py @@ -12,7 +12,9 @@ """ -from tomato.models import Daemon, Driver, Device, Reply, Pipeline, Job +from tomato.models import Daemon, Driver, Device, Reply, Pipeline, Job, Component +from pydantic import BaseModel +from typing import Any import logging import tomato.daemon.io as io @@ -52,6 +54,7 @@ def status(msg: dict, daemon: Daemon) -> Reply: def stop(msg: dict, daemon: Daemon) -> Reply: logger = logging.getLogger(f"{__name__}.stop") + logger.debug("%s", msg) io.store(daemon) if any([pip.jobid is not None for pip in daemon.pips.values()]): logger.error("cannot stop tomato-daemon as jobs are running") @@ -64,40 +67,49 @@ def stop(msg: dict, daemon: Daemon) -> Reply: def setup(msg: dict, daemon: Daemon) -> Reply: logger = logging.getLogger(f"{__name__}.setup") + logger.debug("%s", msg) if daemon.status == "bootstrap": - for key in ["drvs", "devs", "pips"]: + for key in ["drvs", "devs", "pips", "cmps"]: if key in msg: setattr(daemon, key, msg[key]) - logger.info(f"setup successful with pipelines: {list(daemon.pips.keys())}") + logger.info("setup successful with pipelines: '%s'", daemon.pips.keys()) daemon.status = "running" else: - logger.info(f"reload successful with pipelines: {list(daemon.pips.keys())}") + logger.info("reload successful with pipelines: '%s'", daemon.pips.keys()) return Reply(success=True, msg=daemon.status, data=daemon) def pipeline(msg: dict, daemon: Daemon) -> Reply: logger = logging.getLogger(f"{__name__}.pipeline") + logger.debug("%s", msg) pip = msg["params"] if pip["name"] is None: logger.error() return Reply(success=False, msg="no pipeline name supplied", data=msg) if pip["name"] not in daemon.pips: - daemon.pips[pip["name"]] = Pipeline(**pip) - elif pip.get("delete", False) and daemon.pips[pip["name"]].jobid is None: - logger.warning(f"deleting pipeline {pip['name']!r}") + dest = Pipeline(**pip) + daemon.pips[pip["name"]] = dest + return Reply(success=True, msg="pipeline created", data=dest) + + dest = daemon.pips[pip["name"]] + if pip.get("delete", False) and dest.jobid is None: + logger.warning("deleting pipeline '%s'", dest.name) del daemon.pips[pip["name"]] + return Reply(success=True, msg="pipeline deleted") + elif pip.get("delete", False): - logger.error(f"cannot delete pipeline {pip['name']!r} as a job is running") - return Reply(success=False, msg=daemon.status, data=daemon.pips[pip["name"]]) + logger.error("cannot delete pipeline '%s' as a job is running", dest.name) + return Reply(success=False, msg="pipeline cannot be deleted", data=dest) else: for k, v in pip.items(): - logger.debug(f"setting pipeline '{pip['name']}.{k}' to {v}") - setattr(daemon.pips[pip["name"]], k, v) - return Reply(success=True, msg=daemon.status, data=daemon.pips.get(pip["name"])) + logger.debug("setting pipeline '%s.%s' to '%s'", dest.name, k, v) + setattr(dest, k, v) + return Reply(success=True, msg="pipeline updated", data=dest) def job(msg: dict, daemon: Daemon) -> Reply: logger = logging.getLogger(f"{__name__}.job") + logger.debug("%s", msg) jobid = msg.get("id", None) if jobid is None: jobid = daemon.nextjob @@ -108,34 +120,57 @@ def job(msg: dict, daemon: Daemon) -> Reply: for k, v in msg.get("params", {}).items(): logger.debug(f"setting job {jobid}.{k} to {v}") setattr(daemon.jobs[jobid], k, v) - return Reply(success=True, msg=daemon.status, data=daemon.jobs[jobid]) + return Reply(success=True, msg="job updated", data=daemon.jobs[jobid]) def driver(msg: dict, daemon: Daemon) -> Reply: - logger = logging.getLogger(f"{__name__}.driver") - drv = msg["params"] - if drv["name"] is None: - logger.error() - return Reply(success=False, msg="no driver name supplied", data=msg) - if drv["name"] not in daemon.drvs: - daemon.drvs[drv["name"]] = Driver(**drv) - else: - for k, v in drv.items(): - logger.debug(f"setting driver '{drv['name']}.{k}' to {v}") - setattr(daemon.drvs[drv["name"]], k, v) - return Reply(success=True, msg=daemon.status, data=daemon.drvs[drv["name"]]) + return _api( + otype="driver", + msg=msg, + ddict=daemon.drvs, + Cls=Driver, + ) def device(msg: dict, daemon: Daemon) -> Reply: - logger = logging.getLogger(f"{__name__}.device") - dev = msg["params"] - if dev["name"] is None: - logger.error() - return Reply(success=False, msg="no device name supplied", data=msg) - if dev["name"] not in daemon.devs: - daemon.devs[dev["name"]] = Device(**dev) + return _api( + otype="device", + msg=msg, + ddict=daemon.devs, + Cls=Device, + ) + + +def component(msg: dict, daemon: Daemon) -> Reply: + return _api( + otype="component", + msg=msg, + ddict=daemon.cmps, + Cls=Component, + ) + + +def _api(otype: str, msg: dict, ddict: dict[str, Any], Cls: BaseModel) -> Reply: + logger = logging.getLogger(f"{__name__}.{otype}") + logger.debug("%s", msg) + obj = msg["params"] + if obj["name"] is None: + logger.error("no %s name supplied", otype) + return Reply(success=False, msg=f"no {otype} name supplied", data=msg) + + if obj["name"] not in ddict: + ddict[obj["name"]] = Cls(**obj) + return Reply( + success=True, + msg=f"{otype} {obj['name']!r} created", + data=ddict[obj["name"]], + ) else: - for k, v in dev.items(): - logger.debug(f"setting device '{dev['name']}.{k}' to {v}") - setattr(daemon.devs[dev["name"]], k, v) - return Reply(success=True, msg=daemon.status, data=daemon.devs[dev["name"]]) + for k, v in obj.items(): + logger.debug("setting %s '%s.%s' to '%s'", otype, obj["name"], k, v) + setattr(ddict[obj["name"]], k, v) + return Reply( + success=True, + msg=f"{otype} {obj['name']!r} updated", + data=ddict[obj["name"]], + ) diff --git a/src/tomato/daemon/driver.py b/src/tomato/daemon/driver.py index 2dd3e90f..b27b3d95 100644 --- a/src/tomato/daemon/driver.py +++ b/src/tomato/daemon/driver.py @@ -101,16 +101,21 @@ def tomato_driver() -> None: if Interface is None: logger.critical(f"library of driver {args.driver!r} not found") return - interface: ModelInterface = Interface(settings=daemon.drvs[args.driver].settings) - - logger.info(f"registering devices in driver {args.driver!r}") - for dev in daemon.devs.values(): - if dev.driver == args.driver: - for channel in dev.channels: - interface.dev_register(address=dev.address, channel=channel) - logger.debug(f"{interface.devmap=}") - - logger.info(f"driver {args.driver!r} bootstrapped successfully") + drv = daemon.drvs[args.driver] + interface: ModelInterface = Interface(settings=drv.settings) + + logger.info("registering components for driver '%s'", args.driver) + for comp in daemon.cmps.values(): + if comp.driver == args.driver: + logger.info("registering component '%s'", comp.name) + ret = interface.dev_register(address=comp.address, channel=comp.channel) + logger.debug(f"iface {ret=}") + params = dict(name=comp.name, capabilities=ret.data) + req.send_pyobj(dict(cmd="component", params=params)) + ret = req.recv_pyobj() + logger.debug(f"daemon {ret=}") + + logger.info("driver '%s' bootstrapped successfully", args.driver) params = dict( name=args.driver, @@ -124,7 +129,7 @@ def tomato_driver() -> None: ) ret = req.recv_pyobj() if not ret.success: - logger.error(f"could not push driver {args.driver!r} state to tomato-daemon") + logger.error("could not push driver '%s' state to tomato-daemon", args.driver) logger.debug(f"{ret=}") return @@ -171,11 +176,10 @@ def tomato_driver() -> None: if status == "stop": break - logger.info(f"driver {args.driver!r} is beginning reset") - + logger.info("driver '%s' is beginning reset", args.driver) interface.reset() - logger.critical(f"driver {args.driver!r} is quitting") + logger.info("driver '%s' is quitting", args.driver) def spawn_tomato_driver(port: int, driver: str, req: zmq.Socket, verbosity: int): @@ -237,11 +241,10 @@ def manager(port: int, timeout: int = 1000): elif to > timeout: to = timeout daemon = req.recv_pyobj().data - drivers_needed = {v.driver for v in daemon.devs.values()} action_counter = 0 - for driver in drivers_needed: + for driver in daemon.drvs.keys(): if driver not in daemon.drvs: - logger.debug(f"spawning driver {driver!r}") + logger.debug("spawning driver '%s'", driver) spawn_tomato_driver(daemon.port, driver, req, daemon.verbosity) action_counter += 1 else: @@ -267,9 +270,9 @@ def manager(port: int, timeout: int = 1000): req.send_pyobj(dict(cmd="status", with_data=True, sender=f"{__name__}.manager")) daemon = req.recv_pyobj().data for driver in daemon.drvs.values(): - logger.debug(f"stopping driver {driver.name!r} on port {driver.port}") + logger.debug("stopping driver '%s' on port %d", driver.name, driver.port) ret = stop_tomato_driver(driver.port, context) if ret.success: - logger.info(f"stopped driver {driver.name!r}") + logger.info("stopped driver '%s'", driver.name) else: - logger.warning(f"could not stop driver {driver.name!r}") + logger.warning("could not stop driver '%s'", driver.name) diff --git a/src/tomato/daemon/io.py b/src/tomato/daemon/io.py index 774360ce..a11994db 100644 --- a/src/tomato/daemon/io.py +++ b/src/tomato/daemon/io.py @@ -35,6 +35,7 @@ def load(daemon: Daemon): daemon.pips = loaded.pips daemon.devs = loaded.devs daemon.drvs = loaded.drvs + daemon.cmps = loaded.cmps daemon.nextjob = loaded.nextjob daemon.status = "running" diff --git a/src/tomato/daemon/job.py b/src/tomato/daemon/job.py index a7c4a016..b046a5ec 100644 --- a/src/tomato/daemon/job.py +++ b/src/tomato/daemon/job.py @@ -29,29 +29,29 @@ from tomato.daemon.io import merge_netcdfs, data_to_pickle from tomato.models import Pipeline, Daemon, Component, Device, Driver from dgbowl_schemas.tomato import to_payload -from dgbowl_schemas.tomato.payload import Payload +from dgbowl_schemas.tomato.payload import Payload, Task logger = logging.getLogger(__name__) -def find_matching_pipelines(daemon: Daemon, method: list[dict]) -> list[str]: +def find_matching_pipelines( + pips: dict[str, Pipeline], cmps: dict[str, Component], method: list[Task] +) -> list[Pipeline]: req_tags = set([item.component_tag for item in method]) req_capabs = set([item.technique_name for item in method]) candidates = [] - for pip in daemon.pips.values(): - dnames = set([comp.role for comp in pip.devs.values()]) - if req_tags.intersection(dnames) == req_tags: - candidates.append(pip) - - matched = [] - for cd in candidates: - capabs = [] - for dev in cd.devs.values(): - capabs += daemon.devs[dev.name].capabilities - if req_capabs.intersection(set(capabs)) == req_capabs: - matched.append(cd) - return matched + for pip in pips.values(): + roles = set() + capabs = set() + for comp in pip.components: + c = cmps[comp] + roles.add(c.role) + capabs.update(c.capabilities) + if req_tags.intersection(roles) == req_tags: + if req_capabs.intersection(capabs) == req_capabs: + candidates.append(pip) + return candidates def kill_tomato_job(process: psutil.Process): @@ -146,7 +146,9 @@ def check_queued_jobs(daemon: Daemon, req) -> dict[int, list[Pipeline]]: matched = {} queue = [job for job in daemon.jobs.values() if job.status in {"q", "qw"}] for job in queue: - matched[job.id] = find_matching_pipelines(daemon, job.payload.method) + matched[job.id] = find_matching_pipelines( + daemon.pips, daemon.cmps, job.payload.method + ) if len(matched[job.id]) > 0 and job.status == "q": logger.info( f"job {job.id} can queue on pips: {[p.name for p in matched[job.id]]}" @@ -523,19 +525,23 @@ def job_main_loop( # distribute plan into threads threads = {} - for role, tasks in plan.items(): - component = pipeline.devs[role] - logger.debug(" component=%s", component) - device = daemon.devs[component.name] + for cmpk in pipeline.components: + component = daemon.cmps[cmpk] + logger.debug(f"{component=}") + if component.role not in plan: + continue + tasks = plan[component.role] + logger.debug(" tasks=%s", tasks) + device = daemon.devs[component.device] logger.debug(" device=%s", device) - driver = daemon.drvs[device.driver] + driver = daemon.drvs[component.driver] logger.debug(" driver=%s", driver) - threads[role] = Thread( + threads[component.role] = Thread( target=job_thread, args=(tasks, component, device, driver, jobpath, logpath), name="job-thread", ) - threads[role].start() + threads[component.role].start() # wait until threads join or we're killed snapshot = payload.settings.snapshot diff --git a/src/tomato/data/default_devices.json b/src/tomato/data/default_devices.json index 722267be..5c7047fb 100644 --- a/src/tomato/data/default_devices.json +++ b/src/tomato/data/default_devices.json @@ -5,7 +5,6 @@ "driver": "example_counter", "address": "example-addr", "channels": [1], - "capabilities": ["count"], "pollrate": 1 } ], @@ -13,7 +12,7 @@ { "name": "pip-counter", "devices": [ - {"tag": "counter", "name": "dev-counter", "channel": 1} + {"role": "counter", "device": "dev-counter", "channel": 1} ] } ] diff --git a/src/tomato/driverinterface_1_0/__init__.py b/src/tomato/driverinterface_1_0/__init__.py index cffb1a51..62989a4a 100644 --- a/src/tomato/driverinterface_1_0/__init__.py +++ b/src/tomato/driverinterface_1_0/__init__.py @@ -217,13 +217,17 @@ def dev_register(self, address: str, channel: int, **kwargs: dict) -> Reply: Creates a :class:`DeviceManager` representing a device component, storing it in the :obj:`self.devmap` using the provided `address` and `channel`. + + The returned :class:`Reply` should contain the capabilities of the registered + component in the ``data`` slot. """ key = (address, channel) self.devmap[key] = self.CreateDeviceManager(key, **kwargs) + capabs = self.devmap[key].capabilities() return Reply( success=True, msg=f"device {key!r} registered", - data=key, + data=capabs, ) @in_devmap diff --git a/src/tomato/ketchup/__init__.py b/src/tomato/ketchup/__init__.py index 1ab996e8..78062dd1 100644 --- a/src/tomato/ketchup/__init__.py +++ b/src/tomato/ketchup/__init__.py @@ -105,7 +105,6 @@ def submit( if Version(temp.version) > maxver: break payload = temp - print(f"{payload=}") if payload.settings.output.path is None: cwd = str(Path().resolve()) diff --git a/src/tomato/models.py b/src/tomato/models.py index 61d1d43e..4166cb07 100644 --- a/src/tomato/models.py +++ b/src/tomato/models.py @@ -8,6 +8,9 @@ from pydantic import BaseModel, Field from typing import Optional, Any, Mapping, Sequence, Literal from pathlib import Path +import logging + +logger = logging.getLogger(__name__) class Driver(BaseModel): @@ -24,15 +27,17 @@ class Device(BaseModel): driver: str address: str channels: Sequence[int] - capabilities: Sequence[str] pollrate: int = 1 class Component(BaseModel): name: str + driver: str + device: str address: str channel: int role: str + capabilities: Optional[set[str]] = None class Pipeline(BaseModel): @@ -40,7 +45,7 @@ class Pipeline(BaseModel): ready: bool = False jobid: Optional[int] = None sampleid: Optional[str] = None - devs: Mapping[str, Component] = Field(default_factory=dict) + components: Sequence[str] = Field(default_factory=list) class Job(BaseModel): @@ -67,6 +72,7 @@ class Daemon(BaseModel, arbitrary_types_allowed=True): pips: Mapping[str, Pipeline] = Field(default_factory=dict) devs: Mapping[str, Device] = Field(default_factory=dict) drvs: Mapping[str, Driver] = Field(default_factory=dict) + cmps: Mapping[str, Component] = Field(default_factory=dict) jobs: Mapping[int, Job] = Field(default_factory=dict) nextjob: int = 1 diff --git a/src/tomato/tomato/__init__.py b/src/tomato/tomato/__init__.py index b3aaa768..a38e4cdf 100644 --- a/src/tomato/tomato/__init__.py +++ b/src/tomato/tomato/__init__.py @@ -35,7 +35,7 @@ import yaml import toml -from tomato.models import Reply, Pipeline, Device, Driver +from tomato.models import Reply, Pipeline, Device, Driver, Component logger = logging.getLogger(__name__) VERSION = metadata.version("tomato") @@ -61,8 +61,11 @@ def load_device_file(yamlpath: Path) -> dict: return jsdata -def get_pipelines(devs: dict[str, Device], pipelines: list) -> dict[str, Pipeline]: +def get_pipelines( + devs: dict[str, Device], pipelines: list +) -> tuple[dict[str, Pipeline], dict[str, Component]]: pips = {} + cmps = {} for pip in pipelines: if "*" in pip["name"]: data = {"name": pip["name"], "devs": {}} @@ -70,40 +73,51 @@ def get_pipelines(devs: dict[str, Device], pipelines: list) -> dict[str, Pipelin logger.error("more than one component in a wildcard pipeline") continue for comp in pip["devices"]: - if comp["name"] not in devs: - logger.error(f"component {comp['name']} not found among devices") + if comp["device"] not in devs: + logger.error("device '%s' not found", comp["device"]) break - dev = devs[comp["name"]] + dev = devs[comp["device"]] for ch in dev.channels: name = pip["name"].replace("*", f"{ch}") - d = { - comp["tag"]: dict( - name=dev.name, - role=comp["tag"], - channel=ch, - address=dev.address, - ) - } - p = dict(name=name, devs=d) - pips[name] = Pipeline(**p) + h = "/".join((dev.driver, dev.address, str(ch))) + c = Component( + name=h, + driver=dev.driver, + device=dev.name, + address=dev.address, + channel=ch, + role=comp["role"], + ) + cmps[h] = c + p = Pipeline(name=name, components=[h]) + pips[p.name] = p else: - data = {"name": pip["name"], "devs": {}} + data = {"name": pip["name"], "components": []} for comp in pip["devices"]: - if comp["name"] not in devs: - logger.error(f"component {comp['name']} not found among devices") + if comp["device"] not in devs: + logger.error("device '%s' not found", comp["device"]) break - dev = devs[comp["name"]] + dev = devs[comp["device"]] if comp["channel"] not in dev.channels: - logger.error(f"channel {comp['channel']} not found among channels") + logger.error( + "channel %d not found on device '%s'", + comp["channel"], + comp["device"], + ) break - data["devs"][comp["tag"]] = dict( - name=dev.name, - role=comp["tag"], - channel=comp["channel"], + h = "/".join((dev.driver, dev.address, str(comp["channel"]))) + c = Component( + name=h, + driver=dev.driver, + device=dev.name, address=dev.address, + channel=comp["channel"], + role=comp["role"], ) - pips[pip["name"]] = Pipeline(**data) - return pips + data["components"].append(h) + cmps[h] = c + pips[data["name"]] = Pipeline(**data) + return pips, cmps def _updater(context, port, cmd, params): @@ -318,11 +332,13 @@ def reload( devicefile = load_device_file(Path(settings["devices"]["config"])) devs = {dev["name"]: Device(**dev) for dev in devicefile["devices"]} - pips = get_pipelines(devs, devicefile["pipelines"]) + pips, cmps = get_pipelines(devs, devicefile["pipelines"]) + logger.debug(f"{pips=}") + logger.debug(f"{cmps=}") drvs = {dev.driver: Driver(name=dev.driver) for dev in devs.values()} - for drv in drvs.values(): - if drv.name in settings["drivers"]: - drv.settings.update(settings["drivers"][drv.name]) + for drv in drvs.keys(): + if drv in settings["drivers"]: + drvs[drv].settings.update(settings["drivers"][drv]) stat = status(**kwargs, with_data=True) if not stat.success: @@ -338,6 +354,7 @@ def reload( pips=pips, devs=devs, drvs=drvs, + cmps=cmps, sender=f"{__name__}.reload", ) ) @@ -345,52 +362,47 @@ def reload( elif daemon.status == "running": retries = 0 while True: - if any([drv.port is None for drv in daemon.drvs.values()]): + if retries == MAX_RETRIES: + return Reply( + success=False, msg="tomato-drivers are not online", data=daemon + ) + elif any(drv.port is None for drv in daemon.drvs.values()): retries += 1 logger.warning("not all tomato-drivers are online yet, waiting") logger.debug("retry number %d / %d", retries, MAX_RETRIES) time.sleep(timeout / 1000) daemon = status(**kwargs, with_data=True).data - elif retries == MAX_RETRIES: - return Reply( - success=False, msg="tomato-drivers are not online", data=daemon - ) else: break # check changes in driver settings for drv in drvs.values(): logger.debug(f"{drv=}") - if drv.settings != daemon.drvs[drv.name].settings: - ret = _updater( - context, daemon.drvs[drv.name].port, "settings", drv.settings - ) + ddrv = daemon.drvs[drv.name] + if drv.settings != ddrv.settings: + ret = _updater(context, ddrv.port, "settings", drv.settings) if ret.success is False: return ret - ret = _updater( - context, port, "driver", dict(name=drv.name, settings=drv.settings) - ) + msg = dict(name=drv.name, settings=drv.settings) + ret = _updater(context, port, "driver", msg) if ret.success is False: return ret # check changes in devices for dev in devs.values(): logger.debug(f"{dev=}") - if ( - dev.name not in daemon.devs - or dev.channels != daemon.devs[dev.name].channels - ): + ddev = daemon.devs[dev.name] + if dev.channels != ddev.channels: for channel in dev.channels: params = dict( address=dev.address, channel=channel, - capabilities=dev.capabilities, ) + drv = daemon.drvs[dev.driver] logger.debug(f"{params=}") - logger.debug(f"{daemon.drvs[drv.name]}=") - ret = _updater( - context, daemon.drvs[drv.name].port, "dev_register", params - ) + logger.debug(f"{ddev=}") + logger.debug(f"{drv=}") + ret = _updater(context, drv.port, "dev_register", params) logger.debug(f"{ret=}") if ret.success is False: return ret @@ -398,16 +410,18 @@ def reload( ret = _updater(context, port, "device", params) if ret.success is False: return ret - elif dev != daemon.devs[dev.name]: + elif dev != ddev.name: logger.error("updating devices not yet implemented") - for devname in daemon.devs: - if devname not in devs: + for ddev in daemon.devs.values(): + if ddev.name not in devs: logger.error("removing devices not yet implemented") # check changes in pipelines for pip in pips.values(): logger.debug(f"{pip=}") if pip.name not in daemon.pips: + logger.debug(f"{daemon.pips=}") ret = _updater(context, port, "pipeline", pip.model_dump()) + logger.debug(f"{ret=}") if ret.success is False: return ret else: diff --git a/tests/common/devices_counter.json b/tests/common/devices_counter.json index 38b0fd61..fb7c15ab 100644 --- a/tests/common/devices_counter.json +++ b/tests/common/devices_counter.json @@ -5,7 +5,6 @@ "driver": "example_counter", "address": "example-addr", "channels": [1, 2, 3, 4], - "capabilities": ["count"], "pollrate": 2 } ], @@ -13,7 +12,7 @@ { "name": "pip-counter-*", "devices": [ - {"tag": "counter", "name": "dev-counter", "channel": "each"} + {"role": "counter", "device": "dev-counter", "channel": "each"} ] } ] diff --git a/tests/common/devices_multidev.json b/tests/common/devices_multidev.json index 85b2702f..ea2dd4f2 100644 --- a/tests/common/devices_multidev.json +++ b/tests/common/devices_multidev.json @@ -5,7 +5,6 @@ "driver": "example_counter", "address": "example-addr-1", "channels": [1, 2, 3, 4], - "capabilities": ["count"], "pollrate": 2 }, { @@ -13,7 +12,6 @@ "driver": "example_counter", "address": "example-addr-2", "channels": [5, 6, 7, 8], - "capabilities": ["count"], "pollrate": 1 } ], @@ -21,14 +19,14 @@ { "name": "pip-counter", "devices": [ - {"tag": "counter", "name": "dev-counter-1", "channel": 1} + {"role": "counter", "device": "dev-counter-1", "channel": 1} ] }, { "name": "pip-multidev", "devices": [ - {"tag": "counter-1", "name": "dev-counter-1", "channel": 4}, - {"tag": "counter-2", "name": "dev-counter-2", "channel": 5} + {"role": "counter-1", "device": "dev-counter-1", "channel": 4}, + {"role": "counter-2", "device": "dev-counter-2", "channel": 5} ] } ] diff --git a/tests/common/devices_psutil.json b/tests/common/devices_psutil.json index 615b0e3d..94044142 100644 --- a/tests/common/devices_psutil.json +++ b/tests/common/devices_psutil.json @@ -5,7 +5,6 @@ "driver": "example_counter", "address": "counter-addr", "channels": [1], - "capabilities": ["count", "random"], "pollrate": 1 }, { @@ -13,7 +12,6 @@ "driver": "psutil", "address": "psutil-addr", "channels": [10], - "capabilities": ["all_info", "cpu_info", "mem_info"], "pollrate": 1 } ], @@ -21,8 +19,8 @@ { "name": "pip-multidev", "devices": [ - {"tag": "counter", "name": "dev-counter", "channel": 1}, - {"tag": "psutil", "name": "dev-psutil", "channel": 10} + {"role": "counter", "device": "dev-counter", "channel": 1}, + {"role": "psutil", "device": "dev-psutil", "channel": 10} ] } ] diff --git a/tests/test_99_example_counter.py b/tests/test_99_example_counter.py index c8eed3e9..b2933085 100644 --- a/tests/test_99_example_counter.py +++ b/tests/test_99_example_counter.py @@ -95,16 +95,15 @@ def test_counter_snapshot( ("counter_multistep_multidev", {"counter-1": 10, "counter-2": 20}), ], ) -def test_counter_multidev( - casename, npoints, datadir, start_tomato_daemon, stop_tomato_daemon -): +def test_counter_multidev(casename, npoints, datadir, stop_tomato_daemon): os.chdir(datadir) with open("devices_multidev.json", "r") as inf: jsdata = json.load(inf) with open("devices.yml", "w") as ouf: yaml.dump(jsdata, ouf) + subprocess.run(["tomato", "init", "-p", f"{PORT}", "-A", ".", "-D", "."]) + subprocess.run(["tomato", "start", "-p", f"{PORT}", "-A", ".", "-L", ".", "-vv"]) utils.wait_until_tomato_running(port=PORT, timeout=3000) - subprocess.run(["tomato", "reload", "-p", f"{PORT}", "-A", "."]) utils.run_casenames([casename], [None], ["pip-multidev"]) utils.wait_until_ketchup_status(jobid=1, status="r", port=PORT, timeout=10000)