-
Notifications
You must be signed in to change notification settings - Fork 0
/
uptime_kuma.py
84 lines (75 loc) · 2.98 KB
/
uptime_kuma.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
from uptime_kuma_api import UptimeKumaApi
import os
import datetime
from datetime import datetime
class UptimeKuma:
api: UptimeKumaApi
url: str
username: str
password: str
uptime_kuma_status: dict
def __init__(self):
self.url = ""
self.username = ""
self.password = ""
self.uptime_kuma_status = {}
self.api = None
def get_config(self, config):
if "uptime-kuma" in config and "username" in config["uptime-kuma"] and "password" in config["uptime-kuma"]:
self.username = config['uptime-kuma']['username']
self.password = config['uptime-kuma']['password']
self.username = os.getenv("UPTIME_KUMA_USERNAME") or self.username
self.password = os.getenv("UPTIME_KUMA_PASSWORD") or self.password
if "uptime-kuma" in config and "url" in config["uptime-kuma"]:
self.url = config['uptime-kuma']['url']
self.url = os.getenv("UPTIME_KUMA_URL") or self.url
def connect(self):
try:
print("Getting Uptime Kuma")
if not self.api:
self.api = UptimeKumaApi(self.url)
else:
self.api.connect()
print("Connected")
except Exception as e:
print("Could not get Uptime Kuma")
print(e)
def login(self):
if not self.api:
self.connect()
self.api.login(self.username, self.password)
print("Logged in")
def update(self, ingress):
print("Uptime Kuma: Update ...")
try:
tmp_uptime_kuma_status = {}
status_list = self.get_current_status()
for ing in ingress:
if ing.uptime_kuma == -1:
continue
latest_timestamp = datetime.min
latest_heartbeat = None
for status in status_list:
for data in status["data"]:
if int(data["monitorID"]) == ing.uptime_kuma:
timestamp = data["time"]
timestamp = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S.%f")
if timestamp > latest_timestamp:
latest_timestamp = timestamp
latest_heartbeat = data
if latest_heartbeat:
# print(ing.name + " " + str(latest_heartbeat["status"]))
tmp_uptime_kuma_status[ing] = latest_heartbeat["status"]
for ing in tmp_uptime_kuma_status.keys():
self.uptime_kuma_status[ing] = tmp_uptime_kuma_status[ing]
del tmp_uptime_kuma_status
del status_list
print("Uptime Kuma: Updated")
except Exception as e:
print("Uptime Kuma: Could not update!")
print(e)
def get_current_status(self):
status_list = self.api.get_important_heartbeats()
return status_list
def disconnect(self):
self.api.disconnect()