forked from dhtech/snmpexporter
-
Notifications
You must be signed in to change notification settings - Fork 0
/
snmpexporterd.py
executable file
·253 lines (201 loc) · 8.03 KB
/
snmpexporterd.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
#!/usr/bin/env python3
import argparse
from concurrent import futures
import functools
import logging
import objgraph
import sys
import threading
import snmpexporter
import snmpexporter.config
import snmpexporter.prometheus
from twisted.internet import reactor, task, endpoints
from twisted.python import log
from twisted.web import server, resource
# As we're using multiprocessing it's probably not needed for this to be
# thread-local, but why not.
tls = threading.local()
tls.snmpimpl = None
# Used to test health of the executors
def do_nothing():
pass
def poll(config, host, layer):
try:
if not tls.snmpimpl:
logging.debug('Initializing Net-SNMP implemention')
tls.snmpimpl = snmpexporter.snmpimpl.NetsnmpImpl()
collections = config['collection']
overrides = config['override']
snmp_creds = config['snmp']
logging.debug('Constructing SNMP target')
target = snmpexporter.target.SnmpTarget(host, layer, snmp_creds)
target.start('poll')
logging.debug('Creating SNMP poller')
poller = snmpexporter.poller.Poller(collections, overrides, tls.snmpimpl)
logging.debug('Starting poll')
data, timeouts, errors = poller.poll(target)
target.add_timeouts(timeouts)
target.add_errors(errors)
return target, data
except:
logging.exception('Poll exception')
raise
def annotate(config, resolver, f):
try:
target, data = f
annotator_config = config['annotator']
exporter_config = config['exporter']
target.start('annotate')
logging.debug('Creating result annotator')
annotator = snmpexporter.annotator.Annotator(annotator_config, resolver)
logging.debug('Starting annotation')
result = annotator.annotate(data)
target.done()
exporter = snmpexporter.prometheus.Exporter(exporter_config)
return exporter.export(target, result)
except:
logging.exception('Annotate exception')
raise
class PollerResource(resource.Resource):
isLeaf = True
def __init__(self, config_file, poller_pool, annotator_pool):
super(PollerResource).__init__()
# Use process pollers as netsnmp is not behaving well using just threads
logging.debug('Starting poller pool ...')
self.poller_executor = futures.ProcessPoolExecutor(
max_workers=poller_pool)
# Start MIB resolver after processes above (or it will fork it as well)
logging.debug('Initializing MIB resolver ...')
import mibresolver
self.resolver = mibresolver
logging.debug('Starting annotation pool ...')
# .. but annotators are just CPU, so use lightweight threads.
self.annotator_executor = futures.ThreadPoolExecutor(
max_workers=annotator_pool)
self.config_file = config_file
def _response_failed(self, err, f):
logging.debug('Request cancelled, cancelling future %s', f)
f.cancel()
def _reactor_annotate_done(self, request, f):
reactor.callFromThread(self._annotate_done, request, f)
def _annotate_done(self, request, f):
if f.exception():
logging.error('Annotator failed: %s', repr(f.exception()))
request.setResponseCode(500, message=(
'Annotator failed: %s' % repr(f.exception())).encode())
request.finish()
return
for row in f.result():
request.write(row.encode())
request.write('\n'.encode())
request.finish()
def _reactor_poll_done(self, config, request, f):
reactor.callFromThread(self._poll_done, config, request, f)
def _poll_done(self, config, request, f):
if f.exception():
logging.error('Poller failed: %s', repr(f.exception()))
request.setResponseCode(500, message=(
'Poller failed: %s' % repr(f.exception())).encode())
request.finish()
return
logging.debug('Poller done, starting annotation')
f = self.annotator_executor.submit(
annotate, config, self.resolver, f.result())
f.add_done_callback(functools.partial(self._reactor_annotate_done, request))
request.notifyFinish().addErrback(self._response_failed, f)
def render_GET(self, request):
path = request.path.decode()
request.setHeader("Content-Type", "text/plain; charset=UTF-8")
if path == '/probe':
return self.probe(request)
elif path == '/healthy':
return self.healthy(request)
elif path == '/objects':
return self.objects(request)
else:
logging.info('Not found: %s', path)
request.setResponseCode(404)
return '404 Not Found'.encode()
def objects(self, request):
types = objgraph.most_common_types(limit=1000)
request.write('# HELP objgraph_objects active objects in memory'.encode())
request.write('# TYPE objgraph_objects gauge'.encode())
for name, count in types:
request.write(
('objgraph_objects{name="%s"} %s\n' % (name, count)).encode())
return bytes()
def _annotator_executor_healthy(self, request, completed_f):
if completed_f.exception() or completed_f.cancelled():
request.setResponseCode(500, message=(
'Annotator health failed: %s' % repr(
completed_f.exception())).encode())
request.finish()
return
request.write('I am healthy'.encode())
request.finish()
def _poller_executor_healthy(self, request, completed_f):
if completed_f.exception() or completed_f.cancelled():
request.setResponseCode(500, message=(
'Poller health failed: %s' % repr(completed_f.exception())).encode())
request.finish()
return
f = self.annotator_executor.submit(do_nothing)
f.add_done_callback(
lambda f: reactor.callFromThread(
self._annotator_executor_healthy, request, f))
def healthy(self, request):
# Send the healthy request through the pipeline executors to see
# that everything works.
f = self.poller_executor.submit(do_nothing)
logging.debug('Starting healthy poll')
f.add_done_callback(
lambda f: reactor.callFromThread(
self._poller_executor_healthy, request, f))
request.notifyFinish().addErrback(self._response_failed, f)
return server.NOT_DONE_YET
def probe(self, request):
layer = request.args.get('layer'.encode(), [None])[0]
target = request.args.get('target'.encode(), [None])[0]
if not layer or not target:
request.setResponseCode(400)
return '400 Missing layer or target parameter'.encode()
layer = layer.decode()
target = target.decode()
config = snmpexporter.config.load(self.config_file)
f = self.poller_executor.submit(poll, config, target, layer)
f.add_done_callback(
functools.partial(self._reactor_poll_done, config, request))
logging.debug('Starting poll')
request.notifyFinish().addErrback(self._response_failed, f)
return server.NOT_DONE_YET
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='One-shot SNMP exporter.')
parser.add_argument('--config', dest='config_file', type=str,
help='config file to load', default='/etc/snmpexporter.yaml')
parser.add_argument('--log-level', dest='log_level', type=str,
help='log level', default='INFO')
parser.add_argument('--poller-pool', dest='poller_pool', type=int,
help='number of simultaneous polls to do', default=10)
parser.add_argument('--annotator-pool', dest='annotator_pool', type=int,
help='number of threads to use to annotate', default=5)
parser.add_argument('--port', dest='port', type=int,
help='port to listen to', default=9190)
args = parser.parse_args()
# Logging setup
observer = log.PythonLoggingObserver()
observer.start()
root = logging.getLogger()
ch = logging.StreamHandler(sys.stderr)
formatter = logging.Formatter( '%(asctime)s - %(name)s - '
'%(levelname)s - %(message)s' )
ch.setFormatter(formatter)
root.addHandler(ch)
root.setLevel(logging.getLevelName(args.log_level))
pr = PollerResource(
args.config_file, args.poller_pool, args.annotator_pool)
factory = server.Site(pr)
logging.debug('Starting web server on port %d', args.port)
endpoint = endpoints.TCP4ServerEndpoint(reactor, args.port)
endpoint.listen(factory)
reactor.run()