diff --git a/config/stat-collector-crontab b/config/stat-collector-crontab new file mode 100755 index 000000000..d8a9cf261 --- /dev/null +++ b/config/stat-collector-crontab @@ -0,0 +1,3 @@ +# Automatically collects stats every 5 minutes for the stat-collector service +# Installed by AmpliPi +*/5 * * * * /usr/local/bin/stat-collector.py diff --git a/scripts/configure.py b/scripts/configure.py index 69ccb1c36..a2b73a85c 100755 --- a/scripts/configure.py +++ b/scripts/configure.py @@ -207,6 +207,20 @@ 'popd', ] }, + 'stat-collector': { + "copy": [ + { + 'from': 'config/stat-collector-crontab', + 'to': '/etc/cron.d/stat-collector', + 'sudo': 'true', + }, + { + 'from': 'scripts/stat-collector.py', + 'to': '/usr/local/bin/stat-collector.py', + 'sudo': 'true', + }, + ], + }, # streams # TODO: can stream dependencies be aggregated from the streams themselves? 'airplay': { diff --git a/scripts/stat-collector.py b/scripts/stat-collector.py new file mode 100644 index 000000000..6a4a201c9 --- /dev/null +++ b/scripts/stat-collector.py @@ -0,0 +1,207 @@ +#!/usr/bin/env python3 +import os +import json +import subprocess +from datetime import datetime, timezone, timedelta +from shutil import disk_usage +import requests +# pylint: disable=no-name-in-module +from pydantic import BaseModel +from psutil import virtual_memory + + +def find_matches(list1: list, list2: list): + """Takes in two lists, returns a list that only contains data shared by both lists""" + set1 = {json.dumps(item, sort_keys=True) for item in list1} + set2 = {json.dumps(item, sort_keys=True) for item in list2} + + matches = [json.loads(item) for item in set1 & set2] + return matches + + +def average(running_average, weight, new_entry): + """ + Calculates an average using three variables: + running_average, the average so far + weight, the count of numbers that went into that average + new_entry, the newest addition to the dataset + Returns the new average + """ + # Round to nearest whole number, then int() to shave of trailing .0 + return int(round(((running_average * weight) + new_entry) / (weight + 1), 0)) + + +class StreamUsageSchema(BaseModel): + """Schema for individual stream type usage statistics""" + # For the purpose of the below comments, a "survey cycle" is defined as how often the device phones home + active_streams: list = [] + + # The highest number of concurrent streams running within the survey cycle + average_concurrent_streams: int = 0 + peak_concurrent_streams: int = 0 + + # The length in seconds that a stream of this type has been connected within the survey cycle + # Counts as long as at least one playing stream of the given type persists between polling calls + current_runtime: int = 0 + average_runtime: int = 0 + runtime_weight: int = 0 + peak_runtime: int = 0 + + + def consume(self, source_info: list, poll_count: int, timediff: timedelta): + """Consumes a list of source["info"] json blobs and updates stream statistics tracker with information found within""" + stream_names = [item["name"] for item in source_info] + stream_count = len(stream_names) + + if stream_count == 0 and len(self.active_streams) != 0: # If a stream has just been shut off, record average based on current runtime + new_average = average(self.average_runtime, self.runtime_weight, self.current_runtime) + self.average_runtime = new_average + self.runtime_weight += 1 + + matches = find_matches(self.active_streams, stream_names) + if len(matches) != 0: # If there is at least one playing match, calculate runtime + self.current_runtime = self.current_runtime + timediff.seconds + + self.peak_runtime = max(self.peak_runtime, self.current_runtime) if self.current_runtime < 86400 else 86400 + else: + self.current_runtime = 0 + + self.peak_concurrent_streams = max(self.peak_concurrent_streams, stream_count) + self.average_concurrent_streams = average(self.average_concurrent_streams, poll_count, stream_count) + + # set current data to be next poll's previous data + self.active_streams = stream_names + + +class UsageSurveySchema(BaseModel): + """A verification layer for JSON objects to be sent to the Usage Survey server""" + # Auth code that legitimizes the data as coming from a MicroNova AmpliPro and not some random other source + authentication_code: str = os.environ.get('AUTH_CODE', "") + amplipi_version: str = "" + is_streamer: bool = False + + poll_count: int = 0 # Used to calculate averaged values + time_of_first_poll: datetime = datetime.min + time_of_previous_poll: datetime = datetime.min + + # Memory measured in Kb + peak_memory_usage: int = 0 + average_memory_usage: int = 0 + + # Disk space measured in Gb + # Disk average not collected as it's less variable over time + disk_total: int = 0 + disk_used: int = 0 + disk_free: int = 0 + + # Used to store notable events, such as anything that occurs above a logging level of warning. See record_logs() function for details. + notable_logs: list[str] = [] + + # These were once in a dict called "streams" that contained the data in the same format, having there not be dicts 3 layers deep seemed prefferable + airplay: StreamUsageSchema = StreamUsageSchema() + aux: StreamUsageSchema = StreamUsageSchema() + bluetooth: StreamUsageSchema = StreamUsageSchema() + dlna: StreamUsageSchema = StreamUsageSchema() + fileplayer: StreamUsageSchema = StreamUsageSchema() + internetradio: StreamUsageSchema = StreamUsageSchema() + lms: StreamUsageSchema = StreamUsageSchema() + media_device: StreamUsageSchema = StreamUsageSchema() + pandora: StreamUsageSchema = StreamUsageSchema() + plexamp: StreamUsageSchema = StreamUsageSchema() + rca: StreamUsageSchema = StreamUsageSchema() + spotify: StreamUsageSchema = StreamUsageSchema() + + def save_to_disk(self, path="/var/lib/UsageSurvey.json"): + """Saves contents of UsageSurvey to file""" + tmp = "/tmp/UsageSurvey.json" + with open(tmp, "w", encoding="UTF-8") as file: + file.write(self.json()) + subprocess.run(['sudo', 'mv', tmp, path], check=True) + + @classmethod + def load_from_disk(cls, path="/var/lib/UsageSurvey.json"): + """Loads contents of UsageSurvey from saved file""" + if os.path.exists(path): + with open(path, "r", encoding="UTF-8") as file: + file_data = json.load(file) + return cls(**file_data) + else: + return cls() + + def get_disk_usage(self): + """Collects and populates disk usage statistics via shutil""" + self.disk_total, self.disk_used, self.disk_free = disk_usage("/") + + def get_mem_usage(self): + """Collects and populates memory usage statistics via psutil""" + memory_info = virtual_memory() + used_memory = memory_info.used + + self.peak_memory_usage = max(self.peak_memory_usage, used_memory) + self.average_memory_usage = average(self.average_memory_usage, self.poll_count, used_memory) + + def record_logs(self): + """Reads journalctl and searches for a list of keywords, then saves new logs to self.notable_logs""" + keywords = ["WARNING", "ERROR"] + + result = subprocess.run( + ["journalctl", "--no-pager", f"--grep={'|'.join(keywords)}"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + check=True + ) + + # Split logs into individual line items in a list, then add new entries to the notable_logs list + logs = result.stdout.splitlines() + self.notable_logs.extend([log for log in logs if log not in self.notable_logs]) + + + def get_state(self): + """Gets system state, saves to relevant places""" + state = requests.get("http://localhost/api", timeout=1) + if state.status_code == 200: + state_json = state.json() + now = datetime.now(timezone.utc) # in UTC to avoid any issues if we do implement a user-set timezone option + if self.time_of_first_poll == datetime.min: + self.time_of_first_poll = now + + timediff = (now - self.time_of_previous_poll) if self.time_of_previous_poll != datetime.min else timedelta(0) + stream_handlers = { + "airplay": {"object": self.airplay, "list": []}, + "aux": {"object": self.aux, "list": []}, + "bluetooth": {"object": self.bluetooth, "list": []}, + "dlna": {"object": self.dlna, "list": []}, + "fileplayer": {"object": self.fileplayer, "list": []}, + "internetradio": {"object": self.internetradio, "list": []}, + "lms": {"object": self.lms, "list": []}, + "media_device": {"object": self.media_device, "list": []}, + "pandora": {"object": self.pandora, "list": []}, + "plexamp": {"object": self.plexamp, "list": []}, + "rca": {"object": self.rca, "list": []}, + "spotify": {"object": self.spotify, "list": []}, + } + + for source in state_json["sources"]: + if source["input"] and source.get("info", {}).get("state") in ["playing", "connected"]: + stream_type = source["info"]["type"] + if stream_type in stream_handlers: + stream_handlers[stream_type]["list"].append(source["info"]) + + for key, handler in stream_handlers.items(): + print(key) + handler["object"].consume(handler["list"], self.poll_count, timediff) + + self.poll_count += 1 + self.time_of_previous_poll = now + self.amplipi_version = state_json["info"]["version"] + self.is_streamer = state_json["info"]["is_streamer"] + self.get_disk_usage() + self.get_mem_usage() + self.record_logs() + + +if __name__ == "__main__": + UsageSurvey = UsageSurveySchema.load_from_disk() + UsageSurvey.get_state() + UsageSurvey.save_to_disk()