Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NAS-130905 / 25.04 / Remove REST from test_050_alert.py #14415

Merged
merged 16 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 0 additions & 131 deletions tests/api2/test_050_alert.py

This file was deleted.

74 changes: 74 additions & 0 deletions tests/api2/test_alert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from time import sleep

import pytest

from auto_config import pool_name
from middlewared.test.integration.utils import call, ssh


ID_PATH = "/dev/disk/by-partuuid/"


def get_alert_by_id(alert_id):
return next(filter(lambda alert: alert["id"] == alert_id, call("alert.list")), None)


def wait_for_alert(timeout=120):
for _ in range(timeout):
for alert in call("alert.list"):
if (
alert["source"] == "VolumeStatus" and
alert["args"]["volume"] == pool_name and
alert["args"]["state"] == "DEGRADED"
):
return alert["id"]
sleep(1)


creatorcary marked this conversation as resolved.
Show resolved Hide resolved
@pytest.fixture(scope="module")
def degraded_pool_gptid():
get_pool = call("pool.query", [["name", "=", pool_name]], {"get": True})
gptid = get_pool["topology"]["data"][0]["path"].replace(ID_PATH, "")
ssh(f"zinject -d {gptid} -A fault {pool_name}")
return gptid


@pytest.fixture(scope="module")
def alert_id(degraded_pool_gptid):
call("alert.process_alerts")
result = wait_for_alert()
if result is None:
pytest.fail("Timed out while waiting for alert.")
return result


def test_verify_the_pool_is_degraded(degraded_pool_gptid):
status = call("zpool.status", {"name": pool_name})
disk_status = status[pool_name]["data"][ID_PATH + degraded_pool_gptid]["disk_status"]
assert disk_status == "DEGRADED"


def test_dismiss_alert(alert_id):
call("alert.dismiss", alert_id)
alert = get_alert_by_id(alert_id)
assert alert["dismissed"] is True, alert


def test_restore_alert(alert_id):
call("alert.restore", alert_id)
alert = get_alert_by_id(alert_id)
assert alert["dismissed"] is False, alert


def test_clear_the_pool_degradation(degraded_pool_gptid):
ssh(f"zpool clear {pool_name}")
status = call("zpool.status", {"name": pool_name})
disk_status = status[pool_name]["data"][ID_PATH + degraded_pool_gptid]["disk_status"]
assert disk_status != "DEGRADED"


@pytest.mark.timeout(120)
def test_wait_for_the_alert_to_disappear(alert_id):
call("alert.process_alerts")
while get_alert_by_id(alert_id) is not None:
creatorcary marked this conversation as resolved.
Show resolved Hide resolved
sleep(1)
Loading