forked from madwind/flexget_qbittorrent_mod
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathauto_sign_in.py
94 lines (82 loc) · 3.38 KB
/
auto_sign_in.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
from __future__ import annotations
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from flexget import plugin
from flexget.event import event
from flexget.task import Task
from loguru import logger
from .ptsites import executor
from .ptsites.base.entry import SignInEntry
from .ptsites.utils.details_report import DetailsReport
class PluginAutoSignIn:
schema: dict = {
'type': 'object',
'properties': {
'user-agent': {'type': 'string'},
'max_workers': {'type': 'integer'},
'get_messages': {'type': 'boolean', 'default': True},
'get_details': {'type': 'boolean', 'default': True},
'cookie_backup': {'type': 'boolean', 'default': True},
'aipocr': {
'type': 'object',
'properties': {
'app_id': {'type': 'string'},
'api_key': {'type': 'string'},
'secret_key': {'type': 'string'}
},
'additionalProperties': False
},
'sites': {
'type': 'object',
'properties': executor.build_sign_in_schema()
}
},
'additionalProperties': False
}
def prepare_config(self, config: dict) -> dict:
config.setdefault('user-agent', '')
config.setdefault('command_executor', '')
config.setdefault('max_workers', {})
config.setdefault('aipocr', {})
config.setdefault('sites', {})
return config
def on_task_input(self, task: Task, config: dict) -> list[SignInEntry]:
config = self.prepare_config(config)
sites: dict = config['sites']
entries: list[SignInEntry] = []
for site_name, site_configs in sites.items():
if not isinstance(site_configs, list):
site_configs = [site_configs]
for sub_site_config in site_configs:
entry = SignInEntry(
title=f'{site_name} {datetime.now().date()}',
url=''
)
entry['site_name'] = site_name
entry['class_name'] = site_name
entry['site_config'] = sub_site_config
entry['result'] = ''
entry['messages'] = ''
entry['details'] = ''
executor.build_sign_in_entry(entry, config)
entries.append(entry)
return entries
def on_task_output(self, task: Task, config: dict) -> None:
max_workers: int = config.get('max_workers', 1)
date_now: str = str(datetime.now().date())
for entry in task.all_entries:
if date_now not in entry['title']:
entry.reject(f'{entry["title"]} out of date!')
with ThreadPoolExecutor(max_workers=max_workers) as thread_executor:
for entry, feature in [(entry, thread_executor.submit(executor.sign_in, entry, config))
for entry in task.accepted]:
try:
feature.result()
except Exception as e:
logger.exception(e)
entry.fail_with_prefix('Exception: ' + str(e))
if config.get('get_details', True):
DetailsReport().build(task)
@event('plugin.register')
def register_plugin() -> None:
plugin.register(PluginAutoSignIn, 'auto_sign_in', api_ver=2)