-
Notifications
You must be signed in to change notification settings - Fork 101
/
hpilo.py
2199 lines (1920 loc) · 99.5 KB
/
hpilo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# (c) 2011-2021 Dennis Kaarsemaker <[email protected]>
# see COPYING for license details
__version__ = "4.4.3"
import codecs
import io
import os
import errno
import platform
import random
import re
import socket
import ssl
import subprocess
import sys
import types
import xml.etree.ElementTree as etree
import warnings
import hpilo_fw
PY3 = sys.version_info[0] >= 3
if PY3:
import urllib.request as urllib2
class Bogus(Exception): pass
socket.sslerror = Bogus
basestring = str
from os import fsencode
else:
import urllib2
fsencode = lambda x: x
# Python 2.7.13 renamed PROTOCOL_SSLv23 to PROTOCOL_TLS
if not hasattr(ssl, 'PROTOCOL_TLS'):
ssl.PROTOCOL_TLS = ssl.PROTOCOL_SSLv23
# Oh the joys of monkeypatching...
# - We need a CDATA element in set_security_msg, but ElementTree doesn't support it
# - We need to disable escaping of the PASSWORD attribute, because iLO doesn't
# unescape it properly
def CDATA(text=None):
element = etree.Element('![CDATA[')
element.text = text
return element
# Adding this tag to RIBCL scripts should make this hack unnecessary in newer
# iLO firmware versions. TODO: Check compatibility.
# <?ilo entity-processing="standard"?>
class DoNotEscapeMe(str):
pass
etree._original_escape_attrib = etree._escape_attrib
def _escape_attrib(text, *args, **kwargs):
if isinstance(text, DoNotEscapeMe):
return str(text)
else:
return etree._original_escape_attrib(text, *args, **kwargs)
etree._escape_attrib = _escape_attrib
# Python 2.7 and 3
if hasattr(etree, '_serialize_xml'):
etree._original_serialize_xml = etree._serialize_xml
def _serialize_xml(write, elem, *args, **kwargs):
if elem.tag == '![CDATA[':
write("\n<%s%s]]>\n" % (elem.tag, elem.text))
return
return etree._original_serialize_xml(write, elem, *args, **kwargs)
etree._serialize_xml = etree._serialize['xml'] = _serialize_xml
# Python 2.6, and non-stdlib ElementTree
elif hasattr(etree.ElementTree, '_write'):
etree.ElementTree._orig_write = etree.ElementTree._write
def _write(self, file, node, encoding, namespaces):
if node.tag == '![CDATA[':
file.write("\n<![CDATA[%s]]>\n" % node.text.encode(encoding))
else:
self._orig_write(file, node, encoding, namespaces)
etree.ElementTree._write = _write
else:
raise RuntimeError("Don't know how to monkeypatch XML serializer workarounds. Please report a bug at https://github.com/seveas/python-hpilo")
# We handle non-ascii characters in the returned XML by replacing them with XML
# character references. This likely results in bogus data, but avoids crashes.
# The iLO should never do this, but firmware bugs may cause it to do so.
def iloxml_replace(error):
ret = ""
for pos in range(error.start, len(error.object)):
b = error.object[pos]
if not isinstance(b, int):
b = ord(b)
if b < 128:
break
ret += u'?'
warnings.warn("Invalid ascii data found: %s, replaced with %s" % (repr(error.object[error.start:pos]), ret), IloWarning)
return (ret, pos)
codecs.register_error('iloxml_replace', iloxml_replace)
# Which protocol to use
ILO_RAW = 1
ILO_HTTP = 2
ILO_LOCAL = 3
class IloErrorMeta(type):
def __new__(cls, name, parents, attrs):
if 'possible_messages' not in attrs:
attrs['possible_messages'] = []
if 'possible_codes' not in attrs:
attrs['possible_codes'] = []
klass = super(IloErrorMeta, cls).__new__(cls, name, parents, attrs)
if name != 'IloError':
IloError.known_subclasses.append(klass)
return klass
class IloError(Exception):
__metaclass__ = IloErrorMeta
def __init__(self, message, errorcode=None):
if issubclass(IloError, object):
super(IloError, self).__init__(message)
else:
Exception.__init__(self, message)
self.errorcode = errorcode
known_subclasses = []
if PY3:
# Python 3 ignores __metaclass__ but wants class foo(metaclass=bar) But
# that syntax is an error on older python, so recreate IloError properly
# the manual way.
IloError = IloErrorMeta('IloError', (Exception,), {'known_subclasses': [], '__init__': IloError.__init__})
class IloCommunicationError(IloError):
pass
class IloGeneratingCSR(IloError):
possible_messages = ['The iLO subsystem is currently generating a Certificate Signing Request(CSR), run script after 10 minutes or more to receive the CSR.']
possible_codes = [0x0088]
class IloLoginFailed(IloError):
possible_messages = ['Login failed', 'Login credentials rejected']
possible_codes = [0x005f]
class IloUserNotFound(IloError):
possible_codes = [0x000a]
class IloPermissionError(IloError):
possible_codes = [0x0023]
class IloNotARackServer(IloError):
possible_codes = [0x002a]
class IloLicenseKeyError(IloError):
possible_codes = [0x002e]
class IloFeatureNotSupported(IloError):
possible_codes = [0x003c]
class IloNotConfigured(IloError):
possible_codes = [0x006d]
class IloWarning(Warning):
pass
class IloXMLWarning(Warning):
pass
class IloTestWarning(Warning):
pass
class Ilo(object):
"""Represents an iLO/iLO2/iLO3/iLO4/RILOE II management interface on a
specific host. A new connection using the specified login, password and
timeout will be made for each API call. The library will detect which
protocol to use, but you can override this by setting protocol to
ILO_RAW or ILO_HTTP. Use ILO_LOCAL to avoid using a network connection
and use hponcfg instead. Username and password are ignored for ILO_LOCAL
connections. Set delayed to True to make python-hpilo not send requests
immediately, but group them together. See :func:`call_delayed`"""
XML_HEADER = b'<?xml version="1.0"?>\r\n'
HTTP_HEADER = b"POST /ribcl HTTP/1.1\r\nHost: localhost\r\nContent-Length: %d\r\nConnection: Close%s\r\n\r\n"
HTTP_UPLOAD_HEADER = b"POST /cgi-bin/uploadRibclFiles HTTP/1.1\r\nHost: localhost\r\nConnection: Close\r\nContent-Length: %d\r\nContent-Type: multipart/form-data; boundary=%s\r\n\r\n"
BLOCK_SIZE = 64 * 1024
def __init__(self, hostname, login=None, password=None, timeout=60, port=443, protocol=None, delayed=False, ssl_verify=False, ssl_context=None, ssl_version=None):
self.hostname = hostname
self.login = login or 'Administrator'
self.password = password or 'Password'
self.timeout = timeout
self.debug = 0
self.port = port
self.protocol = protocol
self.ssl_verify = False
self.ssl_context = ssl_context
self.cookie = None
self.delayed = delayed
self._elements = None
self._processors = []
self.save_response = None
self.read_response = None
self.save_request = None
self._protect_passwords = os.environ.get('HPILO_DONT_PROTECT_PASSWORDS', None) != 'YesPlease'
self.firmware_mirror = None
self.hponcfg = "/sbin/hponcfg"
hponcfg = 'hponcfg'
if platform.system() == 'Windows':
self.hponcfg = r'C:\Program Files\HP Lights-Out Configuration Utility\cpqlocfg.exe'
hponcfg = 'cpqlocfg.exe'
for path in os.environ.get('PATH','').split(os.pathsep):
maybe = os.path.join(path, hponcfg)
if os.access(maybe, os.X_OK):
self.hponcfg = maybe
break
if self.ssl_verify:
if sys.version_info < (2,7,9):
raise EnvironmentError("SSL verification only works with python 2.7.9 or newer")
if not self.ssl_context:
self.ssl_context = ssl.create_default_context()
# Sadly, ancient iLO's aren't dead yet, so let's enable sslv3 by default
self.ssl_context.options &= ~ssl.OP_NO_SSLv3
def __str__(self):
return "iLO interface of %s" % self.hostname
def _debug(self, level, message):
if message.__class__.__name__ == 'bytes':
message = message.decode('ascii')
if self.debug >= level:
if self._protect_passwords:
message = re.sub(r'PASSWORD=".*?"', 'PASSWORD="********"', message)
sys.stderr.write(message)
if message.startswith('\r'):
sys.stderr.flush()
else:
sys.stderr.write('\n')
def _request(self, xml, progress=None):
"""Given an ElementTree.Element, serialize it and do the request.
Returns an ElementTree.Element containing the response"""
if not self.protocol and not self.read_response:
self._detect_protocol()
# Serialize the XML
if hasattr(etree, 'tostringlist'):
xml = b"\r\n".join(etree.tostringlist(xml)) + b'\r\n'
else:
xml = etree.tostring(xml)
header, data = self._communicate(xml, self.protocol, progress=progress)
# This thing usually contains multiple XML messages
messages = []
while data:
pos = data.find('<?xml', 5)
if pos == -1:
message = self._parse_message(data)
data = None
else:
message = self._parse_message(data[:pos])
data = data[pos:]
# _parse_message returns None if a message has no useful content
if message is not None:
messages.append(message)
if not messages:
return header, None
elif len(messages) == 1:
return header, messages[0]
else:
return header, messages
def _detect_protocol(self):
# Use hponcfg when 'connecting' to localhost
if self.hostname == 'localhost':
self.protocol = ILO_LOCAL
return
# Do a bogus request, using the HTTP protocol. If there is no
# header (see special case in communicate(), we should be using the
# raw protocol
header, data = self._communicate(b'<RIBCL VERSION="2.0"></RIBCL>', ILO_HTTP, save=False)
if header:
self.protocol = ILO_HTTP
else:
self.protocol = ILO_RAW
def _upload_file(self, filename, progress):
with open(filename, 'rb') as fd:
firmware = fd.read()
boundary = b'------hpiLO3t%dz' % random.randint(100000,1000000)
while boundary in firmware:
boundary = b'------hpiLO3t%dz' % str(random.randint(100000,1000000))
parts = [
b"""--%s\r\nContent-Disposition: form-data; name="fileType"\r\n\r\n""" % boundary,
b"""\r\n--%s\r\nContent-Disposition: form-data; name="fwimgfile"; filename="%s"\r\nContent-Type: application/octet-stream\r\n\r\n""" % (boundary, fsencode(filename)),
firmware,
b"\r\n--%s--\r\n" % boundary,
]
total_bytes = sum([len(x) for x in parts])
sock = self._get_socket()
self._debug(2, self.HTTP_UPLOAD_HEADER % (total_bytes, boundary))
sock.write(self.HTTP_UPLOAD_HEADER % (total_bytes, boundary))
for part in parts:
if len(part) < self.BLOCK_SIZE:
self._debug(2, part)
sock.write(part)
else:
sent = 0
fwlen = len(part)
while sent < fwlen:
written = sock.write(part[sent:sent+self.BLOCK_SIZE])
if written is None:
plen = len(part[sent:sent+self.BLOCK_SIZE])
raise IloCommunicationError("Unexpected EOF while sending %d bytes (%d of %d sent before)" % (plen, sent, fwlen))
sent += written
if callable(progress):
progress("Sending request %d/%d bytes (%d%%)" % (sent, fwlen, 100.0*sent/fwlen))
data = ''
try:
while True:
d = sock.read()
data += d.decode('ascii')
if not d:
break
except socket.sslerror as exc: # Connection closed
if not data:
raise IloCommunicationError("Communication with %s:%d failed: %s" % (self.hostname, self.port, str(exc)))
self._debug(1, "Received %d bytes" % len(data))
self._debug(2, data)
if 'Set-Cookie:' not in data:
# Seen on ilo3 with corrupt filesystem
body = re.search('<body>(.*)</body>', data, flags=re.DOTALL).group(1)
body = re.sub('<[^>]*>', '', body).strip()
body = re.sub('Return to last page', '', body).strip()
body = re.sub(r'\s+', ' ', body).strip()
raise IloError(body)
self.cookie = re.search('Set-Cookie: *(.*)', data).group(1)
self._debug(2, "Cookie: %s" % self.cookie)
def _get_socket(self):
"""Set up a subprocess or an https connection and do an HTTP/raw socket request"""
if self.read_response or self.save_request:
class FakeSocket(object):
def __init__(self, rfile=None, wfile=None):
self.input = rfile and open(rfile, 'rb') or io.BytesIO()
self.output = wfile and open(wfile, 'ab') or io.BytesIO()
self.read = self.input.read
self.write = self.output.write
data = self.input.read(4)
self.input.seek(0)
self.protocol = data == b'HTTP' and ILO_HTTP or ILO_RAW
def close(self):
self.input.close()
self.output.close()
shutdown = lambda *args: None
sock = FakeSocket(self.read_response, self.save_request)
if self.read_response:
self.protocol = sock.protocol
return sock
if self.protocol == ILO_LOCAL:
self._debug(1, "Launching hponcfg")
try:
sp = subprocess.Popen([self.hponcfg, '--input', '--xmlverbose'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except OSError as exc:
raise IloCommunicationError("Cannot run %s: %s" % (self.hponcfg, str(exc)))
sp.write = sp.stdin.write
sp.read = sp.stdout.read
return sp
self._debug(1, "Connecting to %s port %d" % (self.hostname, self.port))
err = None
for res in socket.getaddrinfo(self.hostname, self.port, 0, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
sock = None
try:
sock = socket.socket(af, socktype, proto)
sock.settimeout(self.timeout)
self._debug(2, "Connecting to %s port %d" % sa[:2])
sock.connect(sa)
except socket.timeout:
if sock is not None:
sock.close()
err = IloCommunicationError("Timeout connecting to %s port %d" % (self.hostname, self.port))
except socket.error as exc:
if sock is not None:
sock.close()
err = IloCommunicationError("Error connecting to %s port %d: %s" % (self.hostname, self.port, str(exc)))
if err is not None:
raise err
if not sock:
raise IloCommunicationError("Unable to resolve %s" % self.hostname)
try:
if not self.ssl_context:
self.ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS)
# Even more sadly, some iLOs are still using RC4-SHA
# which was dropped from the default cipher suite in
# Python 2.7.10 and Python 3.4.4. Add it back here :(
self.ssl_context.set_ciphers(ssl._DEFAULT_CIPHERS + ":RC4-SHA")
return self.ssl_context.wrap_socket(
sock, server_hostname=self.hostname)
except ssl.SSLError as exc:
raise IloCommunicationError("Cannot establish ssl session with %s:%d: %s" % (self.hostname, self.port, str(exc)))
def _communicate(self, xml, protocol, progress=None, save=True):
sock = self._get_socket()
if self.read_response:
protocol = sock.protocol
msglen = len(self.XML_HEADER + xml)
if protocol == ILO_HTTP:
extra_header = b''
if self.cookie:
extra_header = b"\r\nCookie: %s" % self.cookie.encode('ascii')
http_header = self.HTTP_HEADER % (msglen, extra_header)
msglen += len(http_header)
self._debug(1, "Sending XML request, %d bytes" % msglen)
if protocol == ILO_HTTP:
self._debug(2, http_header)
sock.write(http_header)
self._debug(2, self.XML_HEADER + xml)
# XML header and data need to arrive in 2 distinct packets
if self.protocol != ILO_LOCAL:
sock.write(self.XML_HEADER)
if b'$EMBED' in xml:
pre, name, post = re.compile(rb'(.*)\$EMBED:(.*)\$(.*)', re.DOTALL).match(xml).groups()
sock.write(pre)
sent = 0
fwlen = os.path.getsize(name)
with open(name, 'rb') as fd:
fw = fd.read()
while sent < fwlen:
written = sock.write(fw[sent:sent+self.BLOCK_SIZE])
sent += written
if callable(progress):
progress("Sending request %d/%d bytes (%d%%)" % (sent, fwlen, 100.0*sent/fwlen))
sock.write(post.strip())
else:
sock.write(xml)
# And grab the data
if self.save_request and save:
sock.close()
return None, None
if self.protocol == ILO_LOCAL:
# hponcfg doesn't return data until stdin is closed
sock.stdin.close()
data = ''
try:
while True:
d = sock.read().decode('ascii', 'iloxml_replace')
data += d
if not d:
break
if callable(progress) and d.strip().endswith('</RIBCL>'):
d = d[d.find('<?xml'):]
while '<?xml' in d:
end = d.find('<?xml', 5)
if end == -1:
msg = self._parse_message(d, include_inform=True)
if msg:
progress(msg)
break
else:
msg = self._parse_message(d[:end], include_inform=True)
if msg:
progress(msg)
d = d[end:]
except socket.sslerror as exc: # Connection closed
if not data:
raise IloCommunicationError("Communication with %s:%d failed: %s" % (self.hostname, self.port, str(exc)))
self._debug(1, "Received %d bytes" % len(data))
if self.protocol == ILO_LOCAL:
sock.stdout.close()
sock.wait()
elif sock.shutdown:
# On OSX this may cause an ENOTCONN, Linux/Windows ignore that situation
try:
sock.shutdown(socket.SHUT_RDWR)
except socket.error as exc:
if exc.errno == errno.ENOTCONN:
pass
else:
raise
sock.close()
# Stript out garbage from hponcfg
if self.protocol == ILO_LOCAL:
data = data[data.find('<'):data.rfind('>')+1]
if self.save_response and save:
with open(self.save_response, 'a') as fd:
fd.write(data)
# Do we have HTTP?
header_ = ''
if protocol == ILO_HTTP and data.startswith('HTTP/1.1 200'):
header, data = data.split('\r\n\r\n', 1)
header_ = header
header = [x.split(':', 1) for x in header.split('\r\n')[1:]]
header = dict([(x[0].lower(), x[1].strip()) for x in header])
if header['transfer-encoding'] == 'chunked':
_data, data = data, ''
while _data:
clen, _data = _data.split('\r\n', 1)
clen = int(clen, 16)
if clen == 0:
break
data += _data[:clen]
_data = _data[clen+2:]
elif data.startswith('HTTP/1.1 404'):
# We must be using iLO2 or older, they don't do HTTP for XML requests
# This case is only triggered by the protocol detection
header = None
elif not data.startswith('<?xml'):
if protocol == ILO_LOCAL:
raise IloError(sock.stderr.read().strip())
raise IloError("Remote returned bogus data, maybe it's not an iLO")
else:
header = None
self._debug(2, "%s\r\n\r\n%s" % (header_, data))
return header, data
def _root_element(self, element, **attrs):
"""Create a basic XML structure for a message. Return root and innermost element"""
if not self.delayed or not self._elements:
root = etree.Element('RIBCL', VERSION="2.0")
login = etree.SubElement(root, 'LOGIN', USER_LOGIN=self.login, PASSWORD=DoNotEscapeMe(self.password))
if self.delayed:
if self._elements:
root, login = self._elements
else:
self._elements = (root, login)
if self.delayed and len(login) and login[-1].tag == element and login[-1].attrib == attrs:
element = login[-1]
else:
element = etree.SubElement(login, element, **attrs)
return root, element
def _attempt_to_fix_broken_xml(self, data):
""" Many iLO versions have bugs that causes them to emit malformed XML.
This is a collection of workarounds and kludges to try and fix up the
data so ElementTree has a chance to parse it correctly"""
warnings.warn("iLO returned malformed XML, attempting to fix. Please contact HP to report a bug", IloXMLWarning)
if '<RIBCL VERSION="2.22"/>' in data:
data = data.replace('<RIBCL VERSION="2.22"/>', '<RIBCL VERSION="2.22">')
# Remove binary 01 in xml output. This bug was seen on a faulty PSU.
if '\x01' in data:
data = data.replace('\x01', '')
# Quite a few unescaped quotation mark bugs keep appearing. Let's try
# to fix up the XML by replacing the last occurrence of a quotation mark
# *before* the position of the error.
#
# Definitely not an optimal algorithm, but this is not a hot path.
# Let's favour correctness over hacks.
last_position = None
position = (0, 0)
while position != last_position:
last_position = position
try:
return etree.fromstring(data)
except etree.ParseError as e:
position = e.position
x = position[0]-1
y = position[1]
lines = data.splitlines()
y = lines[x].rfind('"', 0, y)
lines[x] = lines[x][:y] + '"' + lines[x][y+1:]
data = '\n'.join(lines)
# Couldn't fix it :(
raise
def _parse_message(self, data, include_inform=False):
"""Parse iLO responses into Element instances and remove useless messages"""
data = data.strip()
if not data:
return None
try:
message = etree.fromstring(data)
except etree.ParseError:
message = self._attempt_to_fix_broken_xml(data)
if message.tag == 'RIBCL':
for child in message:
if child.tag == 'INFORM':
if include_inform:
# Filter useless message:
if 'should be updated' in child.text:
return None
return child.text
# RESPONSE with status 0 also adds no value
# Maybe start adding <?xmlilo output-format="xml"?> to requests. TODO: check compatibility
elif child.tag == 'RESPONSE' and int(child.get('STATUS'), 16) == 0:
if child.get('MESSAGE') != 'No error':
warnings.warn(child.get('MESSAGE'), IloWarning)
# These are interesting, something went wrong
elif child.tag == 'RESPONSE':
if 'syntax error' in child.get('MESSAGE') and not self.protocol:
# This is triggered when doing protocol detection, ignore
pass
else:
status = int(child.get('STATUS'), 16)
message = child.get('MESSAGE')
if 'syntax error' in message:
message += '. You may have tried to use a feature this iLO version or firmware version does not support.'
for subclass in IloError.known_subclasses:
if status in subclass.possible_codes or message in subclass.possible_messages:
raise subclass(message, status)
raise IloError(message, status)
# And this type of message is the actual payload.
else:
return message
return None
# This shouldn't be reached as all messages are RIBCL messages. But who knows!
return message
def _element_children_to_dict(self, element):
"""Returns a dict with tag names of all child elements as keys and the
VALUE attributes as values"""
retval = {}
keys = [elt.tag.lower() for elt in element]
if len(keys) != 1 and len(set(keys)) == 1:
# Can't return a dict
retval = []
for elt in element:
# There are some special tags
fname = '_parse_%s_%s' % (element.tag.lower(), elt.tag.lower())
if hasattr(self, fname):
retval.update(getattr(self, fname)(elt))
continue
key, val, unit, description = elt.tag.lower(), elt.get('VALUE', elt.get('value', None)), elt.get('UNIT', None), elt.get('DESCRIPTION', None)
if val is None:
# HP is not best friends with consistency. Sometimes there are
# attributes, sometimes child tags and sometimes text nodes. Oh
# well, deal with it :)
if element.tag.lower() == 'rimp' or elt.tag.lower() in self.xmldata_ectd.get(element.tag.lower(), []) or elt.tag.lower() == 'temps':
val = self._element_children_to_dict(elt)
elif elt.attrib and list(elt):
val = self._element_to_dict(elt)
elif list(elt):
val = self._element_to_list(elt)
elif elt.text:
val = elt.text.strip()
elif elt.attrib:
val = self._element_to_dict(elt)
val = self._coerce(val)
if unit:
val = (val, unit)
if description and isinstance(val, str):
val = (val, description)
if isinstance(retval, list):
retval.append(val)
elif key in retval:
if isinstance(retval[key], dict):
retval[key].update(val)
elif not isinstance(retval[key], list):
retval[key] = [retval[key], val]
else:
retval[key].append(val)
else:
retval[key] = val
return retval
def _element_to_dict(self, element):
"""Returns a dict with tag attributes as items"""
retval = {}
for key, val in element.attrib.items():
retval[key.lower()] = self._coerce(val)
if list(element):
fields = []
for child in element:
if child.tag == 'FIELD':
fields.append(self._element_to_dict(child))
if fields:
names = [x['name'] for x in fields]
if len(names) == len(set(names)):
# Field names are unique, treat them like attributes
for field in fields:
retval[field['name']] = field['value']
else:
# Field names are not unique, such as the name "MAC"
retval['fields'] = fields
return retval
def _element_to_list(self, element):
tagnames = [x.tag for x in element]
if len(set(tagnames)) == 1:
return [self._element_children_to_dict(x) for x in element]
else:
return [(child.tag.lower(), self._element_to_dict(child)) for child in element]
def _coerce(self, val):
"""Do some data type coercion: unquote, turn integers into integers and
Y/N into booleans"""
if isinstance(val, basestring):
if val.startswith('"') and val.endswith('"'):
val = val[1:-1]
if val.isdigit():
val = int(val)
else:
val = {'Y': True, 'N': False, 'true': True, 'false': False}.get(val, val)
return val
def _raw(self, *tags):
if self.delayed:
raise IloError("Cannot use raw tags in delayed mode")
root, inner = self._root_element(tags[0][0], **(tags[0][1]))
for t in tags[1:]:
inner = etree.SubElement(inner, t[0], **t[1])
header, message = self._request(root)
fd = io.BytesIO()
etree.ElementTree(message).write(fd)
ret = fd.getvalue()
fd.close()
return ret
def _info_tag(self, infotype, tagname, returntags=None, attrib={}, process=lambda x: x):
root, inner = self._root_element(infotype, MODE='read')
etree.SubElement(inner, tagname, **attrib)
if self.delayed:
self._processors.append([self._process_info_tag, returntags or [tagname], process])
return
header, message = self._request(root)
if self.save_request:
return
return self._process_info_tag(message, returntags or [tagname], process)
def _process_info_tag(self, message, returntags, process):
if isinstance(returntags, basestring):
returntags = [returntags]
for tag in returntags:
if message.find(tag) is None:
continue
message = message.find(tag)
if list(message):
return process(self._element_children_to_dict(message))
else:
return process(self._element_to_dict(message))
raise IloError("Expected tag '%s' not found" % "' or '".join(returntags))
def _control_tag(self, controltype, tagname, returntag=None, attrib={}, elements=[], text=None):
root, inner = self._root_element(controltype, MODE='write')
inner = etree.SubElement(inner, tagname, **attrib)
if text:
inner.text = text
for element in elements:
inner.append(element)
if self.delayed:
if tagname == 'CERTIFICATE_SIGNING_REQUEST':
self._processors.append([self._process_control_tag, returntag or tagname])
return
header, message = self._request(root)
return self._process_control_tag(message, returntag or tagname)
def _process_control_tag(self, message, returntag):
if message is None:
return None
message = message.find(returntag)
if message.text.strip():
return message.text.strip()
if not message.attrib and not list(message):
return None
raise IloError("You've reached unknown territories, please report a bug")
if list(message):
return self._element_children_to_dict(message)
else:
return self._element_to_dict(message)
def call_delayed(self):
"""In delayed mode, calling a method on an iLO object will not cause an
immediate callout to the iLO. Instead, the method and parameters are
stored for future calls of this method. This method makes one
connection to the iLO and sends all commands as one XML document.
This speeds up applications that make many calls to the iLO by
removing seconds of overhead per call.
The return value of call_delayed is a list of return values for
individual methods that don't return None. This means that there may
be fewer items returned than methods called as only `get_*` methods
return data
Delayed calls only work on iLO 2 or newer"""
if not self._elements:
raise ValueError("No commands scheduled")
try:
root, inner = self._elements
header, message = self._request(root)
ret = []
if message is not None:
if not isinstance(message, list):
message = [message]
for message, processor in zip(message, self._processors):
ret.append(processor.pop(0)(message, *processor))
finally:
self._processors = []
self._elements = None
return ret
def abort_dir_test(self):
"""Abort authentication directory test"""
return self._control_tag('DIR_INFO', 'ABORT_DIR_TEST')
def activate_license(self, key):
"""Activate an iLO advanced license"""
license = etree.Element('ACTIVATE', KEY=key)
return self._control_tag('RIB_INFO', 'LICENSE', elements=[license])
def add_federation_group(self, group_name, group_key, admin_priv=False,
remote_cons_priv=True, reset_server_priv=False,
virtual_media_priv=False, config_ilo_priv=True, login_priv=False):
"""Add a new federation group"""
attrs = locals()
elements = []
for attribute in [x for x in attrs.keys() if x.endswith('_priv')]:
val = ['No', 'Yes'][bool(attrs[attribute])]
elements.append(etree.Element(attribute.upper(), VALUE=val))
return self._control_tag('RIB_INFO', 'ADD_FEDERATION_GROUP', elements=elements,
attrib={'GROUP_NAME': group_name, 'GROUP_KEY': group_key})
def add_sso_server(self, server=None, import_from=None, certificate=None):
"""Add an SSO server by name (only if SSO trust level is lowered) or by
importing a certificate from a server or directly"""
if [server, import_from, certificate].count(None) != 2:
raise ValueError("You must specify exactly one of server, import_from or certificate")
if server:
return self._control_tag('SSO_INFO', 'SSO_SERVER', attrib={'NAME': server})
if import_from:
return self._control_tag('SSO_INFO', 'SSO_SERVER', attrib={'IMPORT_FROM': import_from})
if certificate:
return self._control_tag('SSO_INFO', 'IMPORT_CERTIFICATE', text=certificate)
def add_user(self, user_login, user_name, password, admin_priv=False,
remote_cons_priv=True, reset_server_priv=False,
virtual_media_priv=False, config_ilo_priv=True):
"""Add a new user to the iLO interface with the specified name,
password and permissions. Permission attributes should be boolean
values."""
attrs = locals()
elements = []
for attribute in [x for x in attrs.keys() if x.endswith('_priv')]:
val = ['No', 'Yes'][bool(attrs[attribute])]
elements.append(etree.Element(attribute.upper(), VALUE=val))
return self._control_tag('USER_INFO', 'ADD_USER', elements=elements,
attrib={'USER_LOGIN': user_login, 'USER_NAME': user_name, 'PASSWORD': DoNotEscapeMe(password)})
def ahs_clear_data(self):
"""Clears Active Health System information log"""
return self._control_tag('RIB_INFO', 'AHS_CLEAR_DATA')
def cert_fqdn(self, use_fqdn):
"""Configure whether to use the fqdn or the short hostname for certificate requests"""
use_fqdn = str({True: 'Yes', False: 'No'}.get(use_fqdn, use_fqdn))
return self._control_tag('RIB_INFO', 'CERT_FQDN', attrib={'VALUE': use_fqdn})
def certificate_signing_request(self, country=None, state=None, locality=None, organization=None,
organizational_unit=None, common_name=None):
"""Get a certificate signing request from the iLO"""
vars = locals()
del vars['self']
vars = [('CSR_' + x.upper(), vars[x]) for x in vars if vars[x]]
elements = map(lambda x: etree.Element(x[0], attrib={'VALUE': str(x[1])}), vars)
return self._control_tag('RIB_INFO', 'CERTIFICATE_SIGNING_REQUEST', elements=elements)
def clear_ilo_event_log(self):
"""Clears the iLO event log"""
return self._control_tag('RIB_INFO', 'CLEAR_EVENTLOG')
def clear_server_event_log(self):
"""Clears the server event log"""
return self._control_tag('SERVER_INFO', 'CLEAR_IML')
def clear_server_power_on_time(self):
"""Clears the server power on time"""
return self._control_tag('SERVER_INFO', 'CLEAR_SERVER_POWER_ON_TIME')
def computer_lock_config(self, computer_lock=None, computer_lock_key=None):
"""Configure the computer lock settings"""
if computer_lock_key:
computer_lock = "custom"
if not computer_lock:
raise ValueError("A value must be specified for computer_lock")
elements = [etree.Element('COMPUTER_LOCK', VALUE=computer_lock)]
if computer_lock_key:
elements.append(etree.Element('COMPUTER_LOCK_KEY', VALUE=computer_lock_key))
return self._control_tag('RIB_INFO', 'COMPUTER_LOCK_CONFIG', elements=elements)
def dc_registration_complete(self):
"""Complete the ERS registration of your device after calling
set_ers_direct_connect"""
return self._control_tag('RIB_INFO', 'DC_REGISTRATION_COMPLETE')
def delete_federation_group(self, group_name):
"""Delete the specified federation group membership"""
return self._control_tag('RIB_INFO', 'DELETE_FEDERATION_GROUP', attrib={'GROUP_NAME': group_name})
def delete_sso_server(self, index):
"""Delete an SSO server by index"""
return self._control_tag('SSO_INFO', 'DELETE_SERVER',
attrib={'INDEX': str(index)})
def delete_user(self, user_login):
"""Delete the specified user from the ilo"""
return self._control_tag('USER_INFO', 'DELETE_USER', attrib={'USER_LOGIN': user_login})
def deactivate_license(self):
"""Delete the license key from the iLO"""
element = etree.Element('DEACTIVATE')
return self._control_tag('RIB_INFO', 'LICENSE', elements=[element])
def disable_ers(self):
"""Disable Insight Remote Support functionality and unregister the server"""
return self._control_tag('RIB_INFO', 'DISABLE_ERS')
def eject_virtual_floppy(self):
"""Eject the virtual floppy"""
return self._control_tag('RIB_INFO', 'EJECT_VIRTUAL_FLOPPY')
def eject_virtual_media(self, device="cdrom"):
"""Eject the virtual media attached to the specified device"""
return self._control_tag('RIB_INFO', 'EJECT_VIRTUAL_MEDIA',
attrib={"DEVICE": device.upper()})
def ers_ahs_submit(self, message_id, bb_days):
"""Submity AHS data to the insight remote support server"""
elements = [
etree.Element('MESSAGE_ID', attrib={'VALUE': str(message_id)}),
etree.Element('BB_DAYS', attrib={'VALUE': str(bb_days)}),
]
return self._control_tag('RIB_INFO', 'TRIGGER_BB_DATA', elements=elements)
def fips_enable(self):
"""Enable FIPS standard to enforce AES/3DES encryption, can only be
reset with a call to factory_defaults. Resets Administrator password
and license key"""
return self._control_tag('RIB_INFO', 'FIPS_ENABLE')
def factory_defaults(self):
"""Reset the iLO to factory default settings"""
return self._control_tag('RIB_INFO', 'FACTORY_DEFAULTS')
def force_format(self):
"""Forcefully format the iLO's internal NAND flash. Only use this when
the iLO is having severe problems and its self-test fails"""
return self._control_tag('RIB_INFO', 'FORCE_FORMAT', attrib={'VALUE': 'all'})
def get_ahs_status(self):
"""Get active health system logging status"""
return self._info_tag('RIB_INFO', 'GET_AHS_STATUS')
def get_all_users(self):
"""Get a list of all loginnames"""
def process(data):
if isinstance(data, dict):
data = data.values()
return [x for x in data if x]
return self._info_tag('USER_INFO', 'GET_ALL_USERS', process=process)
def get_all_user_info(self):
"""Get basic and authorization info of all users"""
def process(data):
if isinstance(data, dict):
data = data.values()
return dict([(x['user_login'], x) for x in data])
return self._info_tag('USER_INFO', 'GET_ALL_USER_INFO', process=process)
def get_asset_tag(self):
"""Gets the server asset tag"""
# The absence of an asset tag is communicated in a warning and there
# will be *NO* returntag, hence the AttributeError.
try:
return self._info_tag('SERVER_INFO', 'GET_ASSET_TAG')
except AttributeError:
return {'asset_tag': None}
def get_cert_subject_info(self):
"""Get ssl certificate subject information"""
return self._info_tag('RIB_INFO', 'GET_CERT_SUBJECT_INFO', 'CSR_CERT_SETTINGS')
def get_critical_temp_remain_off(self):