Skip to content

Commit

Permalink
Merge pull request #76 from naved001/simplify_metrics_processor
Browse files Browse the repository at this point in the history
Simplify metrics processor
  • Loading branch information
naved001 authored Sep 26, 2024
2 parents 0bb508e + 4155bac commit f159566
Showing 1 changed file with 67 additions and 31 deletions.
98 changes: 67 additions & 31 deletions openshift_metrics/metrics_processor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from typing import List, Dict
from collections import namedtuple

GPU_UNKNOWN_TYPE = "GPU_UNKNOWN_TYPE"
GPUInfo = namedtuple("GPUInfo", ["gpu_type", "gpu_resource", "node_model"])


class MetricsProcessor:
Expand All @@ -18,28 +20,20 @@ def merge_metrics(self, metric_name, metric_list):
namespace = metric["metric"]["namespace"]
node = metric["metric"].get("node")

gpu_type = None
gpu_resource = None
node_model = None

self.merged_data.setdefault(namespace, {})
self.merged_data[namespace].setdefault(pod, {"metrics": {}})

if metric_name == "gpu_request":
gpu_type = metric["metric"].get(
"label_nvidia_com_gpu_product", GPU_UNKNOWN_TYPE
)
gpu_resource = metric["metric"].get("resource")
node_model = metric["metric"].get("label_nvidia_com_gpu_machine")
gpu_type, gpu_resource, node_model = self._extract_gpu_info(
metric_name, metric
)

for value in metric["values"]:
epoch_time = value[0]
for epoch_time, metric_value in metric["values"]:

self.merged_data[namespace][pod]["metrics"].setdefault(epoch_time, {})

self.merged_data[namespace][pod]["metrics"][epoch_time][
metric_name
] = value[1]
] = metric_value
if gpu_type:
self.merged_data[namespace][pod]["metrics"][epoch_time][
"gpu_type"
Expand All @@ -57,6 +51,22 @@ def merge_metrics(self, metric_name, metric_list):
"node"
] = node

@staticmethod
def _extract_gpu_info(metric_name: str, metric: Dict) -> GPUInfo:
"""Extract GPU related info"""
gpu_type = None
gpu_resource = None
node_model = None

if metric_name == "gpu_request":
gpu_type = metric["metric"].get(
"label_nvidia_com_gpu_product", GPU_UNKNOWN_TYPE
)
gpu_resource = metric["metric"].get("resource")
node_model = metric["metric"].get("label_nvidia_com_gpu_machine")

return GPUInfo(gpu_type, gpu_resource, node_model)

def condense_metrics(self, metrics_to_check: List[str]) -> Dict:
"""
Checks if the value of metrics is the same, and removes redundant
Expand All @@ -80,33 +90,59 @@ def condense_metrics(self, metrics_to_check: List[str]) -> Dict:

start_metric_dict = metrics_dict[start_epoch_time].copy()

for i in range(len(epoch_times_list)):
epoch_time = epoch_times_list[i]
same_metrics = True
continuous_metrics = True
for metric in metrics_to_check:
# If either cpu, memory or gpu request is diferent.
if metrics_dict[start_epoch_time].get(metric, 0) != metrics_dict[epoch_time].get(metric, 0): # fmt: skip
same_metrics = False

if i != 0 and epoch_time - epoch_times_list[i - 1] > interval:
# i.e. if the difference between 2 consecutive timestamps
# is more than the expected frequency then the pod was stopped
continuous_metrics = False

if not same_metrics or not continuous_metrics:
duration = epoch_times_list[i - 1] - start_epoch_time + interval
for i in range(1, len(epoch_times_list)):
current_time = epoch_times_list[i]
previous_time = epoch_times_list[i - 1]

metrics_changed = self._are_metrics_different(
metrics_dict[start_epoch_time],
metrics_dict[current_time],
metrics_to_check,
)

pod_was_stopped = self._was_pod_stopped(
current_time=current_time,
previous_time=previous_time,
interval=interval,
)

if metrics_changed or pod_was_stopped:
duration = previous_time - start_epoch_time + interval
start_metric_dict["duration"] = duration
new_metrics_dict[start_epoch_time] = start_metric_dict
start_epoch_time = epoch_time

# Reset start_epoch_time and start_metric_dict
start_epoch_time = current_time
start_metric_dict = metrics_dict[start_epoch_time].copy()

duration = epoch_time - start_epoch_time + interval
# Final block after the loop
duration = epoch_times_list[-1] - start_epoch_time + interval
start_metric_dict["duration"] = duration
new_metrics_dict[start_epoch_time] = start_metric_dict

# Update the pod dict with the condensed data
new_pod_dict = pod_dict.copy()
new_pod_dict["metrics"] = new_metrics_dict
condensed_dict[namespace][pod] = new_pod_dict

return condensed_dict

@staticmethod
def _are_metrics_different(
metrics_a: Dict, metrics_b: Dict, metrics_to_check: List[str]
) -> bool:
"""Method that compares all the metrics in metrics_to_check are different in
metrics_a and metrics_b
"""
return any(
metrics_a.get(metric, 0) != metrics_b.get(metric, 0)
for metric in metrics_to_check
)

@staticmethod
def _was_pod_stopped(current_time: int, previous_time: int, interval: int) -> bool:
"""
A pod is assumed to be stopped if the the gap between two consecutive timestamps
is more than the frequency of our metric collection
"""
return (current_time - previous_time) > interval

0 comments on commit f159566

Please sign in to comment.