Skip to content

Commit

Permalink
API to add Topology patches in worker manager
Browse files Browse the repository at this point in the history
  • Loading branch information
Olivier Michaud committed Oct 16, 2024
1 parent cdf69ea commit 0240ded
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 0 deletions.
33 changes: 33 additions & 0 deletions src/saturn_engine/models/topology_patches.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import typing as t

import sqlalchemy as sa
from sqlalchemy.orm import Mapped

from saturn_engine.models.compat import mapped_column
from saturn_engine.utils.declarative_config import BaseObject
from saturn_engine.utils.options import asdict

from .base import Base


class TopologyPatch(Base):
__tablename__ = "topology_patches"

name: Mapped[str] = mapped_column(sa.Text, primary_key=True)
data: Mapped[dict[str, t.Any]] = mapped_column(
sa.JSON, server_default="{}", nullable=False
)

def __init__(self, topology: BaseObject) -> None:
self.name = topology.metadata.name
self.data = asdict(topology)

def as_base_object(self) -> BaseObject:
return BaseObject(**self.data)

class InsertValues(t.TypedDict):
name: str
data: dict[str, t.Any]

def get_insert_values(self) -> "TopologyPatch.InsertValues":
return TopologyPatch.InsertValues(name=self.name, data=self.data)
22 changes: 22 additions & 0 deletions src/saturn_engine/stores/topologies_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from saturn_engine.models.topology_patches import TopologyPatch
from saturn_engine.utils.declarative_config import BaseObject
from saturn_engine.utils.sqlalchemy import AnySession
from saturn_engine.utils.sqlalchemy import upsert


def add(*, session: AnySession, topology: BaseObject) -> TopologyPatch:
topology_patch = TopologyPatch(topology=topology)

stmt = (
upsert(session)(TopologyPatch)
.values(topology_patch.get_insert_values())
.execution_options(populate_existing=True)
.on_conflict_do_update(
index_elements=[
TopologyPatch.name,
],
set_={TopologyPatch.data: topology_patch.data}, # type: ignore
)
)
session.execute(statement=stmt) # type: ignore
return topology_patch
3 changes: 3 additions & 0 deletions src/saturn_engine/utils/declarative_config.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import typing as t

import os
from dataclasses import field

Expand All @@ -18,6 +20,7 @@ class BaseObject:
metadata: ObjectMetadata
apiVersion: str
kind: str
spec: dict[str, t.Any]


@dataclasses.dataclass
Expand Down
18 changes: 18 additions & 0 deletions src/saturn_engine/worker_manager/api/topologies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from flask import Blueprint

from saturn_engine.database import session_scope
from saturn_engine.stores import topologies_store
from saturn_engine.utils.declarative_config import BaseObject
from saturn_engine.utils.flask import Json
from saturn_engine.utils.flask import jsonify
from saturn_engine.utils.flask import marshall_request

bp = Blueprint("topologies", __name__, url_prefix="/api/topologies")


@bp.route("", methods=("PUT",))
def patch_topology() -> Json[BaseObject]:
patch = marshall_request(BaseObject)
with session_scope() as session:
saved_patch = topologies_store.add(session=session, topology=patch)
return jsonify(saved_patch.as_base_object())
2 changes: 2 additions & 0 deletions src/saturn_engine/worker_manager/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@ def get_app(
from .api.lock import bp as bp_lock
from .api.status import bp as bp_status
from .api.topics import bp as bp_topics
from .api.topologies import bp as bp_topologies

app.register_blueprint(bp_status)
app.register_blueprint(bp_jobs)
app.register_blueprint(bp_job_definitions)
app.register_blueprint(bp_topics)
app.register_blueprint(bp_lock)
app.register_blueprint(bp_inventories)
app.register_blueprint(bp_topologies)

@app.teardown_appcontext # type: ignore
def shutdown_session(response_or_exc: Optional[BaseException]) -> None:
Expand Down
38 changes: 38 additions & 0 deletions tests/worker_manager/api/test_topologies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from flask.testing import FlaskClient


def test_put_topology_patch(client: FlaskClient) -> None:
resp = client.put(
f"/api/topologies",
json={
"apiVersion": "saturn.flared.io/v1alpha1",
"kind": "SaturnTopic",
"metadata": {"name": "test-topic"},
"spec": {"type": "RabbitMQTopic", "options": {"queue_name": "queue_1"}},
},
)
assert resp.status_code == 200
assert resp.json == {
"apiVersion": "saturn.flared.io/v1alpha1",
"kind": "SaturnTopic",
"metadata": {"name": "test-topic", "labels": {}},
"spec": {"type": "RabbitMQTopic", "options": {"queue_name": "queue_1"}},
}

# We add a new patch, overriding the last one
resp = client.put(
f"/api/topologies",
json={
"apiVersion": "saturn.flared.io/v1alpha1",
"kind": "SaturnTopic",
"metadata": {"name": "test-topic"},
"spec": {"type": "RabbitMQTopic", "options": {"queue_name": "queue_2"}},
},
)
assert resp.status_code == 200
assert resp.json == {
"apiVersion": "saturn.flared.io/v1alpha1",
"kind": "SaturnTopic",
"metadata": {"name": "test-topic", "labels": {}},
"spec": {"type": "RabbitMQTopic", "options": {"queue_name": "queue_2"}},
}

0 comments on commit 0240ded

Please sign in to comment.