Skip to content

Commit

Permalink
fix: WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
huyenngn committed Jan 30, 2025
1 parent f0fdc74 commit 57e0c2f
Show file tree
Hide file tree
Showing 10 changed files with 282 additions and 706 deletions.
29 changes: 0 additions & 29 deletions src/capellambse_context_diagrams/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ def init() -> None:
register_realization_view()
register_data_flow_view()
register_cable_tree_view()
register_custom_diagram()
# register_functional_context() XXX: Future


Expand Down Expand Up @@ -334,31 +333,3 @@ def register_cable_tree_view() -> None:
{},
),
)


def register_custom_diagram() -> None:
"""Add the `custom_diagram` attribute to `ModelObject`s."""
supported_classes: list[tuple[type[m.ModelElement], DiagramType]] = [
(oa.Entity, DiagramType.OAB),
(oa.OperationalActivity, DiagramType.OAB),
(oa.OperationalCapability, DiagramType.OCB),
(oa.CommunicationMean, DiagramType.OAB),
(sa.Mission, DiagramType.MCB),
(sa.Capability, DiagramType.MCB),
(sa.SystemComponent, DiagramType.SAB),
(sa.SystemFunction, DiagramType.SAB),
(la.LogicalComponent, DiagramType.LAB),
(la.LogicalFunction, DiagramType.LAB),
(pa.PhysicalComponent, DiagramType.PAB),
(pa.PhysicalFunction, DiagramType.PAB),
(cs.PhysicalLink, DiagramType.PAB),
(cs.PhysicalPort, DiagramType.PAB),
(fa.ComponentExchange, DiagramType.SAB),
(information.Class, DiagramType.CDB),
]
for class_, dgcls in supported_classes:
m.set_accessor(
class_,
"custom_diagram",
context.CustomAccessor(dgcls.value, {}),
)
6 changes: 3 additions & 3 deletions src/capellambse_context_diagrams/collectors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import typing as t

from .. import _elkjs, context
from . import default, generic, portless
from . import custom, generic, portless

__all__ = ["get_elkdata"]
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -41,14 +41,14 @@ def get_elkdata(
"""
try:
if generic.DIAGRAM_TYPE_TO_CONNECTOR_NAMES[diagram.type]:
collector = default.collector
collector = custom.collector
else:
collector = portless.collector
except KeyError:
logger.error(
"Handling unknown diagram type %r. Default collector is used.",
diagram.type,
)
collector = default.collector
collector = custom.collector

return collector(diagram, params)
155 changes: 115 additions & 40 deletions src/capellambse_context_diagrams/collectors/custom.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,35 @@
# SPDX-FileCopyrightText: Copyright DB InfraGO AG and the capellambse-context-diagrams contributors
# SPDX-License-Identifier: Apache-2.0
"""Collector for the CustomDiagram."""

"""Build an ELK diagram from collected capellambse context.
This submodule provides a collector that transforms capellambse data to an ELK-
layouted diagram
[_elkjs.ELKInputData][capellambse_context_diagrams._elkjs.ELKInputData].
The data was collected with the functions from
[collectors][capellambse_context_diagrams.collectors].
"""

from __future__ import annotations

import collections.abc as cabc
import copy
import enum
import typing as t

import capellambse.model as m
from capellambse.metamodel import cs, fa, la, sa

from .. import _elkjs, context
from . import generic, makers
from . import default, generic, makers

if t.TYPE_CHECKING:
from .. import context

DerivatorFunction: t.TypeAlias = cabc.Callable[
[
context.CustomDiagram,
context.ContextDiagram,
dict[str, _elkjs.ELKInputChild],
dict[str, _elkjs.ELKInputEdge],
],
Expand All @@ -32,6 +42,30 @@
]


class EDGE_DIRECTION(enum.Enum):
"""Reroute direction of edges.
Attributes
----------
NONE
No rerouting of edges.
SMART
Reroute edges to follow the primary direction of data flow.
LEFT
Edges are always placed on the left side.
RIGHT
Edges are always placed on the right side.
TREE
Reroute edges to follow a tree-like structure.
"""

NONE = enum.auto()
SMART = enum.auto()
LEFT = enum.auto()
RIGHT = enum.auto()
TREE = enum.auto()


def _is_edge(obj: m.ModelElement) -> bool:
return hasattr(obj, "source") and hasattr(obj, "target")

Expand All @@ -40,12 +74,19 @@ def _is_port(obj: m.ModelElement) -> bool:
return obj.xtype.endswith("Port")


def get_uncommon_owner(
owners: list[str],
opposite_owners: list[str],
) -> str:
return [owner for owner in owners if owner not in opposite_owners][-1]


class CustomCollector:
"""Collect the context for a custom diagram."""

def __init__(
self,
diagram: context.CustomDiagram,
diagram: context.ContextDiagram,
params: dict[str, t.Any],
) -> None:
self.diagram = diagram
Expand All @@ -72,28 +113,42 @@ def __init__(
self.edge_owners: dict[str, str] = {}
self.common_owners: set[str] = set()

if self.diagram._display_parent_relation or self.diagram._blackbox:
if (
self.diagram._display_parent_relation
or self.diagram._mode == default.MODE.GRAYBOX.name
):
self.diagram_target_owners = list(
generic.get_all_owners(self.boxable_target)
)

if self.diagram._unify_edge_direction != "NONE":
if self.diagram._edge_direction != EDGE_DIRECTION.NONE.name:
self.directions: dict[str, bool] = {}

if self.diagram._unify_edge_direction == "UNIFORM":
if self.diagram._edge_direction == EDGE_DIRECTION.RIGHT.name:
self.data.layoutOptions["layered.nodePlacement.strategy"] = (
"NETWORK_SIMPLEX"
)
self.directions[self.boxable_target.uuid] = False
elif self.diagram._edge_direction == EDGE_DIRECTION.LEFT.name:
self.data.layoutOptions["layered.nodePlacement.strategy"] = (
"NETWORK_SIMPLEX"
)
self.directions[self.boxable_target.uuid] = True

self.min_heights: dict[str, dict[str, float]] = {}

def __call__(self) -> _elkjs.ELKInputData:
if _is_port(self.target):
port = self._make_port_and_owner(self.target, "right")
self._update_min_heights(self.boxable_target.uuid, "left", port)
else:
self._make_target(self.target)

if target_edge := self.edges.get(self.target.uuid):
target_edge.layoutOptions = copy.deepcopy(
elif not _is_edge(self.target):
self._make_box(
self.target, slim_width=self.diagram._slim_center_box
)
elif self.diagram._include_interface or self.diagram._hide_functions:
edge = self._make_edge_and_ports(self.target)
assert edge is not None
edge.layoutOptions = copy.deepcopy(
_elkjs.EDGE_STRAIGHTENING_LAYOUT_OPTIONS
)

Expand Down Expand Up @@ -124,17 +179,27 @@ def __call__(self) -> _elkjs.ELKInputData:
derivator(self.diagram, self.boxes, self.edges)

self._fix_box_heights()

for uuid in self.boxes_to_delete:
del self.boxes[uuid]

return self._get_data()

def _get_data(self) -> t.Any:
self.data.children = list(self.boxes.values())
self.data.edges = list(self.edges.values())
if (
self.diagram._hide_context_owner
and len(self.boxes.values()) == 1
and next(iter(self.boxes.values())) != self.boxable_target
):
self.data.children = next(iter(self.boxes.values())).children
self.data.edges = next(iter(self.boxes.values())).edges
else:
self.data.children = list(self.boxes.values())
self.data.edges = list(self.edges.values())
return self.data

def _flip_edges(self) -> None:
if self.diagram._unify_edge_direction == "NONE":
if self.diagram._edge_direction == EDGE_DIRECTION.NONE.name:
return

def flip(edge_uuid: str) -> None:
Expand All @@ -144,16 +209,20 @@ def flip(edge_uuid: str) -> None:
edge.sources[-1],
)

def flip_small_side(edges: dict[bool, set[str]]) -> None:
side = len(edges[True]) < len(edges[False])
def flip_side(edges: dict[bool, set[str]], side: bool) -> None:
for edge_uuid in edges[side]:
flip(edge_uuid)

for edges in self.edges_to_flip.values():
flip_small_side(edges)
if self.diagram._edge_direction == EDGE_DIRECTION.SMART.name:
for edges in self.edges_to_flip.values():
side = len(edges[True]) < len(edges[False])
flip_side(edges, side)
else:
for edges in self.edges_to_flip.values():
flip_side(edges, True)

def _fix_box_heights(self) -> None:
if self.diagram._unify_edge_direction != "NONE":
if self.diagram._edge_direction != EDGE_DIRECTION.NONE.name:
for uuid, min_heights in self.min_heights.items():
box = self.boxes[uuid]
box.height = max(box.height, sum(min_heights.values()))
Expand Down Expand Up @@ -219,10 +288,12 @@ def _make_edge_and_ports(
tgt_owner = tgt_obj.owner
src_owners = list(generic.get_all_owners(src_obj))
tgt_owners = list(generic.get_all_owners(tgt_obj))
is_src = self.boxable_target.uuid in src_owners
is_tgt = self.boxable_target.uuid in tgt_owners
src_uncommon_owner = get_uncommon_owner(src_owners, tgt_owners)
tgt_uncommon_owner = get_uncommon_owner(tgt_owners, src_owners)

if self.diagram._blackbox:
if self.diagram._mode == default.MODE.GRAYBOX.name:
is_src = self.boxable_target.uuid in src_owners
is_tgt = self.boxable_target.uuid in tgt_owners
if is_src and is_tgt:
return None
if is_src and src_owner.uuid != self.boxable_target.uuid:
Expand All @@ -248,15 +319,17 @@ def _make_edge_and_ports(
self.edge_owners[edge_obj.uuid] = common_owner

flip_needed, unc = self._need_flip(
src_owners, tgt_owners, src_owner.uuid, tgt_owner.uuid
src_uncommon_owner,
tgt_uncommon_owner,
src_owner.uuid,
tgt_owner.uuid,
)
self.edges_to_flip.setdefault(unc, {True: set(), False: set()})[
flip_needed
].add(edge_obj.uuid)
if flip_needed:
src_obj, tgt_obj = tgt_obj, src_obj
src_owner, tgt_owner = tgt_owner, src_owner
is_src, is_tgt = is_tgt, is_src

if not self.ports.get(src_obj.uuid):
self._make_port_and_owner(src_obj, "right", src_owner)
Expand All @@ -278,22 +351,18 @@ def _update_min_heights(

def _need_flip(
self,
src_owners: list[str],
tgt_owners: list[str],
src_uncommon_owner: str,
tgt_uncommon_owner: str,
src_uuid: str,
tgt_uuid: str,
) -> tuple[bool, str]:
def _get_direction(
uuid: str,
owners: list[str],
opposite_owners: list[str],
uncommon_owner: str,
default: bool,
) -> tuple[bool | None, str]:
if uuid == self.boxable_target.uuid:
return None, ""
uncommon_owner = [
owner for owner in owners if owner not in opposite_owners
][-1]
return (
self.directions.setdefault(uncommon_owner, default),
uncommon_owner,
Expand All @@ -314,23 +383,29 @@ def _initialize_directions(

return src_dir, tgt_dir

edge_direction: str = self.diagram._unify_edge_direction
if edge_direction == "SMART":
edge_direction = self.diagram._edge_direction
if edge_direction == EDGE_DIRECTION.SMART.name:
src_dir, src_unc = _get_direction(
src_uuid, src_owners, tgt_owners, False
src_uuid, src_uncommon_owner, False
)
tgt_dir, tgt_unc = _get_direction(
tgt_uuid, tgt_owners, src_owners, True
tgt_uuid, tgt_uncommon_owner, True
)
return src_dir is True or tgt_dir is False, (src_unc or tgt_unc)

if edge_direction == "UNIFORM":
if edge_direction == EDGE_DIRECTION.RIGHT.name:
src_dir, _ = _initialize_directions(
src_uuid, tgt_uuid, False, True
)
return self.directions[src_uuid], self.boxable_target.uuid

if edge_direction == "TREE":
if edge_direction == EDGE_DIRECTION.LEFT.name:
_, tgt_dir = _initialize_directions(
src_uuid, tgt_uuid, True, False
)
return self.directions[src_uuid], self.boxable_target.uuid

if edge_direction == EDGE_DIRECTION.TREE.name:
src_dir, tgt_dir = _initialize_directions(
src_uuid, tgt_uuid, True, True
)
Expand Down Expand Up @@ -367,14 +442,14 @@ def _make_port_and_owner(


def collector(
diagram: context.CustomDiagram, params: dict[str, t.Any]
diagram: context.ContextDiagram, params: dict[str, t.Any]
) -> _elkjs.ELKInputData:
"""Collect data for rendering a custom diagram."""
return CustomCollector(diagram, params)()


def derive_from_functions(
diagram: context.CustomDiagram,
diagram: context.ContextDiagram,
boxes: dict[str, _elkjs.ELKInputChild],
edges: dict[str, _elkjs.ELKInputEdge],
) -> None:
Expand Down
Loading

0 comments on commit 57e0c2f

Please sign in to comment.