diff --git a/setup.py b/setup.py index 88eedea..054f95c 100644 --- a/setup.py +++ b/setup.py @@ -10,7 +10,7 @@ def readme(): setup( name=_pkg_name, - version='0.9.9', + version='0.9.10', description="SAML2 SPID Service Provider validation tool that can be run from the command line", long_description=readme(), long_description_content_type='text/markdown', @@ -33,7 +33,6 @@ def readme(): 'pysaml2>=6.5.1', 'xmlschema>=1.5.1', 'requests>=2.25.1', - 'pyXMLSecurity>=0.21', 'lxml>=4.6.2', 'Jinja2>=2.11.3', # 'sslyze>=4.0.4', # todo diff --git a/src/spid_sp_test/__init__.py b/src/spid_sp_test/__init__.py index 977fd6d..8bf167b 100644 --- a/src/spid_sp_test/__init__.py +++ b/src/spid_sp_test/__init__.py @@ -21,49 +21,74 @@ def __init__(self, *args, **kwargs): def report_to_dict(self): res = {self.category: {self.__class__.__name__: self.results}} - return res def is_ok(self, msg): if not self.error_counter: - self.handle_result("info", f"{msg}") + # self.handle_result( + # "info", + # msg, + # method = method or msg + # ) return True else: self.error_counter = 0 return False def handle_result( - self, level: str, title: str, description: str = "", traceback: str = None + self, + level: str, + title: str, + description: str = "", + traceback: str = None, + references: list = [], + method: str = "", + test_id: str = "", ): msg = f"{title}" - getattr(self.logger, level, "debug")(msg) + getattr(self.logger, level, "debug")(f"{method}: {msg}") value = f"{description}" if not traceback else f"{description}: {traceback }" + + data = { + "test_id": test_id, + "test": title, + "value": value.decode() if isinstance(value, bytes) else value, + "references": references, + "method": method, + } + if level not in ("error", "debug", "critical", "warning"): # here report as json - self.results.append( - { - "result": "success", - "test": title, - # "value": value - } - ) + data["result"] = "success" + self.results.append(data) elif level in ("error", "critical"): self.handle_error(title, description, traceback) elif level == "warning": - data = { - "result": "warning", - "test": title, - "value": value, - } + data["result"] = "warning" self.results.append(data) self.warnings.append(data) - def handle_error(self, error_message, description="", traceback: str = None): - getattr(self.logger, "error")(error_message) + def handle_error( + self, + error_message, + description="", + traceback: str = None, + references: list = [], + method: str = "", + test_id: str = "", + ): + self.logger.error(error_message) self.error_counter += 1 # here report as json value = f"{description}" if not traceback else f"{description}: {traceback }" - data = {"result": "failure", "test": error_message, "value": value} + data = { + "test_id": test_id, + "result": "failure", + "test": error_message, + "value": value.decode() if isinstance(value, bytes) else value, + "references": references, + "method": method, + } self.errors.append(data) self.results.append(data) @@ -74,127 +99,30 @@ def _assert( description="", traceback: str = None, level: str = "info", + **kwargs, ): - if not check and level == "info": - self.handle_error(error_message, description, traceback) - elif not check and level == "warning": - self.handle_result(level, f"{error_message}", description, traceback) + if not check and level != "warning": + self.handle_error(error_message, description, traceback, **kwargs) else: - level = "info" if level in ("warning",) else level - self.handle_result(level, f"{error_message}", description, traceback) - - def _assertTrue( - self, - check, - error_message, - description="", - traceback: str = None, - level: str = "info", - ): - self._assert(check, error_message, description, traceback, level) - - def _assertFalse( - self, - check, - error_message, - description="", - traceback: str = None, - level: str = "info", - ): - self._assert(not check, error_message, description, traceback, level) - - def _assertIsNotNone( - self, - check, - error_message, - description="", - traceback: str = None, - level: str = "info", - ): - self._assert(check, error_message, description, traceback, level) - - def _assertIn( - self, - first, - second, - error_message, - description="", - traceback: str = None, - level: str = "info", - ): - self._assert((first in second), error_message, description, traceback, level) - - def _assertGreaterEqual( - self, - first, - second, - error_message, - description="", - traceback: str = None, - level: str = "info", - ): - self._assert((first >= second), error_message, description, traceback, level) + # level = "info" if level in ("warning",) else level + self.handle_result(level, error_message, description, traceback, **kwargs) - def _assertGreater( - self, - first, - second, - error_message, - description="", - traceback: str = None, - level: str = "info", - ): - self._assert((first > second), error_message, description, traceback, level) + def _assertTrue(self, *args, **kwargs): + self._assert(*args, **kwargs) - def _assertEqual( - self, - first, - second, - error_message, - description="", - traceback: str = None, - level: str = "info", - ): - self._assert((first == second), error_message, description, traceback, level) + def _assertFalse(self, check, *args, **kwargs): + self._assert(not check, *args, **kwargs) - def _assertIsValidHttpsUrl( - self, - check, - error_message, - description="", - traceback: str = None, - level: str = "info", - ): - self._assert( - re.match("https://", check if check else ""), description, traceback, level - ) + def _assertIsValidHttpsUrl(self, check, *args, **kwargs): + self._assert(re.match("https://", check if check else ""), *args, **kwargs) - def _assertHttpUrlWithoutPort( - self, - check, - error_message, - description="", - traceback: str = None, - level: str = "info", - ): + def _assertHttpUrlWithoutPort(self, check, *args, **kwargs): self._assert( - re.match(HTTP_NO_PORT_REGEX, check if check else ""), - description, - traceback, - level, + re.match(HTTP_NO_PORT_REGEX, check if check else ""), *args, **kwargs ) - def _assertIsValidHttpUrl( - self, - check, - error_message, - description="", - traceback: str = None, - level: str = "info", - ): - self._assert( - re.match("https?://", check if check else ""), description, traceback, level - ) + def _assertIsValidHttpUrl(self, check, *args, **kwargs): + self._assert(re.match("https?://", check if check else ""), *args, **kwargs) # maybe useful .. one day ?! # idp_server = self.idp() diff --git a/src/spid_sp_test/authn_request.py b/src/spid_sp_test/authn_request.py index 7484585..1d68667 100644 --- a/src/spid_sp_test/authn_request.py +++ b/src/spid_sp_test/authn_request.py @@ -23,8 +23,6 @@ from spid_sp_test import BASE_DIR, AbstractSpidCheck from saml2.server import Server -from saml2.sigver import CryptoBackendXMLSecurity - # from saml2.sigver import CryptoBackendXmlSec1 sys.path.append(os.path.join(os.path.dirname(__file__), os.pardir)) from tempfile import NamedTemporaryFile @@ -208,10 +206,13 @@ def __init__( request_content_type=request_content_type, ) except binascii.Error as exp: - _msg = "[2.0.0] Base64 decode of AuthnRequest MUST be correct" + _msg = "Base64 decode of AuthnRequest MUST be correct" logger.critical(_msg + f": {exp}") - self._assertTrue(False, _msg, description=exp) - self.is_ok(f"{self.__class__.__name__}.test_xmldsig-pre") + _method = f"{self.__class__.__name__}.test_xmldsig-pre" + self._assertTrue( + False, _msg, test_id=["2.0.0"], description=exp, method=_method + ) + self.is_ok(_method) raise exp try: @@ -255,7 +256,12 @@ def idp(self): def test_xsd(self): """Test if the XSD validates and if the signature is valid""" - + _method = f"{self.__class__.__name__}.test_xsd" + _data = dict( + test_id="", + references=[], + method=_method, + ) _orig_pos = os.getcwd() os.chdir(self.xsds_files_path) authn_request = self.authn_request_decoded.decode() @@ -267,13 +273,13 @@ def test_xsd(self): schema.validate(authn_request) self.handle_result("error", " ".join((msg, "-> FAILED!"))) raise Exception("Validation Error") - logger.info(" ".join((msg, "-> OK"))) + logger.info(" ".join((msg))) except Exception as e: os.chdir(_orig_pos) - self.handle_result("error", "-> ".join((msg, f"{e}"))) + self.handle_result("error", msg, description=e, **_data) os.chdir(_orig_pos) - return self.is_ok(f"{self.__class__.__name__}.test_xsd") + return self.is_ok(_method) def test_xmldsig(self): certs = self.md.xpath( @@ -282,8 +288,10 @@ def test_xmldsig(self): ) desc = certs - error_kwargs = dict(description=desc) if desc else {} - + _method = f"{self.__class__.__name__}.test_xmldsig" + _data = dict( + test_id="", references=[], method=_method, description="".join(desc)[:128] + ) msg = ( "The AuthnRequest MUST validate against XSD " "and MUST have a valid signature" @@ -297,142 +305,176 @@ def test_xmldsig(self): msg, "AuthnRequest Signature validation failed: certificates are missing.", ), - **error_kwargs, + **_data, ), ) - return self.is_ok(f"{self.__class__.__name__}.test_xsd_and_xmldsig") + return self.is_ok(_method) else: is_valid = False for cert in certs: - if self.IS_HTTP_REDIRECT: - with NamedTemporaryFile(suffix=".xml") as cert_file: - if cert[-1] != "\n": - cert += "\n" - cert_file.write( - f"-----BEGIN CERTIFICATE-----\n{cert}-----END CERTIFICATE-----".encode() - ) - cert_file.seek(0) - _sigalg = self.authn_request.get("SigAlg", "") - quoted_req = urllib.parse.quote_plus( - self.authn_request["SAMLRequest"] - ) - quoted_rs = urllib.parse.quote_plus( - self.authn_request.get("RelayState") or "" - ) - quoted_sigalg = urllib.parse.quote_plus(_sigalg) - authn_req = ( - f"SAMLRequest={quoted_req}&" - f"RelayState={quoted_rs}&" - f"SigAlg={quoted_sigalg}" - ) + cert_file = NamedTemporaryFile(suffix=".pem") + if cert[-1] != "\n": + cert += "\n" + cert_file.write( + f"-----BEGIN CERTIFICATE-----\n{cert}-----END CERTIFICATE-----".encode() + ) + cert_file.seek(0) - payload_file = NamedTemporaryFile(suffix=".xml") - payload_file.write(authn_req.encode()) - payload_file.seek(0) + pubkey_file = NamedTemporaryFile(suffix=".crt") + x509_cert = subprocess.getoutput( + f"openssl x509 -in {cert_file.name} -noout -pubkey" + ) + pubkey_file.write(x509_cert.encode()) + pubkey_file.seek(0) - signature_file = NamedTemporaryFile(suffix=".sign") - signature_file.write( - base64.b64decode(self.authn_request["Signature"].encode()) - ) - signature_file.seek(0) + if self.IS_HTTP_REDIRECT: + _sigalg = self.authn_request.get("SigAlg", "") + quoted_req = urllib.parse.quote_plus( + self.authn_request["SAMLRequest"] + ) + quoted_rs = urllib.parse.quote_plus( + self.authn_request.get("RelayState") or "" + ) + quoted_sigalg = urllib.parse.quote_plus(_sigalg) + authn_req = ( + f"SAMLRequest={quoted_req}&" + f"RelayState={quoted_rs}&" + f"SigAlg={quoted_sigalg}" + ) - pubkey_file = NamedTemporaryFile(suffix=".crt") - x509_cert = subprocess.getoutput( - f"openssl x509 -in {cert_file.name} -noout -pubkey" - ) - pubkey_file.write(x509_cert.encode()) - pubkey_file.seek(0) + payload_file = NamedTemporaryFile(suffix=".xml") + payload_file.write(authn_req.encode()) + payload_file.seek(0) - dgst = _sigalg.split("-")[-1] - signature = signature_file.name + signature_file = NamedTemporaryFile(suffix=".sign") + signature_file.write( + base64.b64decode(self.authn_request["Signature"].encode()) + ) + signature_file.seek(0) - ver_cmd = ( - f"openssl dgst -{dgst} " - f"-verify {pubkey_file.name} " - f"-signature {signature} {payload_file.name}" - ) - exit_msg = subprocess.getoutput(ver_cmd) - error_kwargs["description"] = exit_msg - if "Verified OK" in exit_msg: - is_valid = True - else: - is_valid = False + dgst = _sigalg.split("-")[-1] + signature = signature_file.name + + ver_cmd = ( + f"openssl dgst -{dgst} " + f"-verify {pubkey_file.name} " + f"-signature {signature} {payload_file.name}" + ) + exit_msg = subprocess.getoutput(ver_cmd) + _data["description"] = exit_msg + if "Verified OK" in exit_msg: + is_valid = True + else: + is_valid = False else: - # pyXMLSecurity allows to pass a certificate without store it on a file - backend = CryptoBackendXMLSecurity() - is_valid = backend.validate_signature( - self.authn_request_decoded, - cert_file=cert, - cert_type="pem", - node_name=constants.NODE_NAME, - node_id=None, + tmp_file = NamedTemporaryFile(suffix=".xml") + tmp_file.write(self.authn_request_decoded) + tmp_file.seek(0) + cmd = ( + 'xmlsec1 --verify --insecure --id-attr:ID ' + '"urn:oasis:names:tc:SAML:2.0:protocol:AuthnRequest" ' + # f'--pubkey-cert-pem {cert_file.name} ' + f'--pubkey-pem {pubkey_file.name} ' + f"{tmp_file.name} " ) - if is_valid: - break - self._assertTrue( - is_valid, "AuthnRequest Signature validation failed", **error_kwargs - ) - return self.is_ok(f"{self.__class__.__name__}.test_xmldsig") + try: + out = subprocess.run( + cmd, + shell=True, + check=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + if out.returncode == 0: + is_valid = True + break + except subprocess.CalledProcessError as err: + lines = [msg] + if err.stderr: + stderr = "stderr: " + "\nstderr: ".join( + list(filter(None, err.stderr.decode("utf-8").split(r"\n"))) + ) + lines.append(stderr) + if err.stdout: + stdout = "stdout: " + "\nstdout: ".join( + list(filter(None, err.stdout.decode("utf-8").split(r"\n"))) + ) + lines.append(stdout) + _msg = "\n".join(lines) + self.handle_result( + "error", _msg, description="Description", traceback=_msg, **_data + ) + return + self._assertTrue(is_valid, "AuthnRequest Signature validation failed", **_data) + return self.is_ok(_method) def test_AuthnRequest(self): """Test the compliance of AuthnRequest element""" req = self.doc.xpath("/AuthnRequest") - - req_desc = [etree.tostring(ent).decode() for ent in req if req] - error_kwargs = dict(description=req_desc) if req_desc else {} + _method = f"{self.__class__.__name__}.test_AuthnRequest" + _data = dict( + test_id="", + references=["TR pag. 8"], + method=_method, + ) self._assertTrue( - (len(req) == 1), "One AuthnRequest element MUST be present", **error_kwargs + (len(req) == 1), "One AuthnRequest element MUST be present", **_data ) if req: req = req[0] else: - return self.is_ok(f"{self.__class__.__name__}.test_AuthnRequest") + return self.is_ok(_method) for attr in ("ID", "Version", "IssueInstant", "Destination"): self._assertTrue( (attr in req.attrib), - f"The {attr} attribute MUST be present - TR pag. 8 ", - **error_kwargs, + f"The {attr} attribute MUST be present", + description=attr, + **_data, ) value = req.get(attr) if attr == "ID": - self._assertIsNotNone( + self._assertTrue( value, - f"The {attr} attribute MUST have a value - TR pag. 8 ", - **error_kwargs, + f"The {attr} attribute MUST have a value", + description=value, + **_data, ) if attr == "Version": exp = "2.0" - self._assertEqual( - value, - exp, - f"The {attr} attribute MUST be {exp} - TR pag. 8 ", - **error_kwargs, + self._assertTrue( + value == exp, + f"The {attr} attribute MUST be {exp}", + description=value, + **_data, ) if attr == "IssueInstant": - self._assertIsNotNone( + self._assertTrue( value, - f"The {attr} attribute MUST have a value - TR pag. 8 ", - **error_kwargs, + f"The {attr} attribute MUST have a value", + description=value, + **_data, ) self._assertTrue( bool(constants.UTC_STRING.search(value)), - f"The {attr} attribute MUST be a valid UTC string - TR pag. 8 ", - **error_kwargs, + f"The {attr} attribute MUST be a valid UTC string", + description=value, + **_data, ) if attr == "Destination": - self._assertIsNotNone( + self._assertTrue( value, - f"The {attr} attribute MUST have a value - TR pag. 8 ", - **error_kwargs, + f"The {attr} attribute MUST have a value", + description=value, + **_data, ) allowed_destinations = [ @@ -448,32 +490,39 @@ def test_AuthnRequest(self): "The Destination attribute SHOULD be the address to " "which the request has been sent but can also be the EnityID of IdP (Av. SPID n.11)", description=value, + **_data, ) if self.production: self._assertIsValidHttpsUrl( value, - f"The {attr} attribute MUST be a valid HTTPS url - TR pag. 8 ", - **error_kwargs, + f"The {attr} attribute MUST be a valid HTTPS url", + description=value, + **_data, ) self._assertHttpUrlWithoutPort( value, 'The entityID attribute MUST not contains any custom tcp ports, eg: ":8000"', + description=value, + **_data, ) self._assertTrue( ("IsPassive" not in req.attrib), "The IsPassive attribute MUST not be present - TR pag. 9 ", - **error_kwargs, + **_data, ) - return self.is_ok(f"{self.__class__.__name__}.test_AuthnRequest") + return self.is_ok(_method) def test_AuthnRequest_SPID(self): """Test the compliance of AuthnRequest element""" req = self.doc.xpath("/AuthnRequest")[0] - - req_desc = [etree.tostring(ent).decode() for ent in req if req is not None] - error_kwargs = dict(description=req_desc) if req_desc else {} + _method = f"{self.__class__.__name__}.test_AuthnRequest_SPID" + _data = dict( + test_id="", + references=["TR pag. 8"], + method=_method, + ) acr = req.xpath("//RequestedAuthnContext/AuthnContextClassRef") acr_desc = [etree.tostring(_acr).decode() for _acr in acr] @@ -483,15 +532,17 @@ def test_AuthnRequest_SPID(self): if bool(constants.SPID_LEVEL_23.search(level)): self._assertTrue( ("ForceAuthn" in req.attrib), - "The ForceAuthn attribute MUST be present if SPID level > 1 - TR pag. 8 ", + "The ForceAuthn attribute MUST be present if SPID level > 1", description=acr_desc, + **_data, ) value = req.get("ForceAuthn") if value: self._assertTrue( (value.lower() in constants.BOOLEAN_TRUE), "The ForceAuthn attribute MUST be true or 1 - TR pag. 8 ", - **error_kwargs, + description=value, + **_data, ) attr = "AssertionConsumerServiceIndex" @@ -510,20 +561,22 @@ def test_AuthnRequest_SPID(self): self._assertTrue( value in availableassertionindexes, - f"The {attr} attribute MUST be equal to an AssertionConsumerService index - TR pag. 8 ", + f"The {attr} attribute MUST be equal to an AssertionConsumerService index", description=acss_desc, + **_data, ) - self._assertIsNotNone( + self._assertTrue( value, - f"The {attr} attribute MUST have a value- TR pag. 8 ", - **error_kwargs, + f"The {attr} attribute MUST have a value", + description=value, + **_data, ) - self._assertGreaterEqual( - int(value), - 0, - f"The {attr} attribute MUST be >= 0 - TR pag. 8 and pag. 20", - **error_kwargs, + self._assertTrue( + int(value) >= 0, + f"The {attr} attribute MUST be >= 0", + description=value, + **_data, ) else: @@ -536,41 +589,47 @@ def test_AuthnRequest_SPID(self): for attr in ["AssertionConsumerServiceURL", "ProtocolBinding"]: self._assertTrue( (attr in req.attrib), - f"The {attr} attribute MUST be present - TR pag. 8 ", - **error_kwargs, + f"The {attr} attribute MUST be present", + description=req.attrib, + **_data, ) value = req.get(attr) - self._assertIsNotNone( + self._assertTrue( value, - f"The {attr} attribute MUST have a value - TR pag. 8 ", - **error_kwargs, + f"The {attr} attribute MUST have a value", + description=value, + **_data, ) if attr == "AssertionConsumerServiceURL": if self.production: self._assertIsValidHttpsUrl( value, - f"The {attr} attribute MUST be a valid HTTPS url - TR pag. 8 and pag. 16", - **error_kwargs, + f"The {attr} attribute MUST be a valid HTTPS url", + description=value, + **_data, ) self._assertHttpUrlWithoutPort( value, 'The entityID attribute MUST not contains any custom tcp ports, eg: ":8000"', + description=value, + **_data, ) self._assertTrue( value in availableassertionlocations, - f"The {attr} attribute MUST be equal to an AssertionConsumerService Location - TR pag. 8 ", - **error_kwargs, + f"The {attr} attribute MUST be equal to an AssertionConsumerService Location", + description=value, + **_data, ) if attr == "ProtocolBinding": exp = "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST" - self._assertEqual( - value, - exp, - f"The {attr} attribute MUST be {exp} - TR pag. 8 ", - **error_kwargs, + self._assertTrue( + value == exp, + f"The {attr} attribute MUST be {exp}", + description=value, + **_data, ) attr = "AttributeConsumingServiceIndex" @@ -578,31 +637,32 @@ def test_AuthnRequest_SPID(self): availableattributeindexes = [] acss = self.md.xpath( - "//EntityDescriptor/SPSSODescriptor" "/AttributeConsumingService" + "//EntityDescriptor/SPSSODescriptor/AttributeConsumingService" ) for acs in acss: index = acs.get("index") availableattributeindexes.append(index) value = req.get(attr) - self._assertIsNotNone( + self._assertTrue( value, - f"The {attr} attribute MUST have a value - TR pag. 8", - **error_kwargs, + f"The {attr} attribute MUST have a value", + description=value, + **_data, ) - self._assertGreaterEqual( - int(value), - 0, - f"The {attr} attribute MUST be >= 0 - TR pag. 8 and pag. 20", - **error_kwargs, + self._assertTrue( + int(value) >= 0, + f"The {attr} attribute MUST be >= 0", + description=value, + **_data, ) self._assertTrue( value in availableattributeindexes, - f"The {attr} attribute MUST be equal to an AttributeConsumingService index - TR pag. 8 ", - **error_kwargs, + f"The {attr} attribute MUST be equal to an AttributeConsumingService index", + description=value, + **_data, ) - return self.is_ok(f"{self.__class__.__name__}.test_AuthnRequest_SPID") - + return self.is_ok(_method) def test_Subject(self): """Test the compliance of Subject element""" @@ -610,106 +670,120 @@ def test_Subject(self): subj = self.doc.xpath("//AuthnRequest/Subject") desc = [etree.tostring(ent).decode() for ent in subj if subj] - error_kwargs = dict(description=desc) if desc else {} + _method = f"{self.__class__.__name__}.test_Subject" + _data = dict( + test_id="", + references=["TR pag. 9"], + method=_method, + description="".join(desc)[:128], + ) if len(subj) > 1: - self._assertEqual( - len(subj), - 1, - "Only one Subject element can be present - TR pag. 9", - **error_kwargs, + self._assertTrue( + len(subj) == 1, + "Only one Subject element can be present", + description=subj, + **_data, ) if len(subj) == 1: subj = subj[0] name_id = subj.xpath("./NameID") - self._assertEqual( - len(name_id), - 1, - "One NameID element in Subject element MUST be present - TR pag. 9", - **error_kwargs, + self._assertTrue( + len(name_id) == 1, + "One NameID element in Subject element MUST be present", + description=name_id, + **_data, ) name_id = name_id[0] for attr in ["Format", "NameQualifier"]: self._assertTrue( (attr in name_id.attrib), - f"The {attr} attribute MUST be present - TR pag. 9", - **error_kwargs, + f"The {attr} attribute MUST be present", + description=name_id.attrib, + **_data, ) value = name_id.get(attr) - - self._assertIsNotNone( + self._assertTrue( value, - f"The {attr} attribute MUST have a value - TR pag. 9", - **error_kwargs, + f"The {attr} attribute MUST have a value", + description=value, + **_data, ) if attr == "Format": exp = "urn:oasis:names:tc:SAML:1.1:nameid-format:unspecified" - self._assertEqual( - value, - exp, - f"The {attr} attribute MUST be {exp} - TR pag. 9", - **error_kwargs, + self._assertTrue( + value == exp, + f"The {attr} attribute MUST be {exp}", + description=value, + **_data, ) - return self.is_ok(f"{self.__class__.__name__}.test_Subject") + return self.is_ok(_method) def test_Issuer(self): """Test the compliance of Issuer element""" e = self.doc.xpath("//AuthnRequest/Issuer") desc = [etree.tostring(ent).decode() for ent in e if e] - error_kwargs = dict(description=desc) if desc else {} + _method = f"{self.__class__.__name__}.test_Issuer" + _data = dict( + test_id="", + references=["TR pag. 9"], + method=_method, + description="".join(desc)[:128], + ) self._assertTrue( (len(e) == 1), - "One Issuer element MUST be present - TR pag. 9", - error_kwargs, + "One Issuer element MUST be present", + **_data, ) if not e: - return self.is_ok(f"{self.__class__.__name__}.test_AuthnRequest") + return self.is_ok(_method) else: e = e[0] - self._assertIsNotNone( - e.text, "The Issuer element MUST have a value - TR pag. 9", **error_kwargs - ) + self._assertTrue(e.text, "The Issuer element MUST have a value", **_data) entitydescriptor = self.md.xpath("//EntityDescriptor") entityid = entitydescriptor[0].get("entityID") - self._assertEqual( - e.text, - entityid, - "The Issuer's value MUST be equal to entityID - TR pag. 9", - **error_kwargs, + _data.pop("description") + self._assertTrue( + e.text == entityid, + "The Issuer's value MUST be equal to entityID", + description=e.text, + **_data, ) for attr in ["Format", "NameQualifier"]: self._assertTrue( (attr in e.attrib), - f"The {attr} attribute MUST be present - TR pag. 9", - **error_kwargs, + f"The {attr} attribute MUST be present", + description=e.attrib, + **_data, ) value = e.get(attr) - self._assertIsNotNone( + self._assertTrue( value, - f"The {attr} attribute MUST have a value - TR pag. 9", - **error_kwargs, + f"The {attr} attribute MUST have a value", + description=value, + **_data, ) if attr == "Format": exp = "urn:oasis:names:tc:SAML:2.0:nameid-format:entity" - self._assertEqual( - value, - exp, - f"The {attr} attribute MUST be {exp} - TR pag. 9", - **error_kwargs, + self._assertTrue( + value == exp, + f"The {attr} attribute MUST be {exp}", + description=value, + **_data, ) - return self.is_ok(f"{self.__class__.__name__}.test_Issuer") + return self.is_ok(_method) def test_NameIDPolicy(self): """Test the compliance of NameIDPolicy element""" @@ -717,61 +791,79 @@ def test_NameIDPolicy(self): e = self.doc.xpath("//AuthnRequest/NameIDPolicy") desc = [etree.tostring(ent).decode() for ent in e if e] - error_kwargs = dict(description=desc) if desc else {} + _method = f"{self.__class__.__name__}.test_NameIDPolicy" + _data = dict( + test_id="", + references=["TR pag. 9"], + method=_method, + description="".join(desc)[:128], + ) self._assertTrue( (len(e) == 1), - "One NameIDPolicy element MUST be present - TR pag. 9", - **error_kwargs, + "One NameIDPolicy element MUST be present", + **_data, ) if not e: - return self.is_ok(f"{self.__class__.__name__}.test_AuthnRequest") + return self.is_ok(_method) else: e = e[0] - self._assertTrue( - ("AllowCreate" not in e.attrib), - "The AllowCreate attribute MUST not be present - AV n.5 ", - **error_kwargs, - ) - + _data.pop("description") attr = "Format" self._assertTrue( (attr in e.attrib), - f"The {attr} attribute MUST be present - TR pag. 9", - **error_kwargs, + f"The {attr} attribute MUST be present", + description=e.attrib, + **_data, ) value = e.get(attr) - - self._assertIsNotNone( - value, f"The {attr} attribute MUST have a value - TR pag. 9", **error_kwargs + self._assertTrue( + value, + f"The {attr} attribute MUST have a value", + description=value, + **_data, ) if attr == "Format": exp = "urn:oasis:names:tc:SAML:2.0:nameid-format:transient" - self._assertEqual( - value, - exp, - f"The {attr} attribute MUST be {exp} - TR pag. 9", - **error_kwargs, + self._assertTrue( + value == exp, + f"The {attr} attribute MUST be {exp}", + description=value, + **_data, ) - return self.is_ok(f"{self.__class__.__name__}.test_NameIDPolicy") + + _data["references"] = ["AV n.5"] + self._assertTrue( + ("AllowCreate" not in e.attrib), + "The AllowCreate attribute MUST not be present", + description=value, + **_data, + ) + return self.is_ok(_method) def test_Conditions(self): """Test the compliance of Conditions element""" e = self.doc.xpath("//AuthnRequest/Conditions") desc = [etree.tostring(ent).decode() for ent in e if e] - error_kwargs = dict(description=desc) if desc else {} + _method = f"{self.__class__.__name__}.test_Conditions" + _data = dict( + test_id="", + references=["TR pag. 9"], + method=_method, + description="".join(desc)[:128], + ) if len(e) > 1: - self._assertEqual( - len(1), - 1, - "Only one Conditions element is allowed - TR pag. 9", - **error_kwargs, + self._assertTrue( + len(1) == 1, + "Only one Conditions element is allowed", + description=e, + **_data, ) if len(e) == 1: @@ -779,170 +871,186 @@ def test_Conditions(self): for attr in ["NotBefore", "NotOnOrAfter"]: self._assertTrue( (attr in e.attrib), - f"The {attr} attribute MUST be present - TR pag. 9", - **error_kwargs, + f"The {attr} attribute MUST be present", + description=e, + **_data, ) value = e.get(attr) - - self._assertIsNotNone( + self._assertTrue( value, - f"The {attr} attribute MUST have a value - TR pag. 9", - **error_kwargs, + f"The {attr} attribute MUST have a value", + description=value, + **_data, ) self._assertTrue( bool(constants.regex.UTC_STRING.search(value)), - f"The {attr} attribute MUST have avalid UTC string - TR pag. 9", - **error_kwargs, + f"The {attr} attribute MUST have avalid UTC string", + description=value, + **_data, ) - return self.is_ok(f"{self.__class__.__name__}.test_Conditions") + return self.is_ok(_method) def test_RequestedAuthnContext(self): """Test the compliance of RequestedAuthnContext element""" e = self.doc.xpath("//AuthnRequest/RequestedAuthnContext") + _method = f"{self.__class__.__name__}.test_RequestedAuthnContext" + _data = dict( + test_id="", + references=["TR pag. 9", "TR pag. 10"], + method=_method, + ) - desc = [etree.tostring(ent).decode() for ent in e if e] - error_kwargs = dict(description=desc) if desc else {} - - self._assertEqual( - len(e), - 1, - "Only one RequestedAuthnContext element MUST be present - TR pag. 9", - **error_kwargs, + self._assertTrue( + len(e) == 1, + "Only one RequestedAuthnContext element MUST be present", + description=[etree.tostring(_val).decode() for _val in e], + **_data, ) if e: e = e[0] - attr = "Comparison" self._assertTrue( (attr in e.attrib), - f"The {attr} attribute MUST be present - TR pag. 10", - **error_kwargs, + f"The {attr} attribute MUST be present", + description=e.attrib, + **_data, ) value = e.get(attr) - self._assertIsNotNone( + self._assertTrue( value, - f"The {attr} attribute MUST have a value - TR pag. 10", - **error_kwargs, + f"The {attr} attribute MUST have a value", + description=value, + **_data, ) allowed = ["exact", "minimum", "better", "maximum"] - self._assertIn( - value, - allowed, - "Attribute not valid - TR pag. 10", + self._assertTrue( + value in allowed, + "Attribute not valid", description=f"The {attr} attribute MUST be one of [{', '.join(allowed)}]", + **_data, ) acr = e.xpath("./AuthnContextClassRef") - self._assertEqual( - len(acr), - 1, - "Only one AuthnContexClassRef element MUST be present - TR pag. 9", + self._assertTrue( + len(acr) == 1, + "Only one AuthnContexClassRef element MUST be present", description=[etree.tostring(_acr).decode() for _acr in acr], + **_data, ) if acr: acr = acr[0] - self._assertIsNotNone( + self._assertTrue( acr.text, - "The AuthnContexClassRef element MUST have a value - TR pag. 9", + "The AuthnContexClassRef element MUST have a value", description=etree.tostring(acr), + **_data, ) self._assertTrue( bool(constants.SPID_LEVEL_ALL.search(acr.text)), - "The AuthnContextClassRef element MUST have a valid SPID level - TR pag. 9 and AV n.5", + "The AuthnContextClassRef element MUST have a valid SPID level", description=etree.tostring(acr), + **_data, ) - return self.is_ok(f"{self.__class__.__name__}.test_RequestedAuthnContext") + return self.is_ok(_method) def test_Signature(self): """Test the compliance of Signature element""" + _method = f"{self.__class__.__name__}.test_Signature" + _data = dict( + test_id="", + references=["TR pag. 10"], + method=_method, + ) if not self.IS_HTTP_REDIRECT: sign = self.doc.xpath("//AuthnRequest/Signature") - - desc = [etree.tostring(ent).decode() for ent in sign if sign] - error_kwargs = dict(description=desc) if desc else {} + # desc = [etree.tostring(ent).decode() for ent in sign if sign] self._assertTrue( (len(sign) == 1), - "The Signature element MUST be present - TR pag. 10", - **error_kwargs, + "The Signature element MUST be present", + **_data, ) method = sign[0].xpath("./SignedInfo/SignatureMethod") self._assertTrue( (len(method) == 1), - "The SignatureMethod element MUST be present- TR pag. 10", - **error_kwargs, + "The SignatureMethod element MUST be present", + **_data, ) self._assertTrue( ("Algorithm" in method[0].attrib), - "The Algorithm attribute MUST be present " - "in SignatureMethod element - TR pag. 10", - **error_kwargs, + "The Algorithm attribute MUST be present " "in SignatureMethod element", + **_data, ) alg = method[0].get("Algorithm") - self._assertIn( - alg, - constants.ALLOWED_XMLDSIG_ALGS, - "The signature algorithm MUST be valid - TR pag. 10", + self._assertTrue( + alg in constants.ALLOWED_XMLDSIG_ALGS, + "The signature algorithm MUST be valid", description=f"One of {', '.join(constants.ALLOWED_XMLDSIG_ALGS)}", + **_data, ) # noqa method = sign[0].xpath("./SignedInfo/Reference/DigestMethod") self._assertTrue( (len(method) == 1), "The DigestMethod element MUST be present", - **error_kwargs, + **_data, ) self._assertTrue( ("Algorithm" in method[0].attrib), - "The Algorithm attribute MUST be present " - "in DigestMethod element - TR pag. 10", - **error_kwargs, + "The Algorithm attribute MUST be present " "in DigestMethod element", + **_data, ) alg = method[0].get("Algorithm") - self._assertIn( - alg, - constants.ALLOWED_DGST_ALGS, + self._assertTrue( + alg in constants.ALLOWED_DGST_ALGS, ( - ("The digest algorithm MUST be one of [%s] - TR pag. 10") + ("The digest algorithm MUST be one of [%s]") % (", ".join(constants.ALLOWED_DGST_ALGS)) ), - **error_kwargs, + **_data, ) # save the grubbed certificate for future alanysis # cert = sign[0].xpath('./KeyInfo/X509Data/X509Certificate')[0] # dump_pem.dump_request_pem(cert, 'authn', 'signature', DATA_DIR) - return self.is_ok(f"{self.__class__.__name__}.test_Signature") + return self.is_ok(_method) def test_RelayState(self): """Test the compliance of RelayState parameter""" + _method = f"{self.__class__.__name__}.test_RelayState" + _data = dict( + test_id="", + references=["TR pag. 14", "TR pag. 15"], + method=_method, + ) if ("RelayState" in self.params) and self.params.get("RelayState"): relaystate = self.params["RelayState"] self._assertTrue( (relaystate.find("http") == -1), - "RelayState MUST not be immediately intelligible - TR pag. 14 or pag. 15", + "RelayState MUST not be immediately intelligible", description=relaystate, + **_data, ) else: self._assertTrue( False, - "RelayState is missing - TR pag. 14 or pag. 15", + "RelayState is missing", description="Missing RelayState", ) - return self.is_ok(f"{self.__class__.__name__}.test_RelayState") + return self.is_ok(_method) def test_profile_saml2core(self): self.test_xsd() diff --git a/src/spid_sp_test/authn_request_extra.py b/src/spid_sp_test/authn_request_extra.py index 3b442cd..f7469d8 100644 --- a/src/spid_sp_test/authn_request_extra.py +++ b/src/spid_sp_test/authn_request_extra.py @@ -1,10 +1,9 @@ import re -from . authn_request import SpidSpAuthnReqCheck +from .authn_request import SpidSpAuthnReqCheck class SpidSpAuthnReqCheckExtra(SpidSpAuthnReqCheck): - def __init__(self, *args, **kwargs): super(SpidSpAuthnReqCheckExtra, self).__init__(*args, **kwargs) @@ -15,6 +14,12 @@ def test_AuthnRequest_SPID_extra(self): # ForceAuthn MUST be true if 'Comparison' is 'minimum' and # SPID level is L1 + _method = f"{self.__class__.__name__}.test_AuthnRequest_SPID_extra" + _data = dict( + test_id="", + references=[], + method=_method, + ) req = self.doc.xpath("/AuthnRequest") rac = None @@ -38,22 +43,25 @@ def test_AuthnRequest_SPID_extra(self): "The ForceAuthn attribute MUST be present " "because of minimum/SpidL2", description=req.attrib, + **_data, ) self._assertTrue( req.get("ForceAuthn", "").lower() in ("true", 1, "1"), "The ForceAuthn attribute MUST be True " "because of minimum/SpidL2", description=req.attrib, + **_data, ) else: self.handle_error( - "AuthnRequest or RequestAuthnContext or AuthnContextClassRef missing" + "AuthnRequest or RequestAuthnContext or AuthnContextClassRef missing", + **_data, ) - return self.is_ok(f"{self.__class__.__name__}.test_AuthnRequest_extra") - + return self.is_ok(_method) def test_authnrequest_no_newlines(self): + _method = f"{self.__class__.__name__}.test_authnrequest_no_newlines" self._assertFalse( re.match(r"^[\t\n\s\r\ ]*", self.authn_request_decoded), ( @@ -62,9 +70,9 @@ def test_authnrequest_no_newlines(self): ), description=self.metadata[0:10], level="warning", + method=_method, ) - return self.is_ok(f"{self.__class__.__name__}.test_authnrequest_no_newlines") - + return self.is_ok(_method) def test_profile_spid_sp(self): super().test_profile_spid_sp() diff --git a/src/spid_sp_test/constants.py b/src/spid_sp_test/constants.py index c13f1c3..e564543 100644 --- a/src/spid_sp_test/constants.py +++ b/src/spid_sp_test/constants.py @@ -418,6 +418,4 @@ "ZW", ] -XML_NAMESPACES = { - 'spid':'https://spid.gov.it/saml-extensions' -} +XML_NAMESPACES = {"spid": "https://spid.gov.it/saml-extensions"} diff --git a/src/spid_sp_test/metadata.py b/src/spid_sp_test/metadata.py index b0699a3..25d3b44 100644 --- a/src/spid_sp_test/metadata.py +++ b/src/spid_sp_test/metadata.py @@ -65,8 +65,8 @@ def get(self, metadata_url: str): return request.content def xsd_check(self, xsds_files: list = ["saml-schema-metadata-2.0.xsd"]): - _msg = "Found metadata" - self.handle_result("debug", _msg, description=self.metadata.decode()) + _method = f"{self.__class__.__name__}.xsd_check" + logger.debug(self.metadata.decode()) _orig_pos = os.getcwd() os.chdir(self.xsds_files_path) metadata = self.metadata.decode() @@ -77,110 +77,136 @@ def xsd_check(self, xsds_files: list = ["saml-schema-metadata-2.0.xsd"]): schema = xmlschema.XMLSchema(schema_file) if not schema.is_valid(metadata): schema.validate(metadata) - self.handle_result("error", " ".join((msg))) + self.handle_result( + level="error", + title=msg, + description=msg, + references="", + method=_method, + ) # raise Exception('Validation Error') - logger.info(" ".join((msg, "-> OK"))) break except Exception as e: os.chdir(_orig_pos) logger.error(f"{msg}: {e}") self.handle_result( - "error", msg, description="xsd test failed", traceback=f"{e}" + "error", + msg, + description="xsd test failed", + traceback=f"{e}", + method=_method, ) os.chdir(_orig_pos) - return self.is_ok(f"{self.__class__.__name__}.xsd_check") + if not self.errors: + self._assertTrue(True, _method, description=msg, method=_method) + return self.is_ok(_method) def test_EntityDescriptor(self): entity_desc = self.doc.xpath("//EntityDescriptor") + desc = [ + etree.tostring(ent).decode()[:128] for ent in entity_desc if entity_desc + ] + _method = f"{self.__class__.__name__}.test_EntityDescriptor" + _data = dict(test_id="", references=["TR pag. 19"], method=_method) self._assertTrue( - self.doc.attrib.get("entityID"), - ( - f"Missing entityID in {self.doc.attrib}: " - "The entityID attribute MUST be present - TR pag. 19" - ), - description=self.doc.attrib.get("entityID"), + len(entity_desc) == 1, + "Only one EntityDescriptor element MUST be present", + description=desc, + **_data, ) self._assertTrue( - len(entity_desc) == 1, - "Only one EntityDescriptor element MUST be present - TR pag. 19", - description=self.doc.attrib.get("entityID"), + self.doc.attrib.get("entityID"), + "The entityID attribute MUST be present", + description=self.doc.attrib, + **_data, ) self._assertTrue( entity_desc[0].get("entityID"), - "The entityID attribute MUST have a value - TR pag. 19", + "The entityID attribute MUST have a value", description=entity_desc[0].get("entityID"), + **_data, ) if self.production: self._assertIsValidHttpsUrl( self.doc.attrib.get("entityID"), "The entityID attribute MUST be a valid HTTPS url", + **_data, ) self._assertHttpUrlWithoutPort( self.doc.attrib.get("entityID"), - 'The entityID attribute MUST not contain any custom tcp ports, eg: ":8000"', + 'The entityID attribute MUST not contains any custom tcp ports, eg: ":8000"', + **_data, ) - return self.is_ok(f"{self.__class__.__name__}.test_EntityDescriptor") + return self.is_ok(_method) def test_SPSSODescriptor(self): spsso = self.doc.xpath("//EntityDescriptor/SPSSODescriptor") - desc = [etree.tostring(ent).decode() for ent in spsso if spsso] - error_kwargs = dict(description=desc) if desc else {} + desc = [etree.tostring(ent).decode()[:128] for ent in spsso if spsso] + + _method = f"{self.__class__.__name__}.test_SPSSODescriptor" + _data = dict(test_id="", references=[""], method=_method, description=desc) self._assertTrue( (len(spsso) == 1), "Only one SPSSODescriptor element MUST be present", - **error_kwargs, + **_data, ) - return self.is_ok(f"{self.__class__.__name__}.test_SPSSODescriptor") + return self.is_ok(_method) def test_SPSSODescriptor_SPID(self): spsso = self.doc.xpath("//EntityDescriptor/SPSSODescriptor") - desc = [etree.tostring(ent).decode() for ent in spsso if spsso] - error_kwargs = dict(description=desc) if desc else {} + desc = [etree.tostring(ent).decode()[:128] for ent in spsso if spsso] + _method = f"{self.__class__.__name__}.test_SPSSODescriptor_SPID" + _data = dict( + test_id="", references=["TR pag. 20"], method=_method, description=desc + ) for attr in ["protocolSupportEnumeration", "AuthnRequestsSigned"]: self._assertTrue( (attr in spsso[0].attrib), - f"The {attr} attribute MUST be present - TR pag. 20", - **error_kwargs, + f"The {attr} attribute MUST be present", + **_data, ) a = spsso[0].get(attr) - self._assertIsNotNone( + self._assertTrue( a, - f"The {attr} attribute MUST have a value - TR pag. 20", - **error_kwargs, + f"The {attr} attribute MUST have a value", + **_data, ) if attr == "AuthnRequestsSigned" and a: - self._assertEqual( - a.lower(), - "true", - f"The {attr} attribute MUST be true - TR pag. 20", - **error_kwargs, + self._assertTrue( + a.lower() == "true", + f"The {attr} attribute MUST be true", + **_data, ) - return self.is_ok(f"{self.__class__.__name__}.test_SPSSODescriptor_SPID") + return self.is_ok(_method) def test_NameIDFormat_Transient(self): spsso = self.doc.xpath("//EntityDescriptor/SPSSODescriptor/NameIDFormat") desc = [etree.tostring(ent).decode() for ent in spsso if spsso] - error_kwargs = dict(description=desc) if desc else {} + + _method = f"{self.__class__.__name__}.test_NameIDFormat_Transient" + _data = dict( + test_id="", references=["TR pag. ..."], method=_method, description=desc + ) if spsso: _rule = "urn:oasis:names:tc:SAML:2.0:nameid-format:transient" self._assertTrue( (spsso[0].text == _rule), - f"The NameIDFormat MUST {_rule}", - **error_kwargs, + f"The NameIDFormat MUST be {_rule}", + **_data, ) - return self.is_ok(f"{self.__class__.__name__}.test_NameIDFormat_SPID") + return self.is_ok(_method) def test_xmldsig(self): """Verify the SP metadata signature""" @@ -192,12 +218,17 @@ def test_xmldsig(self): "--verify", "--insecure", "--id-attr:ID", - "urn:oasis:names:tc:SAML:2.0:metadata:" "EntityDescriptor", + "urn:oasis:names:tc:SAML:2.0:metadata:EntityDescriptor", tmp_file.name, ] cmd = " ".join(xmlsec_cmd) - is_valid = True - msg = "the metadata signature MUST be valid - TR pag. 19" + msg = "the metadata signature MUST be valid" + + _data = dict( + test_id="", + references=["TR pag. 19"], + method=f"{self.__class__.__name__}.test_xmldsig", + ) try: subprocess.run( @@ -208,7 +239,6 @@ def test_xmldsig(self): stderr=subprocess.PIPE, ) except subprocess.CalledProcessError as err: - is_valid = False lines = [msg] if err.stderr: stderr = "stderr: " + "\nstderr: ".join( @@ -221,122 +251,131 @@ def test_xmldsig(self): ) lines.append(stdout) _msg = "\n".join(lines) - self.handle_result("error", msg, description="Description", traceback=_msg) + self.handle_result( + "error", _msg, description="Description", traceback=_msg, **_data + ) return xmlsec_cmd_string = " ".join(xmlsec_cmd) - _msg = f"{self.__class__.__name__}.test_xmldsig: OK" - self.handle_result("info", _msg, description=f"`{xmlsec_cmd_string}`") - return is_valid + self.handle_result("info", msg, description=f"{xmlsec_cmd_string}", **_data) def test_Signature(self): """Test the compliance of Signature element""" + _method = f"{self.__class__.__name__}.test_Signature" sign = self.doc.xpath("//EntityDescriptor/Signature") desc = [etree.tostring(ent).decode() for ent in sign if sign] - error_kwargs = dict(description=desc) if desc else {} + + _data = dict( + test_id="", + description="".join(desc)[:128] or "", + references=["TR pag. 19"], + method=_method, + ) + self._assertTrue( (len(sign) > 0), - "The Signature element MUST be present - TR pag. 19", - **error_kwargs, + "The Signature element MUST be present", + **_data, ) - error_kwargs = dict(description=desc, traceback="") if not sign: self.handle_result( "error", - "The SignatureMethod element MUST be present - TR pag. 19", - **error_kwargs, + "The SignatureMethod element MUST be present", + **_data, ) self.handle_result( "error", - "The Algorithm attribute MUST be present in SignatureMethod element - TR pag. 19", - **error_kwargs, + "The Algorithm attribute MUST be present in SignatureMethod element", + **_data, ) self.handle_result( "error", - "The signature algorithm MUST be valid - TR pag. 19", + "The signature algorithm MUST be valid", description=f"Must be one of [{', '.join(constants.ALLOWED_XMLDSIG_ALGS)}]", + **_data, ) self.handle_result( "error", - "The Algorithm attribute MUST be present in DigestMethod element - TR pag. 19", - **error_kwargs, + "The Algorithm attribute MUST be present in DigestMethod element", + **_data, ) + + _data.pop("description") self.handle_result( "error", - "The digest algorithm MUST be valid - TR pag. 19", + "The digest algorithm MUST be valid", description=f"Must be one of [{', '.join(constants.ALLOWED_DGST_ALGS)}]", + **_data, ) else: method = sign[0].xpath("./SignedInfo/SignatureMethod") desc = [etree.tostring(ent).decode() for ent in method if method] - error_kwargs = dict(description=desc) if desc else {} + _data["description"] = "".join(desc)[:128] self._assertTrue( (len(method) > 0), - "The SignatureMethod element MUST be present - TR pag. 19", - **error_kwargs, + "The SignatureMethod element MUST be present", + **_data, ) self._assertTrue( ("Algorithm" in method[0].attrib), - "The Algorithm attribute MUST be present " - "in SignatureMethod element - TR pag. 19", - **error_kwargs, + "The Algorithm attribute MUST be present " "in SignatureMethod element", + **_data, ) + _data.pop("description") alg = method[0].get("Algorithm") - self._assertIn( - alg, - constants.ALLOWED_XMLDSIG_ALGS, - "The signature algorithm MUST be valid - TR pag. 19", + self._assertTrue( + alg in constants.ALLOWED_XMLDSIG_ALGS, + "The signature algorithm MUST be valid", description=f"One of {(', '.join(constants.ALLOWED_XMLDSIG_ALGS))}", + **_data, ) method = sign[0].xpath("./SignedInfo/Reference/DigestMethod") self._assertTrue( (len(method) == 1), - "The DigestMethod element MUST be present - TR pag. 19", - **error_kwargs, + "The DigestMethod element MUST be present", + **_data, ) self._assertTrue( ("Algorithm" in method[0].attrib), - "The Algorithm attribute MUST be present " - "in DigestMethod element - TR pag. 19", - **error_kwargs, + "The Algorithm attribute MUST be present in DigestMethod element", + **_data, ) alg = method[0].get("Algorithm") - self._assertIn( - alg, - constants.ALLOWED_DGST_ALGS, - "The digest algorithm MUST be valid - TR pag. 19", + self._assertTrue( + alg in constants.ALLOWED_DGST_ALGS, + "The digest algorithm MUST be valid", description=f"One of {(', '.join(constants.ALLOWED_DGST_ALGS))}", + **_data, ) - return self.is_ok(f"{self.__class__.__name__}.test_Signature") + return self.is_ok(_method) def test_KeyDescriptor(self): """Test the compliance of KeyDescriptor element(s)""" + _method = f"{self.__class__.__name__}.test_KeyDescriptor" kds = self.doc.xpath( "//EntityDescriptor/SPSSODescriptor" '/KeyDescriptor[@use="signing"]' ) - self._assertGreaterEqual( - len(kds), - 1, - "At least one signing KeyDescriptor MUST be present - TR pag. 19", + _data = dict(test_id="", references=["TR pag. 19"], method=_method) + self._assertTrue( + len(kds) >= 1, "At least one signing KeyDescriptor MUST be present", **_data ) desc = [etree.tostring(ent).decode() for ent in kds if kds] - error_kwargs = dict(description=desc, traceback="") for kd in kds: certs = kd.xpath("./KeyInfo/X509Data/X509Certificate") - self._assertGreaterEqual( - len(certs), - 1, - "At least one signing x509 MUST be present - TR pag. 19", - **error_kwargs, + self._assertTrue( + len(certs) >= 1, + "At least one signing x509 MUST be present", + description="".join(desc)[:128] or "", + **_data, ) kds = self.doc.xpath( @@ -345,133 +384,142 @@ def test_KeyDescriptor(self): for kd in kds: certs = kd.xpath("./KeyInfo/X509Data/X509Certificate") - self._assertGreaterEqual( - len(certs), - 1, - "At least one encryption x509 MUST be present - TR pag. 19", - **error_kwargs, + self._assertTrue( + len(certs) >= 1, + "At least one encryption x509 MUST be present", + **_data, ) - return self.is_ok(f"{self.__class__.__name__}.test_KeyDescriptor") + return self.is_ok(_method) def test_SingleLogoutService(self): """Test the compliance of SingleLogoutService element(s)""" slos = self.doc.xpath( "//EntityDescriptor/SPSSODescriptor" "/SingleLogoutService" ) - self._assertGreaterEqual( - len(slos), - 1, - "One or more SingleLogoutService elements MUST be present - AV n. 3", + _method = f"{self.__class__.__name__}.test_SingleLogoutService" + desc = [etree.tostring(ent).decode() for ent in slos if slos] + _data = dict( + test_id="", + references=["AV n. 3"], + method=_method, + description="".join(desc)[:128], ) - desc = [etree.tostring(ent).decode() for ent in slos if slos] - error_kwargs = dict(description=desc) + self._assertTrue( + len(slos) >= 1, + "One or more SingleLogoutService elements MUST be present", + **_data, + ) for slo in slos: for attr in ["Binding", "Location"]: self._assertTrue( (attr in slo.attrib), - f"The {attr} attribute in SingleLogoutService element MUST be present - AV n. 3", - **error_kwargs, + f"The {attr} attribute in SingleLogoutService element MUST be present", + **_data, ) _attr = slo.get(attr) - self._assertIsNotNone( + self._assertTrue( _attr, f"The {attr} attribute in SingleLogoutService element MUST have a value", - **error_kwargs, + **_data, ) if attr == "Binding": - self._assertIn( - _attr, - constants.ALLOWED_SINGLELOGOUT_BINDINGS, + self._assertTrue( + _attr in constants.ALLOWED_SINGLELOGOUT_BINDINGS, ( ( - "The %s attribute in SingleLogoutService element MUST be one of [%s] - AV n. 3" + "The %s attribute in SingleLogoutService element MUST be one of [%s]" ) % (attr, ", ".join(constants.ALLOWED_BINDINGS)) # noqa ), - **error_kwargs, # noqa + **_data, # noqa ) if attr == "Location" and self.production: self._assertIsValidHttpsUrl( _attr, f"The {attr} attribute " "in SingleLogoutService element " - "MUST be a valid HTTPS URL - AV n. 1 and n. 3", - **error_kwargs, + "MUST be a valid HTTPS URL", + **_data, ) self._assertHttpUrlWithoutPort( _attr, 'The entityID attribute MUST not contain any custom tcp ports, eg: ":8000"', + **_data, ) elif attr == "Location": self._assertIsValidHttpUrl( _attr, f"The {attr} attribute " "in SingleLogoutService element " - "MUST be a valid HTTP URL - AV n. 1 and n. 3", - **error_kwargs, + "MUST be a valid HTTP URL", + **_data, ) - return self.is_ok(f"{self.__class__.__name__}.test_SingleLogoutService") + return self.is_ok(_method) def test_AssertionConsumerService(self): """Test the compliance of AssertionConsumerService element(s)""" acss = self.doc.xpath( "//EntityDescriptor/SPSSODescriptor" "/AssertionConsumerService" ) - + _method = f"{self.__class__.__name__}.test_AssertionConsumerService" desc = [etree.tostring(ent).decode() for ent in acss if acss] - error_kwargs = dict(description=desc) if desc else {} + _data = dict( + test_id="", + references=["TR pag. 20"], + method=_method, + description="".join(desc)[:128], + ) - self._assertGreaterEqual( - len(acss), - 1, - "At least one AssertionConsumerService " "MUST be present - TR pag. 20", + self._assertTrue( + len(acss) >= 1, + "At least one AssertionConsumerService MUST be present", + **_data, ) for acs in acss: for attr in ["index", "Binding", "Location"]: self._assertTrue( (attr in acs.attrib), - f"The {attr} attribute MUST be present - TR pag. 20", + f"The {attr} attribute MUST be present", + **_data, ) _attr = acs.get(attr) if attr == "index": - self._assertGreaterEqual( - int(_attr), - 0, - f"The {attr} attribute MUST be >= 0 - TR pag. 20", - **error_kwargs, + self._assertTrue( + int(_attr) >= 0, + f"The {attr} attribute MUST be >= 0", + **_data, ) elif attr == "Binding": - self._assertIn( - _attr, - constants.ALLOWED_BINDINGS, + self._assertTrue( + _attr in constants.ALLOWED_BINDINGS, ( - ("The %s attribute MUST be one of [%s] - TR pag. 20") + ("The %s attribute MUST be one of [%s]") % (attr, ", ".join(constants.ALLOWED_BINDINGS)) ), - **error_kwargs, + **_data, ) elif attr == "Location" and self.production: self._assertIsValidHttpsUrl( _attr, - f"The {attr} attribute MUST be a " - "valid HTTPS url - TR pag. 20 and AV n. 1", - **error_kwargs, + f"The {attr} attribute MUST be a valid HTTPS url", + **_data, ) self._assertHttpUrlWithoutPort( _attr, 'The entityID attribute MUST not contain any custom tcp ports, eg: ":8000"', + **_data, ) else: pass - return self.is_ok(f"{self.__class__.__name__}.test_AssertionConsumerService") + return self.is_ok(_method) def test_AssertionConsumerService_SPID(self): acss = self.doc.xpath( @@ -479,13 +527,19 @@ def test_AssertionConsumerService_SPID(self): "/AssertionConsumerService" '[@isDefault="true"]' ) + _method = f"{self.__class__.__name__}.test_AssertionConsumerService_SPID" desc = [etree.tostring(ent).decode() for ent in acss if acss] - error_kwargs = dict(description=desc) if desc else {} + _data = dict( + test_id="", + references=["TR pag. 20"], + method=_method, + description="".join(desc)[:128], + ) self._assertTrue( (len(acss) == 1), - "Only one default AssertionConsumerService " "MUST be present - TR pag. 20", - **error_kwargs, + "Only one default AssertionConsumerService MUST be present", + **_data, ) acss = self.doc.xpath( @@ -496,111 +550,120 @@ def test_AssertionConsumerService_SPID(self): ) self._assertTrue( (len(acss) == 1), - "Must be present the default AssertionConsumerService " - "with index = 0 - TR pag. 20", - **error_kwargs, - ) - return self.is_ok( - f"{self.__class__.__name__}.test_AssertionConsumerService_SPID" + "Must be present the default AssertionConsumerService with index = 0", + **_data, ) + return self.is_ok(_method) def test_AttributeConsumingService(self): """Test the compliance of AttributeConsumingService element(s)""" acss = self.doc.xpath( "//EntityDescriptor/SPSSODescriptor" "/AttributeConsumingService" ) - + _method = f"{self.__class__.__name__}.test_AttributeConsumingService" desc = [etree.tostring(ent).decode() for ent in acss if acss] - error_kwargs = dict(description=desc) if desc else {} + _data = dict( + test_id="", + references=["TR pag. 20"], + method=_method, + description="".join(desc)[:128], + ) - self._assertGreaterEqual( - len(acss), - 1, - "One or more AttributeConsumingService elements MUST be present - TR pag. 20", - **error_kwargs, + self._assertTrue( + len(acss) >= 1, + "One or more AttributeConsumingService elements MUST be present", + **_data, ) - return self.is_ok(f"{self.__class__.__name__}.test_AttributeConsumingService") + return self.is_ok(_method) def test_AttributeConsumingService_SPID( self, allowed_attributes=constants.SPID_ATTRIBUTES ): acss = self.doc.xpath( - "//EntityDescriptor/SPSSODescriptor" "/AttributeConsumingService" + "//EntityDescriptor/SPSSODescriptor/AttributeConsumingService" ) - desc = [etree.tostring(ent).decode() for ent in acss if acss] - error_kwargs = dict(description=desc) if desc else {} + _method = f"{self.__class__.__name__}.test_AttributeConsumingService_SPID" + _data = dict( + test_id="", + references=["TR pag. 20"], + method=_method, + ) + _desc = "".join(desc)[:128] for acs in acss: self._assertTrue( ("index" in acs.attrib), "The index attribute in AttributeConsumigService element MUST be present", - **error_kwargs, + description=_desc, + **_data, ) idx = int(acs.get("index")) - self._assertGreaterEqual( - idx, - 0, - "The index attribute in AttributeConsumigService " - "element MUST be >= 0 - TR pag. 20", - **error_kwargs, + self._assertTrue( + idx >= 0, + "The index attribute in AttributeConsumigService element MUST be >= 0", + description=_desc, + **_data, ) sn = acs.xpath("./ServiceName") self._assertTrue( - (len(sn) > 0), "The ServiceName element MUST be present", **error_kwargs + (len(sn) > 0), + "The ServiceName element MUST be present", + description=_desc, + **_data, ) for sns in sn: - self._assertIsNotNone( + self._assertTrue( sns.text, "The ServiceName element MUST have a value", - **error_kwargs, + description=_desc, + **_data, ) ras = acs.xpath("./RequestedAttribute") - self._assertGreaterEqual( - len(ras), - 1, - "One or more RequestedAttribute elements MUST be present - TR pag. 20", - **error_kwargs, + self._assertTrue( + len(ras) >= 1, + "One or more RequestedAttribute elements MUST be present", + description=_desc, + **_data, ) for ra in ras: self._assertTrue( ("Name" in ra.attrib), "The Name attribute in RequestedAttribute element " - "MUST be present - TR pag. 20 and AV n. 6", - **error_kwargs, + "MUST be present", + description=_desc, + **_data, ) - self._assertIn( - ra.get("Name"), - allowed_attributes, + self._assertTrue( + ra.get("Name") in allowed_attributes, f'The "{ra.attrib.values()[0]}" attribute in RequestedAttribute element MUST be valid', - description=f"one of [{', '.join(allowed_attributes)}] - TR pag. 20 and AV n.6", + description=f"one of [{', '.join(allowed_attributes)}]", + **_data, ) al = acs.xpath("RequestedAttribute/@Name") - self._assertEqual( - len(al), - len(set(al)), - "AttributeConsumigService MUST not contain duplicated RequestedAttribute - TR pag. 20", - **error_kwargs, + self._assertTrue( + len(al) == len(set(al)), + "AttributeConsumigService MUST not contain duplicated RequestedAttribute", + description=_desc, + **_data, ) - return self.is_ok( - f"{self.__class__.__name__}.test_AttributeConsumingService_SPID" - ) + return self.is_ok(_method) def test_Organization(self): """Test the compliance of Organization element""" orgs = self.doc.xpath("//EntityDescriptor/Organization") desc = [etree.tostring(ent).decode() for ent in orgs if orgs] - error_kwargs = dict(description=desc) if desc else {} + _method = f"{self.__class__.__name__}.test_Organization" + _data = dict(description=desc or "", references=["TR pag. 20"], method=_method) self._assertTrue( - (len(orgs) == 1), - "Only one Organization element can be present - TR pag. 20", + (len(orgs) == 1), "Only one Organization element can be present", **_data ) enames = ["OrganizationName", "OrganizationDisplayName", "OrganizationURL"] @@ -610,11 +673,10 @@ def test_Organization(self): org = orgs[0] for ename in enames: elements = org.xpath(f"./{ename}") - self._assertGreater( - len(elements), - 0, - f"One or more {ename} elements MUST be present - TR pag. 20", - **error_kwargs, + self._assertTrue( + len(elements) > 0, + f"One or more {ename} elements MUST be present", + **_data, ) for element in elements: @@ -623,8 +685,8 @@ def test_Organization(self): "{http://www.w3.org/XML/1998/namespace}lang" in element.attrib ), # noqa - f"The lang attribute in {ename} element MUST be present - TR pag. 20", # noqa - **error_kwargs, + f"The lang attribute in {ename} element MUST be present", # noqa + **_data, ) lang = element.attrib.items()[0][1] @@ -635,8 +697,8 @@ def test_Organization(self): self._assertTrue( element.text, - f"The {ename} element MUST have a value - TR pag. 20", - **error_kwargs, + f"The {ename} element MUST have a value", + **_data, ) if ename == "OrganizationURL" and self.production: @@ -648,8 +710,8 @@ def test_Organization(self): OrganizationURLvalue = f"https://{OrganizationURLvalue}" self._assertIsValidHttpUrl( OrganizationURLvalue, - f"The {ename} -element MUST be a valid URL - TR pag. 20", - **error_kwargs, + f"The {ename} -element MUST be a valid URL", + **_data, ) # lang counter check @@ -661,7 +723,7 @@ def test_Organization(self): "The elements OrganizationName, OrganizationDisplayName and OrganizationURL " "MUST have the same number of lang attributes" ), # noqa - **error_kwargs, + **_data, ) self._assertTrue( @@ -670,10 +732,10 @@ def test_Organization(self): "The elements OrganizationName, OrganizationDisplayName and OrganizationURL " "MUST have at least an it language enabled" ), # noqa - **error_kwargs, + **_data, ) - return self.is_ok(f"{self.__class__.__name__}.test_Organization") + return self.is_ok(_method) def test_profile_saml2core(self): self.xsd_check(xsds_files=["saml-schema-metadata-2.0.xsd"]) diff --git a/src/spid_sp_test/metadata_ag.py b/src/spid_sp_test/metadata_ag.py index f4d8e3b..926a7b6 100644 --- a/src/spid_sp_test/metadata_ag.py +++ b/src/spid_sp_test/metadata_ag.py @@ -14,26 +14,36 @@ def test_extensions_public_ag( ], must=False, ): - + _method = f"{self.__class__.__name__}.test_extensions_public_ag" + _data = dict( + test_id="", + references=[], + method=_method, + ) for ext_type in ext_types: ctype = self.doc.xpath(ext_type) if must: self._assertTrue( - ctype, - f"The {ext_type} element MUST be present", + ctype, f"The {ext_type} element MUST be present", **_data ) if ctype: self._assertFalse( ctype[0].text, f"The {ext_type.title()} element MUST be empty", + **_data, ) - return self.is_ok(f"{self.__class__.__name__}.test_extensions_public_ag") + return self.is_ok(_method) def test_entityid_qs(self): """The entityID MUST not contain the query-string part""" - + _method = f"{self.__class__.__name__}.test_entityid_qs" + _data = dict( + test_id="", + references=[], + method=_method, + ) entity_desc = self.doc.xpath("//EntityDescriptor") eid = entity_desc[0].get("entityID") @@ -43,17 +53,24 @@ def test_entityid_qs(self): qs[1], ("The entityID MUST not contain the query-string part"), description=eid, + **_data, ) - return self.is_ok(f"{self.__class__.__name__}.test_entityid_qs") + return self.is_ok(_method) def test_entityid_contains(self, value=""): """The entityID MUST contain ...""" - + _method = f"{self.__class__.__name__}.test_entityid_contains" + _data = dict( + test_id="", + references=[], + method=_method, + ) entity_desc = self.doc.xpath("//EntityDescriptor") eid = entity_desc[0].get("entityID") self._assertTrue( value in eid, (f"The entityID MUST contain {value}"), description=eid, + **_data, ) - return self.is_ok(f"{self.__class__.__name__}.test_entityid_contains") + return self.is_ok(_method) diff --git a/src/spid_sp_test/metadata_extra.py b/src/spid_sp_test/metadata_extra.py index 6c44b7f..a77e6d7 100644 --- a/src/spid_sp_test/metadata_extra.py +++ b/src/spid_sp_test/metadata_extra.py @@ -17,30 +17,47 @@ def __init__(self, *args, **kwargs): self.category = "metadata_extra" def test_metadata_no_newlines(self): + _method = f"{self.__class__.__name__}.test_metadata_no_newlines" + _data = dict( + test_id="", + references=[], + method=_method, + ) self._assertFalse( re.match(r"^[\t\n\s\r\ ]*", self.metadata), ("The XML of metadata should not " "contains newlines at the beginning."), description=self.metadata[0:10], level="warning", + **_data, ) - return self.is_ok(f"{self.__class__.__name__}.test_metadata_no_newlines") + return self.is_ok(_method) def test_entityid_match_url(self): + _method = f"{self.__class__.__name__}.test_entityid_match_url" + _data = dict( + test_id="", + references=[], + method=_method, + ) self._assertTrue( (self.doc.attrib.get("entityID") == self.metadata_url), f"The EntityID SHOULD be equal to {self.metadata_url}", description=f"{self.doc.attrib.get('entityID')}", level="warning", + **_data, ) - return self.is_ok(f"{self.__class__.__name__}.test_entityid_match_url") + return self.is_ok(_method) def test_Signature_extra(self): """Test the compliance of AuthnRequest element""" - sign = self.doc.xpath("//EntityDescriptor/Signature") desc = [etree.tostring(ent).decode() for ent in sign if sign] - error_kwargs = dict(description=desc) if desc else {} + + _method = f"{self.__class__.__name__}.test_Signature_extra" + _data = dict( + test_id="", references=[], method=_method, description="".join(desc)[:128] + ) for si in sign: certs = si.xpath("./KeyInfo/X509Data/X509Certificate") @@ -58,20 +75,19 @@ def test_Signature_extra(self): f"weak signature algorithm: {sign_cert[0].lower()}" ) ), - **error_kwargs, + **_data, ) exp = ["rsaEncryption", "id-ecPublicKey"] - self._assertIn( - sign_cert[2], - exp, + self._assertTrue( + sign_cert[2] in exp, ( ( f"The key type of certificate #{i} MUST be one of [%s] - TR pag. 19" ) % (", ".join(exp)) ), - **error_kwargs, + **_data, ) if sign_cert[2] == "rsaEncryption": @@ -84,7 +100,7 @@ def test_Signature_extra(self): self._assertTrue( (int(sign_cert[1]) >= exp), f"The key length of certificate #{i} MUST be >= {exp}. Instead it is {sign_cert[1]}", - **error_kwargs, + **_data, ) self._assertTrue( @@ -93,51 +109,58 @@ def test_Signature_extra(self): >= datetime.datetime.now() ), f"The certificate #{i} is expired. It was valid till {sign_cert[3]}", - **error_kwargs, + **_data, ) os.remove(fname) - return self.is_ok(f"{self.__class__.__name__}.test_Signature_extra") + return self.is_ok(_method) def test_SPSSODescriptor_extra(self): spsso = self.doc.xpath("//EntityDescriptor/SPSSODescriptor") - desc = [etree.tostring(ent).decode() for ent in spsso if spsso] - error_kwargs = dict(description=desc) if desc else {} + _method = f"{self.__class__.__name__}.test_SPSSODescriptor_extra" + _data = dict( + test_id="", + references=[], + method=_method, + ) for attr in ["protocolSupportEnumeration", "WantAssertionsSigned"]: self._assertTrue( - (attr in spsso[0].attrib), f"The {attr} attribute MUST be present" + (attr in spsso[0].attrib), + f"The {attr} attribute MUST be present", + description=spsso[0].attrib, + **_data, ) if attr == "protocolSupportEnumeration": a = spsso[0].get(attr) - self._assertIsNotNone( - a, f"The {attr} attribute MUST have a value", **error_kwargs + self._assertTrue( + a, f"The {attr} attribute MUST have a value", description=a, **_data ) - self._assertEqual( - a, - "urn:oasis:names:tc:SAML:2.0:protocol", + self._assertTrue( + a == "urn:oasis:names:tc:SAML:2.0:protocol", f"The {attr} attribute MUST be " "urn:oasis:names:tc:SAML:2.0:protocol", - **error_kwargs, + description=a, + **_data, ) if attr == "WantAssertionsSigned": a = spsso[0].get(attr) - self._assertIsNotNone( - a, f"The {attr} attribute MUST have a value", **error_kwargs + self._assertTrue( + a, f"The {attr} attribute MUST have a value", description=a, **_data ) if a: - self._assertEqual( - a.lower(), - "true", + self._assertTrue( + a.lower() == "true", f"The {attr} attribute MUST be true", - **error_kwargs, + description=a, + **_data, ) - return self.is_ok(f"{self.__class__.__name__}.test_SPSSODescriptor_extra") + return self.is_ok(_method) def test_AttributeConsumingService_extra(self): acss = self.doc.xpath( @@ -145,16 +168,18 @@ def test_AttributeConsumingService_extra(self): ) desc = [etree.tostring(ent).decode() for ent in acss if acss] - error_kwargs = dict(description=desc) if desc else {} + _method = f"{self.__class__.__name__}.test_AttributeConsumingService_extra" + _data = dict( + test_id="", references=[], method=_method, description="".join(desc)[:128] + ) for acs in acss: ras = acs.xpath("./RequestedAttribute") for ra in ras: a = ra.get("NameFormat") if a is not None: - self._assertIn( - a, - constants.ALLOWED_FORMATS, + self._assertTrue( + a in constants.ALLOWED_FORMATS, ( ( "The NameFormat attribute " @@ -163,18 +188,19 @@ def test_AttributeConsumingService_extra(self): ) % (", ".join(constants.ALLOWED_FORMATS)) ), - **error_kwargs, + **_data, ) - return self.is_ok( - f"{self.__class__.__name__}.test_AttributeConsumingService_extra" - ) + return self.is_ok(_method) def test_Organization_extra(self): orgs = self.doc.xpath("//EntityDescriptor/Organization") - self._assertTrue((len(orgs) == 1), "An Organization MUST be present") - desc = [etree.tostring(ent).decode() for ent in orgs if orgs] - error_kwargs = dict(description=desc) if desc else {} + _method = f"{self.__class__.__name__}.test_Organization_extra" + _data = dict( + test_id="", references=[], method=_method, description="".join(desc)[:128] + ) + + self._assertTrue((len(orgs) == 1), "An Organization MUST be present", **_data) if orgs: org = orgs[0] @@ -188,9 +214,9 @@ def test_Organization_extra(self): self._assertTrue( (len(e) == 1), f"An IT localised Organization {elem} MUST be present", - **error_kwargs, + **_data, ) - return self.is_ok(f"{self.__class__.__name__}.test_Organization") + return self.is_ok(_method) def test_profile_spid_sp(self): super().test_profile_spid_sp() diff --git a/src/spid_sp_test/metadata_private.py b/src/spid_sp_test/metadata_private.py index 63761d0..c4c0b5e 100644 --- a/src/spid_sp_test/metadata_private.py +++ b/src/spid_sp_test/metadata_private.py @@ -1,12 +1,19 @@ class SpidSpMetadataCheckPrivate(object): def test_Contacts_Priv(self): self.doc.xpath("//ContactPerson") + _method = f"{self.__class__.__name__}.test_Contacts_Priv" + _data = dict( + test_id="", + references=[], + method=_method, + ) ipacode = self.doc.xpath("//ContactPerson/Extensions/IPACode") self._assertFalse( ipacode, "The IPACode element MUST NOT be present", description=ipacode, + **_data, ) exts = self.doc.xpath("//ContactPerson/Extensions/CessionarioCommittente") @@ -14,12 +21,14 @@ def test_Contacts_Priv(self): (len(exts) == 1), ("The CessionarioCommittente element MUST be present"), description=exts, + **_data, ) if exts: exts[0] tise = self.doc.xpath( - "//ContactPerson/Extensions/CessionarioCommittente/TerzoIntermediarioSoggettoEmittente" + "//ContactPerson/Extensions/CessionarioCommittente/TerzoIntermediarioSoggettoEmittente", + **_data, ) if tise: tise = tise[0] @@ -27,6 +36,7 @@ def test_Contacts_Priv(self): tise.text, "If the TerzoIntermediarioSoggettoEmittente element if present it MUST have a value", description=tise, + **_data, ) - return self.is_ok(f"{self.__class__.__name__}.test_Contacts_Priv") + return self.is_ok(_method) diff --git a/src/spid_sp_test/metadata_public.py b/src/spid_sp_test/metadata_public.py index 9346aa2..c347240 100644 --- a/src/spid_sp_test/metadata_public.py +++ b/src/spid_sp_test/metadata_public.py @@ -9,49 +9,53 @@ class SpidSpMetadataCheckPublic(object): - def test_Contacts_PubPriv( - self, contact_type="other", entity_type=""): + def test_Contacts_PubPriv(self, contact_type="other", entity_type=""): + _method = f"{self.__class__.__name__}.test_Contacts_PubPriv" xpatt = f"//ContactPerson[@contactType='{contact_type}'" if entity_type: xpatt += f" and @spid:entityType='{entity_type}']" else: - xpatt += ']' + xpatt += "]" - entity_desc = self.doc.xpath( - xpatt, - namespaces=XML_NAMESPACES - ) - self._assertTrue(entity_desc, "ContactPerson MUST be present") + _data = dict(references=["TR pag. 19"], method=_method) + entity_desc = self.doc.xpath(xpatt, namespaces=XML_NAMESPACES) + self._assertTrue(entity_desc, "ContactPerson MUST be present", **_data) if entity_desc: self._assertTrue( entity_desc[0].attrib.get("contactType"), ( f"Missing contactType in {entity_desc[0].attrib}: " - "The contactType attribute MUST be present - TR pag. 19" + "The contactType attribute MUST be present", ), + **_data, ) self._assertTrue( entity_desc[0].get("contactType"), - "The contactType attribute MUST have a value - TR pag. 19", + "The contactType attribute MUST have a value", + **_data, ) self._assertTrue( entity_desc[0].get("contactType") == contact_type, - f'The contactType must be "{contact_type}" - TR pag. 19', + f'The contactType must be "{contact_type}"', description=entity_desc[0].get("contactType"), + **_data, ) if not entity_type: self._assertTrue( len(entity_desc) == 1, - f'Only one ContactPerson element of contactType "{contact_type}" MUST be present', + "Only one ContactPerson element of contactType " + f'"{contact_type}" MUST be present', + **_data, ) - return self.is_ok(f"{self.__class__.__name__}.test_Contacts_PubPriv") + return self.is_ok(_method) def test_Extensions_PubPriv(self): _conts = self.doc.xpath("//ContactPerson") - + _method = f"{self.__class__.__name__}.test_Extensions_PubPriv" + _data = dict(references=[""], method=_method) for cont in _conts: ext_cnt = 0 for child in cont.getchildren(): @@ -62,6 +66,7 @@ def test_Extensions_PubPriv(self): ext_cnt > 1, "Only one Extensions element inside ContactPerson element MUST be present", description=etree.tostring(cont).decode(), + **_data, ) orgs = self.doc.xpath("//EntityDescriptor/Organization/OrganizationName") @@ -75,89 +80,95 @@ def test_Extensions_PubPriv(self): company.text, "If the Company element is present it MUST have a value", description=company, + **_data, ) self._assertTrue( company.text == org.text, "If the Company element if present it MUST be equal to OrganizationName", description=(company.text, org.text), + **_data, ) - return self.is_ok(f"{self.__class__.__name__}.test_Extensions_PubPriv") + return self.is_ok(_method) def test_contactperson_email(self, email_xpath="//ContactPerson/EmailAddress"): + _method = f"{self.__class__.__name__}.test_contactperson_email" + _data = dict(references=[""], method=_method) email = self.doc.xpath(email_xpath) self._assertTrue( email, f"The {email_xpath} element MUST be present", - description=email, + description=[etree.tostring(_val).decode() for _val in email], + **_data, ) if email: self._assertTrue( email[0].text, f"The {email_xpath} element MUST have a value", - description=email[0], + description=[etree.tostring(_val).decode() for _val in email], + **_data, ) self._assertTrue( re.match(EMAIL_REGEXP, email[0].text), f"The {email_xpath} element MUST be a valid email address", - description=email[0], + description=[etree.tostring(_val).decode() for _val in email], + **_data, ) - return self.is_ok( - f"{self.__class__.__name__}.test_contactperson_email-{email_xpath}" - ) + return self.is_ok(_method) def test_contactperson_phone(self, phone_xpath="//ContactPerson/TelephoneNumber"): + _method = f"{self.__class__.__name__}.test_contactperson_phone" + _data = dict(references=[""], method=_method) phone = self.doc.xpath(phone_xpath) if phone: phone = phone[0].text self._assertTrue( - phone, - f"The {phone_xpath} element MUST have a value", + phone, f"The {phone_xpath} element MUST have a value", **_data ) self._assertTrue( (" " not in phone), f"The {phone_xpath} element MUST not contain spaces", description=phone, + **_data, ) self._assertTrue( (phone[0:3] == "+39"), f'The {phone_xpath} element MUST start with "+39"', description=phone, level="warning", + **_data, ) - return self.is_ok(f"{self.__class__.__name__}.test_contactperson_phone") + return self.is_ok(_method) def test_Contacts_IPACode(self): self.doc.xpath("//ContactPerson") - + _method = f"{self.__class__.__name__}.test_Contacts_IPACode" + _data = dict(references=[""], method=_method) if self.production: ipacode = self.doc.xpath("//ContactPerson/Extensions/IPACode") - self._assertTrue( - ipacode, - "The IPACode element MUST be present", - ) + self._assertTrue(ipacode, "The IPACode element MUST be present", **_data) if ipacode: ipacode = ipacode[0] self._assertTrue( - ipacode.text, - "The IPACode element MUST have a value", + ipacode.text, "The IPACode element MUST have a value", **_data ) self._assertTrue( - ipacode.text, - "The IPACode element MUST have a value", + ipacode.text, "The IPACode element MUST have a value", **_data ) self._assertTrue( get_indicepa_by_ipacode(ipacode.text)[0] == 1, "The IPACode element MUST have a valid value present on IPA", + **_data, ) - return self.is_ok(f"{self.__class__.__name__}.test_Contacts_IPACode") + return self.is_ok(_method) def test_extensions_public_private(self, ext_type="Public", contact_type="other"): ext_type_not = "Private" if ext_type == "Public" else "Public" - + _method = f"{self.__class__.__name__}.test_extensions_public_private" + _data = dict(references=[""], method=_method) # only if other, billing doesn't have any Private element in it! ctype = self.doc.xpath( f'//ContactPerson[@contactType="{contact_type}"]/Extensions/{ext_type.title()}' @@ -166,37 +177,38 @@ def test_extensions_public_private(self, ext_type="Public", contact_type="other" ctype, f"Missing ContactPerson/Extensions/{ext_type.title()}, " "this element MUST be present", + **_data, ) if ctype: self._assertFalse( - ctype[0].text, - f"The {ext_type.title()} element MUST be empty", + ctype[0].text, f"The {ext_type.title()} element MUST be empty", **_data ) ctype = self.doc.xpath(f"//ContactPerson/Extensions/{ext_type_not.title()}") self._assertFalse( - ctype, - f"The {ext_type_not.title()} element MUST not be present", + ctype, f"The {ext_type_not.title()} element MUST not be present", **_data ) - return self.is_ok(f"{self.__class__.__name__}.test_extentions_public") + return self.is_ok(_method) def test_Contacts_VATFC(self, private=False): self.doc.xpath("//ContactPerson") - + _method = f"{self.__class__.__name__}.test_Contacts_VATFC" + _data = dict(references=[""], method=_method) vats = self.doc.xpath("//ContactPerson/Extensions/VATNumber") self._assertTrue( (len(vats) <= 1), "only one VATNumber element must be present", - description=vats, + description=[etree.tostring(_vats).decode() for _vats in vats], + **_data, ) if vats: self._assertTrue( - vats[0].text, - "The VATNumber element MUST have a value", + vats[0].text, "The VATNumber element MUST have a value", **_data ) self._assertTrue( (vats[0].text[:2] in ISO3166_CODES), "The VATNumber element MUST start with a valid ISO3166 Code", + **_data, ) fcs = self.doc.xpath("//ContactPerson/Extensions/FiscalCode") @@ -204,23 +216,25 @@ def test_Contacts_VATFC(self, private=False): self._assertTrue( (len(fcs) == 1), "only one FiscalCode element must be present", - description=fcs, + description=[etree.tostring(_fcs).decode() for _fcs in fcs], + **_data, ) fc = fcs[0] self._assertTrue( - fc.text, - "The FiscalCode element MUST have a value", + fc.text, "The FiscalCode element MUST have a value", **_data ) if private and not len(fcs) and not len(vats): self._assertTrue( False, "If the VATNumber is not present, the FiscalCode element MUST be present", + **_data, ) - return self.is_ok(f"{self.__class__.__name__}.test_Contacts_VATFC") + return self.is_ok(_method) def test_extensions_cie(self, ext_type="Public"): - + _method = f"{self.__class__.__name__}.test_extensions_cie" + _data = dict(references=[""], method=_method) attrs = ["Municipality"] if ext_type == "Private": @@ -239,27 +253,21 @@ def test_extensions_cie(self, ext_type="Public"): ctype = contact.xpath(f"Extensions/{ele}") # some special conditions ... - if ( - contact.attrib['contactType'] == 'technical' - and ele == "IPACode" - ): + if contact.attrib["contactType"] == "technical" and ele == "IPACode": continue - self._assertTrue( - ctype, - f"{ele} element MUST be present", - ) + self._assertTrue(ctype, f"{ele} element MUST be present", **_data) # is <= because already protected with the previous check self._assertTrue( (len(ctype) <= 1), f"only one {ele} element MUST be present", + **_data, ) if ctype: self._assertTrue( - ctype[0].text, - f"The {ele} element MUST have a value", + ctype[0].text, f"The {ele} element MUST have a value", **_data ) - return self.is_ok(f"{self.__class__.__name__}.test_extensions_cie") + return self.is_ok(_method) diff --git a/src/spid_sp_test/response.py b/src/spid_sp_test/response.py index 37717f1..db16545 100644 --- a/src/spid_sp_test/response.py +++ b/src/spid_sp_test/response.py @@ -207,7 +207,7 @@ def do_authnrequest(self): or settings.DEFAULT_RESPONSE["AuthnContextClassRef"], "IssueInstantMillis": now.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), "sign_response": settings.DEFAULT_RESPONSE["sign_response"], - "sign_assertion": settings.DEFAULT_RESPONSE["sign_assertion"] + "sign_assertion": settings.DEFAULT_RESPONSE["sign_assertion"], } self.relay_state = self.kwargs.get("relay_state") @@ -291,13 +291,20 @@ def load_test(self, test_name=None, attributes={}, response_attrs={}): return spid_response - def check_response(self, res, msg: str, attendeds=[]): + def check_response(self, res, msg: str, attendeds=[], test_id=""): if res.status_code in attendeds: status = True else: status = False status_code = f"[http status_code: {res.status_code}]" - self._assertTrue(status, f"{msg}: {status_code}") + self._assertTrue( + status, + f"{msg}: {status_code}", + method=f"{self.__class__.__name__}.check_response", + description=status_code, + references=[], + test_id=test_id, + ) return status, status_code def dump_html_response(self, fname, description, result, content): @@ -366,6 +373,7 @@ def test_profile_spid_sp(self): res, msg=f"Test [{i}] {test_display_desc}", attendeds=response_obj.conf["status_codes"], + test_id=i, ) if self.html_path: self.dump_html_response( diff --git a/src/spid_sp_test/responses/settings.py b/src/spid_sp_test/responses/settings.py index 622059c..62be84a 100644 --- a/src/spid_sp_test/responses/settings.py +++ b/src/spid_sp_test/responses/settings.py @@ -50,7 +50,7 @@ "AuthnContextClassRef": "https://www.spid.gov.it/SpidL1", "Attributes": ATTRIBUTES, "sign_response": True, - "sign_assertion": True + "sign_assertion": True, }