Skip to content

Commit

Permalink
NAS-130905 / 25.04 / Remove REST from test_050_alert.py (#14415)
Browse files Browse the repository at this point in the history
* remove numbers from test and module names

* remove rest and simplify

* replace global variables with pytest cache

* address @yocalebo

* change quotes

* fix `get_alert_by_id` and account for cache dependency

* use `alert.run_source`

* use fixtures

* fix indexing issues

* can't mark a fixture

* fixture timeout

* allow parametrization

* process alerts before waiting for alert to disappear

* Revert "allow parametrization"

This reverts commit 8b7ffe0.

* remove `autouse`
  • Loading branch information
creatorcary authored Sep 11, 2024
1 parent 923ac36 commit b907a48
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 131 deletions.
131 changes: 0 additions & 131 deletions tests/api2/test_050_alert.py

This file was deleted.

74 changes: 74 additions & 0 deletions tests/api2/test_alert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from time import sleep

import pytest

from auto_config import pool_name
from middlewared.test.integration.utils import call, ssh


ID_PATH = "/dev/disk/by-partuuid/"


def get_alert_by_id(alert_id):
return next(filter(lambda alert: alert["id"] == alert_id, call("alert.list")), None)


def wait_for_alert(timeout=120):
for _ in range(timeout):
for alert in call("alert.list"):
if (
alert["source"] == "VolumeStatus" and
alert["args"]["volume"] == pool_name and
alert["args"]["state"] == "DEGRADED"
):
return alert["id"]
sleep(1)


@pytest.fixture(scope="module")
def degraded_pool_gptid():
get_pool = call("pool.query", [["name", "=", pool_name]], {"get": True})
gptid = get_pool["topology"]["data"][0]["path"].replace(ID_PATH, "")
ssh(f"zinject -d {gptid} -A fault {pool_name}")
return gptid


@pytest.fixture(scope="module")
def alert_id(degraded_pool_gptid):
call("alert.process_alerts")
result = wait_for_alert()
if result is None:
pytest.fail("Timed out while waiting for alert.")
return result


def test_verify_the_pool_is_degraded(degraded_pool_gptid):
status = call("zpool.status", {"name": pool_name})
disk_status = status[pool_name]["data"][ID_PATH + degraded_pool_gptid]["disk_status"]
assert disk_status == "DEGRADED"


def test_dismiss_alert(alert_id):
call("alert.dismiss", alert_id)
alert = get_alert_by_id(alert_id)
assert alert["dismissed"] is True, alert


def test_restore_alert(alert_id):
call("alert.restore", alert_id)
alert = get_alert_by_id(alert_id)
assert alert["dismissed"] is False, alert


def test_clear_the_pool_degradation(degraded_pool_gptid):
ssh(f"zpool clear {pool_name}")
status = call("zpool.status", {"name": pool_name})
disk_status = status[pool_name]["data"][ID_PATH + degraded_pool_gptid]["disk_status"]
assert disk_status != "DEGRADED"


@pytest.mark.timeout(120)
def test_wait_for_the_alert_to_disappear(alert_id):
call("alert.process_alerts")
while get_alert_by_id(alert_id) is not None:
sleep(1)

0 comments on commit b907a48

Please sign in to comment.