diff --git a/openshift_metrics/metrics_processor.py b/openshift_metrics/metrics_processor.py index 3132018..b8fd1dd 100644 --- a/openshift_metrics/metrics_processor.py +++ b/openshift_metrics/metrics_processor.py @@ -1,6 +1,8 @@ from typing import List, Dict +from collections import namedtuple GPU_UNKNOWN_TYPE = "GPU_UNKNOWN_TYPE" +GPUInfo = namedtuple("GPUInfo", ["gpu_type", "gpu_resource", "node_model"]) class MetricsProcessor: @@ -18,28 +20,20 @@ def merge_metrics(self, metric_name, metric_list): namespace = metric["metric"]["namespace"] node = metric["metric"].get("node") - gpu_type = None - gpu_resource = None - node_model = None - self.merged_data.setdefault(namespace, {}) self.merged_data[namespace].setdefault(pod, {"metrics": {}}) - if metric_name == "gpu_request": - gpu_type = metric["metric"].get( - "label_nvidia_com_gpu_product", GPU_UNKNOWN_TYPE - ) - gpu_resource = metric["metric"].get("resource") - node_model = metric["metric"].get("label_nvidia_com_gpu_machine") + gpu_type, gpu_resource, node_model = self._extract_gpu_info( + metric_name, metric + ) - for value in metric["values"]: - epoch_time = value[0] + for epoch_time, metric_value in metric["values"]: self.merged_data[namespace][pod]["metrics"].setdefault(epoch_time, {}) self.merged_data[namespace][pod]["metrics"][epoch_time][ metric_name - ] = value[1] + ] = metric_value if gpu_type: self.merged_data[namespace][pod]["metrics"][epoch_time][ "gpu_type" @@ -57,6 +51,22 @@ def merge_metrics(self, metric_name, metric_list): "node" ] = node + @staticmethod + def _extract_gpu_info(metric_name: str, metric: Dict) -> GPUInfo: + """Extract GPU related info""" + gpu_type = None + gpu_resource = None + node_model = None + + if metric_name == "gpu_request": + gpu_type = metric["metric"].get( + "label_nvidia_com_gpu_product", GPU_UNKNOWN_TYPE + ) + gpu_resource = metric["metric"].get("resource") + node_model = metric["metric"].get("label_nvidia_com_gpu_machine") + + return GPUInfo(gpu_type, gpu_resource, node_model) + def condense_metrics(self, metrics_to_check: List[str]) -> Dict: """ Checks if the value of metrics is the same, and removes redundant @@ -80,33 +90,59 @@ def condense_metrics(self, metrics_to_check: List[str]) -> Dict: start_metric_dict = metrics_dict[start_epoch_time].copy() - for i in range(len(epoch_times_list)): - epoch_time = epoch_times_list[i] - same_metrics = True - continuous_metrics = True - for metric in metrics_to_check: - # If either cpu, memory or gpu request is diferent. - if metrics_dict[start_epoch_time].get(metric, 0) != metrics_dict[epoch_time].get(metric, 0): # fmt: skip - same_metrics = False - - if i != 0 and epoch_time - epoch_times_list[i - 1] > interval: - # i.e. if the difference between 2 consecutive timestamps - # is more than the expected frequency then the pod was stopped - continuous_metrics = False - - if not same_metrics or not continuous_metrics: - duration = epoch_times_list[i - 1] - start_epoch_time + interval + for i in range(1, len(epoch_times_list)): + current_time = epoch_times_list[i] + previous_time = epoch_times_list[i - 1] + + metrics_changed = self._are_metrics_different( + metrics_dict[start_epoch_time], + metrics_dict[current_time], + metrics_to_check, + ) + + pod_was_stopped = self._was_pod_stopped( + current_time=current_time, + previous_time=previous_time, + interval=interval, + ) + + if metrics_changed or pod_was_stopped: + duration = previous_time - start_epoch_time + interval start_metric_dict["duration"] = duration new_metrics_dict[start_epoch_time] = start_metric_dict - start_epoch_time = epoch_time + + # Reset start_epoch_time and start_metric_dict + start_epoch_time = current_time start_metric_dict = metrics_dict[start_epoch_time].copy() - duration = epoch_time - start_epoch_time + interval + # Final block after the loop + duration = epoch_times_list[-1] - start_epoch_time + interval start_metric_dict["duration"] = duration new_metrics_dict[start_epoch_time] = start_metric_dict + # Update the pod dict with the condensed data new_pod_dict = pod_dict.copy() new_pod_dict["metrics"] = new_metrics_dict condensed_dict[namespace][pod] = new_pod_dict return condensed_dict + + @staticmethod + def _are_metrics_different( + metrics_a: Dict, metrics_b: Dict, metrics_to_check: List[str] + ) -> bool: + """Method that compares all the metrics in metrics_to_check are different in + metrics_a and metrics_b + """ + return any( + metrics_a.get(metric, 0) != metrics_b.get(metric, 0) + for metric in metrics_to_check + ) + + @staticmethod + def _was_pod_stopped(current_time: int, previous_time: int, interval: int) -> bool: + """ + A pod is assumed to be stopped if the the gap between two consecutive timestamps + is more than the frequency of our metric collection + """ + return (current_time - previous_time) > interval