From f50b85c67109c83c312482b095a41ce080a906d8 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Wed, 18 Sep 2024 18:21:42 +0200 Subject: [PATCH] Issue #150 add some debug logging --- .../partitionedjobs/crossbackend.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/openeo_aggregator/partitionedjobs/crossbackend.py b/src/openeo_aggregator/partitionedjobs/crossbackend.py index 4689994..9136ba1 100644 --- a/src/openeo_aggregator/partitionedjobs/crossbackend.py +++ b/src/openeo_aggregator/partitionedjobs/crossbackend.py @@ -502,8 +502,6 @@ class _GraphViewer: """ - # TODO: add more logging of what is happening under the hood - def __init__(self, node_map: dict[NodeId, _GVNode]): self._check_consistency(node_map=node_map) # Work with a read-only proxy to prevent accidental changes @@ -532,6 +530,7 @@ def from_flat_graph(cls, flat_graph: FlatPG, supporting_backends: SupportingBack """ Build _GraphViewer from a flat process graph representation """ + _log.debug(f"_GraphViewer.from_flat_graph: {flat_graph.keys()=}") # Extract dependency links between nodes depends_on = collections.defaultdict(list) flows_to = collections.defaultdict(list) @@ -768,8 +767,11 @@ def produce_split_locations(self, limit: int = 2) -> Iterator[List[NodeId]]: forsaken_nodes = sorted( forsaken_nodes, key=lambda n: sum(p in forsaken_nodes for p in self.node(n).depends_on) ) + _log.debug(f"_GraphViewer.produce_split_locations: {forsaken_nodes=}") + # Collect nodes where we could split the graph in disjoint subgraphs articulation_points: Set[NodeId] = set(self.find_articulation_points()) + _log.debug(f"_GraphViewer.produce_split_locations: {articulation_points=}") # TODO: allow/deny lists of what openEO processes can be split on? E.g. only split raster cube paths @@ -779,13 +781,14 @@ def produce_split_locations(self, limit: int = 2) -> Iterator[List[NodeId]]: for n in self.walk_upstream_nodes(seeds=forsaken_nodes, include_seeds=False) if n in articulation_points ] + _log.debug(f"_GraphViewer.produce_split_locations: {split_options=}") if not split_options: raise GraphSplitException("No split options found.") # TODO: how to handle limit? will it scale feasibly to iterate over all possibilities at this point? # TODO: smarter picking of split node (e.g. one with most upstream nodes) assert limit > 0 for split_node_id in split_options[:limit]: - # Split graph at this articulation point + _log.debug(f"_GraphViewer.produce_split_locations: splitting at {split_node_id=}") up, down = self.split_at(split_node_id) if down.find_forsaken_nodes(): down_splits = list(down.produce_split_locations(limit=max(limit - 1, 1))) @@ -820,6 +823,7 @@ def split(self, process_graph: FlatPG) -> _PGSplitResult: # TODO: make picking "optimal" split location set a bit more deterministic (e.g. sort first) (split_nodes,) = graph.produce_split_locations(limit=1) + _log.debug(f"DeepGraphSplitter.split: split nodes: {split_nodes=}") secondary_graphs: List[_SubGraphData] = [] graph_to_split = graph @@ -831,6 +835,7 @@ def split(self, process_graph: FlatPG) -> _PGSplitResult: # TODO: better backend selection? # TODO handle case where backend_candidates is None? backend_id = sorted(backend_candidates)[0] + _log.debug(f"DeepGraphSplitter.split: secondary graph: from {split_node_id=}: {backend_id=} {node_ids=}") secondary_graphs.append( _SubGraphData( split_node=split_node_id, @@ -847,6 +852,7 @@ def split(self, process_graph: FlatPG) -> _PGSplitResult: primary_node_ids = set(n for n, _ in primary_graph.iter_nodes()) backend_candidates = primary_graph.get_backend_candidates_for_node_set(primary_node_ids) primary_backend_id = sorted(backend_candidates)[0] + _log.debug(f"DeepGraphSplitter.split: primary graph: {primary_backend_id=} {primary_node_ids=}") return _PGSplitResult( primary_node_ids=primary_node_ids,