diff --git a/src/saturn_engine/models/topology_patches.py b/src/saturn_engine/models/topology_patches.py new file mode 100644 index 00000000..e5a934b3 --- /dev/null +++ b/src/saturn_engine/models/topology_patches.py @@ -0,0 +1,33 @@ +import typing as t + +import sqlalchemy as sa +from sqlalchemy.orm import Mapped + +from saturn_engine.models.compat import mapped_column +from saturn_engine.utils.declarative_config import BaseObject +from saturn_engine.utils.options import asdict + +from .base import Base + + +class TopologyPatch(Base): + __tablename__ = "topology_patches" + + name: Mapped[str] = mapped_column(sa.Text, primary_key=True) + data: Mapped[dict[str, t.Any]] = mapped_column( + sa.JSON, server_default="{}", nullable=False + ) + + def __init__(self, topology: BaseObject) -> None: + self.name = topology.metadata.name + self.data = asdict(topology) + + def as_base_object(self) -> BaseObject: + return BaseObject(**self.data) + + class InsertValues(t.TypedDict): + name: str + data: dict[str, t.Any] + + def get_insert_values(self) -> "TopologyPatch.InsertValues": + return TopologyPatch.InsertValues(name=self.name, data=self.data) diff --git a/src/saturn_engine/stores/topologies_store.py b/src/saturn_engine/stores/topologies_store.py new file mode 100644 index 00000000..631d338b --- /dev/null +++ b/src/saturn_engine/stores/topologies_store.py @@ -0,0 +1,22 @@ +from saturn_engine.models.topology_patches import TopologyPatch +from saturn_engine.utils.declarative_config import BaseObject +from saturn_engine.utils.sqlalchemy import AnySession +from saturn_engine.utils.sqlalchemy import upsert + + +def add(*, session: AnySession, topology: BaseObject) -> TopologyPatch: + topology_patch = TopologyPatch(topology=topology) + + stmt = ( + upsert(session)(TopologyPatch) + .values(topology_patch.get_insert_values()) + .execution_options(populate_existing=True) + .on_conflict_do_update( + index_elements=[ + TopologyPatch.name, + ], + set_={TopologyPatch.data: topology_patch.data}, # type: ignore + ) + ) + session.execute(statement=stmt) # type: ignore + return topology_patch diff --git a/src/saturn_engine/utils/declarative_config.py b/src/saturn_engine/utils/declarative_config.py index 2577d028..dbf052dc 100644 --- a/src/saturn_engine/utils/declarative_config.py +++ b/src/saturn_engine/utils/declarative_config.py @@ -1,3 +1,5 @@ +import typing as t + import os from dataclasses import field @@ -18,6 +20,7 @@ class BaseObject: metadata: ObjectMetadata apiVersion: str kind: str + spec: dict[str, t.Any] @dataclasses.dataclass diff --git a/src/saturn_engine/worker_manager/api/topologies.py b/src/saturn_engine/worker_manager/api/topologies.py new file mode 100644 index 00000000..4bdc96a4 --- /dev/null +++ b/src/saturn_engine/worker_manager/api/topologies.py @@ -0,0 +1,18 @@ +from flask import Blueprint + +from saturn_engine.database import session_scope +from saturn_engine.stores import topologies_store +from saturn_engine.utils.declarative_config import BaseObject +from saturn_engine.utils.flask import Json +from saturn_engine.utils.flask import jsonify +from saturn_engine.utils.flask import marshall_request + +bp = Blueprint("topologies", __name__, url_prefix="/api/topologies") + + +@bp.route("", methods=("PUT",)) +def patch_topology() -> Json[BaseObject]: + patch = marshall_request(BaseObject) + with session_scope() as session: + saved_patch = topologies_store.add(session=session, topology=patch) + return jsonify(saved_patch.as_base_object()) diff --git a/src/saturn_engine/worker_manager/server.py b/src/saturn_engine/worker_manager/server.py index f9b9e5d0..a512dd99 100644 --- a/src/saturn_engine/worker_manager/server.py +++ b/src/saturn_engine/worker_manager/server.py @@ -32,6 +32,7 @@ def get_app( from .api.lock import bp as bp_lock from .api.status import bp as bp_status from .api.topics import bp as bp_topics + from .api.topologies import bp as bp_topologies app.register_blueprint(bp_status) app.register_blueprint(bp_jobs) @@ -39,6 +40,7 @@ def get_app( app.register_blueprint(bp_topics) app.register_blueprint(bp_lock) app.register_blueprint(bp_inventories) + app.register_blueprint(bp_topologies) @app.teardown_appcontext # type: ignore def shutdown_session(response_or_exc: Optional[BaseException]) -> None: diff --git a/tests/worker_manager/api/test_topologies.py b/tests/worker_manager/api/test_topologies.py new file mode 100644 index 00000000..8c058c5f --- /dev/null +++ b/tests/worker_manager/api/test_topologies.py @@ -0,0 +1,38 @@ +from flask.testing import FlaskClient + + +def test_put_topology_patch(client: FlaskClient) -> None: + resp = client.put( + f"/api/topologies", + json={ + "apiVersion": "saturn.flared.io/v1alpha1", + "kind": "SaturnTopic", + "metadata": {"name": "test-topic"}, + "spec": {"type": "RabbitMQTopic", "options": {"queue_name": "queue_1"}}, + }, + ) + assert resp.status_code == 200 + assert resp.json == { + "apiVersion": "saturn.flared.io/v1alpha1", + "kind": "SaturnTopic", + "metadata": {"name": "test-topic", "labels": {}}, + "spec": {"type": "RabbitMQTopic", "options": {"queue_name": "queue_1"}}, + } + + # We add a new patch, overriding the last one + resp = client.put( + f"/api/topologies", + json={ + "apiVersion": "saturn.flared.io/v1alpha1", + "kind": "SaturnTopic", + "metadata": {"name": "test-topic"}, + "spec": {"type": "RabbitMQTopic", "options": {"queue_name": "queue_2"}}, + }, + ) + assert resp.status_code == 200 + assert resp.json == { + "apiVersion": "saturn.flared.io/v1alpha1", + "kind": "SaturnTopic", + "metadata": {"name": "test-topic", "labels": {}}, + "spec": {"type": "RabbitMQTopic", "options": {"queue_name": "queue_2"}}, + }