From b5f79550091a43a066db46c68caa7628e203121c Mon Sep 17 00:00:00 2001
From: Shahar Glazner
Date: Mon, 12 Aug 2024 17:38:03 +0300
Subject: [PATCH 01/32] feat: allow provision api key (#1598)
---
keep/api/core/db_on_start.py | 55 ++++++++++++++++++++++++++++++++----
1 file changed, 49 insertions(+), 6 deletions(-)
diff --git a/keep/api/core/db_on_start.py b/keep/api/core/db_on_start.py
index 5e68e9891..e387bf5ae 100644
--- a/keep/api/core/db_on_start.py
+++ b/keep/api/core/db_on_start.py
@@ -19,7 +19,6 @@
import alembic.command
import alembic.config
-
from sqlalchemy.exc import IntegrityError
from sqlmodel import Session, select
@@ -72,6 +71,7 @@ def try_create_single_tenant(tenant_id: str) -> None:
user = session.exec(select(User)).first()
# if no users exist, let's create the default user
if not user:
+ logger.info("Creating default user")
default_username = os.environ.get("KEEP_DEFAULT_USERNAME", "keep")
default_password = hashlib.sha256(
os.environ.get("KEEP_DEFAULT_PASSWORD", "keep").encode()
@@ -82,26 +82,66 @@ def try_create_single_tenant(tenant_id: str) -> None:
role=AdminRole.get_name(),
)
session.add(default_user)
+ logger.info("Default user created")
# else, if the user want to force the refresh of the default user password
elif os.environ.get("KEEP_FORCE_RESET_DEFAULT_PASSWORD", "false") == "true":
# update the password of the default user
+ logger.info("Forcing reset of default user password")
default_password = hashlib.sha256(
os.environ.get("KEEP_DEFAULT_PASSWORD", "keep").encode()
).hexdigest()
user.password_hash = default_password
+ logger.info("Default user password updated")
+
+ # provision default api keys
+ if os.environ.get("KEEP_DEFAULT_API_KEYS", ""):
+ logger.info("Provisioning default api keys")
+ default_api_keys = os.environ.get("KEEP_DEFAULT_API_KEYS").split(",")
+ for default_api_key in default_api_keys:
+ try:
+ api_key_name, api_key_role, api_key_secret = (
+ default_api_key.strip().split(":")
+ )
+ except ValueError:
+ logger.error(
+ "Invalid format for default api key. Expected format: name:role:secret"
+ )
+ # Create the default api key for the default user
+ api_key = session.exec(
+ select(TenantApiKey).where(
+ TenantApiKey.reference_id == api_key_name
+ )
+ ).first()
+ if api_key:
+ logger.info(f"Api key {api_key_name} already exists")
+ continue
+ logger.info(f"Provisioning api key {api_key_name}")
+ hashed_api_key = hashlib.sha256(
+ api_key_secret.encode("utf-8")
+ ).hexdigest()
+ new_installation_api_key = TenantApiKey(
+ tenant_id=tenant_id,
+ reference_id=api_key_name,
+ key_hash=hashed_api_key,
+ is_system=True,
+ created_by="system",
+ role=api_key_role,
+ )
+ session.add(new_installation_api_key)
+ logger.info(f"Api key {api_key_name} provisioned")
+ logger.info("Api keys provisioned")
# commit the changes
session.commit()
logger.info("Single tenant created")
except IntegrityError:
# Tenant already exists
- logger.info("Single tenant already exists")
- pass
+ logger.exception("Failed to provision single tenant")
+ raise
except Exception:
logger.exception("Failed to create single tenant")
pass
-
def migrate_db():
"""
Run migrations to make sure the DB is up-to-date.
@@ -109,8 +149,11 @@ def migrate_db():
logger.info("Running migrations...")
config_path = os.path.dirname(os.path.abspath(__file__)) + "/../../" + "alembic.ini"
config = alembic.config.Config(file_=config_path)
- # Re-defined because alembic.ini uses relative paths which doesn't work
+ # Re-defined because alembic.ini uses relative paths which doesn't work
# when running the app as a pyhton pakage (could happen form any path)
- config.set_main_option("script_location", os.path.dirname(os.path.abspath(__file__)) + "/../models/db/migrations")
+ config.set_main_option(
+ "script_location",
+ os.path.dirname(os.path.abspath(__file__)) + "/../models/db/migrations",
+ )
alembic.command.upgrade(config, "head")
logger.info("Finished migrations")
From c52c451a7b5af20649546baa850997210f5102de Mon Sep 17 00:00:00 2001
From: Shahar Glazner
Date: Mon, 12 Aug 2024 17:50:43 +0300
Subject: [PATCH 02/32] fix: keep the api key on secret manager (#1603)
---
keep/api/core/db_on_start.py | 12 ++++++++++++
1 file changed, 12 insertions(+)
diff --git a/keep/api/core/db_on_start.py b/keep/api/core/db_on_start.py
index e387bf5ae..8a2f69c67 100644
--- a/keep/api/core/db_on_start.py
+++ b/keep/api/core/db_on_start.py
@@ -96,6 +96,9 @@ def try_create_single_tenant(tenant_id: str) -> None:
# provision default api keys
if os.environ.get("KEEP_DEFAULT_API_KEYS", ""):
logger.info("Provisioning default api keys")
+ from keep.contextmanager.contextmanager import ContextManager
+ from keep.secretmanager.secretmanagerfactory import SecretManagerFactory
+
default_api_keys = os.environ.get("KEEP_DEFAULT_API_KEYS").split(",")
for default_api_key in default_api_keys:
try:
@@ -128,6 +131,15 @@ def try_create_single_tenant(tenant_id: str) -> None:
role=api_key_role,
)
session.add(new_installation_api_key)
+ # write to the secret manager
+ context_manager = ContextManager(tenant_id=tenant_id)
+ secret_manager = SecretManagerFactory.get_secret_manager(
+ context_manager
+ )
+ secret_manager.write_secret(
+ secret_name=f"{tenant_id}-{api_key_name}",
+ secret_value=api_key_secret,
+ )
logger.info(f"Api key {api_key_name} provisioned")
logger.info("Api keys provisioned")
# commit the changes
From de82b32216b095f1a560f52a47f951410734ae97 Mon Sep 17 00:00:00 2001
From: Matvey Kukuy
Date: Mon, 12 Aug 2024 18:48:07 +0300
Subject: [PATCH 03/32] feat: Healthcheck arq task & disabling background task
(#1604)
---
keep/api/api.py | 3 ++-
keep/api/arq_worker.py | 17 +++++++++++++++--
keep/api/tasks/healthcheck_task.py | 7 +++++++
3 files changed, 24 insertions(+), 3 deletions(-)
create mode 100644 keep/api/tasks/healthcheck_task.py
diff --git a/keep/api/api.py b/keep/api/api.py
index 78a11d16a..db4d1f4ac 100644
--- a/keep/api/api.py
+++ b/keep/api/api.py
@@ -58,6 +58,7 @@
SCHEDULER = os.environ.get("SCHEDULER", "true") == "true"
CONSUMER = os.environ.get("CONSUMER", "true") == "true"
REDIS = os.environ.get("REDIS", "false") == "true"
+WORKER_ENABLED = os.environ.get("WORKER_ENABLED", "true") == "true"
AUTH_TYPE = os.environ.get("AUTH_TYPE", AuthenticationType.NO_AUTH.value)
try:
KEEP_VERSION = metadata.version("keep")
@@ -262,7 +263,7 @@ async def on_startup():
# we should add a "wait" here to make sure the server is ready
await event_subscriber.start()
logger.info("Consumer started successfully")
- if REDIS:
+ if REDIS and WORKER_ENABLED:
event_loop = asyncio.get_event_loop()
worker = get_worker()
event_loop.create_task(worker.async_run())
diff --git a/keep/api/arq_worker.py b/keep/api/arq_worker.py
index f71d4ff2e..f62a826d4 100644
--- a/keep/api/arq_worker.py
+++ b/keep/api/arq_worker.py
@@ -2,7 +2,7 @@
from typing import Optional
# third-party
-from arq import Worker, create_pool
+from arq import Worker, create_pool, cron
from arq.connections import RedisSettings
from arq.worker import create_worker
from pydantic.utils import import_string
@@ -10,6 +10,7 @@
# internals
from keep.api.core.config import config
+from keep.api.tasks.healthcheck_task import healthcheck_task
ARQ_BACKGROUND_FUNCTIONS: Optional[CommaSeparatedStrings] = config(
"ARQ_BACKGROUND_FUNCTIONS",
@@ -17,6 +18,7 @@
default=[
"keep.api.tasks.process_event_task.async_process_event",
"keep.api.tasks.process_topology_task.async_process_topology",
+ "keep.api.tasks.healthcheck_task.healthcheck_task",
],
)
FUNCTIONS: list = (
@@ -61,7 +63,8 @@ def get_worker() -> Worker:
WorkerSettings, keep_result=keep_result, expires_extra_ms=expires
)
-
+def at_every_x_minutes(x: int, start: int = 0, end: int = 59):
+ return {*list(range(start, end, x))}
class WorkerSettings:
"""
Settings for the ARQ worker.
@@ -79,3 +82,13 @@ class WorkerSettings:
conn_retry_delay=10,
)
functions: list = FUNCTIONS
+ cron_jobs = [
+ cron(
+ healthcheck_task,
+ minute=at_every_x_minutes(1),
+ unique=True,
+ timeout=30,
+ max_tries=1,
+ run_at_startup=True,
+ ),
+ ]
diff --git a/keep/api/tasks/healthcheck_task.py b/keep/api/tasks/healthcheck_task.py
new file mode 100644
index 000000000..428880356
--- /dev/null
+++ b/keep/api/tasks/healthcheck_task.py
@@ -0,0 +1,7 @@
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+async def healthcheck_task(*args, **kwargs):
+ logger.info("Healthcheck task ran. Just indicating that the background worker is running.")
From e0e15ccab32b17db964dff34dbe806fedf14171f Mon Sep 17 00:00:00 2001
From: Matvey Kukuy
Date: Tue, 13 Aug 2024 13:53:10 +0300
Subject: [PATCH 04/32] fix: background execution (#1433)
Signed-off-by: Tal
Signed-off-by: Matvey Kukuy
Signed-off-by: Vladimir Filonov
Co-authored-by: GlebBerjoskin
Co-authored-by: Shahar Glazner
Co-authored-by: Tal
Co-authored-by: Vladimir Filonov
---
.github/workflows/test-pr-e2e.yml | 2 +
.gitignore | 3 +
docker/Dockerfile.api | 2 +
ee/experimental/graph_utils.py | 99 +++++++
ee/experimental/incident_utils.py | 247 +++++++++++++++++-
ee/experimental/node_utils.py | 77 ++++++
ee/experimental/statistical_utils.py | 127 +++++++++
keep-ui/app/ai/ai.tsx | 127 ++++++---
keep-ui/app/ai/model.ts | 5 +
keep-ui/app/incidents/[id]/incident-info.tsx | 1 +
keep-ui/app/workflows/mockworkflows.tsx | 2 +-
keep-ui/components/navbar/AILink.tsx | 8 +-
keep-ui/components/navbar/IncidentLinks.tsx | 1 +
keep-ui/utils/hooks/useAI.ts | 41 +++
keep-ui/utils/hooks/useAIStats.ts | 20 --
keep/api/arq_worker.py | 17 +-
keep/api/core/db.py | 127 ++++++++-
keep/api/core/db_on_start.py | 1 +
keep/api/models/alert.py | 5 +-
keep/api/models/db/alert.py | 4 +-
keep/api/models/db/migrations/env.py | 1 +
.../versions/2024-07-24-13-39_9ba0aeecd4d0.py | 40 +++
.../versions/2024-07-25-17-13_67f1efb93c99.py | 68 +++--
.../versions/2024-07-28-16-24_8e5942040de6.py | 39 +++
.../versions/2024-07-29-12-51_c91b348b94f2.py | 63 +++++
.../versions/2024-08-09-10-53_6e353161f5a8.py | 21 ++
.../versions/2024-08-11-19-45_005efc57cc1c.py | 21 ++
keep/api/models/db/statistics.py | 9 +
keep/api/routes/ai.py | 4 +-
keep/api/routes/alerts.py | 3 +-
keep/api/routes/incidents.py | 75 +++---
keep/api/tasks/process_background_ai_task.py | 71 +++++
keep/api/tasks/process_event_task.py | 23 +-
keep/api/utils/import_ee.py | 21 ++
poetry.lock | 36 ++-
pyproject.toml | 2 +-
scripts/shoot_alerts_from_dump.py | 1 +
37 files changed, 1262 insertions(+), 152 deletions(-)
create mode 100644 ee/experimental/graph_utils.py
create mode 100644 ee/experimental/node_utils.py
create mode 100644 ee/experimental/statistical_utils.py
create mode 100644 keep-ui/utils/hooks/useAI.ts
delete mode 100644 keep-ui/utils/hooks/useAIStats.ts
create mode 100644 keep/api/models/db/migrations/versions/2024-07-24-13-39_9ba0aeecd4d0.py
create mode 100644 keep/api/models/db/migrations/versions/2024-07-28-16-24_8e5942040de6.py
create mode 100644 keep/api/models/db/migrations/versions/2024-07-29-12-51_c91b348b94f2.py
create mode 100644 keep/api/models/db/migrations/versions/2024-08-09-10-53_6e353161f5a8.py
create mode 100644 keep/api/models/db/migrations/versions/2024-08-11-19-45_005efc57cc1c.py
create mode 100644 keep/api/models/db/statistics.py
create mode 100644 keep/api/tasks/process_background_ai_task.py
create mode 100644 keep/api/utils/import_ee.py
diff --git a/.github/workflows/test-pr-e2e.yml b/.github/workflows/test-pr-e2e.yml
index 9d390af14..6638ac00d 100644
--- a/.github/workflows/test-pr-e2e.yml
+++ b/.github/workflows/test-pr-e2e.yml
@@ -22,6 +22,8 @@ env:
POSTGRES_USER: keepuser
POSTGRES_PASSWORD: keeppassword
POSTGRES_DB: keepdb
+ # To test if imports are working properly
+ EE_ENABLED: true
jobs:
tests:
diff --git a/.gitignore b/.gitignore
index a4c1a8531..592e55b10 100644
--- a/.gitignore
+++ b/.gitignore
@@ -10,6 +10,9 @@ __pycache__/
# C extensions
*.so
+# .csv files
+*.csv
+
# Distribution / packaging
.Python
build/
diff --git a/docker/Dockerfile.api b/docker/Dockerfile.api
index 488877461..291210d25 100644
--- a/docker/Dockerfile.api
+++ b/docker/Dockerfile.api
@@ -19,6 +19,7 @@ RUN python -m venv /venv
COPY pyproject.toml poetry.lock ./
RUN poetry export -f requirements.txt --output requirements.txt --without-hashes && /venv/bin/python -m pip install --upgrade -r requirements.txt
COPY keep keep
+COPY ee keep/ee
COPY examples examples
COPY README.md README.md
RUN poetry build && /venv/bin/pip install --use-deprecated=legacy-resolver dist/*.whl
@@ -26,6 +27,7 @@ RUN poetry build && /venv/bin/pip install --use-deprecated=legacy-resolver dist/
FROM base as final
ENV PATH="/venv/bin:${PATH}"
ENV VIRTUAL_ENV="/venv"
+ENV EE_PATH="ee"
COPY --from=builder /venv /venv
COPY --from=builder /app/examples /examples
# as per Openshift guidelines, https://docs.openshift.com/container-platform/4.11/openshift_images/create-images.html#use-uid_create-images
diff --git a/ee/experimental/graph_utils.py b/ee/experimental/graph_utils.py
new file mode 100644
index 000000000..d8e058104
--- /dev/null
+++ b/ee/experimental/graph_utils.py
@@ -0,0 +1,99 @@
+import numpy as np
+import networkx as nx
+
+from typing import List, Tuple
+
+from keep.api.core.db import get_pmi_values
+
+
+def detect_knee_1d_auto_increasing(y: List[float]) -> Tuple[int, float]:
+ """
+ This function detects the knee point in an increasing 1D curve. Knee point is the point where a curve
+ starts to flatten out (https://en.wikipedia.org/wiki/Knee_of_a_curve).
+
+ Parameters:
+ y (List[float]): a list of float values
+
+ Returns:
+ tuple: knee_index, knee_y
+ """
+
+ def detect_knee_1d(y: List[float], curve: str, direction: str = 'increasing') -> Tuple[int, float, List[float]]:
+ x = np.arange(len(y))
+
+ x_norm = (x - np.min(x)) / (np.max(x) - np.min(x))
+ y_norm = (y - np.min(y)) / (np.max(y) - np.min(y))
+
+ diff_curve = y_norm - x_norm
+
+ if curve == 'concave':
+ knee_index = np.argmax(diff_curve)
+ else:
+ knee_index = np.argmin(diff_curve)
+
+ knee_y = y[knee_index]
+
+ return knee_index, knee_y, diff_curve
+
+ knee_index_concave, knee_y_concave, diff_curve_concave = detect_knee_1d(y, 'concave')
+ knee_index_convex, knee_y_convex, diff_curve_convex = detect_knee_1d(y, 'convex')
+ max_diff_concave = np.max(np.abs(diff_curve_concave))
+ max_diff_convex = np.max(np.abs(diff_curve_convex))
+
+ if max_diff_concave > max_diff_convex:
+ return knee_index_concave, knee_y_concave
+ else:
+ return knee_index_convex, knee_y_convex
+
+
+def create_graph(tenant_id: str, fingerprints: List[str], pmi_threshold: float = 0., knee_threshold: float = 0.8) -> nx.Graph:
+ """
+ This function creates a graph from a list of fingerprints. The graph is created based on the PMI values between
+ the fingerprints. The edges are created between the fingerprints that have a PMI value greater than the threshold.
+ The nodes are removed if the knee point of the PMI values of the edges connected to the node is less than the threshold.
+
+ Parameters:
+ tenant_id (str): tenant id
+ fingerprints (List[str]): a list of fingerprints
+ pmi_threshold (float): PMI threshold
+ knee_threshold (float): knee threshold
+
+ Returns:
+ nx.Graph: a graph
+ """
+
+ graph = nx.Graph()
+
+ if len(fingerprints) == 1:
+ graph.add_node(fingerprints[0])
+ return graph
+
+ # Load all PMI values at once
+ pmi_values = get_pmi_values(tenant_id, fingerprints)
+
+ for idx_i, fingerprint_i in enumerate(fingerprints):
+ if not isinstance(pmi_values[(fingerprint_i, fingerprint_i)], float):
+ continue
+
+ for idx_j in range(idx_i + 1, len(fingerprints)):
+ fingerprint_j = fingerprints[idx_j]
+ weight = pmi_values[(fingerprint_i, fingerprint_j)]
+ if not isinstance(weight, float):
+ continue
+
+ if weight > pmi_threshold:
+ graph.add_edge(fingerprint_i, fingerprint_j, weight=weight)
+
+ nodes_to_delete = []
+
+ for node in graph.nodes:
+ weights = sorted([edge['weight'] for edge in graph[node].values()])
+
+ knee_index, knee_statistic = detect_knee_1d_auto_increasing(weights)
+
+ if knee_statistic < knee_threshold:
+ nodes_to_delete.append(node)
+
+ graph.remove_nodes_from(nodes_to_delete)
+
+ return graph
\ No newline at end of file
diff --git a/ee/experimental/incident_utils.py b/ee/experimental/incident_utils.py
index 975e64ff2..66b734755 100644
--- a/ee/experimental/incident_utils.py
+++ b/ee/experimental/incident_utils.py
@@ -1,10 +1,197 @@
+import os
+import logging
+
import numpy as np
import pandas as pd
import networkx as nx
-from typing import List
-
-from keep.api.models.db.alert import Alert
+from typing import List, Dict
+from openai import OpenAI
+
+from datetime import datetime, timedelta
+
+from fastapi import Depends
+
+from ee.experimental.node_utils import NodeCandidateQueue, NodeCandidate
+from ee.experimental.graph_utils import create_graph
+from ee.experimental.statistical_utils import get_alert_pmi_matrix
+
+from pusher import Pusher
+
+from keep.api.models.db.alert import Alert, Incident
+from keep.api.core.db import (
+ assign_alert_to_incident,
+ is_alert_assigned_to_incident,
+ add_alerts_to_incident_by_incident_id,
+ get_last_alerts,
+ get_last_incidents,
+ get_incident_by_id,
+ write_pmi_matrix_to_db,
+ create_incident_from_dict,
+ update_incident_summary,
+)
+
+from keep.api.core.dependencies import (
+ AuthenticatedEntity,
+ AuthVerifier,
+ get_pusher_client,
+)
+
+logger = logging.getLogger(__name__)
+
+ALGORITHM_VERBOSE_NAME = "Basic correlation algorithm v0.2"
+USE_N_HISTORICAL_ALERTS = 10e10
+USE_N_HISTORICAL_INCIDENTS = 10e10
+
+
+def calculate_pmi_matrix(
+ ctx: dict | None, # arq context
+ tenant_id: str,
+ upper_timestamp: datetime = None,
+ use_n_historical_alerts: int = USE_N_HISTORICAL_ALERTS,
+ sliding_window: int = None,
+ stride: int = None,
+) -> dict:
+ logger.info(
+ "Calculating PMI coefficients for alerts",
+ extra={
+ "tenant_id": tenant_id,
+ },
+ )
+
+ if not upper_timestamp:
+ upper_timestamp = os.environ.get('PMI_ALERT_UPPER_TIMESTAMP', datetime.now())
+
+ if not sliding_window:
+ sliding_window = os.environ.get('PMI_SLIDING_WINDOW', 4 * 60 * 60)
+
+ if not stride:
+ stride = os.environ.get('PMI_STRIDE', 60 * 60)
+
+ alerts=get_last_alerts(tenant_id, limit=use_n_historical_alerts, upper_timestamp=upper_timestamp)
+ pmi_matrix = get_alert_pmi_matrix(alerts, 'fingerprint', sliding_window, stride)
+ write_pmi_matrix_to_db(tenant_id, pmi_matrix)
+
+ return {"status": "success"}
+
+
+async def mine_incidents_and_create_objects(
+ ctx: dict | None, # arq context
+ tenant_id: str,
+ alert_lower_timestamp: datetime = None,
+ alert_upper_timestamp: datetime = None,
+ use_n_historical_alerts: int = USE_N_HISTORICAL_ALERTS,
+ incident_lower_timestamp: datetime = None,
+ incident_upper_timestamp: datetime = None,
+ use_n_hist_incidents: int = USE_N_HISTORICAL_INCIDENTS,
+ pmi_threshold: float = None,
+ knee_threshold: float = None,
+ min_incident_size: int = None,
+ incident_similarity_threshold: float = None,
+ ) -> Dict[str, List[Incident]]:
+
+ """
+ This function mines incidents from alerts and creates incidents in the database.
+
+ Parameters:
+ tenant_id (str): tenant id
+ alert_lower_timestamp (datetime): lower timestamp for alerts
+ alert_upper_timestamp (datetime): upper timestamp for alerts
+ use_n_historical_alerts (int): number of historical alerts to use
+ incident_lower_timestamp (datetime): lower timestamp for incidents
+ incident_upper_timestamp (datetime): upper timestamp for incidents
+ use_n_hist_incidents (int): number of historical incidents to use
+ pmi_threshold (float): PMI threshold used for incident graph edges creation
+ knee_threshold (float): knee threshold used for incident graph nodes creation
+ min_incident_size (int): minimum incident size
+ incident_similarity_threshold (float): incident similarity threshold
+
+ Returns:
+ Dict[str, List[Incident]]: a dictionary containing the created incidents
+ """
+
+ if not incident_upper_timestamp:
+ incident_upper_timestamp = os.environ.get('MINE_INCIDENT_UPPER_TIMESTAMP', datetime.now())
+
+ if not incident_lower_timestamp:
+ incident_validity = os.environ.get('MINE_INCIDENT_VALIDITY', timedelta(days=1))
+ incident_lower_timestamp = incident_upper_timestamp - incident_validity
+
+ if not alert_upper_timestamp:
+ alert_upper_timestamp = os.environ.get('MINE_ALERT_UPPER_TIMESTAMP', datetime.now())
+
+ if not alert_lower_timestamp:
+ alert_window = os.environ.get('MINE_ALERT_WINDOW', timedelta(hours=12))
+ alert_lower_timestamp = alert_upper_timestamp - alert_window
+
+ if not pmi_threshold:
+ pmi_threshold = os.environ.get('PMI_THRESHOLD', 0.0)
+
+ if not knee_threshold:
+ knee_threshold = os.environ.get('KNEE_THRESHOLD', 0.8)
+
+ if not min_incident_size:
+ min_incident_size = os.environ.get('MIN_INCIDENT_SIZE', 5)
+
+ if not incident_similarity_threshold:
+ incident_similarity_threshold = os.environ.get('INCIDENT_SIMILARITY_THRESHOLD', 0.8)
+
+ calculate_pmi_matrix(ctx, tenant_id)
+
+ alerts = get_last_alerts(tenant_id, limit=use_n_historical_alerts, upper_timestamp=alert_upper_timestamp, lower_timestamp=alert_lower_timestamp)
+ incidents, _ = get_last_incidents(tenant_id, limit=use_n_hist_incidents, upper_timestamp=incident_upper_timestamp, lower_timestamp=incident_lower_timestamp)
+ nc_queue = NodeCandidateQueue()
+
+ for candidate in [NodeCandidate(alert.fingerprint, alert.timestamp) for alert in alerts]:
+ nc_queue.push_candidate(candidate)
+ candidates = nc_queue.get_candidates()
+
+ graph = create_graph(tenant_id, [candidate.fingerprint for candidate in candidates], pmi_threshold, knee_threshold)
+ ids = []
+
+ for component in nx.connected_components(graph):
+ if len(component) > min_incident_size:
+ alerts_appended = False
+ for incident in incidents:
+ incident_fingerprints = set([alert.fingerprint for alert in incident.Incident.alerts])
+ intersection = incident_fingerprints.intersection(component)
+
+ if len(intersection) / len(component) >= incident_similarity_threshold:
+ alerts_appended = True
+
+ add_alerts_to_incident_by_incident_id(tenant_id, incident.Incident.id, [alert.id for alert in alerts if alert.fingerprint in component])
+
+ summary = generate_incident_summary(incident.Incident)
+ update_incident_summary(incident.Incident.id, summary)
+
+ if not alerts_appended:
+ incident_start_time = min([alert.timestamp for alert in alerts if alert.fingerprint in component])
+ incident_start_time = incident_start_time.replace(microsecond=0)
+
+ incident = create_incident_from_dict(tenant_id,
+ {"name": f"Incident started at {incident_start_time}",
+ "description": "Summarization is Disabled", "is_predicted": True})
+ ids.append(incident.id)
+
+ add_alerts_to_incident_by_incident_id(tenant_id, incident.id, [alert.id for alert in alerts if alert.fingerprint in component])
+
+ summary = generate_incident_summary(incident)
+ update_incident_summary(incident.id, summary)
+
+ pusher_client = get_pusher_client()
+ if pusher_client:
+ pusher_client.trigger(
+ f"private-{tenant_id}",
+ "ai-logs-change",
+ {"log": ALGORITHM_VERBOSE_NAME + " successfully executed."},
+ )
+ logger.info(
+ "Client notified on new AI log",
+ extra={"tenant_id": tenant_id},
+ )
+
+
+ return {"incidents": [get_incident_by_id(tenant_id, incident_id) for incident_id in ids]}
def mine_incidents(alerts: List[Alert], incident_sliding_window_size: int=6*24*60*60, statistic_sliding_window_size: int=60*60,
@@ -145,4 +332,56 @@ def shape_incidents(alerts: pd.DataFrame, unique_alert_identifier: str, incident
'alert_fingerprints': local_alerts[unique_alert_identifier].unique().tolist(),
})
- return incidents
\ No newline at end of file
+ return incidents
+
+
+def generate_incident_summary(incident: Incident, use_n_alerts_for_summary: int = -1) -> str:
+ if "OPENAI_API_KEY" not in os.environ:
+ logger.error("OpenAI API key is not set. Incident summary generation is not available.")
+ return "Summarization is Disabled"
+
+ try:
+ client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])
+
+ prompt_addition = ''
+ if incident.user_summary:
+ prompt_addition = f'When generating, you must rely on the summary provided by human: {incident.user_summary}'
+
+ description_strings = np.unique([f'{alert.event["name"]}' for alert in incident.alerts]).tolist()
+
+ if use_n_alerts_for_summary > 0:
+ incident_description = "\n".join(description_strings[:use_n_alerts_for_summary])
+ else:
+ incident_description = "\n".join(description_strings)
+
+ timestamps = [alert.timestamp for alert in incident.alerts]
+ incident_start = min(timestamps).replace(microsecond=0)
+ incident_end = max(timestamps).replace(microsecond=0)
+
+ model = os.environ.get("OPENAI_MODEL", "gpt-4o-mini")
+
+ summary = client.chat.completions.create(model=model, messages=[
+ {
+ "role": "system",
+ "content": """You are a very skilled DevOps specialist who can summarize any incident based on alert descriptions.
+ When provided with information, summarize it in a 2-3 sentences explaining what happened and when.
+ ONLY SUMMARIZE WHAT YOU SEE. In the end add information about potential scenario of the incident.
+
+ EXAMPLE:
+ An incident occurred between 2022-11-17 14:11:04.955070 and 2022-11-22 22:19:04.837526, involving a
+ total of 200 alerts. The alerts indicated critical and warning issues such as high CPU and memory
+ usage in pods and nodes, as well as stuck Kubernetes Daemonset rollout. Potential incident scenario:
+ Kubernetes Daemonset rollout stuck due to high CPU and memory usage in pods and nodes. This caused a
+ long tail of alerts on various topics."""
+ },
+ {
+ "role": "user",
+ "content": f"""Here are alerts of an incident for summarization:\n{incident_description}\n This incident started on
+ {incident_start}, ended on {incident_end}, included {len(description_strings)} alerts. {prompt_addition}"""
+ }
+ ]).choices[0].message.content
+
+ return summary
+ except Exception as e:
+ logger.error(f"Error in generating incident summary: {e}")
+ return "Summarization is Disabled"
\ No newline at end of file
diff --git a/ee/experimental/node_utils.py b/ee/experimental/node_utils.py
new file mode 100644
index 000000000..c61c98ace
--- /dev/null
+++ b/ee/experimental/node_utils.py
@@ -0,0 +1,77 @@
+import heapq
+
+from datetime import timedelta, datetime
+from typing import List, Dict, Tuple
+
+
+class NodeCandidate:
+ def __init__(self, fingerpint: str, timestamps: datetime):
+ self.fingerprint = fingerpint
+ self.timestamps = set([timestamps])
+
+ @property
+ def first_timestamp(self):
+ if self.timestamps:
+ return min(self.timestamps)
+ return None
+
+ @property
+ def last_timestamp(self):
+ if self.timestamps:
+ return max(self.timestamps)
+ return None
+
+ def __lt__(self, other):
+ return self.last_timestamp < other.last_timestamp
+
+ def __str__(self):
+ return f'NodeCandidate(fingerprint={self.fingerprint}, first_timestamp={self.first_timestamp}, last_timestamp={self.last_timestamp}, timestamps={self.timestamps})'
+
+
+class NodeCandidateQueue:
+ def __init__(self, candidate_validity_window: int = None):
+ self.queue = []
+ self.candidate_validity_window = candidate_validity_window
+
+ def push_candidate(self, candidate: NodeCandidate):
+ for c in self.queue:
+ if c.fingerprint == candidate.fingerprint:
+ c.timestamps.update(candidate.timestamps)
+ heapq.heapify(self.queue)
+ return
+ heapq.heappush(self.queue, candidate)
+
+ def push_candidates(self, candidates: List[NodeCandidate]):
+ for candidate in candidates:
+ self.push_candidate(candidate)
+
+ def pop_invalid_candidates(self, current_timestamp: datetime):
+ # check incident-wise consistency
+ validity_threshold = current_timestamp - \
+ timedelta(seconds=self.candidate_validity_window)
+
+ while self.queue and self.queue[0].last_timestamp <= validity_threshold:
+ heapq.heappop(self.queue)
+
+ for c in self.queue:
+ c.timestamps = {
+ ts for ts in c.timestamps if ts > validity_threshold}
+ heapq.heapify(self.queue)
+
+ def get_candidates(self):
+ return self.queue
+
+ def copy(self):
+ new_queue = NodeCandidateQueue(self.candidate_validity_window)
+ new_queue.queue = self.queue.copy()
+ return new_queue
+
+ def __str__(self):
+ candidates_str = "\n".join(str(candidate) for candidate in self.queue)
+ return f'NodeCandidateQueue(\ncandidate_validity_window={self.candidate_validity_window}, \nqueue=[\n{candidates_str}\n])'
+
+ def __iter__(self):
+ return iter(self.queue)
+
+ def __len__(self):
+ return len(self.queue)
diff --git a/ee/experimental/statistical_utils.py b/ee/experimental/statistical_utils.py
new file mode 100644
index 000000000..0aa45477a
--- /dev/null
+++ b/ee/experimental/statistical_utils.py
@@ -0,0 +1,127 @@
+import numpy as np
+import pandas as pd
+
+from typing import List, Tuple
+
+def get_batched_alert_counts(alerts: pd.DataFrame, unique_alert_identifier: str, sliding_window_size: int, step_size: int) -> pd.DataFrame:
+ """
+ This function calculates number of alerts per sliding window.
+
+ Parameters:
+ alerts (pd.DataFrame): a DataFrame containing alerts
+ unique_alert_identifier (str): a unique identifier for alerts
+ sliding_window_size (int): sliding window size in seconds
+ step_size (int): step size in seconds
+
+ Returns:
+ rolling_counts (pd.DataFrame): a DataFrame containing the number of alerts per sliding window
+ """
+
+ resampled_alert_counts = alerts.set_index('starts_at').resample(f'{step_size}s')[unique_alert_identifier].value_counts().unstack(fill_value=0)
+ rolling_counts = resampled_alert_counts.rolling(window=f'{sliding_window_size}s', min_periods=1).sum()
+
+ return rolling_counts
+
+
+def get_batched_alert_occurrences(alerts: pd.DataFrame, unique_alert_identifier: str, sliding_window_size: int, step_size: int) -> pd.DataFrame:
+ """
+ This function calculates occurrences of alerts per sliding window.
+
+ Parameters:
+ alerts (pd.DataFrame): a DataFrame containing alerts
+ unique_alert_identifier (str): a unique identifier for alerts
+ sliding_window_size (int): sliding window size in seconds
+ step_size (int): step size in seconds
+
+ Returns:
+ alert_occurences (pd.DataFrame): a DataFrame containing the occurrences of alerts per sliding window
+ """
+
+ alert_counts = get_batched_alert_counts(alerts, unique_alert_identifier, sliding_window_size, step_size)
+ alert_occurences = pd.DataFrame(np.where(alert_counts > 0, 1, 0), index=alert_counts.index, columns=alert_counts.columns)
+
+ return alert_occurences
+
+def get_jaccard_scores(P_a: np.array, P_aa: np.array) -> np.array:
+ """
+ This function calculates the Jaccard similarity scores between recurring events.
+
+ Parameters:
+ P_a (np.array): a 1D array containing the probabilities of events
+ P_aa (np.array): a 2D array containing the probabilities of joint events
+
+ Returns:
+ jaccard_matrix (np.array): a 2D array containing the Jaccard similarity scores between events
+ """
+
+ P_a_matrix = P_a[:, None] + P_a
+ union_matrix = P_a_matrix - P_aa
+
+ with np.errstate(divide='ignore', invalid='ignore'):
+ jaccard_matrix = np.where(union_matrix != 0, P_aa / union_matrix, 0)
+
+ np.fill_diagonal(jaccard_matrix, 1)
+
+ return jaccard_matrix
+
+
+def get_alert_jaccard_matrix(alerts: pd.DataFrame, unique_alert_identifier: str, sliding_window_size: int, step_size: int) -> pd.DataFrame:
+ """
+ This function calculates Jaccard similarity scores between alert groups (fingerprints).
+
+ Parameters:
+ alerts (pd.DataFrame): a DataFrame containing alerts
+ unique_alert_identifier (str): a unique identifier for alerts
+ sliding_window_size (int): sliding window size in seconds
+ step_size (int): step size in seconds
+
+ Returns:
+ jaccard_scores_df (pd.DataFrame): a DataFrame containing the Jaccard similarity scores between alert groups
+ """
+
+ alert_occurrences_df = get_batched_alert_occurrences(alerts, unique_alert_identifier, sliding_window_size, step_size)
+ alert_occurrences = alert_occurrences_df.to_numpy()
+
+ alert_probabilities = np.mean(alert_occurrences, axis=0)
+ joint_alert_occurrences = np.dot(alert_occurrences.T, alert_occurrences)
+ pairwise_alert_probabilities = joint_alert_occurrences / alert_occurrences.shape[0]
+
+ jaccard_scores = get_jaccard_scores(alert_probabilities, pairwise_alert_probabilities)
+ jaccard_scores_df = pd.DataFrame(jaccard_scores, index=alert_occurrences_df.columns, columns=alert_occurrences_df.columns)
+
+ return jaccard_scores_df
+
+
+def get_alert_pmi_matrix(alerts: pd.DataFrame, unique_alert_identifier: str, sliding_window_size: int, step_size: int) -> pd.DataFrame:
+ """
+ This funciton calculates PMI scores between alert groups (fingerprints).
+
+ Parameters:
+ alerts (pd.DataFrame): a DataFrame containing alerts
+ unique_alert_identifier (str): a unique identifier for alerts
+ sliding_window_size (int): sliding window size in seconds
+ step_size (int): step size in seconds
+
+ Returns:
+ pmi_matrix_df (pd.DataFrame): a DataFrame containing the PMI scores between
+ """
+
+ alert_dict = {
+ 'fingerprint': [alert.fingerprint for alert in alerts],
+ 'starts_at': [alert.timestamp for alert in alerts],
+ }
+
+ alert_df = pd.DataFrame(alert_dict)
+ alert_occurences_df = get_batched_alert_occurrences(alert_df, unique_alert_identifier, sliding_window_size, step_size)
+ alert_occurrences = alert_occurences_df.to_numpy()
+ alert_probabilities = np.mean(alert_occurrences, axis=0)
+ joint_alert_occurrences = np.dot(alert_occurrences.T, alert_occurrences)
+ pairwise_alert_probabilities = joint_alert_occurrences / alert_occurrences.shape[0]
+
+ pmi_matrix = np.log(pairwise_alert_probabilities / (alert_probabilities[:, None] * alert_probabilities))
+ pmi_matrix[np.isnan(pmi_matrix)] = 0
+ np.fill_diagonal(pmi_matrix, 0)
+
+ pmi_matrix_df = pd.DataFrame(pmi_matrix, index=alert_occurences_df.columns, columns=alert_occurences_df.columns)
+
+ return pmi_matrix_df
\ No newline at end of file
diff --git a/keep-ui/app/ai/ai.tsx b/keep-ui/app/ai/ai.tsx
index 632facaab..f0006e6d2 100644
--- a/keep-ui/app/ai/ai.tsx
+++ b/keep-ui/app/ai/ai.tsx
@@ -1,19 +1,26 @@
"use client";
import { Card, List, ListItem, Title, Subtitle } from "@tremor/react";
-import { useAIStats } from "utils/hooks/useAIStats";
+import { useAIStats, usePollAILogs } from "utils/hooks/useAI";
import { useSession } from "next-auth/react";
import { getApiURL } from "utils/apiUrl";
import { toast } from "react-toastify";
import { useEffect, useState, useRef, FormEvent } from "react";
+import { AILogs } from "./model";
export default function Ai() {
const { data: aistats, isLoading } = useAIStats();
const { data: session } = useSession();
const [text, setText] = useState("");
+ const [basicAlgorithmLog, setBasicAlgorithmLog] = useState("");
const [newText, setNewText] = useState("Mine incidents");
const [animate, setAnimate] = useState(false);
const onlyOnce = useRef(false);
+ const mutateAILogs = (logs: AILogs) => {
+ setBasicAlgorithmLog(logs.log);
+ };
+ usePollAILogs(mutateAILogs);
+
useEffect(() => {
let index = 0;
@@ -42,14 +49,14 @@ export default function Ai() {
Authorization: `Bearer ${session?.accessToken}`,
"Content-Type": "application/json",
},
- body: JSON.stringify({
- }),
+ body: JSON.stringify({}),
});
if (!response.ok) {
toast.error(
"Failed to mine incidents, please contact us if this issue persists."
);
}
+
setAnimate(false);
setNewText("Mine incidents");
};
@@ -68,7 +75,8 @@ export default function Ai() {
👋 You are almost there!
- AI Correlation is coming soon. Make sure you have enough data collected to prepare.
+ AI Correlation is coming soon. Make sure you have enough data
+ collected to prepare.
@@ -98,7 +106,9 @@ export default function Ai() {
Collect alerts for more than 3 days
- {aistats?.first_alert_datetime && new Date(aistats.first_alert_datetime) < new Date(Date.now() - 3 * 24 * 60 * 60 * 1000) ? (
+ {aistats?.first_alert_datetime &&
+ new Date(aistats.first_alert_datetime) <
+ new Date(Date.now() - 3 * 24 * 60 * 60 * 1000) ? (
✅
) : (
⏳
@@ -107,41 +117,84 @@ export default function Ai() {
- {(aistats?.is_mining_enabled &&
{incident.name}
Summary: {incident.user_summary}
-
Started at: {incident.start_time?.toISOString() ?? "N/A"}
+ {!!incident.start_time &&
Started at: {new Date(incident.start_time + "Z").toLocaleString()}
}
+ {!!incident.last_seen_time &&
Last seen at: {new Date(incident.last_seen_time + "Z").toLocaleString()}
}
{/* = lower_timestamp
+ Incident.last_seen_time >= lower_timestamp
)
total_count = query.count()
@@ -2153,6 +2144,7 @@ def get_incident_alerts_by_incident_id(
AlertToIncident.tenant_id == tenant_id,
Incident.id == incident_id,
)
+ .order_by(col(Alert.timestamp).desc())
)
total_count = query.count()
@@ -2224,13 +2216,15 @@ def add_alerts_to_incident_by_incident_id(
return None
existed_alert_ids = session.exec(
- select(AlertToIncident.alert_id).where(
+ select(AlertToIncident.alert_id)
+ .where(
AlertToIncident.tenant_id == tenant_id,
AlertToIncident.incident_id == incident.id,
col(AlertToIncident.alert_id).in_(alert_ids),
)
).all()
+
new_alert_ids = [
alert_id for alert_id in alert_ids if alert_id not in existed_alert_ids
]
@@ -2253,6 +2247,18 @@ def add_alerts_to_incident_by_incident_id(
]
session.bulk_save_objects(alert_to_incident_entries)
+
+ started_at, last_seen_at = session.exec(
+ select(func.min(Alert.timestamp), func.max(Alert.timestamp))
+ .join(AlertToIncident, AlertToIncident.alert_id == Alert.id)
+ .where(
+ AlertToIncident.tenant_id == tenant_id,
+ AlertToIncident.incident_id == incident.id,
+ )
+ ).one()
+ incident.start_time = started_at
+ incident.last_seen_time = last_seen_at
+
session.add(incident)
session.commit()
return True
@@ -2323,6 +2329,15 @@ def remove_alerts_to_incident_by_incident_id(
if source not in sources_existed
]
+ started_at, last_seen_at = session.exec(
+ select(func.min(Alert.timestamp), func.max(Alert.timestamp))
+ .join(AlertToIncident, AlertToIncident.alert_id == Alert.id)
+ .where(
+ AlertToIncident.tenant_id == tenant_id,
+ AlertToIncident.incident_id == incident.id,
+ )
+ ).one()
+
# filtering removed entities from affected services and sources in the incident
incident.affected_services = [
service
@@ -2334,6 +2349,9 @@ def remove_alerts_to_incident_by_incident_id(
]
incident.alerts_count -= alerts_data_for_incident["count"]
+ incident.start_time = started_at
+ incident.last_seen_time = last_seen_at
+
session.add(incident)
session.commit()
diff --git a/keep/api/models/alert.py b/keep/api/models/alert.py
index 1d80b7844..4128c6c0e 100644
--- a/keep/api/models/alert.py
+++ b/keep/api/models/alert.py
@@ -354,6 +354,7 @@ class IncidentDto(IncidentDtoIn):
id: UUID
start_time: datetime.datetime | None
+ last_seen_time: datetime.datetime | None
end_time: datetime.datetime | None
number_of_alerts: int
@@ -393,6 +394,7 @@ def from_db_incident(cls, db_incident):
is_confirmed=db_incident.is_confirmed,
creation_time=db_incident.creation_time,
start_time=db_incident.start_time,
+ last_seen_time=db_incident.last_seen_time,
end_time=db_incident.end_time,
number_of_alerts=db_incident.alerts_count,
alert_sources=db_incident.sources,
diff --git a/keep/api/models/db/alert.py b/keep/api/models/db/alert.py
index 08e037959..5dc45810b 100644
--- a/keep/api/models/db/alert.py
+++ b/keep/api/models/db/alert.py
@@ -111,6 +111,7 @@ class Incident(SQLModel, table=True):
# But I suppose to have this fields as cache, to prevent extra requests
start_time: datetime | None
end_time: datetime | None
+ last_seen_time: datetime | None
# map of attributes to values
alerts: List["Alert"] = Relationship(
diff --git a/keep/api/models/db/migrations/versions/2024-08-13-19-22_0832e0d9889a.py b/keep/api/models/db/migrations/versions/2024-08-13-19-22_0832e0d9889a.py
new file mode 100644
index 000000000..9be27e3b6
--- /dev/null
+++ b/keep/api/models/db/migrations/versions/2024-08-13-19-22_0832e0d9889a.py
@@ -0,0 +1,84 @@
+"""add last_seen_time field to incident
+
+Revision ID: 0832e0d9889a
+Revises: 005efc57cc1c
+Create Date: 2024-08-13 19:22:35.873850
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+from sqlalchemy.dialects.postgresql import UUID
+from sqlalchemy.orm import Session
+
+# revision identifiers, used by Alembic.
+revision = "0832e0d9889a"
+down_revision = "005efc57cc1c"
+branch_labels = None
+depends_on = None
+
+
+# Define a completely separate metadata for the migration
+migration_metadata = sa.MetaData()
+
+# Direct table definition for AlertToIncident
+alert_to_incident_table = sa.Table(
+ 'alerttoincident',
+ migration_metadata,
+ sa.Column('alert_id', UUID(as_uuid=False), sa.ForeignKey('alert.id', ondelete='CASCADE'), primary_key=True),
+ sa.Column('incident_id', UUID(as_uuid=False), sa.ForeignKey('incident.id', ondelete='CASCADE'), primary_key=True)
+)
+
+# Direct table definition for Incident
+incident_table = sa.Table(
+ 'incident',
+ migration_metadata,
+ sa.Column('id', UUID(as_uuid=False), primary_key=True),
+ sa.Column('start_time', sa.DateTime, nullable=True),
+ sa.Column('last_seen_time', sa.DateTime, nullable=True),
+)
+
+# Direct table definition for Alert
+alert_table = sa.Table(
+ 'alert',
+ migration_metadata,
+ sa.Column('id', UUID(as_uuid=False), primary_key=True),
+ sa.Column('timestamp', sa.DateTime),
+)
+
+
+def populate_db():
+ session = Session(op.get_bind())
+
+ incidents = session.execute(sa.select(incident_table)).fetchall()
+
+ for incident in incidents:
+ stmt = (
+ sa.select([sa.func.min(alert_table.c.timestamp), sa.func.max(alert_table.c.timestamp)])
+ .select_from(alert_table)
+ .join(alert_to_incident_table, alert_table.c.id == alert_to_incident_table.c.alert_id)
+ .where(alert_to_incident_table.c.incident_id == str(incident.id))
+ )
+
+ started_at, last_seen_at = session.execute(stmt).one()
+
+ stmt = (
+ sa.update(incident_table).where(incident_table.c.id == incident.id).values(
+ start_time=started_at,
+ last_seen_time=last_seen_at
+ )
+ )
+ session.execute(stmt)
+ session.commit()
+
+
+def upgrade() -> None:
+ op.add_column("incident", sa.Column("last_seen_time", sa.DateTime(), nullable=True))
+
+ populate_db()
+
+
+def downgrade() -> None:
+ # ### commands auto generated by Alembic - please adjust! ###
+ op.drop_column("incident", "last_seen_time")
+ # ### end Alembic commands ###
From 9d67e905efb7e7e4723ec80cc0470f89cc726f08 Mon Sep 17 00:00:00 2001
From: Shahar Glazner
Date: Wed, 14 Aug 2024 18:00:42 +0300
Subject: [PATCH 08/32] feat: Keep provider v2 (#1617)
---
examples/workflows/resolve_old_alerts.yml | 26 ++++
.../workflows/simple_http_request_ntfy.yml | 4 +-
keep/api/core/db.py | 106 ++++++++++------
keep/api/core/db_on_start.py | 2 +-
keep/api/core/db_utils.py | 11 +-
keep/functions/__init__.py | 16 ++-
keep/providers/base/base_provider.py | 28 ++++-
.../grafana_provider/grafana_provider.py | 5 +-
keep/providers/keep_provider/keep_provider.py | 63 ++++++----
tests/test_functions.py | 4 +-
tests/test_workflow_execution.py | 114 ++++++++++++++++++
11 files changed, 302 insertions(+), 77 deletions(-)
create mode 100644 examples/workflows/resolve_old_alerts.yml
diff --git a/examples/workflows/resolve_old_alerts.yml b/examples/workflows/resolve_old_alerts.yml
new file mode 100644
index 000000000..f299cfa3a
--- /dev/null
+++ b/examples/workflows/resolve_old_alerts.yml
@@ -0,0 +1,26 @@
+workflow:
+ id: resolve-old-alerts
+ description:
+ triggers:
+ - type: manual
+ - type: interval
+ value: 60
+ steps:
+ # get the alerts from keep
+ - name: get-alerts
+ provider:
+ type: keep
+ with:
+ version: 2
+ filter: "status == 'firing'"
+ actions:
+ - name: resolve-alerts
+ foreach: " {{ steps.get-alerts.results }} "
+ if: "keep.to_timestamp('{{ foreach.value.lastReceived }}') < keep.utcnowtimestamp() - 3600"
+ provider:
+ type: mock
+ with:
+ enrich_alert:
+ - key: status
+ value: resolved
+ disposable: true
diff --git a/examples/workflows/simple_http_request_ntfy.yml b/examples/workflows/simple_http_request_ntfy.yml
index 988ffbf91..866875e76 100644
--- a/examples/workflows/simple_http_request_ntfy.yml
+++ b/examples/workflows/simple_http_request_ntfy.yml
@@ -26,5 +26,7 @@ alert:
with:
method: POST
body:
- message: "Time difference: {{ steps.get-max-datetime.conditions.threshold.0.value }}"
+ alert: {{ alert }}
+ fingerprint: {{ alert.fingerprint }}
+ some_customized_field: {{ keep.strip(alert.some_attribute) }}
url: "https://ntfy.sh/MoRen5UlPEQr8s4Y"
diff --git a/keep/api/core/db.py b/keep/api/core/db.py
index 3f3b24a6f..6a8b9fd7e 100644
--- a/keep/api/core/db.py
+++ b/keep/api/core/db.py
@@ -9,13 +9,11 @@
import logging
import random
import uuid
-
-import pandas as pd
-
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Tuple, Union
from uuid import uuid4
+import pandas as pd
import validators
from dotenv import find_dotenv, load_dotenv
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
@@ -39,10 +37,10 @@
from keep.api.models.db.preset import * # pylint: disable=unused-wildcard-import
from keep.api.models.db.provider import * # pylint: disable=unused-wildcard-import
from keep.api.models.db.rule import * # pylint: disable=unused-wildcard-import
+from keep.api.models.db.statistics import * # pylint: disable=unused-wildcard-import
from keep.api.models.db.tenant import * # pylint: disable=unused-wildcard-import
from keep.api.models.db.topology import * # pylint: disable=unused-wildcard-import
from keep.api.models.db.workflow import * # pylint: disable=unused-wildcard-import
-from keep.api.models.db.statistics import * # pylint: disable=unused-wildcard-import
logger = logging.getLogger(__name__)
@@ -613,17 +611,35 @@ def get_workflow_id(tenant_id, workflow_name):
def push_logs_to_db(log_entries):
- db_log_entries = [
- WorkflowExecutionLog(
- workflow_execution_id=log_entry["workflow_execution_id"],
- timestamp=datetime.strptime(log_entry["asctime"], "%Y-%m-%d %H:%M:%S,%f"),
- message=log_entry["message"][0:255], # limit the message to 255 chars
- context=json.loads(
- json.dumps(log_entry.get("context", {}), default=str)
- ), # workaround to serialize any object
- )
- for log_entry in log_entries
- ]
+ # avoid circular import
+ from keep.api.logging import LOG_FORMAT, LOG_FORMAT_OPEN_TELEMETRY
+
+ if LOG_FORMAT == LOG_FORMAT_OPEN_TELEMETRY:
+ db_log_entries = [
+ WorkflowExecutionLog(
+ workflow_execution_id=log_entry["workflow_execution_id"],
+ timestamp=datetime.strptime(
+ log_entry["asctime"], "%Y-%m-%d %H:%M:%S,%f"
+ ),
+ message=log_entry["message"][0:255], # limit the message to 255 chars
+ context=json.loads(
+ json.dumps(log_entry.get("context", {}), default=str)
+ ), # workaround to serialize any object
+ )
+ for log_entry in log_entries
+ ]
+ else:
+ db_log_entries = [
+ WorkflowExecutionLog(
+ workflow_execution_id=log_entry["workflow_execution_id"],
+ timestamp=log_entry["created"],
+ message=log_entry["message"][0:255], # limit the message to 255 chars
+ context=json.loads(
+ json.dumps(log_entry.get("context", {}), default=str)
+ ), # workaround to serialize any object
+ )
+ for log_entry in log_entries
+ ]
# Add the LogEntry instances to the database session
with Session(engine) as session:
@@ -926,7 +942,12 @@ def get_alerts_with_filters(
def get_last_alerts(
- tenant_id, provider_id=None, limit=1000, timeframe=None, upper_timestamp=None, lower_timestamp=None
+ tenant_id,
+ provider_id=None,
+ limit=1000,
+ timeframe=None,
+ upper_timestamp=None,
+ lower_timestamp=None,
) -> list[Alert]:
"""
Get the last alert for each fingerprint along with the first time the alert was triggered.
@@ -962,7 +983,7 @@ def get_last_alerts(
)
.subquery()
)
-
+
filter_conditions = []
if upper_timestamp is not None:
@@ -1866,7 +1887,10 @@ def update_preset_options(tenant_id: str, preset_id: str, options: dict) -> Pres
def assign_alert_to_incident(alert_id: UUID, incident_id: UUID, tenant_id: str):
return add_alerts_to_incident_by_incident_id(tenant_id, incident_id, [alert_id])
-def is_alert_assigned_to_incident(alert_id: UUID, incident_id: UUID, tenant_id: str) -> bool:
+
+def is_alert_assigned_to_incident(
+ alert_id: UUID, incident_id: UUID, tenant_id: str
+) -> bool:
with Session(engine) as session:
assigned = session.exec(
select(AlertToIncident)
@@ -1984,8 +2008,7 @@ def get_last_incidents(
Incident,
)
.filter(
- Incident.tenant_id == tenant_id,
- Incident.is_confirmed == is_confirmed
+ Incident.tenant_id == tenant_id, Incident.is_confirmed == is_confirmed
)
.order_by(desc(Incident.creation_time))
)
@@ -2001,13 +2024,9 @@ def get_last_incidents(
col(Incident.last_seen_time).between(lower_timestamp, upper_timestamp)
)
elif upper_timestamp:
- query = query.filter(
- Incident.last_seen_time <= upper_timestamp
- )
+ query = query.filter(Incident.last_seen_time <= upper_timestamp)
elif lower_timestamp:
- query = query.filter(
- Incident.last_seen_time >= lower_timestamp
- )
+ query = query.filter(Incident.last_seen_time >= lower_timestamp)
total_count = query.count()
@@ -2216,15 +2235,13 @@ def add_alerts_to_incident_by_incident_id(
return None
existed_alert_ids = session.exec(
- select(AlertToIncident.alert_id)
- .where(
+ select(AlertToIncident.alert_id).where(
AlertToIncident.tenant_id == tenant_id,
AlertToIncident.incident_id == incident.id,
col(AlertToIncident.alert_id).in_(alert_ids),
)
).all()
-
new_alert_ids = [
alert_id for alert_id in alert_ids if alert_id not in existed_alert_ids
]
@@ -2418,8 +2435,8 @@ def confirm_predicted_incident_by_id(
session.refresh(incident)
return incident
-
-
+
+
def write_pmi_matrix_to_db(tenant_id: str, pmi_matrix_df: pd.DataFrame) -> bool:
# TODO: add handlers for sequential launches
with Session(engine) as session:
@@ -2431,15 +2448,18 @@ def write_pmi_matrix_to_db(tenant_id: str, pmi_matrix_df: pd.DataFrame) -> bool:
tenant_id=tenant_id,
fingerprint_i=fingerprint_i,
fingerprint_j=fingerprint_j,
- pmi=pmi
+ pmi=pmi,
)
session.merge(pmi_entry)
-
+
session.commit()
-
+
return True
-def get_pmi_value(tenant_id: str, fingerprint_i: str, fingerprint_j: str) -> Optional[float]:
+
+def get_pmi_value(
+ tenant_id: str, fingerprint_i: str, fingerprint_j: str
+) -> Optional[float]:
with Session(engine) as session:
pmi_entry = session.exec(
select(PMIMatrix)
@@ -2447,10 +2467,13 @@ def get_pmi_value(tenant_id: str, fingerprint_i: str, fingerprint_j: str) -> Opt
.where(PMIMatrix.fingerprint_i == fingerprint_i)
.where(PMIMatrix.fingerprint_j == fingerprint_j)
).first()
-
+
return pmi_entry.pmi if pmi_entry else None
-def get_pmi_values(tenant_id: str, fingerprints: List[str]) -> Dict[Tuple[str, str], Optional[float]]:
+
+def get_pmi_values(
+ tenant_id: str, fingerprints: List[str]
+) -> Dict[Tuple[str, str], Optional[float]]:
pmi_values = {}
with Session(engine) as session:
for idx_i, fingerprint_i in enumerate(fingerprints):
@@ -2462,7 +2485,9 @@ def get_pmi_values(tenant_id: str, fingerprints: List[str]) -> Dict[Tuple[str, s
.where(PMIMatrix.fingerprint_i == fingerprint_i)
.where(PMIMatrix.fingerprint_j == fingerprint_j)
).first()
- pmi_values[(fingerprint_i, fingerprint_j)] = pmi_entry.pmi if pmi_entry else None
+ pmi_values[(fingerprint_i, fingerprint_j)] = (
+ pmi_entry.pmi if pmi_entry else None
+ )
return pmi_values
@@ -2528,11 +2553,11 @@ def get_alert_firing_time(tenant_id: str, fingerprint: str) -> timedelta:
tzinfo=timezone.utc
)
+
def update_incident_summary(incident_id: UUID, summary: str) -> Incident:
with Session(engine) as session:
incident = session.exec(
- select(Incident)
- .where(Incident.id == incident_id)
+ select(Incident).where(Incident.id == incident_id)
).first()
if not incident:
@@ -2544,6 +2569,7 @@ def update_incident_summary(incident_id: UUID, summary: str) -> Incident:
return incident
+
# Fetch all topology data
def get_all_topology_data(
tenant_id: str,
diff --git a/keep/api/core/db_on_start.py b/keep/api/core/db_on_start.py
index cff23b493..44c04305e 100644
--- a/keep/api/core/db_on_start.py
+++ b/keep/api/core/db_on_start.py
@@ -33,9 +33,9 @@
from keep.api.models.db.preset import * # pylint: disable=unused-wildcard-import
from keep.api.models.db.provider import * # pylint: disable=unused-wildcard-import
from keep.api.models.db.rule import * # pylint: disable=unused-wildcard-import
+from keep.api.models.db.statistics import * # pylint: disable=unused-wildcard-import
from keep.api.models.db.tenant import * # pylint: disable=unused-wildcard-import
from keep.api.models.db.workflow import * # pylint: disable=unused-wildcard-import
-from keep.api.models.db.statistics import * # pylint: disable=unused-wildcard-import
logger = logging.getLogger(__name__)
diff --git a/keep/api/core/db_utils.py b/keep/api/core/db_utils.py
index a7a5ebb91..03630940e 100644
--- a/keep/api/core/db_utils.py
+++ b/keep/api/core/db_utils.py
@@ -124,12 +124,14 @@ def create_db_engine():
"mysql+pymysql://",
creator=__get_conn,
echo=DB_ECHO,
+ json_serializer=dumps,
)
elif DB_CONNECTION_STRING == "impersonate":
engine = create_engine(
"mysql+pymysql://",
creator=__get_conn_impersonate,
echo=DB_ECHO,
+ json_serializer=dumps,
)
elif DB_CONNECTION_STRING:
try:
@@ -143,12 +145,15 @@ def create_db_engine():
)
# SQLite does not support pool_size
except TypeError:
- engine = create_engine(DB_CONNECTION_STRING)
+ engine = create_engine(
+ DB_CONNECTION_STRING, json_serializer=dumps, echo=DB_ECHO
+ )
else:
engine = create_engine(
"sqlite:///./keep.db",
connect_args={"check_same_thread": False},
echo=DB_ECHO,
+ json_serializer=dumps,
)
return engine
@@ -158,6 +163,6 @@ def get_json_extract_field(session, base_field, key):
if session.bind.dialect.name == "postgresql":
return func.json_extract_path_text(base_field, key)
elif session.bind.dialect.name == "mysql":
- return func.json_unquote(func.json_extract(base_field, '$.{}'.format(key)))
+ return func.json_unquote(func.json_extract(base_field, "$.{}".format(key)))
else:
- return func.json_extract(base_field, '$.{}'.format(key))
+ return func.json_extract(base_field, "$.{}".format(key))
diff --git a/keep/functions/__init__.py b/keep/functions/__init__.py
index f27696d05..a4b05c843 100644
--- a/keep/functions/__init__.py
+++ b/keep/functions/__init__.py
@@ -69,6 +69,10 @@ def utcnow() -> datetime.datetime:
return dt
+def utcnowtimestamp() -> int:
+ return int(utcnow().timestamp())
+
+
def utcnowiso() -> str:
return utcnow().isoformat()
@@ -98,6 +102,16 @@ def to_utc(dt: datetime.datetime | str = "") -> datetime.datetime:
return utc_dt
+def to_timestamp(dt: datetime.datetime | str = "") -> int:
+ if isinstance(dt, str):
+ try:
+ dt = parser.parse(dt.strip())
+ except ParserError:
+ # Failed to parse the date
+ return 0
+ return int(dt.timestamp())
+
+
def datetime_compare(t1: datetime = None, t2: datetime = None) -> float:
if t1 is None or t2 is None:
return 0
@@ -260,7 +274,7 @@ def get_firing_time(alert: dict, time_unit: str, **kwargs) -> str:
"Invalid time_unit. Use 'minutes', 'hours', 'seconds', 'm', 'h', or 's'."
)
- return f"{result:.1f}"
+ return f"{result:.2f}"
def is_first_time(fingerprint: str, since: str = None, **kwargs) -> str:
diff --git a/keep/providers/base/base_provider.py b/keep/providers/base/base_provider.py
index c004fc475..3d948fa3b 100644
--- a/keep/providers/base/base_provider.py
+++ b/keep/providers/base/base_provider.py
@@ -147,7 +147,11 @@ def _enrich_alert(self, enrichments, results):
# This is when we are in a foreach context that is zipped
foreach_context: dict = foreach_context[0]
event = foreach_context
- fingerprint = foreach_context.get("fingerprint")
+
+ if isinstance(foreach_context, AlertDto):
+ fingerprint = foreach_context.fingerprint
+ else:
+ fingerprint = foreach_context.get("fingerprint")
# else, if we are in an event context, use the event fingerprint
elif self.context_manager.event_context:
# TODO: map all casses event_context is dict and update them to the DTO
@@ -170,10 +174,12 @@ def _enrich_alert(self, enrichments, results):
self.logger.debug("Fingerprint extracted", extra={"fingerprint": fingerprint})
_enrichments = {}
+ disposable_enrichments = {}
# enrich only the requested fields
for enrichment in enrichments:
try:
value = enrichment["value"]
+ disposable = bool(enrichment.get("disposable", "false"))
if value.startswith("results."):
val = enrichment["value"].replace("results.", "")
parts = val.split(".")
@@ -181,7 +187,10 @@ def _enrich_alert(self, enrichments, results):
for part in parts:
r = r[part]
value = r
- _enrichments[enrichment["key"]] = value
+ if disposable:
+ disposable_enrichments[enrichment["key"]] = value
+ else:
+ _enrichments[enrichment["key"]] = value
if event is not None:
if isinstance(event, dict):
event[enrichment["key"]] = value
@@ -201,6 +210,7 @@ def _enrich_alert(self, enrichments, results):
enrichment_string += f"{key}={value}, "
# remove the last comma
enrichment_string = enrichment_string[:-2]
+ # enrich the alert with _enrichments
enrichments_bl.enrich_alert(
fingerprint,
_enrichments,
@@ -208,6 +218,20 @@ def _enrich_alert(self, enrichments, results):
action_callee="system",
action_description=f"Workflow enriched the alert with {enrichment_string}",
)
+ # enrich with disposable enrichments
+ enrichment_string = ""
+ for key, value in disposable_enrichments.items():
+ enrichment_string += f"{key}={value}, "
+ # remove the last comma
+ enrichment_string = enrichment_string[:-2]
+ enrichments_bl.enrich_alert(
+ fingerprint,
+ disposable_enrichments,
+ action_type=AlertActionType.WORKFLOW_ENRICH,
+ action_callee="system",
+ action_description=f"Workflow enriched the alert with {enrichment_string}",
+ dispose_on_new_alert=True,
+ )
except Exception as e:
self.logger.error(
diff --git a/keep/providers/grafana_provider/grafana_provider.py b/keep/providers/grafana_provider/grafana_provider.py
index 02441279a..86c0a4104 100644
--- a/keep/providers/grafana_provider/grafana_provider.py
+++ b/keep/providers/grafana_provider/grafana_provider.py
@@ -509,7 +509,10 @@ def simulate_alert(cls, **kwargs) -> dict:
if not alert_type:
alert_type = random.choice(list(ALERTS.keys()))
- alert_payload = ALERTS[alert_type]["payload"]
+ if "payload" in ALERTS[alert_type]:
+ alert_payload = ALERTS[alert_type]["payload"]
+ else:
+ alert_payload = ALERTS[alert_type]["alerts"][0]
alert_parameters = ALERTS[alert_type].get("parameters", {})
# Generate random data for parameters
for parameter, parameter_options in alert_parameters.items():
diff --git a/keep/providers/keep_provider/keep_provider.py b/keep/providers/keep_provider/keep_provider.py
index 8864812f4..ec7835232 100644
--- a/keep/providers/keep_provider/keep_provider.py
+++ b/keep/providers/keep_provider/keep_provider.py
@@ -10,6 +10,7 @@
from keep.contextmanager.contextmanager import ContextManager
from keep.providers.base.base_provider import BaseProvider
from keep.providers.models.provider_config import ProviderConfig
+from keep.searchengine.searchengine import SearchEngine
class KeepProvider(BaseProvider):
@@ -28,7 +29,7 @@ def dispose(self):
"""
pass
- def _query(self, filters, distinct=True, time_delta=1, **kwargs):
+ def _query(self, filters=None, version=1, distinct=True, time_delta=1, **kwargs):
"""
Query Keep for alerts.
"""
@@ -40,31 +41,41 @@ def _query(self, filters, distinct=True, time_delta=1, **kwargs):
"time_delta": time_delta,
},
)
- db_alerts = get_alerts_with_filters(
- self.context_manager.tenant_id, filters=filters, time_delta=time_delta
- )
- self.logger.info(
- "Got alerts from Keep", extra={"num_of_alerts": len(db_alerts)}
- )
-
- fingerprints = {}
- alerts = []
- if db_alerts:
- for alert in db_alerts:
- if fingerprints.get(alert.fingerprint) and distinct is True:
- continue
- alert_event = alert.event
- if alert.alert_enrichment:
- alert_event["enrichments"] = alert.alert_enrichment.enrichments
- alerts.append(alert_event)
- fingerprints[alert.fingerprint] = True
- self.logger.info(
- "Returning alerts",
- extra={
- "num_of_alerts": len(alerts),
- "fingerprints": list(fingerprints.keys()),
- },
- )
+ if version == 1:
+ # filters are mandatory for version 1
+ if not filters:
+ raise ValueError("Filters are required for version")
+ db_alerts = get_alerts_with_filters(
+ self.context_manager.tenant_id, filters=filters, time_delta=time_delta
+ )
+ fingerprints = {}
+ # distinct if needed
+ alerts = []
+ if db_alerts:
+ for alert in db_alerts:
+ if fingerprints.get(alert.fingerprint) and distinct is True:
+ continue
+ alert_event = alert.event
+ if alert.alert_enrichment:
+ alert_event["enrichments"] = alert.alert_enrichment.enrichments
+ alerts.append(alert_event)
+ fingerprints[alert.fingerprint] = True
+ else:
+ search_engine = SearchEngine(tenant_id=self.context_manager.tenant_id)
+ _filter = kwargs.get("filter")
+ if not _filter:
+ raise ValueError("Filter is required for version 2")
+ try:
+ alerts = search_engine.search_alerts_by_cel(
+ cel_query=_filter, limit=kwargs.get("limit"), timeframe=time_delta
+ )
+ except Exception as e:
+ self.logger.exception(
+ "Failed to search alerts by CEL: %s",
+ str(e),
+ )
+ raise
+ self.logger.info("Got alerts from Keep", extra={"num_of_alerts": len(alerts)})
return alerts
def validate_config(self):
diff --git a/tests/test_functions.py b/tests/test_functions.py
index f6f92eb76..00e924ba8 100644
--- a/tests/test_functions.py
+++ b/tests/test_functions.py
@@ -364,7 +364,7 @@ def test_get_firing_time_case2(create_alert):
create_alert(fingerprint, AlertStatus.RESOLVED, base_time - timedelta(minutes=90))
alert = {"fingerprint": fingerprint}
- assert functions.get_firing_time(alert, "m", tenant_id=SINGLE_TENANT_UUID) == "0.0"
+ assert functions.get_firing_time(alert, "m", tenant_id=SINGLE_TENANT_UUID) == "0.00"
def test_get_firing_time_case3(create_alert):
@@ -406,7 +406,7 @@ def test_get_firing_time_no_firing(create_alert):
create_alert(fingerprint, AlertStatus.RESOLVED, base_time - timedelta(minutes=60))
alert = {"fingerprint": fingerprint}
- assert functions.get_firing_time(alert, "m", tenant_id=SINGLE_TENANT_UUID) == "0.0"
+ assert functions.get_firing_time(alert, "m", tenant_id=SINGLE_TENANT_UUID) == "0.00"
def test_get_firing_time_other_statuses(create_alert):
diff --git a/tests/test_workflow_execution.py b/tests/test_workflow_execution.py
index b646a8eb8..f5ac6bc44 100644
--- a/tests/test_workflow_execution.py
+++ b/tests/test_workflow_execution.py
@@ -348,3 +348,117 @@ def test_workflow_execution_2(
assert "Tier 1 Alert" in workflow_execution.results["send-slack-message"][0]
else:
assert workflow_execution.results["send-slack-message"] == []
+
+
+workflow_definition_3 = """workflow:
+id: alert-time-check
+description: Handle alerts based on startedAt timestamp
+triggers:
+- type: alert
+ filters:
+ - key: name
+ value: "server-is-down"
+actions:
+- name: send-slack-message-tier-0
+ if: keep.get_firing_time('{{ alert }}', 'minutes') > 0 and keep.get_firing_time('{{ alert }}', 'minutes') < 10
+ provider:
+ type: console
+ with:
+ message: |
+ "Tier 0 Alert: {{ alert.name }} - {{ alert.description }}
+ Alert details: {{ alert }}"
+- name: send-slack-message-tier-1
+ if: "keep.get_firing_time('{{ alert }}', 'minutes') >= 10 and keep.get_firing_time('{{ alert }}', 'minutes') < 30"
+ provider:
+ type: console
+ with:
+ message: |
+ "Tier 1 Alert: {{ alert.name }} - {{ alert.description }}
+ Alert details: {{ alert }}"
+"""
+
+
+@pytest.mark.parametrize(
+ "test_case, alert_statuses, expected_tier, db_session",
+ [
+ ("Tier 0", [[0, "firing"]], 0, None),
+ ("Tier 1", [[10, "firing"], [0, "firing"]], 1, None),
+ ("Resolved", [[15, "firing"], [5, "firing"], [0, "resolved"]], None, None),
+ (
+ "Tier 0 again",
+ [[20, "firing"], [10, "firing"], [5, "resolved"], [0, "firing"]],
+ 0,
+ None,
+ ),
+ ],
+ indirect=["db_session"],
+)
+def test_workflow_execution3(
+ db_session,
+ create_alert,
+ workflow_manager,
+ test_case,
+ alert_statuses,
+ expected_tier,
+):
+ workflow = Workflow(
+ id="alert-first-time",
+ name="alert-first-time",
+ tenant_id=SINGLE_TENANT_UUID,
+ description="Send slack message only the first time an alert fires",
+ created_by="test@keephq.dev",
+ interval=0,
+ workflow_raw=workflow_definition_3,
+ )
+ db_session.add(workflow)
+ db_session.commit()
+ base_time = datetime.now(tz=pytz.utc)
+
+ # Create alerts with specified statuses and timestamps
+ for time_diff, status in alert_statuses:
+ alert_status = (
+ AlertStatus.FIRING if status == "firing" else AlertStatus.RESOLVED
+ )
+ create_alert("fp1", alert_status, base_time - timedelta(minutes=time_diff))
+
+ # Create the current alert
+ current_alert = AlertDto(
+ id="grafana-1",
+ source=["grafana"],
+ name="server-is-down",
+ status=AlertStatus.FIRING,
+ severity="critical",
+ fingerprint="fp1",
+ )
+
+ # Insert the current alert into the workflow manager
+ workflow_manager.insert_events(SINGLE_TENANT_UUID, [current_alert])
+
+ # Wait for the workflow execution to complete
+ workflow_execution = None
+ count = 0
+ status = None
+ while workflow_execution is None and count < 30 and status != "success":
+ workflow_execution = get_last_workflow_execution_by_workflow_id(
+ SINGLE_TENANT_UUID, "alert-first-time"
+ )
+ if workflow_execution is not None:
+ status = workflow_execution.status
+ time.sleep(1)
+ count += 1
+
+ # Check if the workflow execution was successful
+
+ assert workflow_execution is not None
+ assert workflow_execution.status == "success"
+
+ # Verify if the correct tier action was triggered
+ if expected_tier is None:
+ assert workflow_execution.results["send-slack-message-tier-0"] == []
+ assert workflow_execution.results["send-slack-message-tier-1"] == []
+ elif expected_tier == 0:
+ assert workflow_execution.results["send-slack-message-tier-1"] == []
+ assert "Tier 0" in workflow_execution.results["send-slack-message-tier-0"][0]
+ elif expected_tier == 1:
+ assert workflow_execution.results["send-slack-message-tier-0"] == []
+ assert "Tier 1" in workflow_execution.results["send-slack-message-tier-1"][0]
From bab73405147d2f0ec6c3fe83976aa657984995eb Mon Sep 17 00:00:00 2001
From: Ezhil Shanmugham
Date: Thu, 15 Aug 2024 14:08:43 +0530
Subject: [PATCH 09/32] feat: coralogix provider (#1618)
---
README.md | 2 +
docs/images/coralogix-provider_1.png | Bin 0 -> 141936 bytes
docs/images/coralogix-provider_2.png | Bin 0 -> 79367 bytes
docs/images/coralogix-provider_3.png | Bin 0 -> 90844 bytes
docs/images/coralogix-provider_4.png | Bin 0 -> 102511 bytes
docs/images/coralogix-provider_5.png | Bin 0 -> 82045 bytes
docs/images/coralogix-provider_6.png | Bin 0 -> 116944 bytes
docs/mint.json | 1 +
.../documentation/coralogix-provider.mdx | 70 ++++++
keep-ui/public/icons/coralogix-icon.png | Bin 0 -> 6772 bytes
keep/providers/coralogix_provider/__init__.py | 0
.../coralogix_provider/alerts_mock.py | 225 ++++++++++++++++++
.../coralogix_provider/coralogix_provider.py | 100 ++++++++
13 files changed, 398 insertions(+)
create mode 100644 docs/images/coralogix-provider_1.png
create mode 100644 docs/images/coralogix-provider_2.png
create mode 100644 docs/images/coralogix-provider_3.png
create mode 100644 docs/images/coralogix-provider_4.png
create mode 100644 docs/images/coralogix-provider_5.png
create mode 100644 docs/images/coralogix-provider_6.png
create mode 100644 docs/providers/documentation/coralogix-provider.mdx
create mode 100644 keep-ui/public/icons/coralogix-icon.png
create mode 100644 keep/providers/coralogix_provider/__init__.py
create mode 100644 keep/providers/coralogix_provider/alerts_mock.py
create mode 100644 keep/providers/coralogix_provider/coralogix_provider.py
diff --git a/README.md b/README.md
index d06b1ec80..0a595a2f9 100644
--- a/README.md
+++ b/README.md
@@ -199,6 +199,8 @@ Workflow triggers can either be executed manually when an alert is activated or
+
+