-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
510 lines (437 loc) · 17.9 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
import os, time, json, yaml, hashlib, asyncio, aiohttp, uvicorn, urllib.parse
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, Query, Header
from fastapi.responses import FileResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from core.gotify import push_gotify
from core.logs import log, log_print
logger = log()
# 一些常量
APP_KEY = "4409e2ce8ffd12b8"
APP_SEC = "59b43e04ad6965f34319062b478f83dd"
USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36 Edg/128.0.0.0"
)
# DQ配置文件
config_file_path = os.path.join(os.path.dirname(__file__), "config.yaml")
with open(config_file_path, "r", encoding="utf-8") as f:
config = yaml.safe_load(f)
# Cookie 目录
COOKIE_FOLDER = os.path.join("data", "cookie")
# Cookie 检查
COOKIE_CHECK_ENABLE = config["COOKIE_CHECK"]["enable"]
COOKIE_CHECK_INTERVAL = config["COOKIE_CHECK"]["check_intlval"]
# Cookie 刷新
COOKIE_REFRESH_ENABLE = config["COOKIE_REFRESH"]["enable"]
COOKIE_REFRESH_INTERVAL = config["COOKIE_REFRESH"]["refresh_intlval"]
# 推送配置
PUSH_CONFIG = config.get("PUSH", {})
## Gotify
GOTIFY_CONFIG = PUSH_CONFIG.get("GOTIFY", {})
GOTIFY_ENABLE = GOTIFY_CONFIG.get("enable", False)
GOTIFY_URL = GOTIFY_CONFIG.get("url", "")
GOTIFY_TOKEN = GOTIFY_CONFIG.get("token", "")
async def ez_push_gotify(title: str, message: str, priority: int = 1):
"""
发送 Gotify 通知。
参数:
- title (str): 消息标题。
- message (str): 消息内容。
- priority (int): 消息优先级,默认为1。
"""
if GOTIFY_ENABLE and GOTIFY_URL and GOTIFY_TOKEN:
try:
await push_gotify(
GOTIFY_URL,
GOTIFY_TOKEN,
title,
message,
priority=priority,
)
logger.info(f"[Gotify] 通知已发送: {title}")
except Exception as e:
log_print(f"[Gotify] 推送通知失败: {e}", "ERROR")
else:
logger.debug("[Gotify] Gotify 未启用或配置不完整,跳过通知。")
# 为请求参数进行 API 签名
def tvsign(params, appkey=APP_KEY, appsec=APP_SEC):
params.update({"appkey": appkey})
params = dict(sorted(params.items()))
query = urllib.parse.urlencode(params)
sign = hashlib.md5((query + appsec).encode()).hexdigest()
params.update({"sign": sign})
return params
# Cookie文件路径
def get_cookie_file_path(DedeUserID):
return os.path.join(COOKIE_FOLDER, f"{DedeUserID}.json")
# 读取Cookie
def read_cookie(DedeUserID):
file_path = get_cookie_file_path(DedeUserID)
if os.path.exists(file_path):
with open(file_path, "r", encoding="utf-8") as file:
return json.load(file)
else:
return None
# 保存Cookie
def save_cookie_info(login_data):
DedeUserID = ""
for cookie in login_data["cookie_info"]["cookies"]:
if cookie["name"] == "DedeUserID":
DedeUserID = cookie["value"]
break
if not DedeUserID:
logger.warning("未获取到 DedeUserID")
return
current_ts = int(time.time() * 1000)
save_info = {
"update_time": current_ts,
"token_info": login_data["token_info"],
"cookie_info": login_data["cookie_info"],
}
file_path = get_cookie_file_path(DedeUserID)
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, "w", encoding="utf-8") as f:
json.dump(save_info, f, ensure_ascii=False, indent=4)
# 刷新Cookie
async def refresh_cookie(DedeUserID):
cookie_data = read_cookie(DedeUserID)
if not cookie_data:
return {"code": -1, "message": "指定用户不存在"}
access_token = cookie_data["token_info"]["access_token"]
refresh_token = cookie_data["token_info"]["refresh_token"]
params = tvsign(
{
"access_key": access_token,
"refresh_token": refresh_token,
"ts": int(time.time()),
}
)
headers = {
"content-type": "application/x-www-form-urlencoded",
"user-agent": USER_AGENT,
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(
"https://passport.bilibili.com/api/v2/oauth2/refresh_token",
params=params,
headers=headers,
) as rsp:
rsp_data = await rsp.json()
except Exception as e:
logger.error(f"[刷新] 解析失败: {e}")
return {"code": -1, "message": "请求或解析失败"}
if rsp_data["code"] == 0:
expires_in = rsp_data["data"]["token_info"]["expires_in"]
expire_timestamp = (rsp_data["ts"] + int(expires_in)) * 1000
save_info = {
"update_time": rsp_data["ts"] * 1000,
"token_info": rsp_data["data"]["token_info"],
"cookie_info": rsp_data["data"]["cookie_info"],
}
file_path = get_cookie_file_path(DedeUserID)
with open(file_path, "w", encoding="utf-8") as f:
json.dump(save_info, f, ensure_ascii=False, indent=4)
expire_time_str = time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(expire_timestamp / 1000)
)
logger.info(
f"[刷新] 用户 {DedeUserID} 的 Cookie 刷新成功,有效期至 {expire_time_str}"
)
await ez_push_gotify(
"[BiliBiliCookieMgmt] Cookie 刷新通知",
f"用户 {DedeUserID} 的 Cookie 刷新成功,有效期至 {expire_time_str}",
priority=5,
)
# 健康检查
check_result = await check_cookie(DedeUserID)
if check_result["code"] == 0:
logger.info(f"[刷新] 用户 {DedeUserID} 的 Cookie 有效")
else:
log_print(f"[检查] 用户 {DedeUserID} 的 Cookie 无效", "WARN")
await ez_push_gotify(
"[BiliBiliCookieMgmt] Cookie 失效通知",
f"用户 {DedeUserID} 的 Cookie 已失效,请尽快处理。",
priority=5,
)
return {
"code": 0,
"message": "刷新成功",
"expire_time": expire_timestamp,
}
else:
logger.error(f"[刷新] 刷新失败: {rsp_data.get('message', '未知错误')}")
return {
"code": rsp_data["code"],
"message": rsp_data.get("message", "刷新失败"),
}
# Cookie健康检查
async def check_cookie(DedeUserID):
cookie_data = read_cookie(DedeUserID)
if not cookie_data:
return {"code": -1, "message": "指定用户不存在"}
cookies = cookie_data["cookie_info"]["cookies"]
cookie_dict = {cookie["name"]: cookie["value"] for cookie in cookies}
cookie_str = "; ".join([f"{key}={value}" for key, value in cookie_dict.items()])
# 检查登录状态
url = "https://api.bilibili.com/x/web-interface/nav"
headers = {
"User-Agent": USER_AGENT,
"Referer": "https://www.bilibili.com/",
"Cookie": cookie_str,
}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers, timeout=10) as response:
data = await response.json()
current_ts = int(time.time() * 1000)
cookie_data["check_time"] = current_ts
if data.get("code") == 0 and data.get("data", {}).get("isLogin"):
cookie_data["cookie_valid"] = True
logger.debug(f"[检查] 用户 {DedeUserID} 的 Cookie 有效")
else:
cookie_data["cookie_valid"] = False
log_print(f"[检查] 用户 {DedeUserID} 的 Cookie 无效", "WARN")
await ez_push_gotify(
"[BiliBiliCookieMgmt] Cookie 失效通知",
f"用户 {DedeUserID} 的 Cookie 已失效,请尽快处理。",
priority=5,
)
file_path = get_cookie_file_path(DedeUserID)
with open(file_path, "w", encoding="utf-8") as f:
json.dump(cookie_data, f, ensure_ascii=False, indent=4)
if cookie_data["cookie_valid"]:
return {"code": 0, "message": "Cookie 有效"}
else:
return {"code": -2, "message": "Cookie 无效"}
except Exception as e:
log_print(f"[检查] 检查用户 {DedeUserID} 失败: {e}", "ERROR")
return {"code": -1, "message": "检查失败"}
# Cookie健康检查-所有Cookie
async def check_all_cookies():
if not os.path.exists(COOKIE_FOLDER):
logger.debug("[检查] Cookie 文件夹不存在,无需检查。")
return
tasks = []
for filename in os.listdir(COOKIE_FOLDER):
if filename.endswith(".json"):
DedeUserID = filename.replace(".json", "")
logger.debug(f"[检查] 正在检查用户 {DedeUserID} 的 Cookie...")
task = asyncio.create_task(check_cookie(DedeUserID))
tasks.append(task)
await asyncio.gather(*tasks)
# Cookie到期检查-每天检查定时刷新
async def refresh_expired_cookies():
if not os.path.exists(COOKIE_FOLDER):
logger.debug("[刷新] Cookie 文件夹不存在,无需刷新。")
return
tasks = []
for filename in os.listdir(COOKIE_FOLDER):
if filename.endswith(".json"):
DedeUserID = filename.replace(".json", "")
cookie_data = read_cookie(DedeUserID)
if cookie_data:
update_time = cookie_data.get("update_time", 0)
current_time = int(time.time() * 1000)
elapsed_days = (current_time - update_time) / (1000 * 60 * 60 * 24)
if elapsed_days >= COOKIE_REFRESH_INTERVAL:
log_print(
f"[刷新] 用户 {DedeUserID} 的 Cookie 超过了刷新间隔,正在刷新...",
"INFO",
)
task = asyncio.create_task(refresh_cookie(DedeUserID))
tasks.append(task)
if tasks:
await asyncio.gather(*tasks)
else:
logger.debug("[刷新] 没有需要刷新的 Cookie。")
# 定时任务:定期检查所有 Cookie
async def periodic_cookie_check():
try:
while True:
logger.debug("[检查] 开始自动检查所有 Cookie 有效性...")
try:
await check_all_cookies()
except Exception as e:
log_print(f"[检查] 自动检查过程中出现错误: {e}", "ERROR")
logger.debug(f"[检查] 下一次检查将在 {COOKIE_CHECK_INTERVAL} 秒后进行。")
await asyncio.sleep(COOKIE_CHECK_INTERVAL)
except asyncio.CancelledError:
logger.debug("[检查] 自动 Cookie 检查已取消。")
raise
# 定时任务:定期刷新需要刷新的 Cookie
async def periodic_cookie_refresh():
try:
while True:
logger.debug("[刷新] 开始自动刷新需要更新的 Cookie...")
try:
await refresh_expired_cookies()
except Exception as e:
log_print(f"[刷新] 自动刷新过程中出现错误: {e}", "ERROR")
await asyncio.sleep(24 * 60 * 60)
except asyncio.CancelledError:
logger.debug("[刷新] 自动 Cookie 刷新已取消。")
raise
# 启动函数
@asynccontextmanager
async def run(app: FastAPI):
tasks = []
if COOKIE_CHECK_ENABLE:
logger.debug(
f"[检查] 自动 Cookie 检查已启用,每隔 {COOKIE_CHECK_INTERVAL} 秒检查一次。"
)
check_task = asyncio.create_task(periodic_cookie_check())
tasks.append(check_task)
else:
logger.debug("[检查] 自动 Cookie 检查未启用。")
if COOKIE_REFRESH_ENABLE:
logger.debug(
f"[刷新] 自动 Cookie 刷新已启用,刷新间隔为 {COOKIE_REFRESH_INTERVAL} 天。"
)
refresh_task = asyncio.create_task(periodic_cookie_refresh())
tasks.append(refresh_task)
else:
logger.debug("[刷新] 自动 Cookie 刷新未启用。")
yield
for task in tasks:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# FastAPI
app = FastAPI(lifespan=run)
app.mount("/static", StaticFiles(directory="static"), name="static")
# API TOKEN 验证
async def verify_api_token(token: str = Header(None)):
if config["API_TOKEN"]["enable"]:
if token != str(config["API_TOKEN"]["token"]):
raise HTTPException(status_code=401, detail="无效或缺失的 API Token")
@app.get("/")
async def read_root():
return FileResponse(os.path.join("static", "index.html"))
# 生成二维码
@app.get("/api/passport-login/web/qrcode/generate")
async def generate_qr(token: str = Header(None)):
await verify_api_token(token)
url = "https://passport.bilibili.com/x/passport-tv-login/qrcode/auth_code"
params = tvsign({"local_id": "0", "ts": int(time.time())})
headers = {"User-Agent": USER_AGENT}
try:
async with aiohttp.ClientSession() as session:
async with session.post(url, params=params, headers=headers) as response:
data = await response.json()
return {"code": data["code"], "data": data["data"]}
except aiohttp.ClientError as e:
raise HTTPException(status_code=500, detail=str(e))
# 轮询扫码状态
@app.get("/api/passport-login/web/qrcode/poll")
async def poll_qr(auth_code: str, token: str = Header(None)):
await verify_api_token(token)
url = "https://passport.bilibili.com/x/passport-tv-login/qrcode/poll"
params = tvsign({"auth_code": auth_code, "local_id": "0", "ts": int(time.time())})
headers = {"User-Agent": USER_AGENT}
try:
async with aiohttp.ClientSession() as session:
async with session.post(url, params=params, headers=headers) as response:
data = await response.json()
# print(data)
if data["code"] == 0:
login_data = data["data"]
save_cookie_info(login_data)
return JSONResponse(
content={"code": 0, "message": "扫码成功", "data": login_data}
)
elif data["code"] == -3:
raise HTTPException(status_code=500, detail="API校验密匙错误")
elif data["code"] == -400:
raise HTTPException(status_code=500, detail="请求错误")
elif data["code"] == 86038:
return JSONResponse(content={"code": 86038, "message": "二维码已失效"})
elif data["code"] == 86090:
return JSONResponse(content={"code": 86090, "message": "等待扫码"})
else:
raise HTTPException(status_code=500, detail="未知错误")
except aiohttp.ClientError as e:
raise HTTPException(status_code=500, detail=str(e))
# 获取Cookie信息
@app.get("/api/cookie")
async def get_cookies(DedeUserID: str = Query(None), token: str = Header(None)):
if DedeUserID:
await verify_api_token(token)
cookie_data = read_cookie(DedeUserID)
if cookie_data:
expires_in = cookie_data["token_info"]["expires_in"]
expire_timestamp = cookie_data["update_time"] + int(expires_in) * 1000
return JSONResponse(
content={
"DedeUserID": DedeUserID,
"update_time": cookie_data["update_time"],
"expire_time": expire_timestamp,
"check_time": cookie_data.get("check_time"),
"cookie_valid": cookie_data.get("cookie_valid"),
}
)
else:
raise HTTPException(status_code=404, detail="未找到指定的 Cookie 信息")
else:
cookies = []
if os.path.exists(COOKIE_FOLDER):
for filename in os.listdir(COOKIE_FOLDER):
if filename.endswith(".json"):
file_path = os.path.join(COOKIE_FOLDER, filename)
with open(file_path, "r", encoding="utf-8") as file:
cookie_data = json.load(file)
DedeUserID = filename.replace(".json", "")
expires_in = cookie_data["token_info"]["expires_in"]
expire_timestamp = (
cookie_data["update_time"] + int(expires_in) * 1000
)
cookies.append(
{
"DedeUserID": DedeUserID,
"update_time": cookie_data["update_time"],
"expire_time": expire_timestamp,
"check_time": cookie_data.get("check_time"),
"cookie_valid": cookie_data.get("cookie_valid"),
}
)
return JSONResponse(content=cookies)
# Cookie健康检查
@app.get("/api/cookie/check")
async def check_cookie_api(DedeUserID: str = Query(...), token: str = Header(None)):
await verify_api_token(token)
result = await check_cookie(DedeUserID)
if result["code"] == 0:
return JSONResponse(
content={"code": 0, "message": "Cookie 有效", "is_valid": True}
)
else:
return JSONResponse(
content={
"code": result["code"],
"message": result["message"],
"is_valid": False,
}
)
# Cookie刷新
@app.get("/api/cookie/refresh")
async def refresh_cookie_api(DedeUserID: str = Query(...), token: str = Header(None)):
await verify_api_token(token)
result = await refresh_cookie(DedeUserID)
if result["code"] == 0:
return JSONResponse(
content={
"code": 0,
"message": "刷新成功",
"expire_time": result["expire_time"],
}
)
else:
return JSONResponse(
content={"code": result["code"], "message": result["message"]}
)
# 启动应用
if __name__ == "__main__":
uvicorn.run(app, host=config["HOST"], port=config["PORT"])