From d4a7b7542c59df7941350b72935c3aad9bfc94e9 Mon Sep 17 00:00:00 2001 From: Andreas Gebhardt Date: Fri, 1 Sep 2023 20:32:12 +0200 Subject: [PATCH] [WIP] --- graph/planner/planner.py | 92 ++++++++++++++++++++++++++++++++++ graph/planner/planner_test.py | 0 graph/planner/requirements.txt | 0 3 files changed, 92 insertions(+) create mode 100644 graph/planner/planner.py create mode 100644 graph/planner/planner_test.py create mode 100644 graph/planner/requirements.txt diff --git a/graph/planner/planner.py b/graph/planner/planner.py new file mode 100644 index 0000000..8643db8 --- /dev/null +++ b/graph/planner/planner.py @@ -0,0 +1,92 @@ +from dataclasses import dataclass, field +from enum import Enum + + +class EdgeType(Enum): + DEPENDENCY = 0 + TOOL = 1 + + +@dataclass +class Edge: + u: str + v: str + type: EdgeType = field(default=EdgeType.DEPENDENCY) + + +class CycleException(Exception): + pass + + +class DirectAcyclicGraph: + _g: dict[str, list[(str, EdgeType)]] + _topological_order: list[str] + + def __init__(self, edges: list[Edge]): + self._g = {} + for e in edges: # e = (u, v) + if self._g.get(e.u) is None: + self._g[e.u] = [(e.v, e.type)] + else: + if (e.v, e.type) not in self._g[e.u]: + self._g[e.u].append((e.v, e.type)) + if self._g.get(e.v) is None: + self._g[e.v] = [] + + self._topsort() + + def _topsort(self): + edges = {u: iter([v for (v, _) in value]) for (u, value) in self._g.items()} + result = [] + + while len(edges): + # print(f"edges: {edges}") + dfs_stack = [list(edges.keys())[0]] + while dfs_stack: + # print("-" * 25) + # print(f"stack: {stack}") + # print(f"edges: {edges}") + u = dfs_stack.pop() + # print(f"node: {u}") + if u not in result: + # print(f"node: {u} not visited") + adjacent = edges[u] + try: + v = next(adjacent) + if v in dfs_stack: # cycle + raise CycleException(f"{dfs_stack} + {u} -> {v}") + dfs_stack.append(u) # recursive return + if v not in result: + dfs_stack.append(v) # recursive call + except StopIteration: + # print(f"finished {u}") + del edges[u] + result.append(u) + + result.reverse() + self._topological_order = result + + def print(self): + print(self._g) + print(f"topological order => {self._topological_order}") + + +g = DirectAcyclicGraph( + # [Edge("a", "b"), Edge("b", "c"), Edge("c", "d"), Edge("d", "e"), Edge("d", "e")] + # [Edge("a", "b"), Edge("b", "c"), Edge("c", "d"), Edge("d", "a")] + [ + Edge("a", "b"), + Edge("b", "c"), + Edge("b", "d"), + Edge("b", "e"), + Edge("c", "c1"), + Edge("c", "c2"), + Edge("d", "d1"), + Edge("d", "d2"), + Edge("c1", "c1d1"), + Edge("d1", "c1d1"), + Edge("c2", "c2d2"), + Edge("d2", "c2d2"), + ] +) +g.print() diff --git a/graph/planner/planner_test.py b/graph/planner/planner_test.py new file mode 100644 index 0000000..e69de29 diff --git a/graph/planner/requirements.txt b/graph/planner/requirements.txt new file mode 100644 index 0000000..e69de29