From a6aa31a8f2ea5e2fd77199c5496606a39ed1eb6a Mon Sep 17 00:00:00 2001 From: Matthias Valvekens Date: Thu, 7 Dec 2023 23:52:53 +0100 Subject: [PATCH] Parsing of service definitions from EUTL --- pyhanko/sign/validation/eutl.py | 339 ++++++++++++++++++++++------ pyhanko/sign/validation/settings.py | 8 +- pyhanko_tests/test_trusted_list.py | 331 +++++++++++++++++++++++++++ 3 files changed, 601 insertions(+), 77 deletions(-) create mode 100644 pyhanko_tests/test_trusted_list.py diff --git a/pyhanko/sign/validation/eutl.py b/pyhanko/sign/validation/eutl.py index 428b63b3..9d59c8c6 100644 --- a/pyhanko/sign/validation/eutl.py +++ b/pyhanko/sign/validation/eutl.py @@ -1,24 +1,54 @@ import enum +import logging from dataclasses import dataclass from datetime import datetime -from typing import List, Optional, Set, Tuple, Union +from typing import ( + FrozenSet, + Generator, + Iterable, + Optional, + Tuple, + TypeVar, + Union, +) from asn1crypto import x509 -from pyhanko_certvalidator import InvalidCertificateError +from pyhanko_certvalidator.errors import InvalidCertificateError +from xsdata.formats.dataclass.parsers import XmlParser +from xsdata.formats.dataclass.parsers.config import ParserConfig -from pyhanko.generated.etsi import ts_119612, ts_119612_sie -from pyhanko.sign.validation import KeyUsageConstraints +from pyhanko.generated.etsi import ( + ts_119612, + ts_119612_extra, + ts_119612_sie, + xades, +) +from pyhanko.sign.validation.settings import KeyUsageConstraints + +logger = logging.getLogger(__name__) CA_QC_URI = 'http://uri.etsi.org/TrstSvc/Svctype/CA/QC' QTST_URI = 'http://uri.etsi.org/TrstSvc/Svctype/TSA/QTST' STATUS_GRANTED = 'http://uri.etsi.org/TrstSvc/TrustedList/Svcstatus/granted' +class TSPServiceParsingError(ValueError): + pass + + +@dataclass(frozen=True) +class AdditionalServiceInformation: + uri: str + critical: bool + textual_info: Optional[str] + + @dataclass(frozen=True) class BaseServiceInformation: service_type: str service_name: str - provider_certs: List[x509.Certificate] + provider_certs: Tuple[x509.Certificate, ...] + additional_info: FrozenSet[AdditionalServiceInformation] class Qualifier(enum.Enum): @@ -43,10 +73,7 @@ def uri(self): ) -class CriteriaAssertionType(enum.Enum): - NONE = 0 - AT_LEAST_ONE = 1 - ALL = 2 +_BY_URI = {q.uri: q for q in Qualifier} class Criterion: @@ -68,7 +95,7 @@ def matches(self, cert: x509.Certificate): @dataclass(frozen=True) class PolicySetCriterion(Criterion): - required_policy_oids: Set[str] + required_policy_oids: FrozenSet[str] def matches(self, cert: x509.Certificate): policy_ext = cert.certificate_policies_value or () @@ -78,7 +105,7 @@ def matches(self, cert: x509.Certificate): @dataclass(frozen=True) class CertSubjectDNCriterion(Criterion): - required_rdn_part_oids: Set[str] + required_rdn_part_oids: FrozenSet[str] def matches(self, cert: x509.Certificate): subject_dn: x509.Name = cert.subject @@ -88,17 +115,39 @@ def matches(self, cert: x509.Certificate): return self.required_rdn_part_oids.issubset(found_rdn_part_oids) +@enum.unique +class CriteriaCombination(enum.Enum): + ALL = 'all' + AT_LEAST_ONE = 'atLeastOne' + NONE = 'none' + + +@dataclass(frozen=True) +class CriteriaList(Criterion): + combine_as: CriteriaCombination + criteria: FrozenSet[Criterion] + + def matches(self, cert: x509.Certificate): + if self.combine_as == CriteriaCombination.ALL: + return all(c.matches(cert) for c in self.criteria) + elif self.combine_as == CriteriaCombination.AT_LEAST_ONE: + return any(c.matches(cert) for c in self.criteria) + elif self.combine_as == CriteriaCombination.NONE: + return not any(c.matches(cert) for c in self.criteria) + else: + raise NotImplementedError + + @dataclass(frozen=True) class Qualification: - qualifiers: List[Qualifier] - criteria_assertion_type: CriteriaAssertionType - criteria_list: List[Criterion] + qualifiers: FrozenSet[Qualifier] + criteria_list: CriteriaList @dataclass(frozen=True) class CAServiceInformation: base_info: BaseServiceInformation - qualifications: List[Qualification] + qualifications: FrozenSet[Qualification] expired_certs_revocation_info: Optional[datetime] @@ -119,6 +168,15 @@ def _extract_from_intl_string( return first_value +T = TypeVar('T') + + +def _required(thing: Optional[T], descr: str) -> T: + if thing is None: + raise TSPServiceParsingError(f"{descr} must be provided") + return thing + + def _as_certs(sdi: ts_119612.ServiceDigitalIdentity): for digital_id in sdi.digital_id: cert_bytes = digital_id.x509_certificate @@ -126,46 +184,184 @@ def _as_certs(sdi: ts_119612.ServiceDigitalIdentity): yield x509.Certificate.load(cert_bytes) -def _get_qualifications(qualifications: ts_119612_sie.Qualifications): - for qual in qualifications.qualification_element: - criteria = [] - if qual.criteria_list.policy_set: - criteria.append( - # TODO also take policy qualifiers into account - PolicySetCriterion( - required_policy_oids={ - oid.identifier.value - for policy in qual.criteria_list.policy_set - for oid in policy.policy_identifier - } +def _process_criteria_list( + criteria: Optional[ts_119612_sie.CriteriaListType], +) -> CriteriaList: + entries = frozenset(_process_criteria_list_entries(criteria)) + if entries: + assert criteria is not None + assertion_type = _required( + criteria.assert_value, "Criteria assertion type" + ).value + combine_as = CriteriaCombination(str(assertion_type)) + return CriteriaList(combine_as=combine_as, criteria=entries) + else: + raise TSPServiceParsingError("No criteria") + + +def _parse_xades_oid(oid: xades.ObjectIdentifierType) -> str: + return _required(oid.identifier, "Identifier tag in OID").value + + +def _process_criteria_list_entries( + criteria_list: Optional[ts_119612_sie.CriteriaListType], +) -> Generator[Criterion, None, None]: + if not criteria_list: + return + if criteria_list.policy_set: + # TODO also take policy qualifiers into account + for policy_set_criterion in criteria_list.policy_set: + yield PolicySetCriterion( + required_policy_oids=frozenset( + _parse_xades_oid(policy) + for policy in policy_set_criterion.policy_identifier ) ) - if qual.criteria_list.key_usage: - key_usage_must_have = { - bit.name.name.lower() - for ku in qual.criteria_list.key_usage - for bit in ku.key_usage_bit + if criteria_list.key_usage: + for ku_criterion in criteria_list.key_usage: + key_usage_must_have = frozenset( + _required(bit.name, "Key usage bit type name").name.lower() + for bit in ku_criterion.key_usage_bit if bit.value == True - } - key_usage_forbidden = { - bit.name.name.lower() - for ku in qual.criteria_list.key_usage - for bit in ku.key_usage_bit + ) + key_usage_forbidden = frozenset( + _required(bit.name, "Key usage bit type name").name.lower() + for bit in ku_criterion.key_usage_bit if bit.value == False - } + ) # TODO check EKUs - criteria.append( - KeyUsageCriterion( - settings=KeyUsageConstraints( - key_usage=key_usage_must_have, - key_usage_forbidden=key_usage_forbidden, + yield KeyUsageCriterion( + settings=KeyUsageConstraints( + key_usage=key_usage_must_have, + key_usage_forbidden=key_usage_forbidden, + ) + ) + sublists: Iterable[ + ts_119612_sie.CriteriaListType + ] = criteria_list.criteria_list + if sublists: + for sublist in sublists: + yield _process_criteria_list(sublist) + if criteria_list.other_criteria_list: + for el in criteria_list.other_criteria_list.content: + if isinstance(el, ts_119612_extra.CertSubjectDNAttribute): + yield CertSubjectDNCriterion( + frozenset( + _parse_xades_oid(oid_xml) + for oid_xml in el.attribute_oid ) ) + elif isinstance(el, ts_119612_extra.ExtendedKeyUsage): + kpids = frozenset( + _parse_xades_oid(kpid_xml) for kpid_xml in el.key_purpose_id + ) + yield KeyUsageCriterion( + settings=KeyUsageConstraints(extd_key_usage=kpids) + ) + else: + raise TSPServiceParsingError( + f"Unknown criterion {el} in qualifier definition" + ) + + +def _process_qualifiers(qualifiers: Iterable[ts_119612_sie.QualifierType]): + for qual in qualifiers: + try: + yield _BY_URI[qual.uri] + except KeyError: + logger.info(f"Qualifier {qual.uri} in SDI ignored...") + + +def _get_qualifications(qualifications: ts_119612_sie.Qualifications): + for qual in qualifications.qualification_element: + quals_xml = qual.qualifiers + if not quals_xml: + continue + # note: all the qualifiers we currently support only make sense + # for qualified CAs, not other types of services + # (qualified or otherwise). + qualifiers = frozenset(_process_qualifiers(quals_xml.qualifier)) + criteria = _process_criteria_list(qual.criteria_list) + if qualifiers and criteria: + yield Qualification(qualifiers=qualifiers, criteria_list=criteria) + else: + logger.warning(f"Could not process qualifiers in {quals_xml}") + + +def _process_additional_info( + info: ts_119612.AdditionalServiceInformation, critical: bool +): + return AdditionalServiceInformation( + uri=_required(info.uri, "Additional service info URI").value, + textual_info=info.information_value, + critical=critical, + ) + + +def _interpret_service_info_for_ca( + service: ts_119612.TSPService, +): + service_info = service.service_information + assert service_info is not None + certs = list( + _as_certs( + _required( + service_info.service_digital_identity, + "Service digital identity", ) + ) + ) + service_name = None + if service_info.service_name: + service_name = _extract_from_intl_string(service_info.service_name.name) + qualifications: FrozenSet[Qualification] = frozenset() + expired_revinfo_date = None + additional_info = [] + extensions_xml = ( + service_info.service_information_extensions.extension + if service_info.service_information_extensions + else () + ) + for ext in extensions_xml: + for ext_content in ext.content: + if isinstance(ext_content, ts_119612_sie.Qualifications): + qualifications = frozenset(_get_qualifications(ext_content)) + elif isinstance(ext_content, ts_119612.ExpiredCertsRevocationInfo): + expired_revinfo_date = _required( + ext_content.value, + "Date in expired certs revocation info cutoff", + ).to_datetime() + elif isinstance( + ext_content, ts_119612.AdditionalServiceInformation + ): + additional_info.append( + _process_additional_info(ext_content, ext.critical or False) + ) + elif ext.critical: + # TODO more informative exception / only ditch the current SDI + raise TSPServiceParsingError( + f"Cannot process a critical extension " + f"in service named '{service_name}'.\n" + f"Content: {ext_content}" + ) + base_service_info = BaseServiceInformation( + service_type=_required( + service_info.service_type_identifier, "Service type identifier" + ), + service_name=service_name or "unknown", + provider_certs=tuple(certs), + additional_info=frozenset(additional_info), + ) + + return CAServiceInformation( + base_info=base_service_info, + qualifications=qualifications, + expired_certs_revocation_info=expired_revinfo_date, + ) def _interpret_service_info_for_cas( - services: List[ts_119612.TSPService], + services: Iterable[ts_119612.TSPService], ): for service in services: service_info = service.service_information @@ -174,7 +370,6 @@ def _interpret_service_info_for_cas( or service_info.service_type_identifier != CA_QC_URI ): continue - certs = list(_as_certs(service_info.service_digital_identity)) # TODO allow the user to specify if they also want to include # other statuses (e.g. national level) @@ -182,32 +377,30 @@ def _interpret_service_info_for_cas( # work, store that info on the object if service_info.service_status != STATUS_GRANTED: continue - service_name = None - if service_info.service_name: - service_name = _extract_from_intl_string( - service_info.service_name.name - ) - base_service_info = BaseServiceInformation( - service_type=service_info.service_type_identifier, - service_name=service_name or "unknown", - provider_certs=certs, - ) - qualifications = [] - expired_revinfo_date = None - for ext in service_info.service_information_extensions.extension: - ext_content = ext.content - if isinstance(ext_content, ts_119612_sie.Qualifications): - qualifications = list(_get_qualifications(ext_content)) - elif isinstance(ext_content, ts_119612.ExpiredCertsRevocationInfo): - expired_revinfo_date = ext_content.value.to_datetime() - elif ext.critical: - # TODO more informative exception / only ditch the current SDI - raise ValueError( - f"Cannot process a critical extension " - f"in {base_service_info}" - ) - return CAServiceInformation( - base_info=base_service_info, - qualifications=qualifications, - expired_certs_revocation_info=expired_revinfo_date, + yield _interpret_service_info_for_ca(service) + + +def _raw_tl_parse(tl_xml: str) -> ts_119612.TrustServiceStatusList: + parser = XmlParser( + config=ParserConfig( + load_dtd=False, + process_xinclude=False, + fail_on_unknown_properties=False, + fail_on_unknown_attributes=False, + ), + ) + return parser.from_string(tl_xml, clazz=ts_119612.TrustServiceStatusList) + + +# TODO introduce a similar method for other types of service (TSAs etc) + + +def read_qualified_certificate_authorities( + tl_xml: str, +) -> Generator[CAServiceInformation, None, None]: + parse_result = _raw_tl_parse(tl_xml) + tspl = parse_result.trust_service_provider_list + for tsp in _required(tspl, "TSP list").trust_service_provider: + yield from _interpret_service_info_for_cas( + _required(tsp.tspservices, "TSP services").tspservice ) diff --git a/pyhanko/sign/validation/settings.py b/pyhanko/sign/validation/settings.py index ed300d14..c2ef3e31 100644 --- a/pyhanko/sign/validation/settings.py +++ b/pyhanko/sign/validation/settings.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from typing import Optional, Set +from typing import Iterable, Optional from asn1crypto import x509 from pyhanko_certvalidator.errors import InvalidCertificateError @@ -30,14 +30,14 @@ class KeyUsageConstraints(ConfigurableMixin): Bring extended key usage semantics in line with :rfc:`5280` (PKIX). """ - key_usage: Optional[Set[str]] = None + key_usage: Optional[Iterable[str]] = None """ All or some (depending on :attr:`match_all_key_usage`) of these key usage extensions must be present in the signer's certificate. If not set or empty, all key usages are considered acceptable. """ - key_usage_forbidden: Optional[Set[str]] = None + key_usage_forbidden: Optional[Iterable[str]] = None """ These key usages must not be present in the signer's certificate. @@ -46,7 +46,7 @@ class KeyUsageConstraints(ConfigurableMixin): compatibility with certificate seed value settings in ISO 32000. """ - extd_key_usage: Optional[Set[str]] = None + extd_key_usage: Optional[Iterable[str]] = None """ List of acceptable key purposes that can appear in an extended key usage extension in the signer's certificate, if such an extension is at all diff --git a/pyhanko_tests/test_trusted_list.py b/pyhanko_tests/test_trusted_list.py new file mode 100644 index 00000000..503b3636 --- /dev/null +++ b/pyhanko_tests/test_trusted_list.py @@ -0,0 +1,331 @@ +from datetime import datetime, timezone +from pathlib import Path + +import pytest +from xsdata.formats.dataclass.parsers import XmlParser +from xsdata.formats.dataclass.parsers.config import ParserConfig + +from pyhanko.generated.etsi import ts_119612 +from pyhanko.sign.validation import KeyUsageConstraints, eutl + + +def _read_cas_from_file(path: Path): + with path.open('r', encoding='utf8') as inf: + tl_str = inf.read() + return list(eutl.read_qualified_certificate_authorities(tl_str)) + + +def _raw_tlservice_parse(xml: str) -> ts_119612.TSPService: + parser = XmlParser( + config=ParserConfig( + load_dtd=False, + process_xinclude=False, + fail_on_unknown_properties=False, + fail_on_unknown_attributes=False, + ), + ) + return parser.from_string(xml, clazz=ts_119612.TSPService) + + +TEST_DATA_DIR = Path('pyhanko_tests') / 'data' / 'tl' +TEST_REAL_TL = TEST_DATA_DIR / 'tsl-be.xml' + + +def test_parse_cas_from_real_tl_smoke_test(): + assert len(_read_cas_from_file(TEST_REAL_TL)) == 52 + + +ETSI_NS = 'http://uri.etsi.org' +NAMESPACES = ' '.join( + [ + f'xmlns="{ETSI_NS}/02231/v2#"', + f'xmlns:xades="{ETSI_NS}/01903/v1.3.2#"', + f'xmlns:q="{ETSI_NS}/TrstSvc/SvcInfoExt/eSigDir-1999-93-EC-TrustedList/#"', + f'xmlns:extra="http://uri.etsi.org/02231/v2/additionaltypes#"', + ] +) + + +def test_parse_ca_with_unsupported_critical_qualifier(): + xml = f""" + + + + Test + + + http://uri.etsi.org/TrstSvc/Svctype/CA/QC + + + + + + + + + + """ + + parse_result = _raw_tlservice_parse(xml) + with pytest.raises(eutl.TSPServiceParsingError, match="critical"): + eutl._interpret_service_info_for_ca(parse_result) + + +def test_parse_ca_with_unsupported_noncritical_extension(): + xml = f""" + + + + Test + + + http://uri.etsi.org/TrstSvc/Svctype/CA/QC + + + + + + + + + + """ + + parse_result = _raw_tlservice_parse(xml) + assert eutl._interpret_service_info_for_ca(parse_result) is not None + + +def test_parse_ca_with_extensions(): + qual_xml = """ + + + + + + true + + + + 2.999 + + + + + + 2.999 + + + + + 2.999 + + + + + """ + xml = f""" + + + + Test + + + http://uri.etsi.org/TrstSvc/Svctype/CA/QC + + + + + + + {qual_xml} + + + + + + 2016-06-30T21:00:00Z + + + + + + """ + + parse_result = _raw_tlservice_parse(xml) + result = eutl._interpret_service_info_for_ca(parse_result) + assert result.expired_certs_revocation_info == datetime( + 2016, 6, 30, 21, 0, 0, tzinfo=timezone.utc + ) + qualification = list(result.qualifications)[0] + assert qualification.qualifiers == frozenset([eutl.Qualifier.WITH_SSCD]) + criteria = qualification.criteria_list.criteria + assert eutl.PolicySetCriterion(frozenset(['2.999'])) in criteria + assert ( + eutl.KeyUsageCriterion( + KeyUsageConstraints( + key_usage=frozenset(['non_repudiation']), + key_usage_forbidden=frozenset(), + ) + ) + in criteria + ) + assert ( + eutl.KeyUsageCriterion( + KeyUsageConstraints(extd_key_usage=frozenset(['2.999'])) + ) + in criteria + ) + assert ( + eutl.CertSubjectDNCriterion(required_rdn_part_oids=frozenset(['2.999'])) + in criteria + ) + + +@pytest.mark.parametrize( + 'qualifiers', + [ + '', + '', + '', + ], +) +def test_parse_omit_empty_or_unknown_quals(qualifiers): + qual_xml = f""" + {qualifiers} + + + true + + + + 2.999 + + + + """ + xml = f""" + + + + Test + + + http://uri.etsi.org/TrstSvc/Svctype/CA/QC + + + + + + + {qual_xml} + + + + + + + """ + + parse_result = _raw_tlservice_parse(xml) + result = eutl._interpret_service_info_for_ca(parse_result) + assert len(result.qualifications) == 0 + + +@pytest.mark.parametrize( + 'names', + [ + 'Test', + 'Test', + 'TestFout', + 'FoutTest', + 'TestZzz', + ], +) +def test_parse_service_name(names): + xml = f""" + + + {names} + + http://uri.etsi.org/TrstSvc/Svctype/CA/QC + + + + + """ + + parse_result = _raw_tlservice_parse(xml) + result = eutl._interpret_service_info_for_ca(parse_result) + assert result.base_info.service_name == 'Test' + + +@pytest.mark.parametrize( + 'criteria_xml', + [ + ' ', + '', + ( + '' + '' + '' + ), + ( + '' + '' + 'true' + '' + '' + '' + ), + ], +) +def test_criteria_parsing_failure(criteria_xml): + qual_xml = """ + + + + """ + xml = f""" + + + + Test + + + http://uri.etsi.org/TrstSvc/Svctype/CA/QC + + + + + + + {qual_xml} + {criteria_xml} + + + + + + + """ + + parse_result = _raw_tlservice_parse(xml) + with pytest.raises(eutl.TSPServiceParsingError): + eutl._interpret_service_info_for_ca(parse_result) + + +def test_parse_service_no_type(): + xml = f""" + + + Test + + + + """ + + parse_result = _raw_tlservice_parse(xml) + with pytest.raises(eutl.TSPServiceParsingError, match='Service type'): + eutl._interpret_service_info_for_ca(parse_result)