-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathroutes.py
65 lines (44 loc) · 2.15 KB
/
routes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# pylint: disable=unused-import
# flake8: noqa: F401
import inspect
import sys
from resource.webapp.get_index import GetIndex
from resource.account.login import Login
from resource.engine.add_worker import AddWorker
from resource.engine.get_thread_count import GetThreadCount
from resource.engine.get_worker_count import GetWorkerCount
from resource.healthz import Healthz
from resource.log.get_cron_log import GetCronLog
from resource.module.get_modules import GetModules
from resource.module.run_module import RunModule
def set_routes(*args):
plugins = args[0]
# Create the webapp endpoints
plugins["api"].add_resource(
GetIndex,
"/",
resource_class_kwargs={"db": plugins["db"], "engine": plugins["engine"]}
)
# Build the endpoints imported in this file
classes = inspect.getmembers(sys.modules[__name__], inspect.isclass)
classes = [c for c in classes if c[0] != "GetIndex" and c[0] != "GetStatic"]
for res in classes:
# Build the endpoint
endpoint = res[1].__module__.replace("resource.", ".", 1).replace(".", "/")
functions = inspect.getmembers(res[1], predicate=inspect.isfunction)
http_functions = [f for n, f in functions if n in ["get", "post"]]
if len(http_functions) == 0:
raise Exception(f"No http function found on resource {res[1].__module__}")
if len(http_functions) > 1:
raise Exception(f"Too much http functions found on resource {res[1].__module__}")
function_args = [a for a in inspect.signature(http_functions[0]).parameters if a != "self" and a != "kwargs"]
if len(function_args) > 1:
raise Exception(f"Too much args for http function on resource {res[1].__module__}")
if len(function_args) == 1:
endpoint += f"/<{function_args[0]}>"
# Build the args
class_args = {a: plugins[a] for a in inspect.getfullargspec(res[1]).args if a != "self"}
# Add the resource
plugins["api"].add_resource(res[1], endpoint, resource_class_kwargs=class_args)
# Add the resource in the doc
plugins["docs"].register(res[1], resource_class_kwargs=class_args)