Skip to content

Commit

Permalink
feat(sync clock): Adding signalling between Manager and Worker to obt…
Browse files Browse the repository at this point in the history
…ain the timedelta between different computers. Then a global module is used to use a shared clock within Node services.
  • Loading branch information
edavalosanaya committed Aug 23, 2023
1 parent 73b6076 commit e424a0f
Show file tree
Hide file tree
Showing 16 changed files with 98 additions and 39 deletions.
16 changes: 16 additions & 0 deletions chimerapy/engine/clock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from datetime import datetime, timedelta

time_delta = timedelta(0)


def update_reference_time(delta: timedelta):
global time_delta
time_delta = delta


def utcnow() -> datetime:
return datetime.utcnow() - time_delta


def now() -> datetime:
return utcnow()
5 changes: 3 additions & 2 deletions chimerapy/engine/data_protocols.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import datetime
from typing import Dict
from dataclasses import dataclass, field

from dataclasses_json import DataClassJsonMixin

from chimerapy.engine import clock


@dataclass
class NodePubEntry(DataClassJsonMixin):
Expand All @@ -19,7 +20,7 @@ class NodePubTable(DataClassJsonMixin):
@dataclass
class NodeDiagnostics(DataClassJsonMixin):
timestamp: str = field(
default_factory=lambda: str(datetime.datetime.now().isoformat())
default_factory=lambda: str(clock.now().isoformat())
) # ISO str
latency: float = 0 # ms
payload_size: float = 0 # KB
Expand Down
4 changes: 2 additions & 2 deletions chimerapy/engine/eventbus/eventbus.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import uuid
import asyncio
from datetime import datetime
from collections import deque
from concurrent.futures import Future
from typing import Any, Generic, Type, Callable, Awaitable, Optional, Literal, TypeVar

from aioreactive import AsyncObservable, AsyncObserver, AsyncSubject
from dataclasses import dataclass, field

from chimerapy.engine import clock
from .. import _logger
from ..networking.async_loop_thread import AsyncLoopThread

Expand All @@ -21,7 +21,7 @@ class Event:
type: str
data: Optional[Any] = None
id: str = field(default_factory=lambda: str(uuid.uuid4()))
timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
timestamp: str = field(default_factory=lambda: clock.now().isoformat())


class EventBus(AsyncObservable):
Expand Down
5 changes: 3 additions & 2 deletions chimerapy/engine/logger/common.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import logging
from datetime import datetime
from logging import Filter, Formatter, Handler, StreamHandler
from logging.handlers import RotatingFileHandler
from pathlib import Path
from typing import Union, Dict

from chimerapy.engine import clock

MAX_BYTES_PER_FILE = 100 * 1024 * 1024 # 100MB


Expand Down Expand Up @@ -117,7 +118,7 @@ def emit(self, record: logging.LogRecord):
@staticmethod
def timestamp() -> str:
"""Return the current timestamp in the format YYYY-MM-DD_HH-MM-SS."""
return datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
return clock.now().strftime("%Y-%m-%d_%H-%M-%S")


class MultiplexedRotatingFileHandler(MultiplexedEntityHandler):
Expand Down
3 changes: 2 additions & 1 deletion chimerapy/engine/manager/http_server_service.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import traceback
import datetime
from concurrent.futures import Future
from typing import List, Dict

Expand Down Expand Up @@ -143,6 +144,7 @@ async def _register_worker_route(self, request: web.Request):
"port": self.port,
},
"config": config.config,
"manager_datetime_now": datetime.datetime.utcnow().isoformat(),
}

# Broadcast changes
Expand All @@ -168,7 +170,6 @@ async def _update_nodes_status(self, request: web.Request):
update_dataclass(self.state.workers[worker_state.id], worker_state)
else:
logger.error(f"{self}: non-registered Worker update: {worker_state.id}")
# logger.debug(f"{self}: Nodes status update to: {self.state.workers}")

return web.HTTPOk()

Expand Down
4 changes: 0 additions & 4 deletions chimerapy/engine/manager/worker_handler_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,9 @@ def _get_worker_ip(self, worker_id: str) -> str:

async def _register_worker(self, worker_state: WorkerState) -> bool:

logger.debug(f"{self}: worker_state: {worker_state} BEFORE")
evented_worker_state = make_evented(
worker_state, event_bus=self.eventbus, event_name="ManagerState.changed"
)
logger.debug(
f"{self}: worker_state: {worker_state}: {evented_worker_state} AFTER"
)
self.state.workers[worker_state.id] = evented_worker_state
logger.debug(
f"Manager registered <Worker id={worker_state.id}"
Expand Down
4 changes: 2 additions & 2 deletions chimerapy/engine/networking/data_chunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
import pickle
import uuid
import blosc
import datetime
from typing import Any, Literal, Dict, List

# Third-party Imports
import numpy as np
import simplejpeg

# Internal Imports
from chimerapy.engine import clock
from chimerapy.engine._logger import getLogger

logger = getLogger("chimerapy-engine")
Expand Down Expand Up @@ -40,7 +40,7 @@ def __init__(self):
self._container["meta"] = {
"value": {
"ownership": [],
"created": datetime.datetime.now(),
"created": clock.now(),
"transmitted": None,
"received": None,
},
Expand Down
15 changes: 6 additions & 9 deletions chimerapy/engine/node/node.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import pathlib
import logging
import uuid
import datetime
import os
import tempfile
import asyncio
Expand All @@ -15,7 +14,7 @@

# Internal Imports
from chimerapy.engine import _logger
from chimerapy.engine import config
from chimerapy.engine import clock
from ..states import NodeState
from ..networking import DataChunk
from ..networking.async_loop_thread import AsyncLoopThread
Expand Down Expand Up @@ -89,7 +88,7 @@ def __init__(
# Generic Node needs
self.logger: logging.Logger = logging.getLogger("chimerapy-engine-node")
self.logging_level: int = logging.DEBUG
self.start_time = datetime.datetime.now()
self.start_time = clock.now()

# Default values
self.node_config = NodeConfig()
Expand Down Expand Up @@ -200,7 +199,7 @@ def save_video(self, name: str, data: np.ndarray, fps: int):
return False

if self.recorder.enabled:
timestamp = datetime.datetime.now()
timestamp = clock.now()
video_entry = {
"uuid": uuid.uuid4(),
"name": name,
Expand Down Expand Up @@ -251,7 +250,7 @@ def save_audio(
"channels": channels,
"format": format,
"rate": rate,
"timestamp": datetime.datetime.now(),
"timestamp": clock.now(),
}
self.recorder.submit(audio_entry)

Expand All @@ -271,7 +270,7 @@ def save_tabular(
"name": name,
"data": data,
"dtype": "tabular",
"timestamp": datetime.datetime.now(),
"timestamp": clock.now(),
}
self.recorder.submit(tabular_entry)

Expand All @@ -289,7 +288,7 @@ def save_image(self, name: str, data: np.ndarray):
"name": name,
"data": data,
"dtype": "image",
"timestamp": datetime.datetime.now(),
"timestamp": clock.now(),
}
self.recorder.submit(image_entry)

Expand Down Expand Up @@ -422,8 +421,6 @@ def run(
self.worker_comms.in_node_config(
state=self.state, eventbus=self.eventbus, logger=self.logger
)
if self.worker_comms.worker_config:
config.update_defaults(self.worker_comms.worker_config)
elif not self.state.logdir:
self.state.logdir = pathlib.Path(tempfile.mktemp())

Expand Down
4 changes: 2 additions & 2 deletions chimerapy/engine/node/poller_service.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import threading
import logging
import datetime
from typing import Optional, Dict, Tuple, List

import zmq

from chimerapy.engine import clock
from chimerapy.engine import _logger
from ..states import NodeState
from ..networking import Subscriber, DataChunk
Expand Down Expand Up @@ -149,7 +149,7 @@ def poll_inputs(self):
serial_data_chunk = s.recv()
data_chunk = DataChunk.from_bytes(serial_data_chunk)
meta = data_chunk.get("meta")
meta["value"]["received"] = datetime.datetime.now()
meta["value"]["received"] = clock.now()
data_chunk.update("meta", meta)

# Update the latest value
Expand Down
4 changes: 2 additions & 2 deletions chimerapy/engine/node/processor_service.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import threading
import traceback
import datetime
import logging
import asyncio
from typing import Dict, List, Optional, Callable, Coroutine, Any, Literal

from chimerapy.engine import _logger
from chimerapy.engine import clock
from ..networking.client import Client
from ..networking.enums import NODE_MESSAGE
from ..networking import DataChunk
Expand Down Expand Up @@ -285,7 +285,7 @@ async def safe_step(self, data_chunks: Dict[str, DataChunk] = {}):

# Add timestamp and step id to the DataChunk
meta = output_data_chunk.get("meta")
meta["value"]["transmitted"] = datetime.datetime.now()
meta["value"]["transmitted"] = clock.now()
output_data_chunk.update("meta", meta)

# Send out the output to the OutputsHandler
Expand Down
4 changes: 2 additions & 2 deletions chimerapy/engine/node/profiler_service.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import os
import pickle
import logging
import datetime
from collections import deque
from typing import Dict, Optional, Any, List

import pandas as pd
from psutil import Process

from chimerapy.engine import config
from chimerapy.engine import clock
from ..data_protocols import NodeDiagnostics
from ..async_timer import AsyncTimer
from ..networking.data_chunk import DataChunk
Expand Down Expand Up @@ -79,7 +79,7 @@ async def diagnostics_report(self):
return None

# Get the timestamp
timestamp = datetime.datetime.now().isoformat()
timestamp = clock.now().isoformat()

# Get process-wide information
memory = self.process.memory_info()
Expand Down
13 changes: 13 additions & 0 deletions chimerapy/engine/node/worker_comms_service.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import pathlib
import logging
import tempfile
import datetime
from typing import Dict, Optional

from chimerapy.engine import config
from chimerapy.engine import clock
from ..networking import Client
from ..states import NodeState
from ..networking.enums import GENERAL_MESSAGE, WORKER_MESSAGE, NODE_MESSAGE
Expand All @@ -25,6 +28,7 @@ def __init__(
host: str,
port: int,
node_config: NodeConfig,
manager_worker_timedelta: Optional[datetime.timedelta] = None,
worker_logdir: Optional[pathlib.Path] = None,
worker_config: Optional[Dict] = None,
logging_level: int = logging.INFO,
Expand All @@ -42,6 +46,7 @@ def __init__(
self.worker_config = worker_config
self.logging_level = logging_level
self.node_config = node_config
self.manager_worker_timedelta = manager_worker_timedelta

# Optional
self.state = state
Expand Down Expand Up @@ -77,6 +82,14 @@ def in_node_config(
# Then add observers
self.add_observers()

# Update config if possible
if self.worker_config:
config.update_defaults(self.worker_config)

# Set clock if possible
if self.manager_worker_timedelta:
clock.update_reference_time(self.manager_worker_timedelta)

def add_observers(self):
assert self.state and self.eventbus and self.logger

Expand Down
6 changes: 6 additions & 0 deletions chimerapy/engine/worker/events.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pathlib
import datetime
from dataclasses import dataclass, field
from typing import Dict, Any
from enum import Enum
Expand All @@ -13,6 +14,11 @@ class BroadcastEvent:
data: Dict[str, Any] = field(default_factory=dict)


@dataclass
class ManagerWorkerTimeDeltaEvent:
manager_worker_timedelta: datetime.timedelta


@dataclass
class SendMessageEvent:
client_id: str
Expand Down
23 changes: 21 additions & 2 deletions chimerapy/engine/worker/http_client_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import asyncio
import pathlib
import logging
import datetime
from typing import Optional, Literal, Union, Tuple, Dict

import aiohttp
Expand All @@ -18,8 +19,8 @@
from ..networking import Client
from ..utils import get_ip_address
from ..service import Service
from ..eventbus import EventBus, TypedObserver
from .events import SendArchiveEvent
from ..eventbus import EventBus, TypedObserver, Event
from .events import SendArchiveEvent, ManagerWorkerTimeDeltaEvent
from .zeroconf_listener import ZeroconfListener


Expand All @@ -46,6 +47,7 @@ def __init__(
self.manager_host = "0.0.0.0"
self.manager_port = -1
self.manager_url = ""
self.manager_datetime_now: Optional[datetime.datetime] = None

# Specify observers
self.observers: Dict[str, TypedObserver] = {
Expand Down Expand Up @@ -169,9 +171,26 @@ async def _async_connect_via_ip(

if resp.ok:

# Track time
worker_datetime_now = datetime.datetime.utcnow()

# Get JSON
data = await resp.json()

# Compute the time difference
manager_datetime_now = datetime.datetime.fromisoformat(
data.get("manager_datetime_now", None)
)
manager_worker_timedelta = (
worker_datetime_now - manager_datetime_now
)

event_data = ManagerWorkerTimeDeltaEvent(manager_worker_timedelta)
await self.eventbus.asend(
Event("manager_worker_timedelta", event_data)
)

# Update data
config.update_defaults(data.get("config", {}))
logs_push_info = data.get("logs_push_info", {})

Expand Down
Loading

0 comments on commit e424a0f

Please sign in to comment.