From 1954be442d5b1e5f4a237a0aa0cf0b9eb047df30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Moura?= Date: Sat, 13 Jan 2024 23:25:35 -0300 Subject: [PATCH] creating Logger and RPMController --- src/crewai/utilities/logger.py | 11 +++++ src/crewai/utilities/rpm_controller.py | 57 ++++++++++++++++++++++++++ 2 files changed, 68 insertions(+) create mode 100644 src/crewai/utilities/logger.py create mode 100644 src/crewai/utilities/rpm_controller.py diff --git a/src/crewai/utilities/logger.py b/src/crewai/utilities/logger.py new file mode 100644 index 0000000000..533dd7a759 --- /dev/null +++ b/src/crewai/utilities/logger.py @@ -0,0 +1,11 @@ +class Logger: + def __init__(self, verbose_level=0): + verbose_level = ( + 2 if isinstance(verbose_level, bool) and verbose_level else verbose_level + ) + self.verbose_level = verbose_level + + def log(self, level, message): + level_map = {"debug": 1, "info": 2} + if self.verbose_level and level_map.get(level, 0) <= self.verbose_level: + print(f"\n[{level.upper()}]: {message}") diff --git a/src/crewai/utilities/rpm_controller.py b/src/crewai/utilities/rpm_controller.py new file mode 100644 index 0000000000..1bf5f27453 --- /dev/null +++ b/src/crewai/utilities/rpm_controller.py @@ -0,0 +1,57 @@ +import threading +import time +from typing import Union + +from pydantic import BaseModel, ConfigDict, Field, PrivateAttr, model_validator + +from crewai.utilities.logger import Logger + + +class RPMController(BaseModel): + model_config = ConfigDict(arbitrary_types_allowed=True) + max_rpm: Union[int, None] = Field(default=None) + logger: Logger = Field(default=None) + _current_rpm: int = PrivateAttr(default=0) + _timer: threading.Timer = PrivateAttr(default=None) + _lock: threading.Lock = PrivateAttr(default=None) + + @model_validator(mode="after") + def reset_counter(self): + if self.max_rpm: + self._lock = threading.Lock() + self._reset_request_count() + return self + + def check_or_wait(self): + if not self.max_rpm: + return True + + with self._lock: + if self._current_rpm < self.max_rpm: + self._current_rpm += 1 + return True + else: + self.logger.log( + "info", "Max RPM reached, waiting for next minute to start." + ) + self._wait_for_next_minute() + self._current_rpm = 1 + return True + + def stop_rpm_counter(self): + if self._timer: + self._timer.cancel() + self._timer = None + + def _wait_for_next_minute(self): + time.sleep(60) + with self._lock: + self._current_rpm = 0 + + def _reset_request_count(self): + with self._lock: + self._current_rpm = 0 + if self._timer: + self._timer.cancel() + self._timer = threading.Timer(60.0, self._reset_request_count) + self._timer.start()