diff --git a/examples/hmac_secret.py b/examples/hmac_secret.py index 5937bf2..28bcac8 100644 --- a/examples/hmac_secret.py +++ b/examples/hmac_secret.py @@ -39,7 +39,6 @@ from fido2.client import Fido2Client, WindowsClient from fido2.ctap2.extensions import HmacSecretExtension from exampleutils import CliInteraction -from functools import partial import ctypes import sys import os @@ -74,7 +73,7 @@ def enumerate_devices(): user_interaction=CliInteraction(), # By default only the PRF extension is allowed, we need to explicitly # configure the client to allow hmac-secret - extension_types=[partial(HmacSecretExtension, allow_hmac_secret=True)], + extensions=[HmacSecretExtension(allow_hmac_secret=True)], ) if "hmac-secret" in client.info.extensions: break diff --git a/fido2/client.py b/fido2/client.py index 36ad6cd..ded74e1 100644 --- a/fido2/client.py +++ b/fido2/client.py @@ -32,11 +32,16 @@ from .ctap1 import Ctap1, APDU, ApduError from .ctap2 import Ctap2, AssertionResponse, Info from .ctap2.pin import ClientPin, PinProtocol -from .ctap2.extensions import Ctap2Extension, ClientExtensionOutputs +from .ctap2.extensions import ( + Ctap2Extension, + ClientExtensionOutputs, + AuthenticationExtensionProcessor, +) from .webauthn import ( Aaguid, AttestationObject, CollectedClientData, + PublicKeyCredentialRpEntity, PublicKeyCredentialDescriptor, PublicKeyCredentialCreationOptions, PublicKeyCredentialRequestOptions, @@ -45,6 +50,7 @@ AuthenticatorAttestationResponse, AuthenticatorAssertionResponse, AttestationConveyancePreference, + ResidentKeyRequirement, _as_cbor, ) from .cose import ES256 @@ -52,7 +58,6 @@ from .utils import sha256 from enum import IntEnum, unique from urllib.parse import urlparse -from dataclasses import replace from threading import Timer, Event from typing import ( Type, @@ -61,6 +66,7 @@ Optional, Mapping, Sequence, + Tuple, ) import abc @@ -298,11 +304,25 @@ def selection(self, event: Optional[Event]) -> None: raise NotImplementedError() @abc.abstractmethod - def do_make_credential(self, *args) -> AuthenticatorAttestationResponse: + def do_make_credential( + self, + options: PublicKeyCredentialCreationOptions, + client_data: CollectedClientData, + rp: PublicKeyCredentialRpEntity, + rp_id: str, + enterprise_rpid_list: Optional[Sequence[str]], + event: Event, + ) -> AuthenticatorAttestationResponse: raise NotImplementedError() @abc.abstractmethod - def do_get_assertion(self, *args) -> AssertionSelection: + def do_get_assertion( + self, + options: PublicKeyCredentialRequestOptions, + client_data: CollectedClientData, + rp_id: str, + event: Event, + ) -> AssertionSelection: raise NotImplementedError() @@ -328,6 +348,7 @@ def do_make_credential( options, client_data, rp, + rp_id, enterprise_rpid_list, event, ): @@ -345,7 +366,7 @@ def do_make_credential( ): raise CtapError(CtapError.ERR.UNSUPPORTED_OPTION) - app_param = sha256(rp.id.encode()) + app_param = sha256(rp_id.encode()) dummy_param = b"\0" * 32 for cred in exclude_list or []: @@ -386,9 +407,9 @@ def do_get_assertion( self, options, client_data, + rp_id, event, ): - rp_id = options.rp_id allow_list = options.allow_credentials user_verification = options.user_verification @@ -421,24 +442,20 @@ def __init__( self, client_data: CollectedClientData, assertions: Sequence[AssertionResponse], - extensions: Sequence[Ctap2Extension], - pin_token: Optional[str], - pin_protocol: Optional[PinProtocol], + extensions: Sequence[AuthenticationExtensionProcessor], + pin_token: Optional[bytes], ): super().__init__(client_data, assertions) self._extensions = extensions self._pin_token = pin_token - self._pin_protocol = pin_protocol def _get_extension_results(self, assertion): # Process extenstion outputs extension_outputs = {} try: for ext in self._extensions: - output = ext.process_get_output( - assertion, self._pin_token, self._pin_protocol - ) - if output is not None: + output = ext.prepare_outputs(assertion, self._pin_token) + if output: extension_outputs.update(output) except ValueError as e: raise ClientError.ERR.CONFIGURATION_UNSUPPORTED(e) @@ -456,13 +473,20 @@ def __init__( self, device: CtapDevice, user_interaction: UserInteraction, - extensions: Sequence[Type[Ctap2Extension]], + extension_types: Sequence[Type[Ctap2Extension]], + extensions: Sequence[Ctap2Extension], ): self.ctap2 = Ctap2(device) self.info = self.ctap2.info - self.extensions = extensions + self._extension_types = extension_types + self._extensions = extensions self.user_interaction = user_interaction + def _get_extensions(self) -> Sequence[Ctap2Extension]: + if self._extensions: + return self._extensions + return [ext(self.ctap2) for ext in self._extension_types] + def _filter_creds( self, rp_id, cred_list, pin_protocol, pin_token, event, on_keepalive ): @@ -592,15 +616,14 @@ def _get_token( ) def _get_auth_params( - self, rp_id, user_verification, permissions, event, on_keepalive + self, pin_protocol, rp_id, user_verification, permissions, event, on_keepalive ): self.info = self.ctap2.get_info() # Make sure we have "fresh" info - pin_protocol = None pin_token = None internal_uv = False if self._should_use_uv(user_verification, permissions): - client_pin = ClientPin(self.ctap2) + client_pin = ClientPin(self.ctap2, pin_protocol) allow_internal_uv = ( permissions & ~( @@ -612,26 +635,23 @@ def _get_auth_params( pin_token = self._get_token( client_pin, permissions, rp_id, event, on_keepalive, allow_internal_uv ) - if pin_token: - pin_protocol = client_pin.protocol - else: + if not pin_token: internal_uv = True - return pin_protocol, pin_token, internal_uv + return pin_token, internal_uv def do_make_credential( self, options, client_data, rp, + rp_id, enterprise_rpid_list, event, ): user = options.user key_params = options.pub_key_cred_params exclude_list = options.exclude_credentials - extensions = options.extensions selection = options.authenticator_selection or AuthenticatorSelectionCriteria() - rk = selection.require_resident_key user_verification = selection.user_verification on_keepalive = _user_keepalive(self.user_interaction) @@ -642,36 +662,43 @@ def do_make_credential( if self.info.options.get("ep"): if enterprise_rpid_list is not None: # Platform facilitated - if rp.id in enterprise_rpid_list: + if rp_id in enterprise_rpid_list: enterprise_attestation = 2 else: # Vendor facilitated enterprise_attestation = 1 - # Gather up permissions + # Negotiate PIN/UV protocol version + for proto in ClientPin.PROTOCOLS: + if proto.VERSION in self.info.pin_uv_protocols: + pin_protocol: Optional[PinProtocol] = proto() + break + else: + pin_protocol = None + + # Gather UV permissions permissions = ClientPin.PERMISSION.MAKE_CREDENTIAL if exclude_list: # We need this for filtering the exclude_list permissions |= ClientPin.PERMISSION.GET_ASSERTION - # Get extension permissions - extension_instances = [cls(self.ctap2) for cls in self.extensions] + # Initialize extensions and add extension permissions used_extensions = [] - client_inputs = extensions or {} - for ext in extension_instances: - # TODO: Move options to the constructor instead - ext._create_options = options - permissions |= ext.get_create_permissions(client_inputs) + for e in self._get_extensions(): + ext = e.make_credential(self.ctap2, options, pin_protocol) + if ext: + used_extensions.append(ext) + permissions |= ext.permissions def _do_make(): # Handle auth - pin_protocol, pin_token, internal_uv = self._get_auth_params( - rp.id, user_verification, permissions, event, on_keepalive + pin_token, internal_uv = self._get_auth_params( + pin_protocol, rp_id, user_verification, permissions, event, on_keepalive ) if exclude_list: exclude_cred = self._filter_creds( - rp.id, exclude_list, pin_protocol, pin_token, event, on_keepalive + rp_id, exclude_list, pin_protocol, pin_token, event, on_keepalive ) # We know the request will fail if exclude_cred is not None here # BUT DO NOT FAIL EARLY! We still need to prompt for UP, so we keep @@ -682,32 +709,40 @@ def _do_make(): # Process extensions extension_inputs = {} try: - for ext in extension_instances: - auth_input = ext.process_create_input(client_inputs) - if auth_input is not None: - used_extensions.append(ext) - extension_inputs[ext.NAME] = auth_input - elif ext._used: - # TODO: Make this cleaner - used_extensions.append(ext) + for ext in used_extensions: + auth_input = ext.prepare_inputs(pin_token) + if auth_input: + extension_inputs.update(auth_input) except ValueError as e: raise ClientError.ERR.CONFIGURATION_UNSUPPORTED(e) + can_rk = self.info.options.get("rk") + rk = selection.resident_key == ResidentKeyRequirement.REQUIRED or ( + selection.resident_key == ResidentKeyRequirement.PREFERRED and can_rk + ) + if not (rk or internal_uv): options = None else: options = {} if rk: + if not can_rk: + raise ClientError.ERR.CONFIGURATION_UNSUPPORTED( + "Resident key not supported" + ) options["rk"] = True if internal_uv: options["uv"] = True # Calculate pin_auth client_data_hash = client_data.hash - if pin_token: - pin_auth = pin_protocol.authenticate(pin_token, client_data_hash) + if pin_protocol and pin_token: + pin_auth: Tuple[Optional[bytes], Optional[int]] = ( + pin_protocol.authenticate(pin_token, client_data_hash), + pin_protocol.VERSION, + ) else: - pin_auth = None + pin_auth = (None, None) # Perform make credential return ( @@ -719,13 +754,11 @@ def _do_make(): [_as_cbor(exclude_cred)] if exclude_cred else None, extension_inputs or None, options, - pin_auth, - pin_protocol.VERSION if pin_protocol else None, + *pin_auth, enterprise_attestation, event=event, on_keepalive=on_keepalive, ), - pin_protocol, pin_token, ) @@ -733,7 +766,7 @@ def _do_make(): reconnected = False while True: try: - att_obj, pin_protocol, pin_token = _do_make() + att_obj, pin_token = _do_make() break except CtapError as e: # The Authenticator may still require UV, try again @@ -759,7 +792,7 @@ def _do_make(): extension_outputs = {} try: for ext in used_extensions: - output = ext.process_create_output(att_obj, pin_token, pin_protocol) + output = ext.prepare_outputs(att_obj, pin_token) if output is not None: extension_outputs.update(output) except ValueError as e: @@ -775,30 +808,38 @@ def do_get_assertion( self, options, client_data, + rp_id, event, ): rp_id = options.rp_id allow_list = options.allow_credentials - extensions = options.extensions user_verification = options.user_verification on_keepalive = _user_keepalive(self.user_interaction) - # Gather up permissions + # Negotiate PIN/UV protocol version + for proto in ClientPin.PROTOCOLS: + if proto.VERSION in self.info.pin_uv_protocols: + pin_protocol: Optional[PinProtocol] = proto() + break + else: + pin_protocol = None + + # Gather UV permissions permissions = ClientPin.PERMISSION.GET_ASSERTION - # Get extension permissions - extension_instances = [cls(self.ctap2) for cls in self.extensions] - client_inputs = extensions or {} - for ext in extension_instances: - # TODO: Move options to get_get_permissions and process_get_input - ext._get_options = options - permissions |= ext.get_get_permissions(client_inputs) + # Initialize extensions and add extension permissions + used_extensions = [] + for e in self._get_extensions(): + ext = e.get_assertion(self.ctap2, options, pin_protocol) + if ext: + used_extensions.append(ext) + permissions |= ext.permissions def _do_auth(): # Handle auth - pin_protocol, pin_token, internal_uv = self._get_auth_params( - rp_id, user_verification, permissions, event, on_keepalive + pin_token, internal_uv = self._get_auth_params( + pin_protocol, rp_id, user_verification, permissions, event, on_keepalive ) if allow_list: @@ -813,18 +854,11 @@ def _do_auth(): # Process extensions extension_inputs = {} - used_extensions = [] try: - for ext in extension_instances: - # TODO: Move to process_get_input() - ext._selected = selected_cred - auth_input = ext.process_get_input(client_inputs) - if auth_input is not None: - used_extensions.append(ext) - extension_inputs[ext.NAME] = auth_input - elif ext._used: - # TODO: Make this cleaner - used_extensions.append(ext) + for ext in used_extensions: + inputs = ext.prepare_inputs(selected_cred, pin_token) + if inputs: + extension_inputs.update(inputs) except ValueError as e: raise ClientError.ERR.CONFIGURATION_UNSUPPORTED(e) @@ -832,10 +866,13 @@ def _do_auth(): # Calculate pin_auth client_data_hash = client_data.hash - if pin_token: - pin_auth = pin_protocol.authenticate(pin_token, client_data_hash) + if pin_protocol and pin_token: + pin_auth: Tuple[Optional[bytes], Optional[int]] = ( + pin_protocol.authenticate(pin_token, client_data_hash), + pin_protocol.VERSION, + ) else: - pin_auth = None + pin_auth = (None, None) if allow_list and not selected_cred: # We still need to send a dummy value if there was an allow_list @@ -849,18 +886,13 @@ def _do_auth(): [_as_cbor(selected_cred)] if selected_cred else None, extension_inputs or None, options, - pin_auth, - pin_protocol.VERSION if pin_protocol else None, + *pin_auth, event=event, on_keepalive=on_keepalive, ) return _Ctap2ClientAssertionSelection( - client_data, - assertions, - used_extensions, - pin_token, - pin_protocol, + client_data, assertions, used_extensions, pin_token ) dev = self.ctap2.device @@ -905,8 +937,10 @@ def __init__( device: CtapDevice, origin: str, verify: Callable[[str, str], bool] = verify_rp_id, + # TODO 2.0: Replace extension_types with extensions extension_types: Sequence[Type[Ctap2Extension]] = _default_extensions(), user_interaction: UserInteraction = UserInteraction(), + extensions: Sequence[Ctap2Extension] = [], ): super().__init__(origin, verify) @@ -915,7 +949,7 @@ def __init__( try: self._backend: _ClientBackend = _Ctap2ClientBackend( - device, user_interaction, extension_types + device, user_interaction, extension_types, extensions ) except (ValueError, CtapError): self._backend = _Ctap1ClientBackend(device, user_interaction) @@ -930,6 +964,17 @@ def selection(self, event: Optional[Event] = None) -> None: except CtapError as e: raise _ctap2client_err(e) + def _get_rp_id(self, rp_id: Optional[str]) -> str: + if rp_id is None: + url = urlparse(self.origin) + if url.scheme != "https" or not url.netloc: + raise ClientError.ERR.BAD_REQUEST( + "RP ID required for non-https origin." + ) + return url.netloc + else: + return rp_id + def make_credential( self, options: PublicKeyCredentialCreationOptions, @@ -949,16 +994,10 @@ def make_credential( timer.start() rp = options.rp - if rp.id is None: - url = urlparse(self.origin) - if url.scheme != "https" or not url.netloc: - raise ClientError.ERR.BAD_REQUEST( - "RP ID required for non-https origin." - ) - rp = replace(rp, id=url.netloc) + rp_id = self._get_rp_id(rp.id) - logger.debug(f"Register a new credential for RP ID: {rp.id}") - self._verify_rp_id(rp.id) + logger.debug(f"Register a new credential for RP ID: {rp_id}") + self._verify_rp_id(rp_id) client_data = self._build_client_data( CollectedClientData.TYPE.CREATE, options.challenge @@ -969,6 +1008,7 @@ def make_credential( options, client_data, rp, + rp_id, self._enterprise_rpid_list, event, ) @@ -996,8 +1036,9 @@ def get_assertion( timer.daemon = True timer.start() - logger.debug(f"Assert a credential for RP ID: {options.rp_id}") - self._verify_rp_id(options.rp_id) + rp_id = self._get_rp_id(options.rp_id) + logger.debug(f"Assert a credential for RP ID: {rp_id}") + self._verify_rp_id(rp_id) client_data = self._build_client_data( CollectedClientData.TYPE.GET, options.challenge @@ -1007,6 +1048,7 @@ def get_assertion( return self._backend.do_get_assertion( options, client_data, + rp_id, event, ) except CtapError as e: diff --git a/fido2/ctap2/extensions.py b/fido2/ctap2/extensions.py index b2df7ca..ae4406b 100644 --- a/fido2/ctap2/extensions.py +++ b/fido2/ctap2/extensions.py @@ -36,6 +36,7 @@ PublicKeyCredentialCreationOptions, PublicKeyCredentialRequestOptions, AuthenticatorSelectionCriteria, + ResidentKeyRequirement, ) from enum import Enum, unique from dataclasses import dataclass @@ -45,6 +46,14 @@ class ClientExtensionOutputs(Mapping[str, Any]): + """Holds extension output from a call to MakeCredential or GetAssertion. + + When accessed as a dict, all bytes values will be serialized to base64url encoding, + capable of being serialized to JSON. + + When accessed using attributes, richer types will instead be returned. + """ + def __init__(self, outputs: Mapping[str, Any]): self._members = {k: v for k, v in outputs.items() if v is not None} @@ -69,26 +78,178 @@ def __repr__(self): return repr(dict(self)) +class ExtensionProcessor(abc.ABC): + def __init__( + self, + permissions: ClientPin.PERMISSION = ClientPin.PERMISSION(0), + inputs: Optional[Dict[str, Any]] = None, + outputs: Optional[Dict[str, Any]] = None, + ): + self.permissions = permissions + self._inputs = inputs + self._outputs = outputs + + +class RegistrationExtensionProcessor(ExtensionProcessor): + """Processing state for a CTAP2 extension, for single use. + + The ExtensionProcessor holds state and logic for client processing of an extension, + for a registration (MakeCredential) call. + + :param permissions: PinUvAuthToken permissions required by the extension. + """ + + def prepare_inputs(self, pin_token: Optional[bytes]) -> Optional[Dict[str, Any]]: + "Prepare authenticator extension inputs, to be passed to the Authenenticator." + return self._inputs + + def prepare_outputs( + self, + response: AttestationResponse, + pin_token: Optional[bytes], + ) -> Optional[Dict[str, Any]]: + "Prepare client extension outputs, to be returned to the caller." + return self._outputs + + +class AuthenticationExtensionProcessor(ExtensionProcessor): + """Processing state for a CTAP2 extension, for single use. + + The ExtensionProcessor holds state and logic for client processing of an extension, + for an authentication (GetAssertion) call. + + :param permissions: PinUvAuthToken permissions required by the extension. + """ + + def prepare_inputs( + self, + selected: Optional[PublicKeyCredentialDescriptor], + pin_token: Optional[bytes], + ) -> Optional[Dict[str, Any]]: + "Prepare authenticator extension inputs, to be passed to the Authenenticator." + return self._inputs + + def prepare_outputs( + self, + response: AssertionResponse, + pin_token: Optional[bytes], + ) -> Optional[Dict[str, Any]]: + "Prepare client extension outputs, to be returned to the caller." + return self._outputs + + +# TODO 2.0: Make changes as described below class Ctap2Extension(abc.ABC): - """Base class for Ctap2 extensions. + """Base class for CTAP2 extensions. + + As of python-fido2 1.2 these instances can be used for multiple requests and + should be invoked via the make_credential and get_assertion methods. Subclasses are instantiated for a single request, if the Authenticator supports the extension. + + From python-fido2 2.0 the following methods will be fully removed: + get_create_permissions, process_create_input, process_create_output, + process_create_input_with_permissions, + get_get_permissions, process_get_input, process_get_output, + process_get_input_with_permissions. + + The following changes will also be made: + __init__() will no longer allow passing a ctap2 instance. + is_supported() will require a ctap2 instance to be passed. """ NAME: str = None # type: ignore - def __init__(self, ctap: Ctap2): - self.ctap = ctap - # TODO: Pass options and selected to the various methods that need them instead - self._create_options: PublicKeyCredentialCreationOptions - self._get_options: PublicKeyCredentialRequestOptions - self._selected: Optional[PublicKeyCredentialDescriptor] - self._used = False + def __init__(self, ctap: Optional[Ctap2] = None): + self._ctap = ctap + + @property + def ctap(self) -> Ctap2: + ctap = self._ctap + if not ctap: + raise ValueError( + "Accessed self.ctap when no ctap instance has been passed to __init__" + ) + return ctap - def is_supported(self) -> bool: + def is_supported(self, ctap: Optional[Ctap2] = None) -> bool: """Whether or not the extension is supported by the authenticator.""" - return self.NAME in self.ctap.info.extensions + if not ctap: + warnings.warn( + "Calling is_supported without a Ctap2 instance is deprecated.", + DeprecationWarning, + ) + ctap = ctap or self._ctap + if not ctap: + raise ValueError("No Ctap2 instance available") + return self.NAME in ctap.info.extensions + def make_credential( + self, + ctap: Ctap2, + options: PublicKeyCredentialCreationOptions, + pin_protocol: Optional[PinProtocol], + ) -> Optional[RegistrationExtensionProcessor]: + """Start client extension processing for registration.""" + # This implementation is for LEGACY PURPOSES! + # Subclasses should override this method instead of: + # process_create_input, process_create_output, and get_create_permissions + warnings.warn( + "This extension does not override make_credential, which is deprecated.", + DeprecationWarning, + ) + inputs = dict(options.extensions or {}) + self._ctap = ctap + ext = self + + class Processor(RegistrationExtensionProcessor): + def prepare_inputs(self, pin_token): + processed = ext.process_create_input(inputs) + self._has_input = processed is not None + return {ext.NAME: processed} if self._has_input else None + + def prepare_outputs(self, response, pin_token): + if self._has_input: + processed = ext.process_create_output( + response, pin_token, pin_protocol + ) + return processed + + return Processor(self.get_create_permissions(inputs)) + + def get_assertion( + self, + ctap: Ctap2, + options: PublicKeyCredentialRequestOptions, + pin_protocol: Optional[PinProtocol], + ) -> Optional[AuthenticationExtensionProcessor]: + """Start client extension processing for authentication.""" + # This implementation is for LEGACY PURPOSES! + # Subclasses should override this method instead of: + # process_get_input, process_get_output, and get_get_permissions + warnings.warn( + "This extension does not override get_assertion, which is deprecated.", + DeprecationWarning, + ) + inputs = dict(options.extensions or {}) + self._ctap = ctap + ext = self + + class Processor(AuthenticationExtensionProcessor): + _has_input: bool + + def prepare_inputs(self, selected, pin_token): + processed = ext.process_get_input(inputs) + self._has_input = processed is not None + return {ext.NAME: processed} if self._has_input else None + + def prepare_outputs(self, response, pin_token): + if self._has_input: + return ext.process_get_output(response, pin_token, pin_protocol) + + return Processor(self.get_get_permissions(inputs)) + + # TODO 2.0: Remove the remaining methods of this class def get_create_permissions(self, inputs: Dict[str, Any]) -> ClientPin.PERMISSION: return ClientPin.PERMISSION(0) @@ -102,7 +263,7 @@ def process_create_input_with_permissions( self, inputs: Dict[str, Any] ) -> Tuple[Any, ClientPin.PERMISSION]: warnings.warn( - "This method is deprecated, use get_create_permissions.", DeprecationWarning + "This method is deprecated, use make_credential().", DeprecationWarning ) return self.process_create_input(inputs), self.get_create_permissions(inputs) @@ -110,7 +271,7 @@ def process_create_input_with_permissions( def process_create_output( self, attestation_response: AttestationResponse, - token: Optional[str], + token: Optional[bytes], pin_protocol: Optional[PinProtocol], ) -> Optional[Dict[str, Any]]: """Return client extension output given attestation_response, or None.""" @@ -129,14 +290,14 @@ def process_get_input_with_permissions( self, inputs: Dict[str, Any] ) -> Tuple[Any, ClientPin.PERMISSION]: warnings.warn( - "This method is deprecated, use get_get_permissions.", DeprecationWarning + "This method is deprecated, use get_assertion().", DeprecationWarning ) return self.process_get_input(inputs), self.get_get_permissions(inputs) def process_get_output( self, assertion_response: AssertionResponse, - token: Optional[str], + token: Optional[bytes], pin_protocol: Optional[PinProtocol], ) -> Optional[Dict[str, Any]]: """Return client extension output given assertion_response, or None.""" @@ -185,63 +346,145 @@ class HmacSecretExtension(Ctap2Extension): NAME = "hmac-secret" SALT_LEN = 32 - def __init__(self, ctap, pin_protocol=None, allow_hmac_secret=False): + def __init__(self, ctap=None, pin_protocol=None, allow_hmac_secret=False): super().__init__(ctap) + if pin_protocol: + warnings.warn( + "Initializing HmacSecretExtension with pin_protocol is deprecated, " + "pin_protocol will be ignored.", + DeprecationWarning, + ) self.pin_protocol = pin_protocol self._allow_hmac_secret = allow_hmac_secret + def make_credential(self, ctap, options, pin_protocol): + inputs = options.extensions or {} + prf = inputs.get("prf") is not None + hmac = self._allow_hmac_secret and inputs.get("hmacCreateSecret") is True + if self.is_supported(ctap) and (prf or hmac): + + class Processor(RegistrationExtensionProcessor): + def prepare_inputs(self, pin_token): + return {HmacSecretExtension.NAME: True} + + def prepare_outputs(self, response, pin_token): + extensions = response.auth_data.extensions or {} + enabled = extensions.get(HmacSecretExtension.NAME, False) + if prf: + return {"prf": _PrfOutputs(enabled=enabled)} + else: + return {"hmacCreateSecret": enabled} + + return Processor() + + def get_assertion(self, ctap, options, pin_protocol): + inputs = options.extensions or {} + prf = _PrfInputs.from_dict(inputs.get("prf")) + hmac = ( + _HmacGetSecretInput.from_dict(inputs.get("hmacGetSecret")) + if self._allow_hmac_secret + else None + ) + + if pin_protocol and self.is_supported(ctap) and (prf or hmac): + client_pin = ClientPin(ctap, pin_protocol) + key_agreement, shared_secret = client_pin._get_shared_secret() + + class Processing(AuthenticationExtensionProcessor): + def prepare_inputs(self, selected, pin_token): + if prf: + secrets = prf.eval + by_creds = prf.eval_by_credential + if by_creds: + # Make sure all keys are valid IDs from allow_credentials + allow_list = options.allow_credentials + if not allow_list: + raise ValueError( + "evalByCredentials requires allowCredentials" + ) + ids = {websafe_encode(c.id) for c in allow_list} + if not ids.issuperset(by_creds): + raise ValueError( + "evalByCredentials contains invalid key" + ) + if selected: + key = websafe_encode(selected.id) + if key in by_creds: + secrets = by_creds[key] + + if not secrets: + return + + salts = ( + _prf_salt(secrets.first), + ( + _prf_salt(secrets.second) + if secrets.second is not None + else b"" + ), + ) + else: + assert hmac is not None # nosec + salts = hmac.salt1, hmac.salt2 or b"" + + if not ( + len(salts[0]) == HmacSecretExtension.SALT_LEN + and ( + not salts[1] + or len(salts[1]) == HmacSecretExtension.SALT_LEN + ) + ): + raise ValueError("Invalid salt length") + + salt_enc = pin_protocol.encrypt(shared_secret, salts[0] + salts[1]) + salt_auth = pin_protocol.authenticate(shared_secret, salt_enc) + + return { + HmacSecretExtension.NAME: { + 1: key_agreement, + 2: salt_enc, + 3: salt_auth, + 4: pin_protocol.VERSION, + } + } + + def prepare_outputs(self, response, pin_token): + extensions = response.auth_data.extensions or {} + value = extensions.get(HmacSecretExtension.NAME) + + if value: + decrypted = client_pin.protocol.decrypt(shared_secret, value) + output1 = decrypted[: HmacSecretExtension.SALT_LEN] + output2 = decrypted[HmacSecretExtension.SALT_LEN :] or None + else: + return None + + if prf: + return { + "prf": _PrfOutputs(results=_PrfValues(output1, output2)) + } + else: + return {"hmacGetSecret": _HmacGetSecretOutput(output1, output2)} + + return Processing() + + # TODO 2.0: Remove the remaining methods of this class def process_create_input(self, inputs): - if self.is_supported(): - if inputs.get("hmacCreateSecret") is True and self._allow_hmac_secret: - self.prf = False - return True - elif inputs.get("prf") is not None: - self.prf = True - return True - - def process_create_output(self, attestation_response, *args): - enabled = attestation_response.auth_data.extensions.get(self.NAME, False) - if self.prf: - return {"prf": _PrfOutputs(enabled=enabled)} + if self.is_supported() and inputs.get("hmacCreateSecret") is True: + return True - else: - return {"hmacCreateSecret": enabled} + def process_create_output(self, attestation_response, *args, **kwargs): + enabled = attestation_response.auth_data.extensions.get(self.NAME, False) + return {"hmacCreateSecret": enabled} def process_get_input(self, inputs): if not self.is_supported(): return - prf = _PrfInputs.from_dict(inputs.get("prf")) - if prf: - secrets = prf.eval - by_creds = prf.eval_by_credential - if by_creds: - # Make sure all keys are valid IDs from allow_credentials - allow_list = self._get_options.allow_credentials - if not allow_list: - raise ValueError("evalByCredentials requires allowCredentials") - ids = {websafe_encode(c.id) for c in allow_list} - if not ids.issuperset(by_creds): - raise ValueError("evalByCredentials contains invalid key") - if self._selected: - key = websafe_encode(self._selected.id) - if key in by_creds: - secrets = by_creds[key] - - if not secrets: - return - - salts = ( - _prf_salt(secrets.first), - _prf_salt(secrets.second) if secrets.second is not None else b"", - ) - self.prf = True - else: - get_secret = _HmacGetSecretInput.from_dict(inputs.get("hmacGetSecret")) - if not get_secret or not self._allow_hmac_secret: - return - salts = get_secret.salt1, get_secret.salt2 or b"" - self.prf = False + get_secret = _HmacGetSecretInput.from_dict(inputs.get("hmacGetSecret")) + if not get_secret: + return + salts = get_secret.salt1, get_secret.salt2 or b"" if not ( len(salts[0]) == HmacSecretExtension.SALT_LEN @@ -249,7 +492,9 @@ def process_get_input(self, inputs): ): raise ValueError("Invalid salt length") - client_pin = ClientPin(self.ctap, self.pin_protocol) + if not self._ctap: + raise ValueError("No Ctap2 instance available") + client_pin = ClientPin(self._ctap, self.pin_protocol) key_agreement, self.shared_secret = client_pin._get_shared_secret() if self.pin_protocol is None: self.pin_protocol = client_pin.protocol @@ -264,17 +509,14 @@ def process_get_input(self, inputs): 4: self.pin_protocol.VERSION, } - def process_get_output(self, assertion_response, *args): + def process_get_output(self, assertion_response, *args, **kwargs): value = assertion_response.auth_data.extensions.get(self.NAME) + assert self.pin_protocol is not None # nosec decrypted = self.pin_protocol.decrypt(self.shared_secret, value) output1 = decrypted[: HmacSecretExtension.SALT_LEN] output2 = decrypted[HmacSecretExtension.SALT_LEN :] or None - - if self.prf: - return {"prf": _PrfOutputs(results=_PrfValues(output1, output2))} - else: - return {"hmacGetSecret": _HmacGetSecretOutput(output1, output2)} + return {"hmacGetSecret": _HmacGetSecretOutput(output1, output2)} @dataclass(eq=False, frozen=True) @@ -298,9 +540,65 @@ class LargeBlobExtension(Ctap2Extension): NAME = "largeBlobKey" - def is_supported(self): - return super().is_supported() and self.ctap.info.options.get("largeBlobs") + def is_supported(self, ctap=None): + ctap = ctap or self._ctap + assert ctap is not None # nosec + return super().is_supported(ctap) and ctap.info.options.get("largeBlobs", False) + + def make_credential(self, ctap, options, pin_protocol): + inputs = options.extensions or {} + data = _LargeBlobInputs.from_dict(inputs.get("largeBlob")) + if data: + if data.read or data.write: + raise ValueError("Invalid set of parameters") + if data.support == "required" and not self.is_supported(ctap): + raise ValueError("Authenticator does not support large blob storage") + + class Processor(RegistrationExtensionProcessor): + def prepare_inputs(self, pin_token): + return {LargeBlobExtension.NAME: True} + + def prepare_outputs(self, response, pin_token): + return { + "largeBlob": _LargeBlobOutputs( + supported=response.large_blob_key is not None + ) + } + + return Processor() + + def get_assertion(self, ctap, options, pin_protocol): + inputs = options.extensions or {} + data = _LargeBlobInputs.from_dict(inputs.get("largeBlob")) + if data: + if data.support or (data.read and data.write): + raise ValueError("Invalid set of parameters") + if not self.is_supported(ctap): + raise ValueError("Authenticator does not support large blob storage") + + class Processor(AuthenticationExtensionProcessor): + def prepare_outputs(self, response, pin_token): + blob_key = response.large_blob_key + if blob_key: + if data.read: + large_blobs = LargeBlobs(ctap) + blob = large_blobs.get_blob(blob_key) + return {"largeBlob": _LargeBlobOutputs(blob=blob)} + elif data.write: + large_blobs = LargeBlobs(ctap, pin_protocol, pin_token) + large_blobs.put_blob(blob_key, data.write) + return {"largeBlob": _LargeBlobOutputs(written=True)} + + return Processor( + ( + ClientPin.PERMISSION.LARGE_BLOB_WRITE + if data.write + else ClientPin.PERMISSION(0) + ), + inputs={LargeBlobExtension.NAME: True}, + ) + # TODO 2.0: Remove the remaining methods of this class def process_create_input(self, inputs): data = _LargeBlobInputs.from_dict(inputs.get("largeBlob")) if data: @@ -310,7 +608,7 @@ def process_create_input(self, inputs): raise ValueError("Authenticator does not support large blob storage") return True - def process_create_output(self, attestation_response, *args): + def process_create_output(self, attestation_response, *args, **kwargs): return { "largeBlob": _LargeBlobOutputs( supported=attestation_response.large_blob_key is not None @@ -338,15 +636,15 @@ def process_get_input(self, inputs): def process_get_output(self, assertion_response, token, pin_protocol): blob_key = assertion_response.large_blob_key - if self._action is True: # Read - large_blobs = LargeBlobs(self.ctap) - blob = large_blobs.get_blob(blob_key) - return {"largeBlob": _LargeBlobOutputs(blob=blob)} - - elif self._action: # Write - large_blobs = LargeBlobs(self.ctap, pin_protocol, token) - large_blobs.put_blob(blob_key, self._action) - return {"largeBlob": _LargeBlobOutputs(written=True)} + if blob_key: + if self._action is True: # Read + large_blobs = LargeBlobs(self.ctap) + blob = large_blobs.get_blob(blob_key) + return {"largeBlob": _LargeBlobOutputs(blob=blob)} + elif self._action: # Write + large_blobs = LargeBlobs(self.ctap, pin_protocol, token) + large_blobs.put_blob(blob_key, self._action) + return {"largeBlob": _LargeBlobOutputs(written=True)} class CredBlobExtension(Ctap2Extension): @@ -356,6 +654,20 @@ class CredBlobExtension(Ctap2Extension): NAME = "credBlob" + def make_credential(self, ctap, options, pin_protocol): + inputs = options.extensions or {} + if self.is_supported(): + blob = inputs.get("credBlob") + assert ctap.info.max_cred_blob_length is not None # nosec + if blob and len(blob) <= ctap.info.max_cred_blob_length: + return RegistrationExtensionProcessor(inputs={self.NAME: blob}) + + def get_assertion(self, ctap, options, pin_protocol): + inputs = options.extensions or {} + if self.is_supported(ctap) and inputs.get("getCredBlob") is True: + return AuthenticationExtensionProcessor(inputs={self.NAME: True}) + + # TODO 2.0: Remove the remaining methods of this class def process_create_input(self, inputs): if self.is_supported(): blob = inputs.get("credBlob") @@ -382,6 +694,20 @@ class POLICY(Enum): ALWAYS_RUN = True NAME = "credProtect" + def make_credential(self, ctap, options, pin_protocol): + inputs = options.extensions or {} + policy = inputs.get("credentialProtectionPolicy") + if policy: + index = list(CredProtectExtension.POLICY).index( + CredProtectExtension.POLICY(policy) + ) + enforce = inputs.get("enforceCredentialProtectionPolicy", False) + if enforce and not self.is_supported(ctap) and index > 0: + raise ValueError("Authenticator does not support Credential Protection") + + return RegistrationExtensionProcessor(inputs={self.NAME: index + 1}) + + # TODO 2.0: Remove the remaining methods of this class def process_create_input(self, inputs): policy = inputs.get("credentialProtectionPolicy") if policy: @@ -401,9 +727,18 @@ class MinPinLengthExtension(Ctap2Extension): NAME = "minPinLength" - def is_supported(self): # NB: There is no key in the extensions field. - return "setMinPINLength" in self.ctap.info.options + def is_supported(self, ctap=None): + # NB: There is no key in the extensions field. + ctap = ctap or self._ctap + assert ctap is not None # nosec + return "setMinPINLength" in ctap.info.options + + def make_credential(self, ctap, options, pin_protocol): + inputs = options.extensions or {} + if self.is_supported(ctap) and inputs.get(self.NAME) is True: + return RegistrationExtensionProcessor(inputs={self.NAME: True}) + # TODO 2.0: Remove the remaining methods of this class def process_create_input(self, inputs): if self.is_supported() and inputs.get(self.NAME) is True: return True @@ -421,19 +756,20 @@ class CredPropsExtension(Ctap2Extension): NAME = "credProps" - def is_supported(self): # NB: There is no key in the extensions field. + def is_supported(self, ctap=None): # NB: There is no key in the extensions field. return True - def process_create_input(self, inputs): + def make_credential(self, ctap, options, pin_protocol): + inputs = options.extensions or {} if inputs.get(self.NAME) is True: - # This extension doesn't provide any input to the authenticator, - # but still needs to add output. - self._used = True - - def process_create_output(self, attestation_response, *args): - selection = ( - self._create_options.authenticator_selection - or AuthenticatorSelectionCriteria() - ) - rk = selection.require_resident_key - return {"credProps": _CredPropsOutputs(rk=rk)} + selection = ( + options.authenticator_selection or AuthenticatorSelectionCriteria() + ) + rk = selection.resident_key == ResidentKeyRequirement.REQUIRED or ( + selection.resident_key == ResidentKeyRequirement.PREFERRED + and ctap.info.options.get("rk") + ) + + return RegistrationExtensionProcessor( + outputs={self.NAME: _CredPropsOutputs(rk=rk)} + )