From c9a8f410f813022e1581a76c695aa5c87a17cf28 Mon Sep 17 00:00:00 2001 From: Naved Ansari Date: Wed, 25 Sep 2024 20:19:19 -0400 Subject: [PATCH 1/4] Move the code to extract gpu info into its own function --- openshift_metrics/metrics_processor.py | 29 +++++++++++++++++--------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/openshift_metrics/metrics_processor.py b/openshift_metrics/metrics_processor.py index 3132018..ee78794 100644 --- a/openshift_metrics/metrics_processor.py +++ b/openshift_metrics/metrics_processor.py @@ -18,19 +18,12 @@ def merge_metrics(self, metric_name, metric_list): namespace = metric["metric"]["namespace"] node = metric["metric"].get("node") - gpu_type = None - gpu_resource = None - node_model = None - self.merged_data.setdefault(namespace, {}) self.merged_data[namespace].setdefault(pod, {"metrics": {}}) - if metric_name == "gpu_request": - gpu_type = metric["metric"].get( - "label_nvidia_com_gpu_product", GPU_UNKNOWN_TYPE - ) - gpu_resource = metric["metric"].get("resource") - node_model = metric["metric"].get("label_nvidia_com_gpu_machine") + gpu_type, gpu_resource, node_model = self._extract_gpu_info( + metric_name, metric + ) for value in metric["values"]: epoch_time = value[0] @@ -57,6 +50,22 @@ def merge_metrics(self, metric_name, metric_list): "node" ] = node + @staticmethod + def _extract_gpu_info(metric_name: str, metric: Dict) -> tuple: + """Extract GPU related info""" + gpu_type = None + gpu_resource = None + node_model = None + + if metric_name == "gpu_request": + gpu_type = metric["metric"].get( + "label_nvidia_com_gpu_product", GPU_UNKNOWN_TYPE + ) + gpu_resource = metric["metric"].get("resource") + node_model = metric["metric"].get("label_nvidia_com_gpu_machine") + + return gpu_type, gpu_resource, node_model + def condense_metrics(self, metrics_to_check: List[str]) -> Dict: """ Checks if the value of metrics is the same, and removes redundant From a59157446cf0861c97f8a9a3ce1900e6396e03c9 Mon Sep 17 00:00:00 2001 From: Naved Ansari Date: Wed, 25 Sep 2024 20:23:12 -0400 Subject: [PATCH 2/4] Slightly improve how we iterate over the timestamp and value pairs --- openshift_metrics/metrics_processor.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/openshift_metrics/metrics_processor.py b/openshift_metrics/metrics_processor.py index ee78794..a9fbb36 100644 --- a/openshift_metrics/metrics_processor.py +++ b/openshift_metrics/metrics_processor.py @@ -25,14 +25,13 @@ def merge_metrics(self, metric_name, metric_list): metric_name, metric ) - for value in metric["values"]: - epoch_time = value[0] + for epoch_time, metric_value in metric["values"]: self.merged_data[namespace][pod]["metrics"].setdefault(epoch_time, {}) self.merged_data[namespace][pod]["metrics"][epoch_time][ metric_name - ] = value[1] + ] = metric_value if gpu_type: self.merged_data[namespace][pod]["metrics"][epoch_time][ "gpu_type" From 4d16ae63ad0149850b258d9884248891c6a492d4 Mon Sep 17 00:00:00 2001 From: Naved Ansari Date: Thu, 26 Sep 2024 12:24:20 -0400 Subject: [PATCH 3/4] Refactor condense_metrics This is an attempt to make it more easy to follow the logic of condense_metrics. Notable changes are: * move the logic to check if pod was stopped or if the metrics are different to separate functions * start the loop with the index at 1 since all the comparisons need to be with a previous timestamp or previous metric --- openshift_metrics/metrics_processor.py | 62 ++++++++++++++++++-------- 1 file changed, 44 insertions(+), 18 deletions(-) diff --git a/openshift_metrics/metrics_processor.py b/openshift_metrics/metrics_processor.py index a9fbb36..e0844f0 100644 --- a/openshift_metrics/metrics_processor.py +++ b/openshift_metrics/metrics_processor.py @@ -88,33 +88,59 @@ def condense_metrics(self, metrics_to_check: List[str]) -> Dict: start_metric_dict = metrics_dict[start_epoch_time].copy() - for i in range(len(epoch_times_list)): - epoch_time = epoch_times_list[i] - same_metrics = True - continuous_metrics = True - for metric in metrics_to_check: - # If either cpu, memory or gpu request is diferent. - if metrics_dict[start_epoch_time].get(metric, 0) != metrics_dict[epoch_time].get(metric, 0): # fmt: skip - same_metrics = False - - if i != 0 and epoch_time - epoch_times_list[i - 1] > interval: - # i.e. if the difference between 2 consecutive timestamps - # is more than the expected frequency then the pod was stopped - continuous_metrics = False - - if not same_metrics or not continuous_metrics: - duration = epoch_times_list[i - 1] - start_epoch_time + interval + for i in range(1, len(epoch_times_list)): + current_time = epoch_times_list[i] + previous_time = epoch_times_list[i - 1] + + metrics_changed = self._are_metrics_different( + metrics_dict[start_epoch_time], + metrics_dict[current_time], + metrics_to_check, + ) + + pod_was_stopped = self._was_pod_stopped( + current_time=current_time, + previous_time=previous_time, + interval=interval, + ) + + if metrics_changed or pod_was_stopped: + duration = previous_time - start_epoch_time + interval start_metric_dict["duration"] = duration new_metrics_dict[start_epoch_time] = start_metric_dict - start_epoch_time = epoch_time + + # Reset start_epoch_time and start_metric_dict + start_epoch_time = current_time start_metric_dict = metrics_dict[start_epoch_time].copy() - duration = epoch_time - start_epoch_time + interval + # Final block after the loop + duration = epoch_times_list[-1] - start_epoch_time + interval start_metric_dict["duration"] = duration new_metrics_dict[start_epoch_time] = start_metric_dict + # Update the pod dict with the condensed data new_pod_dict = pod_dict.copy() new_pod_dict["metrics"] = new_metrics_dict condensed_dict[namespace][pod] = new_pod_dict return condensed_dict + + @staticmethod + def _are_metrics_different( + metrics_a: Dict, metrics_b: Dict, metrics_to_check: List[str] + ) -> bool: + """Method that compares all the metrics in metrics_to_check are different in + metrics_a and metrics_b + """ + return any( + metrics_a.get(metric, 0) != metrics_b.get(metric, 0) + for metric in metrics_to_check + ) + + @staticmethod + def _was_pod_stopped(current_time: int, previous_time: int, interval: int) -> bool: + """ + A pod is assumed to be stopped if the the gap between two consecutive timestamps + is more than the frequency of our metric collection + """ + return (current_time - previous_time) > interval From 4155bacbdfa189828269885c28118ecfbe43233c Mon Sep 17 00:00:00 2001 From: Naved Ansari Date: Thu, 26 Sep 2024 13:54:50 -0400 Subject: [PATCH 4/4] Use a namedtuple for GPUInfo --- openshift_metrics/metrics_processor.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/openshift_metrics/metrics_processor.py b/openshift_metrics/metrics_processor.py index e0844f0..b8fd1dd 100644 --- a/openshift_metrics/metrics_processor.py +++ b/openshift_metrics/metrics_processor.py @@ -1,6 +1,8 @@ from typing import List, Dict +from collections import namedtuple GPU_UNKNOWN_TYPE = "GPU_UNKNOWN_TYPE" +GPUInfo = namedtuple("GPUInfo", ["gpu_type", "gpu_resource", "node_model"]) class MetricsProcessor: @@ -50,7 +52,7 @@ def merge_metrics(self, metric_name, metric_list): ] = node @staticmethod - def _extract_gpu_info(metric_name: str, metric: Dict) -> tuple: + def _extract_gpu_info(metric_name: str, metric: Dict) -> GPUInfo: """Extract GPU related info""" gpu_type = None gpu_resource = None @@ -63,7 +65,7 @@ def _extract_gpu_info(metric_name: str, metric: Dict) -> tuple: gpu_resource = metric["metric"].get("resource") node_model = metric["metric"].get("label_nvidia_com_gpu_machine") - return gpu_type, gpu_resource, node_model + return GPUInfo(gpu_type, gpu_resource, node_model) def condense_metrics(self, metrics_to_check: List[str]) -> Dict: """