-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathbot.py
executable file
·76 lines (61 loc) · 2.26 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import socket
import re
import yaml
from time import sleep
config = yaml.safe_load(open('config.yml', 'rb'))
HOST = config['HOST']
PORT = config['PORT']
NICK = config['NICK']
PASS = config['PASS']
class Bot(object):
""""""
def __init__(self, channel, n_msg_per_sec=100):
super(Bot, self).__init__()
self._nickname = NICK
self.channel = channel
self.connect(channel)
# print(NICK, channel, '\n', '-' * (len(NICK + channel) + 1))
print("{} {}\n{}".format(NICK, channel, '-' * (len(NICK + channel) + 1)))
self._msg_count = 0
self.n_msg_per_sec = n_msg_per_sec
def connect(self, channel):
self._socket = socket.socket()
self._socket.connect((HOST, PORT))
self._socket.send("PASS {}\r\n".format(PASS).encode("utf-8"))
self._socket.send("NICK {}\r\n".format(NICK).encode("utf-8"))
self._socket.send("JOIN {}\r\n".format(channel).encode("utf-8"))
def chat(self, msg):
self._socket.send("PRIVMSG {} :{}\r\n".format(self.channel, msg))
def _ping_pong(self, response):
if response == "PING :tmi.twitch.tv\r\n":
# send pong back to prevent timeout
self._socket.send("PONG :tmi.twitch.tv\r\n".encode("utf-8"))
return True
else:
return False
def _get_response(self):
try:
response = self._socket.recv(1024).decode("utf-8")
except UnicodeDecodeError as e:
print('\n\n%s\n\n' % e)
return False
if self._ping_pong(response):
return False
elif ':tmi.twitch.tv' in response:
return False
else:
return response
def _process_msg(self, response):
username = re.search(r"\w+", response).group(0)
mask = re.compile(r"^:\w+!\w+@\w+\.tmi\.twitch\.tv PRIVMSG #\w+ :")
message = mask.sub("", response).strip('\r\n')
return username, message
def action(self, username, msg):
return NotImplementedError()
def run(self):
while True:
response = self._get_response()
if response:
username, msg = self._process_msg(response)
self.action(username, msg)
sleep(1 / float(self.n_msg_per_sec))