-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy path__init__.py
executable file
·257 lines (228 loc) · 8.95 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
import configparser
import inspect
import logging
import os
import re
from AaronTools.const import AARONLIB, AARONTOOLS
default_config = configparser.ConfigParser(interpolation=None, comment_prefixes=("#"))
for filename in [
os.path.join(AARONTOOLS, "config.ini"),
os.path.join(AARONLIB, "config.ini"),
]:
try:
default_config.read(filename)
except FileNotFoundError:
continue
except configparser.MissingSectionHeaderError:
# add global options to default section
with open(filename) as f:
contents = "[DEFAULT]\n" + f.read()
default_config.read_string(contents)
if "log_level" in default_config["DEFAULT"]:
LOGLEVEL = default_config["DEFAULT"]["log_level"].upper()
else:
LOGLEVEL = "WARNING"
if "print_citations" in default_config["DEFAULT"]:
PRINT_CITATIONS = default_config["DEFAULT"].getboolean("print_citations")
else:
PRINT_CITATIONS = False
try:
SAVE_CITATIONS = default_config["DEFAULT"].getboolean("save_citations")
except ValueError:
SAVE_CITATIONS = default_config["DEFAULT"].get("save_citations")
if SAVE_CITATIONS is False:
SAVE_CITATIONS = None
logging.logThreads = 0
logging.logProcesses = 0
logging.captureWarnings(True)
class CustomFilter(logging.Filter):
def __init__(self, name="", level=None, override=None, cite=False):
super().__init__(name=name)
self.level = logging.WARNING
if isinstance(level, str):
level = getattr(logging, level.upper())
if level is not None:
self.level = level
self.override = {}
if override is not None:
self.override = override
self.cite = cite
def filter(self, record):
if record.funcName == "citation":
found = False
for frame in reversed(inspect.stack()):
if found:
record.funcName = frame.function
break
if frame.function == "_callTestMethod":
found = True
else:
record.funcName = inspect.stack()[-2].function
record.levelname = "CITATION"
if not self.cite:
return False
self.parse_message(record)
return True
for level, func_list in self.override.items():
if isinstance(level, str):
level = getattr(logging, level.upper())
if record.funcName not in func_list:
continue
if record.levelno < level:
return False
self.parse_message(record)
return True
if record.levelno < self.level:
return False
self.parse_message(record)
return True
def parse_message(self, record):
"""
Formats message to print prettily to console
"""
if isinstance(record.msg, str):
record.msg = re.sub(
"\n(\S)", lambda x: "\n %s" % x.group(1), record.msg
)
msg = ["\n "]
for word in re.findall("\S+\s*", record.getMessage()):
if len("".join(msg).split("\n")[-1]) + len(word) < 80:
msg.append(word)
else:
msg.append("\n {}".format(word))
record.getMessage = lambda: "".join(msg)
class CitationHandler(logging.FileHandler):
def __init__(self, filename, **kwargs):
filename = os.path.expandvars(filename)
if not os.path.exists(os.path.dirname(filename)):
# might be trying to put citations in $AARONLIB, but user
# didn't bother to set the environment variable and just
# uses the default
from AaronTools.const import AARONLIB
if "$AARONLIB" in filename:
filename = filename.replace("$AARONLIB", AARONLIB)
elif "${AARONLIB}" in filename:
filename = filename.replace("${AARONLIB}", AARONLIB)
elif "%AARONLIB%" in filename:
filename = filename.replace("%AARONLIB%", AARONLIB)
super().__init__(filename, **kwargs)
def emit(self, record):
"""
Adds a record to the citation file if it's not already present
"""
if record.levelname != "CITATION":
return
msg = record.msg.replace("\n ", " ")
record.getMessage = lambda: "".join(msg)
# check for duplicates
dupe = False
with open(self.baseFilename) as f:
for line in f.readlines():
if line.strip() == self.format(record):
dupe = True
break
if not dupe:
super().emit(record)
class ATLogger(logging.Logger):
def __init__(
self, name, level=None, override=None, fmt=None, add_hdlrs=None
):
"""
:level: the log level to use
:override: dict(level=funcName) to override loglevel for certain funcitons
:fmt: formatting string (optional)
:add_hdlrs: list(str(handlerName)) or list(Handler())
"""
super().__init__(name, level=1)
if level is None:
level = LOGLEVEL
if isinstance(level, str):
level = getattr(logging, level.upper())
self.level = level
if fmt is None:
fmt = "%(levelname)s %(name)s.%(funcName)s %(message)s"
formatter = logging.Formatter(fmt=fmt)
handlers = [(logging.StreamHandler(), PRINT_CITATIONS)]
if SAVE_CITATIONS is not None and os.access(SAVE_CITATIONS, os.W_OK):
handlers += [(CitationHandler(SAVE_CITATIONS), True)]
if add_hdlrs is not None:
for hdlr in add_hdlrs:
if isinstance(hdlr, str):
hdlr = getattr(logging, hdlr)
handlers.append((hdlr(), PRINT_CITATIONS))
else:
handlers.append(hdlr, PRINT_CITATIONS)
for hdlr, cite in handlers:
hdlr.setFormatter(formatter)
hdlr.addFilter(
CustomFilter(
name=name, level=self.level, override=override, cite=cite
)
)
self.addHandler(hdlr)
def citation(self, msg, *args, **kwargs):
self.info(msg, *args, **kwargs)
def getlogger(name=None, level=None, override=None, fmt=None):
"""
Get the logger without using the class decorator
:level: the log level to apply, defaults to WARNING
:override: a dictionary of the form {new_level: function_name_list} will apply the
`new_level` to log records produced from functions with names in
`function_name_list`, eg:
override={"DEBUG": ["some_function"]}
will set the log level to DEBUG for any messages produced during the run of
some_function()
"""
if name is None:
package = None
for frame in reversed(inspect.stack()):
res = inspect.getargvalues(frame.frame)
if "__name__" in res.locals and name is None:
name = res.locals["__name__"]
if "__package__" in res.locals and package is None:
package = res.locals["__package__"]
if name is not None and package is not None:
break
name = "{}{}{}".format(
name if name is not None else "",
"." if package is not None else "",
package if package is not None else "",
)
log = ATLogger(name, level=level, override=override, fmt=fmt)
return log
def addlogger(cls):
"""
Import this function and use it as a class decorator.
Log messages using the created LOG class attribute.
Useful class attributes to set that will be picked up by this decorator:
:LOG: Will be set to the logger instance during class initialization
:LOGLEVEL: Set this to use a different log level than what is in your config. Only
do this for testing purposes, and do not include it when pushing commits to the
master AaronTools branch.
:LOGLEVEL_OVERRIDE: Use this dict to override the log level set in the config file
for records originating in particular functions. Keys are log levels, values
are lists of strings corresponding to function names (default: {})
Example:
```
from AaronTools import addlogger
@addlogger
class Someclass:
LOG = None
LOGLEVEL = "WARNING"
LOGLEVEL_OVERRIDE = {"DEBUG": ["some_function"]}
# this won't be printed b/c "INFO" < LOGLEVEL
LOG.info("loading class")
def some_function(self):
# this message will be printed thanks to LOGLEVEL_OVERRIDE
self.LOG.debug("function called")
```
"""
name = "{}.{}".format(cls.__module__, cls.__name__)
level = None
if hasattr(cls, "LOGLEVEL") and cls.LOGLEVEL is not None:
level = cls.LOGLEVEL
override = None
if hasattr(cls, "LOGLEVEL_OVERRIDE") and cls.LOGLEVEL_OVERRIDE is not None:
override = cls.LOGLEVEL_OVERRIDE
cls.LOG = ATLogger(name, level=level, override=override)
return cls