Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stat Collector #1007

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion amplipi/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
""" AmpliPi """
# TODO: remove "rt"
__all__ = ["app", "asgi", "auth", "ctrl", "defaults", "display", "eeprom",
"extras", "hw", "models", "mpris", "rt", "streams", "utils", "zeroconf"]
"extras", "hw", "models", "mpris", "rt", "statcollector", "streams", "utils", "zeroconf"]
17 changes: 17 additions & 0 deletions amplipi/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import amplipi.utils as utils
import amplipi.models as models
import amplipi.defaults as defaults
import amplipi.statcollector as statcollector
from amplipi.ctrl import Api, ApiResponse, ApiCode # we don't import ctrl here to avoid naming ambiguity with a ctrl variable
from amplipi.auth import CookieOrParamAPIKey, router as auth_router, NotAuthenticatedException, not_authenticated_exception_handler

Expand Down Expand Up @@ -163,6 +164,22 @@ class params(SimpleNamespace):
api = SimplifyingRouter(dependencies=[Depends(CookieOrParamAPIKey)])


@api.get('/collected_stats')
@api.get('/collected_stats/')
def get_collected_stats():
"""Get the current contents of statcollector output file after filtering out any empty stream data"""
survey = dict(statcollector.UsageSurveySchema.load_from_disk())
return {header: data for header, data in survey.items() if data != statcollector.StreamUsageSchema()}


@api.post('/collected_stats')
@api.post('/collected_stats/')
def post_collected_stats():
"""Send the contents of the statcollector output file to AmpliPi devs"""
survey = statcollector.UsageSurveySchema.load_from_disk()
survey.phone_home()


Comment on lines +167 to +182
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These do not belong at the top of this file
I do not know where they belong, but it isn't the top of the top
They stay here in the meantime because it's easier to not need to scroll too much to edit imports if need be during devwork

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These stats are effectively metadata about the api. Putting them in the API feels weird. That said keeping them outside of the api may make accessing them a little more awkward.

@api.get('/api', tags=['status'])
@api.get('/api/', tags=['status'])
def get_status(ctrl: Api = Depends(get_ctrl)) -> models.Status:
Expand Down
224 changes: 224 additions & 0 deletions amplipi/statcollector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
#!/usr/bin/env python3
"""Stat Collector for AmpliPi; collects data, saves it to a json file, then phones home with user consent every X days/weeks"""
import os
import json
import re
import subprocess
from datetime import datetime, timezone, timedelta
from shutil import disk_usage
import requests
# pylint: disable=no-name-in-module
from pydantic import BaseModel
from psutil import virtual_memory

path = "/var/lib/UsageSurvey.json"


def find_matches(list1: list, list2: list):
"""Takes in two lists, returns a list that only contains data shared by both lists"""
set1 = {json.dumps(item, sort_keys=True) for item in list1}
set2 = {json.dumps(item, sort_keys=True) for item in list2}

matches = [json.loads(item) for item in set1 & set2]
return matches


def average(running_average, weight, new_entry):
"""
Calculates an average using three variables:
running_average, the average so far
weight, the count of numbers that went into that average
new_entry, the newest addition to the dataset
Returns the new average
"""
# Round to nearest whole number, then int() to shave of trailing .0
return int(round(((running_average * weight) + new_entry) / (weight + 1), 0))


class StreamUsageSchema(BaseModel):
"""Schema for individual stream type usage statistics"""
# For the purpose of the below comments, a "survey cycle" is defined as how often the device phones home
active_streams: list = []

# The highest number of concurrent streams running within the survey cycle
average_concurrent_streams: int = 0
peak_concurrent_streams: int = 0

# The length in seconds that a stream of this type has been connected within the survey cycle
# Counts as long as at least one playing stream of the given type persists between polling calls
current_runtime: int = 0
average_runtime: int = 0
runtime_weight: int = 0
peak_runtime: int = 0

def consume(self, source_info: list, poll_count: int, timediff: timedelta):
"""Consumes a list of source["info"] json blobs and updates stream statistics tracker with information found within"""
stream_names = [item["name"] for item in source_info]
stream_count = len(stream_names)

if stream_count == 0 and len(self.active_streams) != 0: # If a stream has just been shut off, record average based on current runtime
new_average = average(self.average_runtime, self.runtime_weight, self.current_runtime)
self.average_runtime = new_average
self.runtime_weight += 1

matches = find_matches(self.active_streams, stream_names)
if len(matches) != 0: # If there is at least one playing match, calculate runtime
self.current_runtime = self.current_runtime + timediff.seconds

self.peak_runtime = max(self.peak_runtime, self.current_runtime) if self.current_runtime < 86400 else 86400
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no real reason to limit this to 24 hours, I just had the feeling that more than 24 hours would lead to outliers and this is a method of negating that. It might be worthwhile to remove the limit just so we can see what sort of lifespan that long-lived stream errors depend on

else:
self.current_runtime = 0

self.peak_concurrent_streams = max(self.peak_concurrent_streams, stream_count)
self.average_concurrent_streams = average(self.average_concurrent_streams, poll_count, stream_count)

# set current data to be next poll's previous data
self.active_streams = stream_names


class UsageSurveySchema(BaseModel):
"""A verification layer for JSON objects to be sent to the Usage Survey server"""
# Auth code that legitimizes the data as coming from a MicroNova AmpliPro and not some random other source
authentication_code: str = os.environ.get('AUTH_CODE', "")
amplipi_version: str = ""
is_streamer: bool = False

poll_count: int = 0 # Used to calculate averaged values
time_of_first_poll: datetime = datetime.min
time_of_previous_poll: datetime = datetime.min

# Memory measured in Kb
peak_memory_usage: int = 0
average_memory_usage: int = 0

# Disk space measured in Gb
# Disk average not collected as it's less variable over time
disk_total: int = 0
disk_used: int = 0
disk_free: int = 0

# Used to store notable events, such as anything that occurs above a logging level of warning. See record_logs() function for details.
notable_logs: list = []

# These were once in a dict called "streams" that contained the data in the same format, having there not be dicts 3 layers deep seemed prefferable
airplay: StreamUsageSchema = StreamUsageSchema()
aux: StreamUsageSchema = StreamUsageSchema()
bluetooth: StreamUsageSchema = StreamUsageSchema()
dlna: StreamUsageSchema = StreamUsageSchema()
fileplayer: StreamUsageSchema = StreamUsageSchema()
internetradio: StreamUsageSchema = StreamUsageSchema()
lms: StreamUsageSchema = StreamUsageSchema()
media_device: StreamUsageSchema = StreamUsageSchema()
pandora: StreamUsageSchema = StreamUsageSchema()
plexamp: StreamUsageSchema = StreamUsageSchema()
rca: StreamUsageSchema = StreamUsageSchema()
spotify: StreamUsageSchema = StreamUsageSchema()

def save_to_disk(self):
"""Saves contents of UsageSurvey to file"""
tmp = "/tmp/UsageSurvey.json"
with open(tmp, "w", encoding="UTF-8") as file:
file.write(self.json())
subprocess.run(['sudo', 'mv', tmp, path], check=True)

@classmethod
def load_from_disk(cls):
"""Loads contents of UsageSurvey from saved file"""
if os.path.exists(path):
with open(path, "r", encoding="UTF-8") as file:
file_data = json.load(file)
return cls(**file_data)
else:
return cls()

def phone_home(self):
"""Send contents back to amplipi devs, and delete current state to refresh the data cycle"""
url = "Currently unknown" # TODO: Update this url after finishing the hosted home base side of the stat collector
response = requests.post(url, json={**self}, timeout=10)
if os.path.exists(path) and response.status_code == 200:
subprocess.run(['sudo', 'rm', path], check=True)

def get_disk_usage(self):
"""Collects and populates disk usage statistics via shutil"""
self.disk_total, self.disk_used, self.disk_free = disk_usage("/")

def get_mem_usage(self):
"""Collects and populates memory usage statistics via psutil"""
memory_info = virtual_memory()
used_memory = memory_info.used

self.peak_memory_usage = max(self.peak_memory_usage, used_memory)
self.average_memory_usage = average(self.average_memory_usage, self.poll_count, used_memory)

def record_logs(self):
"""Reads journalctl and searches for a list of keywords, then saves new logs to self.notable_logs"""
keywords = ["WARNING", "ERROR"]

result = subprocess.run(
["journalctl", "--no-pager", f"--grep={'|'.join(keywords)}"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=True
)

# Split logs into individual line items in a list, then add new entries to the notable_logs list
logs = result.stdout.splitlines()
# Removes the timestamp and hostname of logs, helps sort out logs that are the same but would be seen as different due to a different timestamp
pattern = r"^[A-Za-z]{3} \d{1,2} \d{2}:\d{2}:\d{2} \S+ "
logs = [re.sub(pattern, "", log) for log in logs]

# Was once a single line: self.notable_logs.extend([log for log in logs if log not in self.notable_logs])
# That didn't work as some duplicates were within logs but not self.notable_logs
# so they weren't filtered against and would get flushed to self.notable_logs at the end of the loop
for log in logs:
if log not in self.notable_logs:
self.notable_logs.extend([log])

def get_state(self):
"""Gets system state, saves to relevant places"""
state = requests.get("http://localhost/api", timeout=1)
if state.status_code == 200:
state_json = state.json()
now = datetime.now(timezone.utc) # in UTC to avoid any issues if we do implement a user-set timezone option
if self.time_of_first_poll == datetime.min:
self.time_of_first_poll = now

timediff = (now - self.time_of_previous_poll) if self.time_of_previous_poll != datetime.min else timedelta(0)
stream_handlers = {
"airplay": {"object": self.airplay, "list": []},
"aux": {"object": self.aux, "list": []},
"bluetooth": {"object": self.bluetooth, "list": []},
"dlna": {"object": self.dlna, "list": []},
"fileplayer": {"object": self.fileplayer, "list": []},
"internetradio": {"object": self.internetradio, "list": []},
"lms": {"object": self.lms, "list": []},
"media_device": {"object": self.media_device, "list": []},
"pandora": {"object": self.pandora, "list": []},
"plexamp": {"object": self.plexamp, "list": []},
"rca": {"object": self.rca, "list": []},
"spotify": {"object": self.spotify, "list": []},
}

for source in state_json["sources"]:
if source["input"] and source.get("info", {}).get("state") in ["playing", "connected"]:
stream_type = source["info"]["type"]
if stream_type in stream_handlers:
stream_handlers[stream_type]["list"].append(source["info"])

for _, handler in stream_handlers.items():
handler["object"].consume(handler["list"], self.poll_count, timediff)

self.poll_count += 1
self.time_of_previous_poll = now
self.amplipi_version = state_json["info"]["version"]
self.is_streamer = state_json["info"]["is_streamer"]
self.get_disk_usage()
self.get_mem_usage()
self.record_logs()


if __name__ == "__main__":
UsageSurvey = UsageSurveySchema.load_from_disk()
UsageSurvey.get_state()
UsageSurvey.save_to_disk()
3 changes: 3 additions & 0 deletions config/stat_collector_crontab
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Automatically collects stats every 5 minutes for the stat-collector service
# Installed by AmpliPi
*/5 * * * * root /usr/local/bin/collect_stats.sh
56 changes: 56 additions & 0 deletions docs/amplipi_api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,62 @@ paths:
apmHost: null
version: null
environment: null
/collected_stats/:
get:
summary: Get Collected Stats
description: Get the current contents of statcollector output file after filtering
out any empty stream data
operationId: get_collected_stats_collected_stats__get
responses:
'200':
description: Successful Response
content:
application/json:
schema: {}
security:
- APIKeyCookie: []
- APIKeyQuery: []
post:
summary: Post Collected Stats
description: Send the contents of the statcollector output file to AmpliPi devs
operationId: post_collected_stats_collected_stats__post
responses:
'200':
description: Successful Response
content:
application/json:
schema: {}
security:
- APIKeyCookie: []
- APIKeyQuery: []
/collected_stats:
get:
summary: Get Collected Stats
description: Get the current contents of statcollector output file after filtering
out any empty stream data
operationId: get_collected_stats_collected_stats__get
responses:
'200':
description: Successful Response
content:
application/json:
schema: {}
security:
- APIKeyCookie: []
- APIKeyQuery: []
post:
summary: Post Collected Stats
description: Send the contents of the statcollector output file to AmpliPi devs
operationId: post_collected_stats_collected_stats__post
responses:
'200':
description: Successful Response
content:
application/json:
schema: {}
security:
- APIKeyCookie: []
- APIKeyQuery: []
/api/:
get:
tags:
Expand Down
4 changes: 4 additions & 0 deletions scripts/collect_stats.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/bash
#/usr/local/bin
source /home/pi/amplipi-dev/venv/bin/activate;
python3 /home/pi/amplipi-dev/amplipi/statcollector.py;
14 changes: 14 additions & 0 deletions scripts/configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,20 @@
'popd',
]
},
'stat-collector': {
"copy": [
{
'from': 'config/stat_collector_crontab',
'to': '/etc/cron.d/statcollector',
'sudo': 'true',
},
{
'from': 'scripts/collect_stats.sh',
'to': '/usr/local/bin/collect_stats.sh',
'sudo': 'true',
},
],
},
# streams
# TODO: can stream dependencies be aggregated from the streams themselves?
'airplay': {
Expand Down
14 changes: 7 additions & 7 deletions tests/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1414,18 +1414,18 @@ def test_api_doc_has_examples(client):
if method in ['post', 'put', 'patch']:
try:
req_spec = m['requestBody']['content']['application/json']
assert 'example' in req_spec or 'examples' in req_spec, f'{path_desc}: At least one exmaple request required'
if 'exmaples' in req_spec:
assert len(req_spec['examples']) > 0, f'{path_desc}: At least one exmaple request required'
assert 'example' in req_spec or 'examples' in req_spec, f'{path_desc}: At least one example request required'
if 'examples' in req_spec:
assert len(req_spec['examples']) > 0, f'{path_desc}: At least one example request required'
except KeyError:
pass # request could be different type or non-existent
try:
resp_spec = m['responses']['200']['content']['application/json']
assert 'example' in resp_spec or 'examples' in resp_spec, f'{path_desc}: At least one exmaple response required'
if 'exmaples' in resp_spec:
assert len(resp_spec['examples']) > 0, f'{path_desc}: At least one exmaple response required'
assert 'example' in resp_spec or 'examples' in resp_spec, f'{path_desc}: At least one example response required'
if 'examples' in resp_spec:
assert len(resp_spec['examples']) > 0, f'{path_desc}: At least one example response required'
except KeyError:
pass # reposnse could not be json
pass # reponse could not be json

# TODO: this test will fail until we come up with a good scheme for specifying folder locations in a global config
# The test below fails since the test and the app are run in different directories
Expand Down
Loading