forked from albertz/RandomFtpGrabber
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
executable file
·194 lines (153 loc) · 5.05 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
#!/usr/bin/env python3
import sys
from argparse import ArgumentParser
import os
import termios
import re
from typing import List
RootDir = "."
Sources = [] # type: List[str]
Blacklist = []
FileWhitelist = []
DownloadOnly = False
Args = None
reloadHandlers = []
def prepare_stdin():
fd = sys.stdin.fileno()
if not os.isatty(fd):
print("Not a tty. No stdin control.")
return
old = termios.tcgetattr(fd)
new = termios.tcgetattr(fd)
new[3] = new[3] & ~termios.ICANON & ~termios.ECHO
# http://www.unixguide.net/unix/programming/3.6.2.shtml
new[6][termios.VMIN] = 0
new[6][termios.VTIME] = 1 # timeout
termios.tcsetattr(fd, termios.TCSANOW, new)
termios.tcsendbreak(fd, 0)
import atexit
atexit.register(lambda: termios.tcsetattr(fd, termios.TCSANOW, old))
print_stdin_help()
def stdin_get_char():
fd = sys.stdin.fileno()
ch = os.read(fd, 7)
return ch
def print_stdin_help():
print("Console control:")
print(" <h>: print this help")
print(" <r>: reload lists (sources, blacklist)")
print(" <d>: debug: show all workers")
print(" <q>: quit")
def stdin_handler_loop():
while True:
ch = stdin_get_char()
if not ch:
continue
elif ch == b"q":
print("Exit.")
import Threading
from Action import IssueSystemExit
Threading.do_in_main_thread(IssueSystemExit(), wait=False)
return
elif ch == b"r":
print("Reload lists.")
setup_lists()
for handler in reloadHandlers:
handler()
elif ch == b"d":
print("Workers:")
import TaskSystem
for worker in TaskSystem.workers:
print(" %s" % worker)
elif ch == b"h" or ch == b"\n":
print_stdin_help()
else:
print("Unknown key command: %r" % ch)
print_stdin_help()
def start_stdin_handler_loop():
prepare_stdin()
from threading import Thread
t = Thread(target=stdin_handler_loop, name="stdin control")
t.daemon = True
t.start()
def setup_lists():
import Logging
main.Sources = [l for l in open(RootDir + "/sources.txt").read().splitlines() if l and not l.startswith("#")]
if os.path.exists(RootDir + "/blacklist.txt"):
blacklist = open(RootDir + "/blacklist.txt").read().splitlines()
main.Blacklist = [re.compile(bad_pattern) for bad_pattern in blacklist]
else:
main.Blacklist = []
Logging.log("blacklist:", main.Blacklist)
if os.path.exists(RootDir + "/file_whitelist.txt"):
file_whitelist = open(RootDir + "/file_whitelist.txt").read().splitlines()
main.FileWhitelist = [re.compile(pattern) for pattern in file_whitelist]
else:
main.FileWhitelist = []
Logging.log("file whitelist:", main.FileWhitelist)
def allowed_by_blacklist(entry):
"""
:param str entry: URL of either a directory or file
:rtype: bool
"""
for bad_pattern_re in main.Blacklist:
if bad_pattern_re.match(entry):
return False
return True
def allowed_by_file_whitelist(entry):
"""
:param str entry: URL of a file
:rtype: bool
"""
for pattern_re in main.FileWhitelist:
if not pattern_re.match(entry):
return False
return True
def setup(*raw_arg_list):
print("RandomFtpGrabber startup.")
import better_exchook
better_exchook.install()
import Logging
better_exchook.output = Logging.log
arg_parser = ArgumentParser()
arg_parser.add_argument("--dir", default=os.getcwd())
arg_parser.add_argument("--numWorkers", type=int)
arg_parser.add_argument("--shell", action="store_true")
arg_parser.add_argument("--downloadRemaining", action="store_true")
global Args
Args = arg_parser.parse_args(raw_arg_list)
if sys.version_info.major != 3:
Logging.log("Warning: This code was only tested with Python3.")
import time
time.sleep(10) # wait a bit to make sure the user sees this
start_stdin_handler_loop()
import main
main.RootDir = Args.dir
Logging.log("root dir: %s" % RootDir)
main.DownloadOnly = Args.downloadRemaining
if not main.DownloadOnly:
setup_lists()
import TaskSystem # important to be initially imported in the main thread
if Args.numWorkers:
TaskSystem.kNumWorkers = Args.numWorkers
TaskSystem.kMinQueuedActions = Args.numWorkers
TaskSystem.kSuggestedMaxQueuedActions = Args.numWorkers * 2
if Args.shell:
TaskSystem.kNumWorkers = 0
TaskSystem.setup()
def main_entry():
import TaskSystem
import Logging
try:
TaskSystem.main_loop()
except KeyboardInterrupt:
Logging.log("KeyboardInterrupt")
# Has the effect that this module is know as 'main' and not just '__main__'.
import main
if __name__ == "__main__":
main.setup(*sys.argv[1:])
if main.Args.shell:
import better_exchook
better_exchook.debug_shell(globals(), globals())
sys.exit()
main.main_entry()