diff --git a/src/saturn_engine/models/topology_patches.py b/src/saturn_engine/models/topology_patches.py new file mode 100644 index 00000000..58f2e357 --- /dev/null +++ b/src/saturn_engine/models/topology_patches.py @@ -0,0 +1,43 @@ +import typing as t + +import sqlalchemy as sa +from sqlalchemy.orm import Mapped + +from saturn_engine.models.compat import mapped_column +from saturn_engine.utils.declarative_config import BaseObject +from saturn_engine.utils.options import asdict + +from .base import Base + + +class TopologyPatch(Base): + __tablename__ = "topology_patches" + + kind: Mapped[str] = mapped_column(sa.Text, primary_key=True) + name: Mapped[str] = mapped_column(sa.Text, primary_key=True) + + data: Mapped[dict[str, t.Any]] = mapped_column(sa.JSON, nullable=False) + + def __init__(self, kind: str, name: str, data: dict[str, t.Any]) -> None: + self.kind = kind + self.name = name + self.data = data + + @classmethod + def from_topology(cls, topology: BaseObject) -> "TopologyPatch": + return cls( + kind=topology.kind, name=topology.metadata.name, data=asdict(topology) + ) + + def as_base_object(self) -> BaseObject: + return BaseObject(**self.data) + + class InsertValues(t.TypedDict): + kind: str + name: str + data: dict[str, t.Any] + + def get_insert_values(self) -> "TopologyPatch.InsertValues": + return TopologyPatch.InsertValues( + kind=self.kind, name=self.name, data=self.data + ) diff --git a/src/saturn_engine/stores/topologies_store.py b/src/saturn_engine/stores/topologies_store.py new file mode 100644 index 00000000..7888fc48 --- /dev/null +++ b/src/saturn_engine/stores/topologies_store.py @@ -0,0 +1,25 @@ +from saturn_engine.models.topology_patches import TopologyPatch +from saturn_engine.utils.declarative_config import BaseObject +from saturn_engine.utils.sqlalchemy import AnySession +from saturn_engine.utils.sqlalchemy import upsert + + +def patch(*, session: AnySession, patch: BaseObject) -> TopologyPatch: + topology_patch = TopologyPatch.from_topology(topology=patch) + + stmt = ( + upsert(session)(TopologyPatch) + .values(topology_patch.get_insert_values()) + .execution_options(populate_existing=True) + .on_conflict_do_update( + index_elements=[ + TopologyPatch.kind, + TopologyPatch.name, + ], + set_={ + TopologyPatch.data: topology_patch.data, # type: ignore + }, + ) + ) + session.execute(stmt) # type: ignore + return topology_patch diff --git a/src/saturn_engine/utils/declarative_config.py b/src/saturn_engine/utils/declarative_config.py index 2577d028..dbf052dc 100644 --- a/src/saturn_engine/utils/declarative_config.py +++ b/src/saturn_engine/utils/declarative_config.py @@ -1,3 +1,5 @@ +import typing as t + import os from dataclasses import field @@ -18,6 +20,7 @@ class BaseObject: metadata: ObjectMetadata apiVersion: str kind: str + spec: dict[str, t.Any] @dataclasses.dataclass diff --git a/src/saturn_engine/worker_manager/api/topologies.py b/src/saturn_engine/worker_manager/api/topologies.py new file mode 100644 index 00000000..35206131 --- /dev/null +++ b/src/saturn_engine/worker_manager/api/topologies.py @@ -0,0 +1,18 @@ +from flask import Blueprint + +from saturn_engine.database import session_scope +from saturn_engine.stores import topologies_store +from saturn_engine.utils.declarative_config import BaseObject +from saturn_engine.utils.flask import Json +from saturn_engine.utils.flask import jsonify +from saturn_engine.utils.flask import marshall_request + +bp = Blueprint("topologies", __name__, url_prefix="/api/topologies") + + +@bp.route("/patch", methods=("PUT",)) +def put_patch() -> Json[BaseObject]: + patch = marshall_request(BaseObject) + with session_scope() as session: + saved_patch = topologies_store.patch(session=session, patch=patch) + return jsonify(saved_patch.as_base_object()) diff --git a/src/saturn_engine/worker_manager/server.py b/src/saturn_engine/worker_manager/server.py index 8e574883..135d0bf1 100644 --- a/src/saturn_engine/worker_manager/server.py +++ b/src/saturn_engine/worker_manager/server.py @@ -32,6 +32,7 @@ def get_app( from .api.lock import bp as bp_lock from .api.status import bp as bp_status from .api.topics import bp as bp_topics + from .api.topologies import bp as bp_topologies app.register_blueprint(bp_status) app.register_blueprint(bp_jobs) @@ -39,6 +40,7 @@ def get_app( app.register_blueprint(bp_topics) app.register_blueprint(bp_lock) app.register_blueprint(bp_inventories) + app.register_blueprint(bp_topologies) @app.teardown_appcontext # type: ignore def shutdown_session(response_or_exc: Optional[BaseException]) -> None: diff --git a/tests/worker_manager/api/test_topologies.py b/tests/worker_manager/api/test_topologies.py new file mode 100644 index 00000000..6175871c --- /dev/null +++ b/tests/worker_manager/api/test_topologies.py @@ -0,0 +1,38 @@ +from flask.testing import FlaskClient + + +def test_put_topology_patch(client: FlaskClient) -> None: + resp = client.put( + "/api/topologies/patch", + json={ + "apiVersion": "saturn.flared.io/v1alpha1", + "kind": "SaturnTopic", + "metadata": {"name": "test-topic"}, + "spec": {"type": "RabbitMQTopic", "options": {"queue_name": "queue_1"}}, + }, + ) + assert resp.status_code == 200 + assert resp.json == { + "apiVersion": "saturn.flared.io/v1alpha1", + "kind": "SaturnTopic", + "metadata": {"name": "test-topic", "labels": {}}, + "spec": {"type": "RabbitMQTopic", "options": {"queue_name": "queue_1"}}, + } + + # We add a new patch, overriding the last one + resp = client.put( + "/api/topologies/patch", + json={ + "apiVersion": "saturn.flared.io/v1alpha1", + "kind": "SaturnTopic", + "metadata": {"name": "test-topic"}, + "spec": {"type": "RabbitMQTopic", "options": {"queue_name": "queue_2"}}, + }, + ) + assert resp.status_code == 200 + assert resp.json == { + "apiVersion": "saturn.flared.io/v1alpha1", + "kind": "SaturnTopic", + "metadata": {"name": "test-topic", "labels": {}}, + "spec": {"type": "RabbitMQTopic", "options": {"queue_name": "queue_2"}}, + }