-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Refactor test methods in test_main/index.rst and main.py
- Loading branch information
Showing
4 changed files
with
225 additions
and
87 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,117 +1,208 @@ | ||
import logging | ||
from datetime import datetime, timezone | ||
from typing import Any, Dict, List, Optional, Union | ||
|
||
import pandas as pd | ||
from ceda_elasticsearch_tools import CEDAElasticsearchClient | ||
from elasticsearch import ElasticsearchException | ||
from elasticsearch import Elasticsearch, ElasticsearchException | ||
|
||
|
||
class MetricsClient: | ||
def __init__(self, token: Optional[str] = None) -> None: | ||
kwargs = {} | ||
""" | ||
Initialize the MetricsClient with an optional token for authentication. | ||
Args: | ||
token (Optional[str]): The authentication token. Defaults to None. | ||
""" | ||
hosts = [f"es{i}.ceda.ac.uk:9200" for i in range(1, 9)] | ||
kwargs = {"hosts": hosts, "use_ssl": True, "ca_certs": "path/to/CA_ROOT"} | ||
if token: | ||
kwargs["headers"] = {"Authorization": f"Bearer {token}"} | ||
else: | ||
logging.error("No authentication token provided.") | ||
try: | ||
self.es = CEDAElasticsearchClient(**kwargs) | ||
self.es = Elasticsearch( | ||
hosts=kwargs.get("hosts"), | ||
use_ssl=kwargs.get("use_ssl"), | ||
ca_certs=kwargs.get("ca_certs"), | ||
headers=kwargs.get("headers", {}), | ||
) | ||
logging.info("Elasticsearch client initialized successfully.") | ||
except ElasticsearchException as e: | ||
logging.error(f"Error initializing Elasticsearch client: {str(e)}") | ||
raise e | ||
except Exception as e: | ||
logging.error( | ||
f"Unexpected error initializing Elasticsearch client: {str(e)}" | ||
) | ||
raise e | ||
|
||
def get_all_metrics(self) -> Optional[List[str]]: | ||
try: | ||
query = { | ||
"aggs": { | ||
"unique_metrics": { | ||
"terms": { | ||
"field": "prometheus.labels.metric_name.keyword", | ||
"size": 1000, | ||
} | ||
""" | ||
Retrieve all unique metric names from the Elasticsearch index. | ||
Returns: | ||
Optional[List[str]]: A list of unique metric names, or None if an error occurs. | ||
""" | ||
query = { | ||
"aggs": { | ||
"unique_metrics": { | ||
"terms": { | ||
"field": "prometheus.labels.metric_name.keyword", | ||
"size": 1000, | ||
} | ||
}, | ||
"size": 0, | ||
} | ||
} | ||
}, | ||
"size": 0, | ||
} | ||
response = None | ||
try: | ||
response = self.es.search(index="jasmin-metrics-production", query=query) | ||
return [ | ||
bucket["key"] | ||
for bucket in response["aggregations"]["unique_metrics"]["buckets"] | ||
] | ||
except ElasticsearchException as e: | ||
logging.error(f"Error fetching all metrics: {str(e)}") | ||
return None | ||
except Exception as e: | ||
logging.error(f"Unexpected error fetching all metrics: {str(e)}") | ||
return None | ||
if ( | ||
"aggregations" not in response | ||
or "unique_metrics" not in response["aggregations"] | ||
): | ||
logging.error( | ||
"Unexpected response structure: missing 'aggregations' or 'unique_metrics'" | ||
) | ||
return None | ||
return [ | ||
bucket["key"] | ||
for bucket in response["aggregations"]["unique_metrics"]["buckets"] | ||
] | ||
|
||
def get_metric_labels(self, metric_name: str) -> Optional[List[str]]: | ||
""" | ||
Retrieve all labels associated with a specific metric name. | ||
Args: | ||
metric_name (str): The name of the metric. | ||
Returns: | ||
Optional[List[str]]: A list of labels for the metric, or None if an error occurs. | ||
""" | ||
response = None | ||
query = { | ||
"query": {"match": {"prometheus.labels.metric_name.keyword": metric_name}}, | ||
"size": 1, | ||
} | ||
try: | ||
query = { | ||
"query": { | ||
"match": {"prometheus.labels.metric_name.keyword": metric_name} | ||
}, | ||
"size": 1, | ||
} | ||
response = self.es.search(index="jasmin-metrics-production", query=query) | ||
if not response["hits"]["hits"]: | ||
logging.info(f"No labels found for metric: {metric_name}") | ||
return [] | ||
|
||
labels = set() | ||
for hit in response["hits"]["hits"]: | ||
labels.update(hit["_source"]["prometheus"]["labels"].keys()) | ||
return list(labels) | ||
|
||
except ElasticsearchException as e: | ||
logging.error(f"Error fetching metric labels for {metric_name}: {str(e)}") | ||
return None | ||
|
||
if "hits" not in response or "hits" not in response["hits"]: | ||
logging.error("Unexpected response structure: missing 'hits'") | ||
return None | ||
if not response["hits"]["hits"]: | ||
logging.info(f"No labels found for metric: {metric_name}") | ||
return [] | ||
labels = set() | ||
for hit in response["hits"]["hits"]: | ||
labels.update(hit["_source"]["prometheus"]["labels"].keys()) | ||
return list(labels) | ||
|
||
def get_metric( | ||
self, | ||
metric_name: str, | ||
filters: Optional[Dict[str, Any]] = None, | ||
size: int = 10000, | ||
) -> Optional[pd.DataFrame]: | ||
""" | ||
Retrieve metric data for a specific metric name, optionally filtered by labels and time range. | ||
Args: | ||
metric_name (str): The name of the metric. | ||
filters (Optional[Dict[str, Any]]): Optional filters for labels and time range. Defaults to None. | ||
size (int): The number of results to retrieve. Defaults to 10000. | ||
Returns: | ||
Optional[pd.DataFrame]: A DataFrame containing the metric data, or None if an error occurs. | ||
""" | ||
query = self._build_query(metric_name, filters, size) | ||
response = None | ||
try: | ||
response = self.es.search(index="jasmin-metrics-production", query=query) | ||
except ElasticsearchException as e: | ||
logging.error(f"Error fetching metric {metric_name}: {str(e)}") | ||
return None | ||
except Exception as e: | ||
logging.error(f"Unexpected error fetching metric {metric_name}: {str(e)}") | ||
return None | ||
if "hits" not in response or "hits" not in response["hits"]: | ||
logging.error("Unexpected response structure: missing 'hits'") | ||
return None | ||
data: List[Dict[str, Union[str, float]]] = [] | ||
for hit in response["hits"]["hits"]: | ||
timestamp = hit["_source"]["@timestamp"] | ||
value = hit["_source"]["prometheus"]["metrics"].get(metric_name) | ||
data.append({"timestamp": timestamp, "value": value}) | ||
return pd.DataFrame(data) | ||
|
||
query: Dict[str, Any] = { | ||
"size": size, | ||
"query": { | ||
"bool": { | ||
"must": [ | ||
{ | ||
"match": { | ||
"prometheus.labels.metric_name.keyword": metric_name | ||
} | ||
} | ||
] | ||
} | ||
}, | ||
} | ||
|
||
if filters: | ||
if "labels" in filters: | ||
for key, value in filters["labels"].items(): | ||
query["query"]["bool"]["must"].append( | ||
{"match": {f"prometheus.labels.{key}.keyword": value}} | ||
) | ||
if "time" in filters: | ||
time_range = filters["time"] | ||
query["query"]["bool"]["filter"] = [ | ||
@staticmethod | ||
def _build_query( | ||
metric_name: str, filters: Optional[Dict[str, Any]] = None, size: int = 10000 | ||
) -> Dict[str, Any]: | ||
"""Helper function to build Elasticsearch query.""" | ||
query: Dict[str, Any] = { | ||
"size": size, | ||
"query": { | ||
"bool": { | ||
"must": [ | ||
{ | ||
"range": { | ||
"@timestamp": { | ||
"gte": time_range.get("start", "now-1d/d"), | ||
"lte": time_range.get("end", "now/d"), | ||
} | ||
"match": { | ||
"prometheus.labels.metric_name.keyword": metric_name | ||
} | ||
} | ||
] | ||
], | ||
"filter": [], | ||
} | ||
}, | ||
} | ||
if filters: | ||
if "labels" in filters: | ||
for key, value in filters["labels"].items(): | ||
query["query"]["bool"]["must"].append( | ||
{"match": {f"prometheus.labels.{key}.keyword": value}} | ||
) | ||
|
||
response = self.es.search(index="jasmin-metrics-production", query=query) | ||
if "time" in filters: | ||
time_range = filters["time"] | ||
if "start" not in time_range or "end" not in time_range: | ||
raise ValueError( | ||
"Both 'start' and 'end' ISO-formatted dates must be provided in 'time' filter." | ||
) | ||
try: | ||
start_date = datetime.fromisoformat(time_range["start"]) | ||
end_date = datetime.fromisoformat(time_range["end"]) | ||
except ValueError as e: | ||
raise ValueError( | ||
"Dates must be in ISO format (YYYY-MM-DDTHH:MM:SS) for 'start' and 'end'" | ||
) from e | ||
if start_date > end_date: | ||
raise ValueError( | ||
"The 'start' date must be before the 'end' date or be the same." | ||
) | ||
|
||
data: List[Dict[str, Union[str, float]]] = [] | ||
for hit in response["hits"]["hits"]: | ||
timestamp = hit["_source"]["@timestamp"] | ||
value = hit["_source"]["prometheus"]["metrics"].get(metric_name) | ||
data.append({"timestamp": timestamp, "value": value}) | ||
if end_date > datetime.now(timezone.utc): | ||
raise ValueError("The 'end' date cannot be in the future.") | ||
|
||
return pd.DataFrame(data) | ||
except ElasticsearchException as e: | ||
logging.error(f"Error fetching metric {metric_name}: {str(e)}") | ||
return None | ||
query["query"]["bool"]["filter"].append( | ||
{ | ||
"range": { | ||
"@timestamp": { | ||
"gte": time_range["start"], | ||
"lte": time_range["end"], | ||
} | ||
} | ||
} | ||
) | ||
return query |
Oops, something went wrong.