Skip to content

Commit

Permalink
Implement DriverInterface class (#86)
Browse files Browse the repository at this point in the history
* Implement DriverInterface.

* Two more functions.

* ModelInterface and Attr

* Fork out example_counter.

* ruff

* Fix pages & ruff.

* Refactor into versioned driverinterface

* Tests pass.

* Refactor driver interface + warnings

* Fix 3.9

* Disable autodoc_pydantic for now

* Update to newer model.

* Changes to driverinterface

* Consistency ModelInterface

* Docs

* Final changes.

* Fix failing test.
  • Loading branch information
PeterKraus authored Jul 17, 2024
1 parent 5bd77d2 commit c971e68
Show file tree
Hide file tree
Showing 18 changed files with 728 additions and 345 deletions.
1 change: 0 additions & 1 deletion docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ sustainable batteries of the future.
:maxdepth: 1
:caption: tomato driver library

apidoc/tomato.drivers.example_counter

.. toctree::
:maxdepth: 1
Expand Down
2 changes: 1 addition & 1 deletion docs/source/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -325,4 +325,4 @@ The *payload* file contains all information required to enter a *job* into the q
allow its assignment onto a *pipeline*. The overall schema of the *payload* is defined
in the :mod:`dgbowl_schemas.tomato` module, and is parsed using :func:`dgbowl_schemas.tomato.to_payload`:

.. autopydantic_model:: dgbowl_schemas.tomato.payload_0_2.Payload
.. autopydantic_model:: dgbowl_schemas.tomato.payload_1_0.Payload
19 changes: 14 additions & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,24 @@ dependencies = [
"toml >= 0.10",
"pyyaml >= 6.0",
"psutil >= 5.9",
"dgbowl_schemas >= 108",
"dgbowl_schemas @ git+https://github.com/dgbowl/dgbowl-schemas.git@Payload_1.0",
"pyzmq >= 25.1",
"h5netcdf >= 1.3",
"xarray >= 2024.2",
"pydantic ~= 1.0",
"pydantic >= 2.0",

]

[project.optional-dependencies]
testing = ["pytest"]
testing = [
"pytest",
"tomato-example-counter @ git+https://github.com/dgbowl/tomato-example-counter.git",
"tomato-psutil @ git+https://github.com/dgbowl/tomato-psutil.git",
]
docs = [
"sphinx ~= 7.2",
"sphinx-rtd-theme ~= 1.3.0",
"autodoc-pydantic ~= 1.9.0",
"autodoc-pydantic ~= 2.1",
"sphinxcontrib-mermaid ~= 0.9.2",
]

Expand All @@ -62,4 +67,8 @@ enabled = true
dev_template = "{tag}.dev{ccount}"
dirty_template = "{tag}.dev{ccount}"

[tool.ruff]
[tool.ruff]

[tool.pytest.ini_options]
log_cli = false
log_cli_level = "DEBUG"
53 changes: 22 additions & 31 deletions src/tomato/daemon/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
import zmq
import psutil

import tomato.drivers
from tomato.driverinterface_1_0 import ModelInterface
from tomato.drivers import driver_to_interface
from tomato.models import Reply

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -56,7 +57,7 @@ def tomato_driver() -> None:
parser.add_argument(
"--verbosity",
help="Verbosity of the tomato-driver.",
default=logging.INFO,
default=logging.DEBUG,
type=int,
)
parser.add_argument(
Expand Down Expand Up @@ -96,19 +97,18 @@ def tomato_driver() -> None:
logger.debug(f"{daemon=}")

logger.info(f"attempting to spawn driver {args.driver!r}")
if not hasattr(tomato.drivers, args.driver):
Interface = driver_to_interface(args.driver)
if Interface is None:
logger.critical(f"library of driver {args.driver!r} not found")
return

kwargs = dict(settings=daemon.drvs[args.driver].settings)
driver = getattr(tomato.drivers, args.driver).Driver(**kwargs)
interface: ModelInterface = Interface(settings=daemon.drvs[args.driver].settings)

logger.info(f"registering devices in driver {args.driver!r}")
for dev in daemon.devs.values():
if dev.driver == args.driver:
for channel in dev.channels:
driver.dev_register(address=dev.address, channel=channel)
logger.debug(f"{driver.devmap=}")
interface.dev_register(address=dev.address, channel=channel)
logger.debug(f"{interface.devmap=}")

logger.info(f"driver {args.driver!r} bootstrapped successfully")

Expand All @@ -117,7 +117,7 @@ def tomato_driver() -> None:
port=port,
pid=pid,
connected_at=str(datetime.now(timezone.utc)),
settings=driver.settings,
settings=interface.settings,
)
req.send_pyobj(
dict(cmd="driver", params=params, sender=f"{__name__}.tomato_driver")
Expand All @@ -128,7 +128,7 @@ def tomato_driver() -> None:
logger.debug(f"{ret=}")
return

logger.info(f"driver {args.driver!r} is entering main loop")
logger.info("driver '%s' is entering main loop", args.driver)

poller = zmq.Poller()
poller.register(rep, zmq.POLLIN)
Expand All @@ -137,7 +137,7 @@ def tomato_driver() -> None:
socks = dict(poller.poll(100))
if rep in socks:
msg = rep.recv_pyobj()
logger.debug(f"received {msg=}")
logger.debug("received msg=%s", msg)
if "cmd" not in msg:
logger.error(f"received msg without cmd: {msg=}")
ret = Reply(success=False, msg="received msg without cmd", data=msg)
Expand All @@ -155,40 +155,32 @@ def tomato_driver() -> None:
data=dict(status=status, driver=args.driver),
)
elif msg["cmd"] == "settings":
driver.settings = msg["params"]
params["settings"] = driver.settings
interface.settings = msg["params"]
params["settings"] = interface.settings
ret = Reply(
success=True,
msg="settings received",
data=msg.get("params"),
)
elif msg["cmd"] == "dev_register":
driver.dev_register(**msg["params"])
ret = Reply(
success=True,
msg="device registered",
data=msg.get("params"),
)
elif msg["cmd"] == "task_status":
ret = driver.task_status(**msg["params"])
elif msg["cmd"] == "task_start":
ret = driver.task_start(**msg["params"])
elif msg["cmd"] == "task_data":
ret = driver.task_data(**msg["params"])
logger.debug(f"{ret=}")
elif hasattr(interface, msg["cmd"]):
ret = getattr(interface, msg["cmd"])(**msg["params"])
else:
logger.critical("unknown command: '%s'", msg["cmd"])
logger.debug("replying %s", ret)
rep.send_pyobj(ret)
if status == "stop":
break

logger.info(f"driver {args.driver!r} is beginning teardown")
logger.info(f"driver {args.driver!r} is beginning reset")

driver.teardown()
interface.reset()

logger.critical(f"driver {args.driver!r} is quitting")


def spawn_tomato_driver(port: int, driver: str, req: zmq.Socket, verbosity: int):
cmd = ["tomato-driver", "--port", str(port), "--verbosity", str(verbosity), driver]
# cmd = ["tomato-driver", "--port", str(port), "--verbosity", str(verbosity), driver]
cmd = ["tomato-driver", "--port", str(port), driver]
if psutil.WINDOWS:
cfs = subprocess.CREATE_NO_WINDOW
cfs |= subprocess.CREATE_NEW_PROCESS_GROUP
Expand Down Expand Up @@ -277,7 +269,6 @@ def manager(port: int, timeout: int = 1000):
for driver in daemon.drvs.values():
logger.debug(f"stopping driver {driver.name!r} on port {driver.port}")
ret = stop_tomato_driver(driver.port, context)
logger.debug(f"{ret=}")
if ret.success:
logger.info(f"stopped driver {driver.name!r}")
else:
Expand Down
Loading

0 comments on commit c971e68

Please sign in to comment.