From a8de95fa83887d1b0bfc9d564a8d8383d7b01e72 Mon Sep 17 00:00:00 2001 From: Dustin Bragg Date: Fri, 16 Aug 2019 17:56:31 -0400 Subject: [PATCH] Reworked system failure collection Retain failure history Annotations for both failure discovery as well as failure resolution --- ansible/dashboards/Disk View Dashboard.json | 32 +++- ansible/dashboards/System View Dashboard.json | 68 ++++++-- ansible/dashboards/Volume View Dashboard.json | 32 +++- collector/collector.py | 153 ++++++++++++------ collector/tests/test_collector.py | 8 +- 5 files changed, 231 insertions(+), 62 deletions(-) diff --git a/ansible/dashboards/Disk View Dashboard.json b/ansible/dashboards/Disk View Dashboard.json index cc60194..964c8a0 100755 --- a/ansible/dashboards/Disk View Dashboard.json +++ b/ansible/dashboards/Disk View Dashboard.json @@ -1,8 +1,8 @@ { "dashboard": { "annotations": { - "list": [ - { + "list": [ + { "builtIn": 1, "datasource": "-- Grafana --", "enable": true, @@ -10,6 +10,34 @@ "iconColor": "rgba(0, 211, 255, 1)", "name": "Annotations & Alerts", "type": "dashboard" + }, + { + "datasource": "WSP", + "enable": true, + "hide": false, + "iconColor": "#C4162A", + "limit": 100, + "name": "Failure Discovered", + "query": "SELECT name_of,type_of FROM (SELECT name_of,type_of,active FROM \"failures\" WHERE (\"sys_name\" =~ /^$System$/) AND $timeFilter GROUP BY \"sys_name\", \"failure_type\", \"object_type\" LIMIT 25) WHERE (\"active\" = 'True')", + "showIn": 0, + "tags": [], + "tagsColumn": "type_of", + "textColumn": "name_of", + "type": "tags" + }, + { + "datasource": "WSP", + "enable": true, + "hide": false, + "iconColor": "#56A64B", + "limit": 100, + "name": "Failure Resolved", + "query": "SELECT name_of,type_of FROM (SELECT name_of,type_of,active FROM \"failures\" WHERE (\"sys_name\" =~ /^$System$/) AND $timeFilter GROUP BY \"sys_name\", \"failure_type\", \"object_type\" LIMIT 25) WHERE (\"active\" = 'False')", + "showIn": 0, + "tags": [], + "tagsColumn": "type_of", + "textColumn": "name_of", + "type": "tags" } ] }, diff --git a/ansible/dashboards/System View Dashboard.json b/ansible/dashboards/System View Dashboard.json index aec33ef..1efde34 100755 --- a/ansible/dashboards/System View Dashboard.json +++ b/ansible/dashboards/System View Dashboard.json @@ -1,8 +1,8 @@ { "dashboard": { "annotations": { - "list": [ - { + "list": [ + { "builtIn": 1, "datasource": "-- Grafana --", "enable": true, @@ -10,6 +10,34 @@ "iconColor": "rgba(0, 211, 255, 1)", "name": "Annotations & Alerts", "type": "dashboard" + }, + { + "datasource": "WSP", + "enable": true, + "hide": false, + "iconColor": "#C4162A", + "limit": 100, + "name": "Failure Discovered", + "query": "SELECT name_of,type_of FROM (SELECT name_of,type_of,active FROM \"failures\" WHERE (\"sys_name\" =~ /^$System$/) AND $timeFilter GROUP BY \"sys_name\", \"failure_type\", \"object_type\" LIMIT 25) WHERE (\"active\" = 'True')", + "showIn": 0, + "tags": [], + "tagsColumn": "type_of", + "textColumn": "name_of", + "type": "tags" + }, + { + "datasource": "WSP", + "enable": true, + "hide": false, + "iconColor": "#56A64B", + "limit": 100, + "name": "Failure Resolved", + "query": "SELECT name_of,type_of FROM (SELECT name_of,type_of,active FROM \"failures\" WHERE (\"sys_name\" =~ /^$System$/) AND $timeFilter GROUP BY \"sys_name\", \"failure_type\", \"object_type\" LIMIT 25) WHERE (\"active\" = 'False')", + "showIn": 0, + "tags": [], + "tagsColumn": "type_of", + "textColumn": "name_of", + "type": "tags" } ] }, @@ -1561,7 +1589,7 @@ "h": 10, "w": 9, "x": 15, - "y": 29 + "y": 26 }, "id": 24, "links": [], @@ -1575,7 +1603,7 @@ }, "styles": [ { - "alias": "Pull Time", + "alias": "Discovery Time", "colorMode": null, "colors": [ "rgba(245, 54, 54, 0.9)", @@ -1661,6 +1689,22 @@ "thresholds": [], "type": "hidden", "unit": "short" + }, + { + "alias": "", + "colorMode": null, + "colors": [ + "rgba(245, 54, 54, 0.9)", + "rgba(237, 129, 40, 0.89)", + "rgba(50, 172, 45, 0.97)" + ], + "dateFormat": "YYYY-MM-DD HH:mm:ss", + "decimals": 2, + "mappingType": 1, + "pattern": "active", + "thresholds": [], + "type": "hidden", + "unit": "short" } ], "targets": [ @@ -1688,15 +1732,15 @@ "measurement": "failures", "orderByTime": "ASC", "policy": "default", - "query": "SELECT \"id\", \"location\", \"description\" FROM \"major_event_log\" WHERE (\"sys_name\" =~ /^$System$/) GROUP BY \"sys_name\"", - "rawQuery": false, + "query": "SELECT * FROM (SELECT last(\"name_of\"),active FROM \"failures\" WHERE (\"sys_name\" =~ /^$System$/) AND $timeFilter GROUP BY \"sys_name\", \"failure_type\", \"object_type\") WHERE (\"active\" = 'True')", + "rawQuery": true, "refId": "A", "resultFormat": "table", "select": [ [ { "params": [ - "value" + "name_of" ], "type": "field" }, @@ -1711,6 +1755,12 @@ "key": "sys_name", "operator": "=~", "value": "/^$System$/" + }, + { + "condition": "AND", + "key": "active", + "operator": "=", + "value": "True" } ] } @@ -1740,14 +1790,14 @@ ] }, "datasource": "WSP", - "definition": "SHOW TAG VALUES FROM \"major_event_log\" WITH KEY = \"sys_name\"", + "definition": "SHOW TAG VALUES FROM \"disks\" WITH KEY = \"sys_name\"", "hide": 0, "includeAll": false, "label": null, "multi": true, "name": "System", "options": [], - "query": "SHOW TAG VALUES FROM \"major_event_log\" WITH KEY = \"sys_name\"", + "query": "SHOW TAG VALUES FROM \"disks\" WITH KEY = \"sys_name\"", "refresh": 2, "regex": "", "skipUrlSync": false, diff --git a/ansible/dashboards/Volume View Dashboard.json b/ansible/dashboards/Volume View Dashboard.json index 8fac81d..a0b76e8 100755 --- a/ansible/dashboards/Volume View Dashboard.json +++ b/ansible/dashboards/Volume View Dashboard.json @@ -1,8 +1,8 @@ { "dashboard": { "annotations": { - "list": [ - { + "list": [ + { "builtIn": 1, "datasource": "-- Grafana --", "enable": true, @@ -10,6 +10,34 @@ "iconColor": "rgba(0, 211, 255, 1)", "name": "Annotations & Alerts", "type": "dashboard" + }, + { + "datasource": "WSP", + "enable": true, + "hide": false, + "iconColor": "#C4162A", + "limit": 100, + "name": "Failure Discovered", + "query": "SELECT name_of,type_of FROM (SELECT name_of,type_of,active FROM \"failures\" WHERE (\"sys_name\" =~ /^$System$/) AND $timeFilter GROUP BY \"sys_name\", \"failure_type\", \"object_type\" LIMIT 25) WHERE (\"active\" = 'True')", + "showIn": 0, + "tags": [], + "tagsColumn": "type_of", + "textColumn": "name_of", + "type": "tags" + }, + { + "datasource": "WSP", + "enable": true, + "hide": false, + "iconColor": "#56A64B", + "limit": 100, + "name": "Failure Resolved", + "query": "SELECT name_of,type_of FROM (SELECT name_of,type_of,active FROM \"failures\" WHERE (\"sys_name\" =~ /^$System$/) AND $timeFilter GROUP BY \"sys_name\", \"failure_type\", \"object_type\" LIMIT 25) WHERE (\"active\" = 'False')", + "showIn": 0, + "tags": [], + "tagsColumn": "type_of", + "textColumn": "name_of", + "type": "tags" } ] }, diff --git a/collector/collector.py b/collector/collector.py index 2236652..17c62df 100755 --- a/collector/collector.py +++ b/collector/collector.py @@ -11,6 +11,8 @@ import concurrent.futures import requests import json +import hashlib +from datetime import datetime from datetime import datetime from influxdb import InfluxDBClient @@ -306,7 +308,7 @@ def collect_storage_metrics(sys): PROXY_BASE_URL, sys_id)).json() if CMD.showVolumeNames: for stats in volume_stats_list: - LOG.info(stats["volumeName"]); + LOG.info(stats["volumeName"]) # Add Volume statistics to json body for stats in volume_stats_list: vol_item = dict( @@ -356,9 +358,9 @@ def collect_major_event_log(sys): if query: start_from = int(next(query.get_points())["id"]) + 1 - + mel_response = session.get(("{}/{}/mel-events").format(PROXY_BASE_URL, sys_id), - params = {"count": mel_grab_count, "startSequenceNumber": start_from}).json(); + params = {"count": mel_grab_count, "startSequenceNumber": start_from}, timeout=(6.10, CMD.intervalTime*2)).json() if CMD.showMELMetrics: LOG.info("Starting from %s", str(start_from)) LOG.info("Grabbing %s MELs", str(len(mel_response))) @@ -390,7 +392,26 @@ def collect_major_event_log(sys): LOG.error(("Error when attempting to post MEL for {}/{}").format(sys["name"], sys["id"])) -def collect_system_state(sys): +def create_failure_dict_item(sys_id, sys_name, fail_type, obj_ref, obj_type, is_active, the_time): + item = dict( + measurement = "failures", + tags = dict( + sys_id = sys_id, + sys_name = sys_name, + failure_type = fail_type, + object_ref = obj_ref, + object_type = obj_type, + active = is_active + ), + fields = dict( + name_of = sys_name, + type_of = fail_type + ), + time = the_time + ) + return item + +def collect_system_state(sys, checksums): """ Collects state information from the storage system and posts it to influxdb :param sys: The JSON object of a storage_system @@ -407,48 +428,86 @@ def collect_system_state(sys): # If this storage device still lacks a name, use a default if not sys_name or len(sys_name) <= 0: sys_name = DEFAULT_SYSTEM_NAME - + + # query the api and get a list of current failures for this system + failure_response = session.get(("{}/{}/failures").format(PROXY_BASE_URL, sys_id)).json() + + # we can skip us if this is the same response we handled last time + old_checksum = checksums.get(str(sys_id)) + new_checksum = hashlib.md5(str(failure_response).encode("utf-8")).hexdigest() + if old_checksum is not None and str(new_checksum) == str(old_checksum): + return + checksums.update({str(sys_id) : str(new_checksum)}) + + # pull most recent failures for this system from our database, including their active status + query_string = ("SELECT last(\"type_of\"),failure_type,object_ref,object_type,active FROM \"failures\" WHERE (\"sys_id\" = '{}') GROUP BY \"sys_name\", \"failure_type\"").format(sys_id) + query = client.query(query_string) + failure_points = list(query.get_points()) + json_body = list() - query = client.query("SELECT * FROM failures WHERE sys_id='%s'" % sys_id) - - failure_response = session.get(("{}/{}/failures").format(PROXY_BASE_URL, sys_id)).json(); + + # take care of active failures we don't know about for failure in failure_response: - found = False - fail_type = failure["failureType"] - obj_ref = failure["objectRef"] - obj_type = failure["objectType"] - - # check to see if we've seen this failure before - if query: - failure_points = (query.get_points(measurement='failures')) - for point in failure_points: - if fail_type == point.failure_type and obj_ref == point.object_ref and obj_type == point.object_type: - found = True - break - # if this is a new failure, we want to post it to influxdb - if not found: - item = dict( - measurement = "failures", - tags = dict( - sys_id = sys_id, - sys_name = sys_name, - failure_type = fail_type, - object_ref = obj_ref, - object_type = obj_type - ), - fields = dict( - value = True - ), - time = datetime.utcnow().isoformat() - ) + r_fail_type = failure.get("failureType") + r_obj_ref = failure.get("objectRef") + r_obj_type = failure.get("objectType") + + # we push if we haven't seen this, or we think it's inactive + push = True + for point in failure_points: + p_fail_type = point["failure_type"] + p_obj_ref = point["object_ref"] + p_obj_type = point["object_type"] + p_active = point["active"] + if (r_fail_type == p_fail_type + and r_obj_ref == p_obj_ref + and r_obj_type == p_obj_type): + if p_active == "True": + push = False # we already know this is an active failure so don't push + break + + if push: if CMD.showStateMetrics: - LOG.info("Failure payload: %s", item) - json_body.append(item) - - num = len(json_body) - if num > 0: - LOG.info("Found %s new failures", str(num)) - client.write_points(json_body, database=INFLUXDB_DATABASE, time_precision="s") + LOG.info("Failure payload T1: %s", item) + json_body.append(create_failure_dict_item(sys_id, sys_name, + r_fail_type, r_obj_ref, r_obj_type, + True, datetime.utcnow().isoformat())) + + # take care of failures that are no longer active + for point in failure_points: + # we only care about points that we think are active + p_active = point["active"] + if not p_active: + continue + + p_fail_type = point["failure_type"] + p_obj_ref = point["object_ref"] + p_obj_type = point["object_type"] + + # we push if we are no longer active, but think that we are + push = True + for failure in failure_response: + r_fail_type = failure.get("failureType") + r_obj_ref = failure.get("objectRef") + r_obj_type = failure.get("objectType") + if (r_fail_type == p_fail_type + and r_obj_ref == p_obj_ref + and r_obj_type == p_obj_type): + push = False # we are still active, so don't push + break + + if push: + if CMD.showStateMetrics: + LOG.info("Failure payload T2: %s", item) + json_body.append(create_failure_dict_item(sys_id, sys_name, + p_fail_type, p_obj_ref, p_obj_type, + False, datetime.utcnow().isoformat())) + + # write failures to influxdb + if CMD.showStateMetrics: + LOG.info("Writing {} failures".format(len(json_body))) + client.write_points(json_body, database=INFLUXDB_DATABASE) + except RuntimeError: LOG.error(("Error when attempting to post state information for {}/{}").format(sys["name"], sys["id"])) @@ -458,7 +517,7 @@ def collect_system_state(sys): ####################### if __name__ == "__main__": - executor = concurrent.futures.ProcessPoolExecutor(NUMBER_OF_THREADS) + executor = concurrent.futures.ThreadPoolExecutor(NUMBER_OF_THREADS) SESSION = get_session() loopIteration = 1 @@ -491,6 +550,7 @@ def collect_system_state(sys): except json.decoder.JSONDecodeError: LOG.exception("Failed to open configuration file due to invalid JSON!") + checksums = dict() while True: time_start = time.time() try: @@ -518,9 +578,9 @@ def collect_system_state(sys): concurrent.futures.wait(collector) # Iterate through all storage system and collect state information - collector = [executor.submit(collect_system_state, sys) for sys in storageList] + collector = [executor.submit(collect_system_state, sys, checksums) for sys in storageList] concurrent.futures.wait(collector) - + # Iterate through all storage system and collect MEL entries collector = [executor.submit(collect_major_event_log, sys) for sys in storageList] concurrent.futures.wait(collector) @@ -539,4 +599,5 @@ def collect_system_state(sys): "Time interval specified: {:07.4f}" .format(time_difference, CMD.intervalTime)) wait_time = time_difference + time.sleep(wait_time) diff --git a/collector/tests/test_collector.py b/collector/tests/test_collector.py index 30489f8..fc10131 100644 --- a/collector/tests/test_collector.py +++ b/collector/tests/test_collector.py @@ -104,7 +104,7 @@ def test_collect_major_event_log(self, mock_requests, mock_get_session, mock_inf } mock_get_session.assert_called() - mock_session.get.assert_called_with(req_url_mel, params=call_params) + mock_session.get.assert_called_with(req_url_mel, params=call_params, timeout=(6.10, 10)) # Test that we properly write points to influxdb when collecting mel events @@ -135,7 +135,8 @@ def test_collect_system_state(self, mock_requests, mock_get_session, mock_influx req_url_fails = ("{}/{}/failures").format(collector.PROXY_BASE_URL, sys_id) mock_session = mock_get_session.return_value - collector.collect_system_state(system) + Checksums = dict() + collector.collect_system_state(system, Checksums) mock_get_session.assert_called() mock_session.get.assert_called_with(req_url_fails) @@ -151,7 +152,8 @@ def test_collect_system_state_writepoints(self, mock_requests, mock_get_session, } mock_session = mock_get_session.return_value - collector.collect_system_state(system) + Checksums = dict() + collector.collect_system_state(system, Checksums) mock_influxclient = mock_influxdb.return_value mock_influxclient.write_points.assert_called()