-
Notifications
You must be signed in to change notification settings - Fork 4
/
bot.py
executable file
·284 lines (266 loc) · 9.99 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
from slackclient import SlackClient
from piazza_api import Piazza
import time
import re
import humanize
import json
from datetime import datetime
import html2text
import traceback
def process_bot_call(channel, user, text, thread=None):
calls = text.split("<@" + bot_id + ">")[1:]
for call in calls:
post_id = re.search(r'\d+', call).group()
post_link(channel, user, post_id, piazza_id, thread)
def make_attachment(post, network, followup, url):
title = "Piazza Post"
content = "Could not fetch post content"
time = None
image = None
child = None
a_name, a_photo = "", None
try:
title = post['history'][0]['subject']
if followup:
title = 'Followup #{} to {}'.format(followup, title)
child = post['children'][int(followup) - 1]
content, image = format_html(child['subject'])
timestamp = child['created']
a_name, a_photo = find_followup_author(child, network)
else:
content, image = format_html(post['history'][0]['content'])
timestamp = post['history'][0]['created']
a_name, a_photo = find_authors(post['history'], network)
time_format = "%Y-%m-%dT%H:%M:%SZ"
post_time = datetime.strptime(timestamp, time_format)
time = humanize.naturaltime(datetime.utcnow() - post_time)
except:
pass
footer = time
if a_name is not None:
footer = a_name + " - " + time
msg = {
"fallback": content if followup else title + ", " + url,
"title": title,
"title_link": url,
"text": content,
"footer": footer,
"footer_icon": a_photo,
"image_url": image,
"color": "#3e7aab",
"mrkdwn_in": ["text"]
}
return json.dumps([msg])
def make_reply(reply, network):
content, image = format_html(reply['subject'])
timestamp = reply['created']
time_format = "%Y-%m-%dT%H:%M:%SZ"
post_time = datetime.strptime(timestamp, time_format)
time = humanize.naturaltime(datetime.utcnow() - post_time)
a_name, a_photo = find_followup_author(reply, network)
footer = time
if a_name is not None:
footer = a_name + " - " + time
return {
"fallback": content,
"text": content,
"footer": footer,
"footer_icon": a_photo,
"image_url": image,
"mrkdwn_in": ["text"]
}
def post_link(channel, user, post_id, piazza_id, thread=None, followup=None):
post, network = None, None
try:
network = p.network(piazza_id)
post = network.get_post(post_id)
except:
return
url = "https://piazza.com/class/" + piazza_id + "?cid=" + post_id
attach = make_attachment(post, network, followup, url)
response = None
if thread:
response = sc.api_call('chat.postMessage', channel=channel,
attachments=attach, as_user=True, thread_ts=thread)
else:
response = sc.api_call('chat.postMessage', channel=channel,
attachments=attach, as_user=True)
replies = []
if followup and not thread:
for reply in post['children'][int(followup) - 1]['children']:
replies.append(make_reply(reply, network))
if replies:
sc.api_call('chat.postMessage', channel=channel, as_user=True,
attachments=json.dumps(replies), thread_ts=response['ts'])
PHOTO_SERVER = "https://d1b10bmlvqabco.cloudfront.net/photos"
def find_authors(history, network):
try:
authors = [(e['anon'] != 'no', e['uid'] if 'uid' in e else None) for e in history]
ids = set([x[1] for x in authors])
user_data = network.get_users([x for x in ids if x is not None])
users = {}
for u in user_data:
users[u['id']] = u
if len(ids) == 1:
anon, uid = authors[0]
if uid is None:
return None, None
user = users[uid]
name = user['name']
photo = None
if anon:
name += " (anon)"
if user['photo']:
photo = PHOTO_SERVER + '/' + uid + '/' + user['photo']
return name, photo
elif len(ids) == 2:
anon1, uid1 = authors[0]
name1, name2 = "Anonymous", "Anonymous"
if uid1 is not None:
user1 = users[uid1]
name1 = user1['name']
if anon1:
name1 += " (anon)"
anon2, uid2 = authors[1]
if uid2 is not None:
user2 = users[uid2]
name2 = user2['name']
if anon2:
name2 += " (anon)"
return name1 + " and " + name2, None
else:
anon, uid = authors[0]
name, photo = "Anonymous", None
if uid is not None:
user = users[uid]
name = user['name']
if anon:
name += " (anon)"
if user['photo']:
photo = PHOTO_SERVER + '/' + uid + '/' + user['photo']
name += " and " + str(len(ids) - 1) + " others"
return name, photo
except:
traceback.print_exc()
return None, None
def find_followup_author(child, network):
try:
anon = child['anon'] != 'no'
uid = child['uid'] if 'uid' in child else None
if not uid:
return None, None
user_data = network.get_users([uid])
user = user_data[0]
name = user['name']
photo = None
if anon:
name += " (anon)"
if user['photo']:
photo = PHOTO_SERVER + '/' + uid + '/' + user['photo']
return name, photo
except:
traceback.print_exc()
return None, None
def format_html(html):
try:
h = html2text.HTML2Text()
h.body_width = 0
html = html.replace('target="_blank"', "").replace("\n\n", "<div></div>")
html = html.replace('<br />', '<br>')
while "<pre>" in html:
index = html.find('<pre>')
prev = html[:index] + '```'
build = ''
end = html.find('</pre>')
for i in range(index + 5, end):
cur = html[i]
if cur == ' ':
cur = ' '
build += cur
build = build.replace('\n', '<br>')
html = prev + build + '<br>```' + html[end + 6:]
text = h.handle(html)
text = text.replace('**', '__bold__').replace('*', '_').replace('__bold__', '*')
text = text.replace('\n\\- ', '\n- ')
links, images = find_md_links(text)
for name, link in links:
true_link = link
if link.startswith('/'):
true_link = 'https://piazza.com' + link
text = text.replace('[%s](%s)'%(name, link), '<%s|%s>'%(true_link, name))
for image in images:
true_image = image
if image.startswith('/'):
true_image = 'https://piazza.com' + image
text = text.replace('![](%s)'%(image), '<%s|Image>'%(true_image))
first_image = None
if len(images) > 0:
first_image = images[0]
if first_image.startswith('/'):
first_image = 'https://piazza.com' + first_image
return text, first_image
except Exception as e:
traceback.print_exc()
return html, None
INLINE_LINK_RE = re.compile(r'\[([^\]]+)\]\(([^)]+)\)')
INLINE_IMAGE_RE = re.compile(r'!\[\]\(([^)]+)\)')
def find_md_links(md):
""" Return dict of links in markdown """
links = INLINE_LINK_RE.findall(md)
images = INLINE_IMAGE_RE.findall(md)
return links, images
def handle_message(result):
channel = result['channel']
user = result['user']
if user == bot_id:
return
text = result['text']
urls = re.findall(r'https://piazza\.com/class/([\w]+)\?cid=([\d]+)', text)
all_names = '|'.join(other_piazza_names)
at_nums = re.findall(r'(\A|\s|' + all_names + r')@(\d+)(?:\s|\Z|,|\?|;|:|\.)', text)
at_nums_followup = re.findall(r'(\A|\s|' + all_names + r')@(\d+)#(\d+)(?:\s|\Z|,|\?|;|:|\.)', text)
thread = None
if 'thread_ts' in result and result['thread_ts'] != result['ts']:
thread = result['thread_ts']
posts = set()
for pid, post in urls:
posts.add((pid, post, None))
for course, post in at_nums:
course = course.strip()
pid = other_piazza_ids[other_piazza_names.index(course)] if course else piazza_id
posts.add((pid, post, None))
for course, post, followup in at_nums_followup:
course = course.strip()
pid = other_piazza_ids[other_piazza_names.index(course)] if course else piazza_id
posts.add((pid, post, followup))
if len(posts) > 0:
for pid, post, followup in posts:
post_link(channel, user, post, pid, thread, followup)
elif bot_id in text:
process_bot_call(channel, user, text, thread)
if __name__ == "__main__":
from bot_config import *
bot_id = None
sc = SlackClient(token)
p = Piazza()
p.user_login(email=piazza_email, password=piazza_password)
if sc.rtm_connect():
username = sc.server.username
for users in sc.server.login_data['users']:
if users['name'] == username:
bot_id = users['id']
break
if not bot_id:
raise Error("Could not find bot")
while True:
results = sc.rtm_read()
for result in results:
try:
if result['type'] == 'message':
handle_message(result)
except Exception as e:
print(e)
sc.rtm_connect()
time.sleep(1)
else:
print("Connection Failed, invalid token?")