Skip to content

Commit

Permalink
Merge pull request #85 from kaulketh/develop
Browse files Browse the repository at this point in the history
v9.2 Develop
  • Loading branch information
kaulketh authored Dec 30, 2023
2 parents 5b67e6b + af4f9a2 commit 0832b37
Show file tree
Hide file tree
Showing 14 changed files with 239 additions and 226 deletions.
150 changes: 73 additions & 77 deletions bot_framework/telepot_bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
stop_threads
from control.reboot import AutoReboot
from control.update import update_bot
from functions import STOP_CMD
from functions import indices_of_functions, STOP, START
from logger import LOGGER, HISTORY

admins = [ID_CHAT_THK]
Expand All @@ -49,29 +49,18 @@ def __init__(self, t, ids):
self.__admins = ids
self.__bot = telepot.Bot(self.__token)

self._remove_keyboard = ReplyKeyboardRemove()
self.__remove_keyboard = ReplyKeyboardRemove()
# keys order (refer config)
self.__keyboard_markup = ReplyKeyboardMarkup(keyboard=[
self.__button_group([4, 5, 17, 18, 19, 21, 22]),
self.__button_group([8, 9, 10, 13, 11, 12, 14]),
self.__button_group([15, 20, 6, 23, 7, 24]),
self.__button_group([2, 3, 16])
])
self.__keyboard = []
for i in range(len(indices_of_functions)):
self.__keyboard.append(self.__btn_grp(indices_of_functions[i]))
self.__keyboard_markup = ReplyKeyboardMarkup(keyboard=self.__keyboard)
self.__log.debug(f"Done, keyboards and buttons built.")
self.__func_thread = None

@property
def rm_kb(self):
return self._remove_keyboard

@property
def kb_markup(self):
return self.__keyboard_markup

@property
def kb_stop(self):
r = ReplyKeyboardMarkup(
keyboard=[[self.__button(commands[0], 0)]])
def kb_stop(self) -> ReplyKeyboardMarkup:
r = ReplyKeyboardMarkup(keyboard=[[self.__btn(STOP, 0)]])
self.__log.debug(f"Stop keyboard markup: {r}")
return r

Expand All @@ -84,17 +73,17 @@ def external_request(cls, msg, chat_id=None, reply_markup=None, bot=None):
bot.__send(ch_id=chat_id, text=msg, reply_markup=reply_markup)

# noinspection PyMethodMayBeStatic
def __button(self, text, i) -> KeyboardButton:
def __btn(self, text, i) -> KeyboardButton:
self.__log.debug(f"[{i:02d}] {text}")
return KeyboardButton(text=text)

# noinspection PyMethodMayBeStatic
def __button_group(self, choices: list) -> list:
def __btn_grp(self, choices: list) -> list:
btn_list, il = [], []
for i in choices:
btn_list.append(self.__button(commands[i], i))
btn_list.append(self.__btn(commands[i], i))
il.append(i)
self.__log.debug(f"arranged {il}")
self.__log.debug(f"{il} arranged")
return btn_list

def __send(self, ch_id, text, reply_markup, parse_mode='Markdown'):
Expand All @@ -114,7 +103,7 @@ def __reply_wrong_id(self, ch_id, msg):
f"{ch_id} User:{username}, {first_name} {last_name}"
self.__send(ch_id, m_wrong_id.format(user_id, username, first_name,
last_name),
reply_markup=self.rm_kb)
reply_markup=self.__remove_keyboard)
raise Exception(log_msg)
except Exception as ex:
self.__log.warning(f"{ex}")
Expand All @@ -129,16 +118,40 @@ def __reply_wrong_command(self, ch_id, content):
self.__log.warning(str(ex))
return

def __stop_function(self, ch_id, msg):
def __stop_function(self, ch_id, msg) -> bool:
if msg is not None:
self.__send(ch_id, msg, reply_markup=self.rm_kb)
# return True if stop_threads() else False
self.__send(ch_id, msg, reply_markup=self.__remove_keyboard)
return stop_threads()

def __handle(self, msg):
content_type, chat_type, chat_id = telepot.glance(msg)
self.__log.debug(msg)

def answer(txt):
self.__send(chat_id, txt, reply_markup=self.__remove_keyboard)

def execution_possible(txt) -> bool:
if command == txt:
if self.__stop_function(chat_id, msg=None):
return True

def selection_request():
self.__send(chat_id,
m_pls_select.format(msg['from']['first_name']),
reply_markup=self.__keyboard_markup)

def help_requested():
return execution_possible(
service.Service.c_help) or execution_possible(
service.Service.c_help.lower())

def sos() -> bool:
"""start or stop w/o leading '/'"""
return (command.startswith(STOP)) or (
command.startswith(STOP.lower())) or (
command.startswith(START)) or (
command.startswith(START.lower()))

# check user
if chat_id not in self.__admins:
self.__reply_wrong_id(chat_id, msg)
Expand All @@ -147,57 +160,37 @@ def __handle(self, msg):
if content_type == 'text':
command = msg['text']
self.__log.info(f"Got command '{command}'")
# /start
if command == "/start":
if self.__stop_function(chat_id, msg=None):
self.__send(chat_id,
m_pls_select.format(msg['from']['first_name']),
reply_markup=self.kb_markup)

# /stop
elif command == "/stop":
if self.__stop_function(chat_id, msg=None):
self.__send(chat_id, m_stopped, reply_markup=self.rm_kb)

# stop function
elif (command.startswith(commands[0])) \
or (command.startswith(commands[0].lower())):
if self.__stop_function(chat_id, msg=None):
self.__send(chat_id,
m_pls_select.format(msg['from']['first_name']),
reply_markup=self.kb_markup)

# /service
elif command == ('/' + service.NAME.lower()):
if self.__stop_function(chat_id, msg=None):
self.__send(chat_id, service.menu, reply_markup=self.rm_kb)
elif command == service.Service.c_reboot:
self.__send(chat_id, m_rebooted, reply_markup=self.rm_kb)
# Bot menu respectively Telegram-in-app-commands
if execution_possible("/start"):
selection_request()
elif execution_possible("/stop"):
answer(m_stopped)
elif execution_possible('/' + service.NAME.lower()):
answer(service.menu)
elif execution_possible(service.Service.c_reboot):
answer(m_rebooted)
service.reboot_device(m_rebooted)
elif command == service.Service.c_restart:
self.__send(chat_id, m_restarted, reply_markup=self.rm_kb)
elif execution_possible(service.Service.c_restart):
answer(m_restarted)
service.restart_service(m_restarted)
elif command == service.Service.c_info:
elif execution_possible(service.Service.c_info):
info = service.system_info()
answer(info)
self.__log.info(info.replace("\n", "").replace(" ", ""))
elif execution_possible(service.Service.c_update):
answer(m_updated)
update_bot(m_updated)
elif help_requested():
answer(service.get_help_text())
# start or stop
elif sos():
if self.__stop_function(chat_id, msg=None):
info = service.system_info()
self.__send(chat_id, info, reply_markup=self.rm_kb)
self.__log.info(info.replace("\n", "").replace(" ", ""))
elif command == service.Service.c_update:
if self.__stop_function(chat_id, msg=None):
self.__send(chat_id, m_updated, reply_markup=self.rm_kb)
update_bot(m_updated)
elif command == service.Service.c_help \
or command == service.Service.c_help.lower():
if self.__stop_function(chat_id, msg=None):
self.__send(chat_id, service.get_help_text(),
reply_markup=self.rm_kb)

selection_request()
# all other commands
elif any(c for c in commands if (command == c)):
if self.__stop_function(chat_id, msg=None):
self.__func_thread = run_thread(command, chat_id, self)
self.__send(chat_id, text=command,
reply_markup=self.kb_stop)
elif any(c for c in commands if (execution_possible(c))):
self.__func_thread = run_thread(command, chat_id, self)
self.__send(chat_id, text=command, reply_markup=self.kb_stop)
else:
self.__reply_wrong_command(chat_id, command)
else:
Expand All @@ -206,21 +199,24 @@ def __handle(self, msg):
def start(self):
self.__log.info(RUNNING)
for a in self.__admins:
self.__send(a, m_started, reply_markup=self.rm_kb)
self.__send(a, m_started, reply_markup=self.__remove_keyboard)

MessageLoop(self.__bot,
{'chat': self.__handle}).run_as_thread()
# TODO: nfo string/text as constant w/ translations (II)
as_nfo = f"Autostart"
self.__log.info(f"{as_nfo} = {AUTO_START}")
with open(HISTORY, "r") as f:
line = f.readlines()[-1]
# FIXME: if no HISTORY, impossible to find line in file
lines = f.readlines()
# if len(f.readlines()) > 0 else ["new file\n"]
line = lines[-1]
self.__log.warning(line.replace("\n", ""))
# TODO: implement considering of translation of stored command after language change
# - search key of value/stored string and gather translations with this key
# - depending of set language execute/set command text
cmd = line.partition(" HISTORY ")[2].replace("\n", "")
_stop = (cmd == STOP_CMD)
_stop = (cmd == STOP)
self.__log.warning(_stop)
if AUTO_START:
if not _stop:
Expand All @@ -237,7 +233,7 @@ def start(self):
self.__log.info(f"{ar_nfo} = {AUTO_REBOOT_ENABLED}")
if AUTO_REBOOT_ENABLED:
for a in self.__admins:
kb = self.kb_stop if AUTO_START else self.rm_kb
kb = self.kb_stop if AUTO_START else self.__remove_keyboard
self.__send(a, f"{ar_nfo}: {AUTO_REBOOT_TIME} CET",
reply_markup=kb)
AutoReboot(reboot_time=AUTO_REBOOT_TIME, bot=self).start()
Expand Down
8 changes: 4 additions & 4 deletions config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@
translations = json.load(file)

commands = []
# not really needed, but better to avoid error messages in the IDE
# Init not really needed, but better to avoid error messages in the IDE
(m_wrong_id, m_not_allowed, m_pls_select, m_called, m_started, m_rebooted,
m_restarted, m_stopped, m_standby, m_stop_f, m_killed, m_updated) \
= 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
= [None] * 12

for item in translations.items():
_type = item[1].get('type')
_name = item[1].get('name')
_value = item[1].get(LANGUAGE)
_n = int(item[0])
globals()[_name] = _value
_value_hr = _value.replace("\n", "")
globals()[_name] = _value # variables creation dynamically
_value_hr = _value.replace("\n", "") # human readable
if _type == "btn_txt":
# commands
_n = len(commands)
Expand Down
2 changes: 1 addition & 1 deletion config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
AUTO_START = False
LANGUAGE = "German" # language keys: "German", "English", "French"
RUNNING = "Bot is running..."
TRANSLATIONS = 'dictionary.json'
TRANSLATIONS = 'ui_translations.json'

if __name__ == '__main__':
pass
File renamed without changes.
8 changes: 4 additions & 4 deletions control/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@
__maintainer__ = "Thomas Kaulke"
__status__ = "Production"

from functions import dictionary_of_functions, STOP_CMD
from functions import dictionary_of_functions, STOP
from functions.effects import clear
from logger import LOGGER
from .light import LightFunction
from .wreath import WREATH, wreath_setup

ERROR = "Any error occurred: "
flag = None
flag = False

clear(WREATH)

Expand Down Expand Up @@ -52,13 +52,13 @@ def run_thread(func_name, request_id, bot):
LOGGER.error(f"{ERROR}{e}")


def stop_threads():
def stop_threads() -> bool:
try:
for t in LightFunction.threads:
if t is not None and t.is_running:
t.stop()
set_stop_flag(True)
LOGGER.history(STOP_CMD)
LOGGER.history(STOP)
except Exception as e:
LOGGER.error(f"{ERROR}{e}")
set_stop_flag(False)
Expand Down
4 changes: 2 additions & 2 deletions control/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
Candles: Each LED simulates candlelight.
Strobe: Emitting brief and rapid flashes of white light in random frequency.
- *Service menu:*
- *Service* menu:
/Reboot device
/Restart bot application (running as service)
/Info Information commit/release versions on GitHub, host name, IP, memory usage, disk usage, cpu load.
Expand All @@ -70,7 +70,7 @@ def __init__(self, command: str = None, log_msg: str = None):
self.__log_msg = log_msg
self.__command = command
self.__help_txt = HELP_TEXT
self.__menu_header = f"{NAME} functions:"
self.__menu_header = f"*{NAME}*:"
self.__menu_dictionary = {
0: self.c_reboot,
1: self.c_restart,
Expand Down
2 changes: 1 addition & 1 deletion control/wreath.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import datetime

from rpi_ws281x import *
from rpi_ws281x import Adafruit_NeoPixel

from config import LED_BRIGHTNESS, LED_BRIGHTNESS_DAY, LED_BRIGHTNESS_NIGHT, \
LED_COUNT, LED_CUT_OFF_MORNING, LED_CUT_OFF_NIGHT, LED_DMA, LED_FREQ_HZ, \
Expand Down
23 changes: 16 additions & 7 deletions functions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,22 @@
from .clocks import \
run_clock1, run_clock2, run_clock3, \
run_clock4, run_clock5, run_clock6, run_clock7
from .colors import run_blue, run_demo, run_demo2, run_green, run_orange, \
from .color import OwnColors
from .colorant import run_blue, run_demo, run_demo2, run_green, run_orange, \
run_red, run_stroboscope, run_violet, run_white, run_yellow
from .effects import run_rainbow, run_rainbow_chaser, run_rainbow_cycle, \
run_theater

STOP_CMD = commands[0]
STOP = commands[0]
START = commands[1]


def light_animations():
def assigned():
try:
"""
Ensure right order of functions,
depends on the command order
in config/dictionary.json
in config/ui_translations.json
"""
dic = {}
funcs = [None, None,
Expand All @@ -38,13 +40,20 @@ def light_animations():
run_clock5, run_demo2, run_clock6, run_clock7,
run_rainbow, run_rainbow_chaser,
None]

i_clocks = 4, 5, 17, 18, 19, 21, 22
i_colors = 8, 9, 10, 13, 11, 12, 14
i_effects = 15, 20, 6, 23, 7, 24
i_specials = 2, 3, 16
indices = i_clocks, i_colors, i_effects, i_specials

for i in range(len(commands)):
f = funcs[i]
dic[commands[i]] = f
LOGGER.debug(f"Dictionary function[{i:02d}] {commands[i]}: {f}")
return dic
LOGGER.debug(f"function[{i:02d}] {commands[i]}: {f}")
return dic, indices
except Exception as ex:
LOGGER.error(f"{ex}")


dictionary_of_functions = light_animations()
dictionary_of_functions, indices_of_functions = assigned()
Loading

0 comments on commit 0832b37

Please sign in to comment.