Skip to content

Commit

Permalink
Implement service history parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
MatthiasValvekens committed Nov 17, 2024
1 parent 8611c2d commit 1b27a63
Show file tree
Hide file tree
Showing 3 changed files with 238 additions and 26 deletions.
102 changes: 89 additions & 13 deletions pyhanko/sign/validation/qualified/eutl_parse.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import itertools
import logging
from datetime import datetime
from typing import (
FrozenSet,
Generator,
Iterable,
List,
Optional,
Set,
Tuple,
Expand Down Expand Up @@ -56,6 +58,16 @@
PREFERRED_LANGUAGE: str = 'en'


def _service_name_from_intl_string(
intl_string: Optional[ts_119612.InternationalNamesType],
) -> str:
return (
_extract_from_intl_string(intl_string.name)
if intl_string
else "unknown"
)


def _extract_from_intl_string(
intl_string: Tuple[
Union[ts_119612.MultiLangStringType, ts_119612.MultiLangNormStringType],
Expand Down Expand Up @@ -228,9 +240,7 @@ def _interpret_historical_service_info_for_ca(
)
)
)
service_name = None
if service_info.service_name:
service_name = _extract_from_intl_string(service_info.service_name.name)
service_name = _service_name_from_intl_string(service_info.service_name)
qualifications: FrozenSet[Qualification] = frozenset()
expired_revinfo_date = None
additional_info = []
Expand Down Expand Up @@ -262,7 +272,6 @@ def _interpret_historical_service_info_for_ca(
except KeyError:
additional_info.append(additional_info_entry)
elif ext.critical:
# TODO more informative exception / only ditch the current SDI
raise TSPServiceParsingError(
f"Cannot process a critical extension "
f"in service named '{service_name}'.\n"
Expand All @@ -271,16 +280,17 @@ def _interpret_historical_service_info_for_ca(
valid_from_date = service_info.status_starting_time
if valid_from_date is None:
raise TSPServiceParsingError(
"The validity start of the current status of the the service named "
f"{service_name} is not known. This is an error."
f"The validity start of the status of "
f"the the service named {service_name} is not known. "
f"This is an error."
)
base_service_info = BaseServiceInformation(
service_type=_required(
service_info.service_type_identifier, "Service type identifier"
).value,
valid_from=valid_from_date.to_datetime(),
valid_until=next_update_at,
service_name=service_name or "unknown",
service_name=service_name,
provider_certs=tuple(certs),
additional_info_certificate_type=frozenset(asi_qc_type),
other_additional_info=frozenset(additional_info),
Expand All @@ -293,9 +303,53 @@ def _interpret_historical_service_info_for_ca(
)


def _read_service_history(history_items, validity_start, service_name):
errors_encountered = []
item_index_sorted_by_date = sorted(
(
(orig_ix, item.status_starting_time.to_datetime())
for orig_ix, item in enumerate(history_items)
if item.status_starting_time
),
key=lambda t: t[1],
reverse=True,
)
end_of_validity_by_orig_ix = {
orig_ix: next_start
for (orig_ix, cur_start), next_start in zip(
item_index_sorted_by_date,
itertools.chain(
(validity_start.to_datetime(),),
(st for _, st in item_index_sorted_by_date[:-1]),
),
)
}

for orig_ix, validity_end in end_of_validity_by_orig_ix.items():
history_item = history_items[orig_ix]
if history_item.service_status != STATUS_GRANTED:
continue
try:
validity_end = end_of_validity_by_orig_ix[orig_ix]
yield _interpret_historical_service_info_for_ca(
history_item,
next_update_at=validity_end,
)
except TSPServiceParsingError as e:
logger.debug(
f"Failed to parse item {orig_ix + 1} in history "
f"of service {service_name}. This history "
f"entry will not be processed further.",
exc_info=e,
)
errors_encountered.append(e)
return errors_encountered


def _interpret_service_info_for_cas(
services: Iterable[ts_119612.TSPService],
):
errors_encountered = []
for service in services:
service_info = service.service_information
if (
Expand All @@ -304,14 +358,33 @@ def _interpret_service_info_for_cas(
):
continue

service_name = _service_name_from_intl_string(service_info.service_name)
# TODO allow the user to specify if they also want to include
# other statuses (e.g. national level)
# TODO evaluate historical definitions too in case of point-in-time
# work, store that info on the object
if service_info.service_status != STATUS_GRANTED:
continue
# TODO process errors in individual services
yield _interpret_service_info_for_ca(service)
if service_info.service_status == STATUS_GRANTED:
try:
yield _interpret_service_info_for_ca(service)
except TSPServiceParsingError as e:
logger.warning(
f"Failed to process current status "
f"of service {service_name}. This history "
f"entry will not be processed further.",
exc_info=e,
)
errors_encountered.append(e)
continue

validity_start = service_info.status_starting_time
if validity_start and service.service_history:
history_items = service.service_history.service_history_instance
history_errors = yield from _read_service_history(
history_items, validity_start, service_name
)
errors_encountered.extend(history_errors)

return errors_encountered


def _raw_tl_parse(tl_xml: str) -> ts_119612.TrustServiceStatusList:
Expand All @@ -329,10 +402,13 @@ def _raw_tl_parse(tl_xml: str) -> ts_119612.TrustServiceStatusList:
# TODO introduce a similar method for other types of service (TSAs etc)
def read_qualified_certificate_authorities(
tl_xml: str,
) -> Generator[CAServiceInformation, None, None]:
) -> Generator[CAServiceInformation, None, List[TSPServiceParsingError]]:
parse_result = _raw_tl_parse(tl_xml)
tspl = parse_result.trust_service_provider_list
errors_encountered = []
for tsp in _required(tspl, "TSP list").trust_service_provider:
yield from _interpret_service_info_for_cas(
tsp_errors = yield from _interpret_service_info_for_cas(
_required(tsp.tspservices, "TSP services").tspservice
)
errors_encountered.extend(tsp_errors)
return errors_encountered
12 changes: 1 addition & 11 deletions pyhanko/sign/validation/qualified/tsp.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,7 @@
from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime
from typing import (
Dict,
FrozenSet,
Generator,
Iterable,
Optional,
Set,
Tuple,
Union,
)
from typing import Dict, FrozenSet, Generator, Iterable, Optional, Set, Tuple

from asn1crypto import x509
from pyhanko_certvalidator.authority import (
Expand All @@ -36,7 +27,6 @@
'TSPTrustManager',
'QcCertType',
'AdditionalServiceInformation',
'TSPServiceParsingError',
'BaseServiceInformation',
'Qualifier',
'Criterion',
Expand Down
150 changes: 148 additions & 2 deletions pyhanko_tests/test_trusted_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

from pyhanko.generated.etsi import ts_119612
from pyhanko.sign.validation.qualified import assess, eutl_parse, q_status, tsp
from pyhanko.sign.validation.qualified.eutl_parse import (
CA_QC_URI,
STATUS_GRANTED,
)
from pyhanko.sign.validation.settings import KeyUsageConstraints

from .samples import CERTOMANCER
Expand Down Expand Up @@ -50,7 +54,10 @@ def _raw_tlservice_parse(xml: str) -> ts_119612.TSPService:


def test_parse_cas_from_real_tl_smoke_test():
assert len(_read_cas_from_file(TEST_REAL_TL)) == 52
cas_read = _read_cas_from_file(TEST_REAL_TL)
current_cas = [ca for ca in cas_read if not ca.base_info.valid_until]
assert len(current_cas) == 52
assert len(cas_read) == 73


ETSI_NS = 'http://uri.etsi.org'
Expand Down Expand Up @@ -89,7 +96,7 @@ def test_parse_ca_with_unsupported_critical_qualifier():

parse_result = _raw_tlservice_parse(xml)
with pytest.raises(
tsp.TSPServiceParsingError,
eutl_parse.TSPServiceParsingError,
match="critical",
):
eutl_parse._interpret_service_info_for_ca(parse_result)
Expand Down Expand Up @@ -1158,3 +1165,142 @@ async def test_conclude_qualified_convergence():
assessor = assess.QualificationAssessor(tsp_registry=registry)
status = assessor.check_entity_cert_qualified(path)
assert status.qualified


def test_parse_service_history_intervals():
xml = f"""
<TSPService {NAMESPACES}>
<ServiceInformation>
<ServiceName><Name xml:lang="en">Test</Name></ServiceName>
<ServiceTypeIdentifier>{CA_QC_URI}</ServiceTypeIdentifier>
<ServiceStatus>{STATUS_GRANTED}</ServiceStatus>
<ServiceDigitalIdentity/>
<StatusStartingTime>
2020-11-01T00:00:00Z
</StatusStartingTime>
</ServiceInformation>
<ServiceHistory>
<ServiceHistoryInstance>
<ServiceName><Name xml:lang="en">Test</Name></ServiceName>
<ServiceTypeIdentifier>{CA_QC_URI}</ServiceTypeIdentifier>
<ServiceStatus>{STATUS_GRANTED}</ServiceStatus>
<ServiceDigitalIdentity/>
<StatusStartingTime>
2017-11-01T00:00:00Z
</StatusStartingTime>
</ServiceHistoryInstance>
<ServiceHistoryInstance>
<ServiceName><Name xml:lang="en">Test</Name></ServiceName>
<ServiceTypeIdentifier>{CA_QC_URI}</ServiceTypeIdentifier>
<ServiceStatus>{STATUS_GRANTED}</ServiceStatus>
<ServiceDigitalIdentity/>
<StatusStartingTime>
2019-11-01T00:00:00Z
</StatusStartingTime>
</ServiceHistoryInstance>
</ServiceHistory>
</TSPService>
"""

parse_result = _raw_tlservice_parse(xml)
result = eutl_parse._interpret_service_info_for_cas([parse_result])
date1 = datetime(2017, 11, 1, tzinfo=timezone.utc)
date2 = datetime(2019, 11, 1, tzinfo=timezone.utc)
date3 = datetime(2020, 11, 1, tzinfo=timezone.utc)
intervals = [
(r.base_info.valid_from, r.base_info.valid_until) for r in result
]
assert intervals == [(date3, None), (date2, date3), (date1, date2)]


def test_parse_service_history_intervals_skip_not_granted():
xml = f"""
<TSPService {NAMESPACES}>
<ServiceInformation>
<ServiceName><Name xml:lang="en">Test</Name></ServiceName>
<ServiceTypeIdentifier>{CA_QC_URI}</ServiceTypeIdentifier>
<ServiceStatus>{STATUS_GRANTED}</ServiceStatus>
<ServiceDigitalIdentity/>
<StatusStartingTime>
2020-11-01T00:00:00Z
</StatusStartingTime>
</ServiceInformation>
<ServiceHistory>
<ServiceHistoryInstance>
<ServiceName><Name xml:lang="en">Test</Name></ServiceName>
<ServiceTypeIdentifier>{CA_QC_URI}</ServiceTypeIdentifier>
<ServiceStatus>{STATUS_GRANTED}</ServiceStatus>
<ServiceDigitalIdentity/>
<StatusStartingTime>
2017-11-01T00:00:00Z
</StatusStartingTime>
</ServiceHistoryInstance>
<ServiceHistoryInstance>
<ServiceName><Name xml:lang="en">Test</Name></ServiceName>
<ServiceTypeIdentifier>{CA_QC_URI}</ServiceTypeIdentifier>
<ServiceStatus>urn:blah</ServiceStatus>
<ServiceDigitalIdentity/>
<StatusStartingTime>
2019-11-01T00:00:00Z
</StatusStartingTime>
</ServiceHistoryInstance>
</ServiceHistory>
</TSPService>
"""

parse_result = _raw_tlservice_parse(xml)
result = eutl_parse._interpret_service_info_for_cas([parse_result])
date1 = datetime(2017, 11, 1, tzinfo=timezone.utc)
date2 = datetime(2019, 11, 1, tzinfo=timezone.utc)
date3 = datetime(2020, 11, 1, tzinfo=timezone.utc)
intervals = [
(r.base_info.valid_from, r.base_info.valid_until) for r in result
]
assert intervals == [
(date3, None),
# gap where status is not granted
(date1, date2),
]


def test_parse_service_history_intervals_skip_invalid_entries():
xml = f"""
<TSPService {NAMESPACES}>
<ServiceInformation>
<ServiceName><Name xml:lang="en">Test</Name></ServiceName>
<ServiceTypeIdentifier>{CA_QC_URI}</ServiceTypeIdentifier>
<ServiceStatus>{STATUS_GRANTED}</ServiceStatus>
<ServiceDigitalIdentity/>
<StatusStartingTime>
2020-11-01T00:00:00Z
</StatusStartingTime>
</ServiceInformation>
<ServiceHistory>
<ServiceHistoryInstance>
<ServiceName><Name xml:lang="en">Test</Name></ServiceName>
<ServiceTypeIdentifier>{CA_QC_URI}</ServiceTypeIdentifier>
<ServiceStatus>{STATUS_GRANTED}</ServiceStatus>
<ServiceDigitalIdentity/>
<StatusStartingTime>
2017-11-01T00:00:00Z
</StatusStartingTime>
</ServiceHistoryInstance>
<ServiceHistoryInstance>
<ServiceName><Name xml:lang="en">Test</Name></ServiceName>
<ServiceStatus>{STATUS_GRANTED}</ServiceStatus>
<StatusStartingTime>2019-11-01T00:00:00Z</StatusStartingTime>
</ServiceHistoryInstance>
</ServiceHistory>
</TSPService>
"""

parse_result = _raw_tlservice_parse(xml)
result = eutl_parse._interpret_service_info_for_cas([parse_result])
date2 = datetime(2020, 11, 1, tzinfo=timezone.utc)
intervals = [
(r.base_info.valid_from, r.base_info.valid_until) for r in result
]
assert len(intervals) == 2
assert intervals[0] == (date2, None)
# don't assert on second interval for now; let's call
# that one undefined behaviour

0 comments on commit 1b27a63

Please sign in to comment.