Skip to content

Commit

Permalink
Capabilities in driver (#95)
Browse files Browse the repository at this point in the history
* Remove capabilities from data/default_devices.json

* ketchup tests pass

* Fix state reload.

* Documentation.

* fix tests

* docs & tidy
  • Loading branch information
PeterKraus authored Jul 21, 2024
1 parent c971e68 commit 414eadd
Show file tree
Hide file tree
Showing 15 changed files with 241 additions and 188 deletions.
59 changes: 25 additions & 34 deletions docs/source/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,11 @@ process for managing all devices of a certain driver type.
`````````````````````````````
The following concepts are used in **tomato**:

- *devices*, which represent individually-addressable and optionally multichannel
instruments in the lab,
- *pipelines*, which represent the real world organisation of *devices* into independent
experimental set-ups, optionally containing one or more *device* (or their channels),
- *jobs*, which are processes that carry out a *payload* (i.e. a set of experimental
instructions) on a *pipeline*, and
- *drivers*, which are processes managing all devices of a certain driver type:
- *devices*, which represent separately addressable and optionally multichannel instruments in the lab,
- *components*, which represent the individual channels of each *device*, describing the role of this *component* and its associated capabilities,
- *pipelines*, which represent the real world organisation of device *components* into independent experimental set-ups, optionally containing *components* from one or more *device*,
- *jobs*, which are processes that carry out a *payload* (i.e. a set of experimental instructions) on a *pipeline*, and
- *drivers*, which are processes managing all *devices* of a certain driver type and their *component*:

.. mermaid::

Expand Down Expand Up @@ -210,14 +208,10 @@ The ``devices`` section of the default *devices file* is shown below:
address: "example-addr"
channels: [1]
pollrate: 1
capabilities:
- count
Here, we define a single device using the :mod:`~tomato.drivers.example_counter` driver.
The definition includes the ``address`` of the device (:class:`str` type) as well as an
enumeration of individually-addressable channels the device has (:class:`list[int]`).
The ``capabilities`` list enumerates all "techniques" or "methods" the device can
perform (:class:`list[str]`).

For example, the devices shown in the :ref:`concepts flowchart <concepts>` above would
be defined as:
Expand All @@ -231,19 +225,16 @@ be defined as:
address: "192.168.1.1"
channels: [1, 2, 3]
pollrate: 1
capabilities: [...]
- name: device a
driver: "driver abc"
address: "COM1"
channels: [100]
pollrate: 5
capabilities: [...]
- name: device b
driver: "driver abc"
address: "COM2"
channels: [100]
pollrate: 5
capabilities: [...]
.. note::
Expand All @@ -262,9 +253,9 @@ The default ``pipelines`` section looks as follows:
pipelines:
- name: pip-counter
devices:
- tag: counter
name: dev-counter
components:
- role: counter
device: dev-counter
channel: 1
Here, a single *pipeline* called ``pip-counter`` is defined to contain the one available
Expand All @@ -278,9 +269,9 @@ For example, with the following definition:
pipelines:
- name: pip-counter-*
devices:
- tag: counter
name: dev-counter
components:
- role: counter
device: dev-counter
channel: each
a set of pipelines would be created using each of the available channels in
Expand All @@ -294,27 +285,27 @@ above can be defined as:
.. code-block:: yaml
:linenos:
devices:
pipelines:
- name: pipeline a1
devices:
- tag: dev 123
name: device 1
components:
- role: dev 123
device: device 1
channel: 1
- tag: dev abc
name: device a
- role: dev abc
device: device a
channel: 100
- name: pipeline b2
devices:
- tag: dev 123
name: device 1
components:
- role: dev 123
device: device 1
channel: 2
- tag: dev abc
name: device b
- role: dev abc
device: device b
channel: 100
- name: pipeline 3
devices:
- tag: dev 123
name: device 1
components:
- role: dev 123
device: device 1
channel: 3
.. _payfile:
Expand Down
1 change: 1 addition & 0 deletions src/tomato/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def run_tomato():
"-p",
help="Port number of tomato's reply socket",
default=DEFAULT_TOMATO_PORT,
type=int,
)
p.add_argument(
"--timeout",
Expand Down
107 changes: 71 additions & 36 deletions src/tomato/daemon/cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
"""

from tomato.models import Daemon, Driver, Device, Reply, Pipeline, Job
from tomato.models import Daemon, Driver, Device, Reply, Pipeline, Job, Component
from pydantic import BaseModel
from typing import Any
import logging

import tomato.daemon.io as io
Expand Down Expand Up @@ -52,6 +54,7 @@ def status(msg: dict, daemon: Daemon) -> Reply:

def stop(msg: dict, daemon: Daemon) -> Reply:
logger = logging.getLogger(f"{__name__}.stop")
logger.debug("%s", msg)
io.store(daemon)
if any([pip.jobid is not None for pip in daemon.pips.values()]):
logger.error("cannot stop tomato-daemon as jobs are running")
Expand All @@ -64,40 +67,49 @@ def stop(msg: dict, daemon: Daemon) -> Reply:

def setup(msg: dict, daemon: Daemon) -> Reply:
logger = logging.getLogger(f"{__name__}.setup")
logger.debug("%s", msg)
if daemon.status == "bootstrap":
for key in ["drvs", "devs", "pips"]:
for key in ["drvs", "devs", "pips", "cmps"]:
if key in msg:
setattr(daemon, key, msg[key])
logger.info(f"setup successful with pipelines: {list(daemon.pips.keys())}")
logger.info("setup successful with pipelines: '%s'", daemon.pips.keys())
daemon.status = "running"
else:
logger.info(f"reload successful with pipelines: {list(daemon.pips.keys())}")
logger.info("reload successful with pipelines: '%s'", daemon.pips.keys())
return Reply(success=True, msg=daemon.status, data=daemon)


def pipeline(msg: dict, daemon: Daemon) -> Reply:
logger = logging.getLogger(f"{__name__}.pipeline")
logger.debug("%s", msg)
pip = msg["params"]
if pip["name"] is None:
logger.error()
return Reply(success=False, msg="no pipeline name supplied", data=msg)
if pip["name"] not in daemon.pips:
daemon.pips[pip["name"]] = Pipeline(**pip)
elif pip.get("delete", False) and daemon.pips[pip["name"]].jobid is None:
logger.warning(f"deleting pipeline {pip['name']!r}")
dest = Pipeline(**pip)
daemon.pips[pip["name"]] = dest
return Reply(success=True, msg="pipeline created", data=dest)

dest = daemon.pips[pip["name"]]
if pip.get("delete", False) and dest.jobid is None:
logger.warning("deleting pipeline '%s'", dest.name)
del daemon.pips[pip["name"]]
return Reply(success=True, msg="pipeline deleted")

elif pip.get("delete", False):
logger.error(f"cannot delete pipeline {pip['name']!r} as a job is running")
return Reply(success=False, msg=daemon.status, data=daemon.pips[pip["name"]])
logger.error("cannot delete pipeline '%s' as a job is running", dest.name)
return Reply(success=False, msg="pipeline cannot be deleted", data=dest)
else:
for k, v in pip.items():
logger.debug(f"setting pipeline '{pip['name']}.{k}' to {v}")
setattr(daemon.pips[pip["name"]], k, v)
return Reply(success=True, msg=daemon.status, data=daemon.pips.get(pip["name"]))
logger.debug("setting pipeline '%s.%s' to '%s'", dest.name, k, v)
setattr(dest, k, v)
return Reply(success=True, msg="pipeline updated", data=dest)


def job(msg: dict, daemon: Daemon) -> Reply:
logger = logging.getLogger(f"{__name__}.job")
logger.debug("%s", msg)
jobid = msg.get("id", None)
if jobid is None:
jobid = daemon.nextjob
Expand All @@ -108,34 +120,57 @@ def job(msg: dict, daemon: Daemon) -> Reply:
for k, v in msg.get("params", {}).items():
logger.debug(f"setting job {jobid}.{k} to {v}")
setattr(daemon.jobs[jobid], k, v)
return Reply(success=True, msg=daemon.status, data=daemon.jobs[jobid])
return Reply(success=True, msg="job updated", data=daemon.jobs[jobid])


def driver(msg: dict, daemon: Daemon) -> Reply:
logger = logging.getLogger(f"{__name__}.driver")
drv = msg["params"]
if drv["name"] is None:
logger.error()
return Reply(success=False, msg="no driver name supplied", data=msg)
if drv["name"] not in daemon.drvs:
daemon.drvs[drv["name"]] = Driver(**drv)
else:
for k, v in drv.items():
logger.debug(f"setting driver '{drv['name']}.{k}' to {v}")
setattr(daemon.drvs[drv["name"]], k, v)
return Reply(success=True, msg=daemon.status, data=daemon.drvs[drv["name"]])
return _api(
otype="driver",
msg=msg,
ddict=daemon.drvs,
Cls=Driver,
)


def device(msg: dict, daemon: Daemon) -> Reply:
logger = logging.getLogger(f"{__name__}.device")
dev = msg["params"]
if dev["name"] is None:
logger.error()
return Reply(success=False, msg="no device name supplied", data=msg)
if dev["name"] not in daemon.devs:
daemon.devs[dev["name"]] = Device(**dev)
return _api(
otype="device",
msg=msg,
ddict=daemon.devs,
Cls=Device,
)


def component(msg: dict, daemon: Daemon) -> Reply:
return _api(
otype="component",
msg=msg,
ddict=daemon.cmps,
Cls=Component,
)


def _api(otype: str, msg: dict, ddict: dict[str, Any], Cls: BaseModel) -> Reply:
logger = logging.getLogger(f"{__name__}.{otype}")
logger.debug("%s", msg)
obj = msg["params"]
if obj["name"] is None:
logger.error("no %s name supplied", otype)
return Reply(success=False, msg=f"no {otype} name supplied", data=msg)

if obj["name"] not in ddict:
ddict[obj["name"]] = Cls(**obj)
return Reply(
success=True,
msg=f"{otype} {obj['name']!r} created",
data=ddict[obj["name"]],
)
else:
for k, v in dev.items():
logger.debug(f"setting device '{dev['name']}.{k}' to {v}")
setattr(daemon.devs[dev["name"]], k, v)
return Reply(success=True, msg=daemon.status, data=daemon.devs[dev["name"]])
for k, v in obj.items():
logger.debug("setting %s '%s.%s' to '%s'", otype, obj["name"], k, v)
setattr(ddict[obj["name"]], k, v)
return Reply(
success=True,
msg=f"{otype} {obj['name']!r} updated",
data=ddict[obj["name"]],
)
43 changes: 23 additions & 20 deletions src/tomato/daemon/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,21 @@ def tomato_driver() -> None:
if Interface is None:
logger.critical(f"library of driver {args.driver!r} not found")
return
interface: ModelInterface = Interface(settings=daemon.drvs[args.driver].settings)

logger.info(f"registering devices in driver {args.driver!r}")
for dev in daemon.devs.values():
if dev.driver == args.driver:
for channel in dev.channels:
interface.dev_register(address=dev.address, channel=channel)
logger.debug(f"{interface.devmap=}")

logger.info(f"driver {args.driver!r} bootstrapped successfully")
drv = daemon.drvs[args.driver]
interface: ModelInterface = Interface(settings=drv.settings)

logger.info("registering components for driver '%s'", args.driver)
for comp in daemon.cmps.values():
if comp.driver == args.driver:
logger.info("registering component '%s'", comp.name)
ret = interface.dev_register(address=comp.address, channel=comp.channel)
logger.debug(f"iface {ret=}")
params = dict(name=comp.name, capabilities=ret.data)
req.send_pyobj(dict(cmd="component", params=params))
ret = req.recv_pyobj()
logger.debug(f"daemon {ret=}")

logger.info("driver '%s' bootstrapped successfully", args.driver)

params = dict(
name=args.driver,
Expand All @@ -124,7 +129,7 @@ def tomato_driver() -> None:
)
ret = req.recv_pyobj()
if not ret.success:
logger.error(f"could not push driver {args.driver!r} state to tomato-daemon")
logger.error("could not push driver '%s' state to tomato-daemon", args.driver)
logger.debug(f"{ret=}")
return

Expand Down Expand Up @@ -171,11 +176,10 @@ def tomato_driver() -> None:
if status == "stop":
break

logger.info(f"driver {args.driver!r} is beginning reset")

logger.info("driver '%s' is beginning reset", args.driver)
interface.reset()

logger.critical(f"driver {args.driver!r} is quitting")
logger.info("driver '%s' is quitting", args.driver)


def spawn_tomato_driver(port: int, driver: str, req: zmq.Socket, verbosity: int):
Expand Down Expand Up @@ -237,11 +241,10 @@ def manager(port: int, timeout: int = 1000):
elif to > timeout:
to = timeout
daemon = req.recv_pyobj().data
drivers_needed = {v.driver for v in daemon.devs.values()}
action_counter = 0
for driver in drivers_needed:
for driver in daemon.drvs.keys():
if driver not in daemon.drvs:
logger.debug(f"spawning driver {driver!r}")
logger.debug("spawning driver '%s'", driver)
spawn_tomato_driver(daemon.port, driver, req, daemon.verbosity)
action_counter += 1
else:
Expand All @@ -267,9 +270,9 @@ def manager(port: int, timeout: int = 1000):
req.send_pyobj(dict(cmd="status", with_data=True, sender=f"{__name__}.manager"))
daemon = req.recv_pyobj().data
for driver in daemon.drvs.values():
logger.debug(f"stopping driver {driver.name!r} on port {driver.port}")
logger.debug("stopping driver '%s' on port %d", driver.name, driver.port)
ret = stop_tomato_driver(driver.port, context)
if ret.success:
logger.info(f"stopped driver {driver.name!r}")
logger.info("stopped driver '%s'", driver.name)
else:
logger.warning(f"could not stop driver {driver.name!r}")
logger.warning("could not stop driver '%s'", driver.name)
1 change: 1 addition & 0 deletions src/tomato/daemon/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def load(daemon: Daemon):
daemon.pips = loaded.pips
daemon.devs = loaded.devs
daemon.drvs = loaded.drvs
daemon.cmps = loaded.cmps
daemon.nextjob = loaded.nextjob
daemon.status = "running"

Expand Down
Loading

0 comments on commit 414eadd

Please sign in to comment.