Skip to content

Commit

Permalink
Merge pull request #92 from kaulketh/develop
Browse files Browse the repository at this point in the history
Update main class and minor changes
  • Loading branch information
kaulketh authored Jan 6, 2024
2 parents 2434308 + 4c0f7de commit c5690df
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 125 deletions.
155 changes: 85 additions & 70 deletions app/framework.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
ReplyKeyboardRemove

# noinspection PyUnresolvedReferences
from config import auto_reboot_enabled, auto_reboot_time, ID_CHAT_THK, \
from config import auto_reboot, auto_reboot_time, ID_CHAT_THK, \
running, TOKEN_TELEGRAM_BOT, commands, m_not_allowed, m_pls_select, \
m_rebooted, m_restarted, m_started, m_stopped, m_updated, m_wrong_id, \
auto_start, auto_reboot_msg, auto_start_msg
Expand Down Expand Up @@ -51,20 +51,19 @@ def __init__(self, t, ids):
self.__admins = ids
self.__bot = telepot.Bot(self.__token)

self.__remove_keyboard = ReplyKeyboardRemove()
# keys order (refer config)
self.__keyboard = []
# keyboards and buttons arrangement (refer config)
self.__rm_kb = ReplyKeyboardRemove()
self.__kb = []
for i in range(len(indices_of_functions)):
self.__keyboard.append(self.__btn_grp(indices_of_functions[i]))
self.__keyboard_markup = ReplyKeyboardMarkup(keyboard=self.__keyboard)
self.__kb.append(self.__btn_grp(indices_of_functions[i]))
self.__kbm = ReplyKeyboardMarkup(keyboard=self.__kb)
self.__log.debug(f"Done, keyboards and buttons built.")
self.__func_thread = None

@property
def kb_stop(self) -> ReplyKeyboardMarkup:
r = ReplyKeyboardMarkup(keyboard=[[self.__btn(STOP, 0)]])
self.__log.debug(f"Stop keyboard markup: {r}")
return r
self.__log.debug("Init Stop keyboard")
return ReplyKeyboardMarkup(keyboard=[[self.__btn(STOP, 0)]])

@classmethod
def external_request(cls, msg, chat_id=None, reply_markup=None, bot=None):
Expand Down Expand Up @@ -93,22 +92,27 @@ def __send(self, ch_id, text, reply_markup, parse_mode='Markdown'):
self.__bot.sendMessage(ch_id, text, reply_markup=reply_markup,
parse_mode=parse_mode)

# noinspection PyMethodMayBeStatic
def __user(self, msg):
user_id = msg['from']['id']
first_name = msg['from']['first_name']
last_name = msg['from']['last_name']
username = msg['from']['username'] if (
'username' in msg['from'].keys()) else "'user name unknown'"
return user_id, username, first_name, last_name

def __reply_wrong_id(self, ch_id, msg):
try:
user_id = msg['from']['id']
first_name = msg['from']['first_name']
last_name = msg['from']['last_name']
username = msg['from']['username']
uid, nick, first, last = self.__user(msg)
log_msg = f"Unauthorized access: ID " \
f"{ch_id} User:{username}, {first_name} {last_name}"
self.__send(ch_id, m_wrong_id.format(user_id, username, first_name,
last_name),
reply_markup=self.__remove_keyboard)
f"{ch_id} User:{nick}, {first} {last}"
self.__send(ch_id, m_wrong_id.format(uid, nick, first, last),
reply_markup=self.__rm_kb)
raise Exception(log_msg)
except Exception as ex:
self.__log.warning(f"{ex}")

def __reply_wrong_command(self, ch_id, content):
def __reply_wrong_content(self, ch_id, content):
try:
got = str(codecs.encode(content, 'utf-8')).replace('b', '')
raise Exception(f'Not allowed input: {got}')
Expand All @@ -120,87 +124,90 @@ def __reply_wrong_command(self, ch_id, content):

def __stop_function(self, ch_id, msg) -> bool:
if msg is not None:
self.__send(ch_id, msg, reply_markup=self.__remove_keyboard)
self.__send(ch_id, msg, reply_markup=self.__rm_kb)
return stop_threads()

def __handle(self, msg):
content_type, chat_type, chat_id = telepot.glance(msg)
command = msg['text']
self.__log.debug(msg)

def answer(txt):
self.__send(chat_id, txt, reply_markup=self.__remove_keyboard)
# helper
def respond(txt):
self.__send(chat_id, txt, reply_markup=self.__rm_kb)

def execution_possible(txt) -> bool:
if command == txt:
if self.__stop_function(chat_id, msg=None):
return True
def selection_ok(selection) -> bool:
return command == selection and self.__stop_function(chat_id,
msg=None)

def selection_request():
self.__send(chat_id,
m_pls_select.format(msg['from']['first_name']),
reply_markup=self.__keyboard_markup)
def request_selection():
self.__send(chat_id, m_pls_select.format(self.__user(msg)[2]),
reply_markup=self.__kbm)

def help_requested():
return execution_possible(
service.Service.c_help) or execution_possible(
service.Service.c_help.lower())
help_requested = (selection_ok(service.Service.c_help) or
selection_ok(service.Service.c_help.lower()))

def sos() -> bool:
"""start or stop w/o leading '/'"""
return (command.startswith(STOP)) or (
command.startswith(STOP.lower())) or (
command.startswith(START)) or (
command.startswith(START.lower()))
sos = (command.startswith(STOP) or
command.startswith(STOP.lower()) or
command.startswith(START) or
command.startswith(START.lower()))

# check user
if chat_id not in self.__admins:
self.__reply_wrong_id(chat_id, msg)
uid, nick, first, last = self.__user(msg)
m = f"{msg['text']} -> {chat_id}, {uid} {nick}, {first} {last}"
self.external_request(msg=m, chat_id=ID_CHAT_THK, bot=self)
return None

# check content
if content_type == 'text':
command = msg['text']
self.__log.info(command)

# Bot menu respectively Telegram-in-app-commands
if execution_possible("/start"):
selection_request()
elif execution_possible("/stop"):
answer(m_stopped)
elif execution_possible('/' + service.NAME.lower()):
answer(service.menu)
elif execution_possible(service.Service.c_reboot):
answer(m_rebooted)
if selection_ok("/start"):
request_selection()
elif selection_ok("/stop"):
respond(m_stopped)
elif selection_ok('/' + service.NAME.lower()):
respond(service.menu)
elif selection_ok(service.Service.c_reboot):
respond(m_rebooted)
service.reboot_device(m_rebooted)
elif execution_possible(service.Service.c_restart):
answer(m_restarted)
elif selection_ok(service.Service.c_restart):
respond(m_restarted)
service.restart_service(m_restarted)
elif execution_possible(service.Service.c_info):
elif selection_ok(service.Service.c_info):
info = service.system_info()
answer(info)
respond(info)
self.__log.info(info.replace("\n", "").replace(" ", ""))
elif execution_possible(service.Service.c_update):
answer(m_updated)
elif selection_ok(service.Service.c_update):
respond(m_updated)
update_bot(m_updated)
elif help_requested():
answer(service.get_help_text())
# start or stop
elif sos():
elif help_requested:
respond(service.get_help_text())
# start or stop w/o leading '/'
elif sos:
if self.__stop_function(chat_id, msg=None):
selection_request()
request_selection()
# all other commands
elif any(c for c in commands if (execution_possible(c))):
elif any(c for c in commands if (selection_ok(c))):
self.__func_thread = run_thread(command, chat_id, self)
self.__send(chat_id, text=command, reply_markup=self.kb_stop)
else:
self.__reply_wrong_command(chat_id, command)
# wrong command
self.__reply_wrong_content(chat_id, command)
else:
self.__reply_wrong_command(chat_id, content_type)
# wrong type
self.__reply_wrong_content(chat_id, content_type)

def start(self):
# welcome and run
self.__log.debug(running)
for a in self.__admins:
self.__send(a, m_started, reply_markup=self.__remove_keyboard)
self.__send(a, m_started, reply_markup=self.__rm_kb)
MessageLoop(self.__bot, {'chat': self.__handle}).run_as_thread()

# history check
try:
with open(HISTORY, "r") as f:
lines = f.readlines()
Expand All @@ -214,11 +221,15 @@ def start(self):
with open(HISTORY, "r") as f:
lines = f.readlines()
line = lines[-1]
self.__log.debug(f"History: {line.strip()}")
# TODO: implement considering of translation of stored command after language change
# - search key of value/stored string and gather translations with this key
# - depending of set language execute/set command text
self.__log.debug(line.strip())
# TODO: implement considering of translation of
# stored command after language change
# e.g. search key of value/stored string and gather
# translations with this key, depending of set language
# execute/set command text
cmd = line.partition(" HISTORY ")[2].rstrip()

# auto start check and run stored function
if auto_start:
self.__log.info(auto_start_msg)
if not (cmd == STOP):
Expand All @@ -229,13 +240,17 @@ def start(self):
else:
open(HISTORY, "w").close()
self.__stop_function(ID_CHAT_THK, msg=None)
if auto_reboot_enabled:

# auto reboot check and initializing
if auto_reboot:
self.__log.info(auto_reboot_msg)
for a in self.__admins:
kb = self.kb_stop if auto_start else self.__remove_keyboard
kb = self.kb_stop if auto_start else self.__rm_kb
self.__send(a, f"{auto_reboot_msg}: {auto_reboot_time} CET",
reply_markup=kb)
AutoReboot(reboot_time=auto_reboot_time, bot=self).start()

# main loop
while True:
try:
signal.pause()
Expand Down
7 changes: 4 additions & 3 deletions bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,18 @@
__status__ = "Production"
__doc__ = "Call instance of framework class."

from urllib3.exceptions import HTTPError, ProtocolError
# from urllib3.exceptions import HTTPError, ProtocolError
from urllib3 import exceptions

from app import *
from app import framework
from control import peripheral_functions
from logger import LOGGER

if __name__ == '__main__':

try:
framework.main()
except (ConnectionResetError, ProtocolError, HTTPError) as e:
except (ConnectionResetError, exceptions) as e:
LOGGER.error(f"Connection error occurs: {e}")
exit()
finally:
Expand Down
Loading

0 comments on commit c5690df

Please sign in to comment.