From bf75c16302361e749fad9833bf49cae9d9319761 Mon Sep 17 00:00:00 2001 From: Matthias Valvekens Date: Thu, 18 Aug 2022 22:40:39 +0200 Subject: [PATCH] Add ISO/TS 32003 support --- pyhanko/pdf_utils/crypt/api.py | 23 ++++- pyhanko/pdf_utils/crypt/filter_mixins.py | 90 ++++++++++++++++- pyhanko/pdf_utils/crypt/pubkey.py | 43 ++++++++- pyhanko/pdf_utils/crypt/standard.py | 47 ++++++++- pyhanko_tests/test_crypt.py | 117 ++++++++++++++++++++++- 5 files changed, 305 insertions(+), 15 deletions(-) diff --git a/pyhanko/pdf_utils/crypt/api.py b/pyhanko/pdf_utils/crypt/api.py index d4cdfb8b..995bba23 100644 --- a/pyhanko/pdf_utils/crypt/api.py +++ b/pyhanko/pdf_utils/crypt/api.py @@ -1,6 +1,6 @@ import enum from dataclasses import dataclass -from typing import Callable, Dict, List, Optional, Set, Tuple, Type +from typing import Callable, Dict, Iterable, List, Optional, Set, Tuple, Type from pyhanko.pdf_utils import generic, misc from pyhanko.pdf_utils.crypt.cred_ser import SerialisableCredential @@ -74,6 +74,7 @@ class SecurityHandlerVersion(misc.VersionEnum): RC4_LONGER_KEYS = 2 RC4_OR_AES128 = 4 AES256 = 5 + AES_GCM = 6 OTHER = None """ @@ -471,11 +472,17 @@ def get_min_pdf_version(self) -> Optional[Tuple[int, int]]: return None def get_extensions(self) -> List[DeveloperExtension]: + exts = [] if self.pdf_mac_enabled: from .pdfmac import ISO32004 - return [ISO32004] - return [] + exts.append(ISO32004) + + for cf in self.crypt_filter_config.filters(): + cf_exts = cf.get_extensions() + if cf_exts is not None: + exts.extend(cf_exts) + return exts class CryptFilter: @@ -525,6 +532,12 @@ def method(self) -> generic.NameObject: """ raise NotImplementedError + def get_extensions(self) -> Optional[List[DeveloperExtension]]: + """ + Get applicable developer extensions for this crypt filter. + """ + return None + @property def keylen(self) -> int: """ @@ -624,7 +637,7 @@ def derive_object_key(self, idnum, generation) -> bytes: :return: The local key to use for this object. """ - raise NotImplementedError + return self.shared_key def set_embedded_only(self): self._embedded_only = True @@ -781,7 +794,7 @@ def __contains__(self, item): or item in self._crypt_filters ) - def filters(self): + def filters(self) -> Iterable['CryptFilter']: """Enumerate all crypt filters in this configuration.""" return self._crypt_filters.values() diff --git a/pyhanko/pdf_utils/crypt/filter_mixins.py b/pyhanko/pdf_utils/crypt/filter_mixins.py index c8338483..d2ce9f8e 100644 --- a/pyhanko/pdf_utils/crypt/filter_mixins.py +++ b/pyhanko/pdf_utils/crypt/filter_mixins.py @@ -1,7 +1,12 @@ import abc import secrets +import struct +from typing import List, Optional -from pyhanko.pdf_utils import generic +import cryptography.exceptions +from cryptography.hazmat.primitives.ciphers.aead import AESGCM + +from pyhanko.pdf_utils import generic, misc from pyhanko.pdf_utils.crypt._util import ( aes_cbc_decrypt, aes_cbc_encrypt, @@ -9,8 +14,19 @@ ) from pyhanko.pdf_utils.crypt.api import CryptFilter, SecurityHandlerVersion +from ..extensions import DeveloperExtension, DevExtensionMultivalued from ._legacy import legacy_derive_object_key +ISO32003 = DeveloperExtension( + prefix_name=generic.pdf_name('/ISO_'), + base_version=generic.pdf_name('/2.0'), + extension_level=32003, + extension_revision=':2023', + url='https://www.iso.org/standard/45876.html', + compare_by_level=False, + multivalued=DevExtensionMultivalued.ALWAYS, +) + class RC4CryptFilterMixin(CryptFilter, abc.ABC): """ @@ -157,3 +173,75 @@ def derive_object_key(self, idnum, generation) -> bytes: return legacy_derive_object_key( self.shared_key, idnum, generation, use_aes=True ) + + +class AESGCMCryptFilterMixin(CryptFilter, abc.ABC): + """Mixin for AES GCM-based crypt filters (ISO 32003)""" + + method = generic.NameObject('/AESV4') + + def __init__(self: 'AESGCMCryptFilterMixin', **kwargs): + super().__init__(**kwargs) + self.__counter: int = 0 + + @property + def keylen(self) -> int: + return 32 + + def _get_nonce(self) -> bytes: + # nonce is 12 bytes, we use 8 for the counter and 4 random ones + # (mostly because there's no convenient way to do a 12-byte counter with + # struct.pack) + # Crypt filter instances are not shared between documents, so this + # should be plenty unique enough. + random_part = secrets.token_bytes(4) + self.__counter += 1 + counter_part = struct.pack('>Q', self.__counter) + return random_part + counter_part + + def encrypt(self, key, plaintext: bytes, params=None): + """ + Encrypt data using AES-GCM. + + :param key: + The key to use. + :param plaintext: + The plaintext to be encrypted. + :param params: + Ignored. + :return: + The resulting ciphertext and tag, prepended with a 12-byte nonce + """ + + nonce = self._get_nonce() + ciphertext = AESGCM(key).encrypt( + nonce=nonce, data=plaintext, associated_data=None + ) + return nonce + ciphertext + + def decrypt(self, key, ciphertext: bytes, params=None) -> bytes: + """ + Decrypt data using AES-GCM. + + :param key: + The key to use. + :param ciphertext: + The ciphertext to be decrypted, prepended with a 12-byte + initialisation vector, and suffixed with the 16-byte authentication + tag. + :param params: + Ignored. + :return: + The resulting plaintext. + """ + nonce, data = ciphertext[:12], ciphertext[12:] + try: + plaintext = AESGCM(key).decrypt( + nonce=nonce, data=data, associated_data=None + ) + except cryptography.exceptions.InvalidTag: + raise misc.PdfReadError("Invalid GCM tag") + return plaintext + + def get_extensions(self) -> Optional[List[DeveloperExtension]]: + return [ISO32003] diff --git a/pyhanko/pdf_utils/crypt/pubkey.py b/pyhanko/pdf_utils/crypt/pubkey.py index 36bba1df..dcf2fdef 100644 --- a/pyhanko/pdf_utils/crypt/pubkey.py +++ b/pyhanko/pdf_utils/crypt/pubkey.py @@ -57,7 +57,11 @@ build_crypt_filter, ) from .cred_ser import SerialisableCredential, SerialisedCredential -from .filter_mixins import AESCryptFilterMixin, RC4CryptFilterMixin +from .filter_mixins import ( + AESCryptFilterMixin, + AESGCMCryptFilterMixin, + RC4CryptFilterMixin, +) from .permissions import PubKeyPermissions logger = logging.getLogger(__name__) @@ -229,6 +233,14 @@ class PubKeyAESCryptFilter(PubKeyCryptFilter, AESCryptFilterMixin): pass +class PubKeyAESGCMCryptFilter(PubKeyCryptFilter, AESGCMCryptFilterMixin): + """ + AES-GCM crypt filter for public key security handlers. + """ + + pass + + class PubKeyRC4CryptFilter(PubKeyCryptFilter, RC4CryptFilterMixin): """ RC4 crypt filter for public key security handlers. @@ -289,9 +301,18 @@ def _pubkey_aes_config(keylen, recipients=None, encrypt_metadata=True): ) -""" -Type alias for a callable that produces a crypt filter from a dictionary. -""" +def _pubkey_gcm_config(recipients=None, encrypt_metadata=True): + return CryptFilterConfiguration( + { + DEFAULT_CRYPT_FILTER: PubKeyAESGCMCryptFilter( + acts_as_default=True, + recipients=recipients, + encrypt_metadata=encrypt_metadata, + ) + }, + default_stream_filter=DEFAULT_CRYPT_FILTER, + default_string_filter=DEFAULT_CRYPT_FILTER, + ) @enum.unique @@ -1168,6 +1189,12 @@ def _build_aes256_pubkey_cf(cfdict, acts_as_default): ) +def _build_aesgcm_pubkey_cf(cfdict, acts_as_default): + return PubKeyAESGCMCryptFilter( + acts_as_default=acts_as_default, **_read_generic_pubkey_cf_info(cfdict) + ) + + @SecurityHandler.register class PubKeySecurityHandler(SecurityHandler): """ @@ -1181,6 +1208,7 @@ class PubKeySecurityHandler(SecurityHandler): generic.NameObject('/V2'): _build_legacy_pubkey_cf, generic.NameObject('/AESV2'): _build_aes128_pubkey_cf, generic.NameObject('/AESV3'): _build_aes256_pubkey_cf, + generic.NameObject('/AESV4'): _build_aesgcm_pubkey_cf, generic.NameObject('/Identity'): lambda _, __: IdentityCryptFilter(), } @@ -1312,9 +1340,16 @@ def __init__( encrypt_metadata=encrypt_metadata, recipients=recipient_objs, ) + elif version == SecurityHandlerVersion.AES_GCM: + crypt_filter_config = _pubkey_gcm_config( + recipients=recipient_objs, encrypt_metadata=encrypt_metadata + ) elif version >= SecurityHandlerVersion.AES256: # there's a reasonable default config that we can fall back to # here + # NOTE: we _don't_ use GCM by default. With the way PDF + # encryption works, the authentication guarantees are not + # worth much anyhow (need ISO 32004-style solution). crypt_filter_config = _pubkey_aes_config( keylen=32, encrypt_metadata=encrypt_metadata, diff --git a/pyhanko/pdf_utils/crypt/standard.py b/pyhanko/pdf_utils/crypt/standard.py index d0d2ac6b..80e76b61 100644 --- a/pyhanko/pdf_utils/crypt/standard.py +++ b/pyhanko/pdf_utils/crypt/standard.py @@ -31,7 +31,11 @@ SecurityHandlerVersion, ) from .cred_ser import SerialisableCredential, SerialisedCredential -from .filter_mixins import AESCryptFilterMixin, RC4CryptFilterMixin +from .filter_mixins import ( + AESCryptFilterMixin, + AESGCMCryptFilterMixin, + RC4CryptFilterMixin, +) from .permissions import StandardPermissions @@ -124,6 +128,7 @@ class StandardSecuritySettingsRevision(misc.VersionEnum): RC4_EXTENDED = 3 RC4_OR_AES128 = 4 AES256 = 6 + AES_GCM = 7 OTHER = None """ Placeholder value for custom security handlers. @@ -203,6 +208,14 @@ class StandardAESCryptFilter(StandardCryptFilter, AESCryptFilterMixin): pass +class StandardAESGCMCryptFilter(StandardCryptFilter, AESGCMCryptFilterMixin): + """ + AES-GCM crypt filter for the standard security handler. + """ + + pass + + class StandardRC4CryptFilter(StandardCryptFilter, RC4CryptFilterMixin): """ RC4 crypt filter for the standard security handler. @@ -230,6 +243,14 @@ def _std_aes_config(keylen): ) +def _std_gcm_config(): + return CryptFilterConfiguration( + {STD_CF: StandardAESGCMCryptFilter()}, + default_stream_filter=STD_CF, + default_string_filter=STD_CF, + ) + + def _build_legacy_standard_crypt_filter( cfdict: generic.DictionaryObject, _acts_as_default ): @@ -258,6 +279,7 @@ class StandardSecurityHandler(SecurityHandler): generic.NameObject('/AESV3'): lambda _, __: StandardAESCryptFilter( keylen=32 ), + generic.NameObject('/AESV4'): lambda _, __: StandardAESGCMCryptFilter(), generic.NameObject('/Identity'): lambda _, __: IdentityCryptFilter(), } @@ -396,6 +418,7 @@ def build_from_pw( perms: StandardPermissions = StandardPermissions.allow_everything(), encrypt_metadata=True, pdf_mac: bool = True, + use_gcm: bool = True, **kwargs, ): """ @@ -416,6 +439,15 @@ def build_from_pw( as well. :param pdf_mac: Include an ISO 32004 MAC. + :param use_gcm: + Use AES-GCM (ISO 32003) to encrypt strings and streams. + + .. danger:: + Due to the way PDF encryption works, the authentication + guarantees of AES-GCM only apply to the content of individual + strings and streams. The PDF file structure itself is not + authenticated. Document-level integrity protection is provided + by the ``pdf_mac=True`` option. :return: A :class:`StandardSecurityHandler` instance. """ @@ -473,9 +505,16 @@ def build_from_pw( else: kdf_salt = None + if use_gcm: + version = SecurityHandlerVersion.AES_GCM + revision = StandardSecuritySettingsRevision.AES_GCM + else: + version = SecurityHandlerVersion.AES256 + revision = StandardSecuritySettingsRevision.AES256 + sh = cls( - version=SecurityHandlerVersion.AES256, - revision=StandardSecuritySettingsRevision.AES256, + version=version, + revision=revision, legacy_keylen=32, perm_flags=perms, odata=o_entry, @@ -530,6 +569,8 @@ def __init__( crypt_filter_config = _std_rc4_config(5) elif version == SecurityHandlerVersion.RC4_LONGER_KEYS: crypt_filter_config = _std_rc4_config(legacy_keylen) + elif version == SecurityHandlerVersion.AES_GCM: + crypt_filter_config = _std_gcm_config() elif ( version >= SecurityHandlerVersion.AES256 and crypt_filter_config is None diff --git a/pyhanko_tests/test_crypt.py b/pyhanko_tests/test_crypt.py index 626fdaa4..fc65ea6f 100644 --- a/pyhanko_tests/test_crypt.py +++ b/pyhanko_tests/test_crypt.py @@ -1050,7 +1050,7 @@ def test_security_handler_version_deser(): assert ( SecurityHandlerVersion.from_number(5) == SecurityHandlerVersion.AES256 ) - assert SecurityHandlerVersion.from_number(6) == SecurityHandlerVersion.OTHER + assert SecurityHandlerVersion.from_number(0) == SecurityHandlerVersion.OTHER assert ( SecurityHandlerVersion.from_number(None) == SecurityHandlerVersion.OTHER ) @@ -1060,7 +1060,7 @@ def test_security_handler_version_deser(): == StandardSecuritySettingsRevision.AES256 ) assert ( - StandardSecuritySettingsRevision.from_number(7) + StandardSecuritySettingsRevision.from_number(0) == StandardSecuritySettingsRevision.OTHER ) @@ -1603,6 +1603,119 @@ def test_tolerate_direct_encryption_dict_in_nonstrict(): assert b'Hello' in data +def test_gcm_standard(): + w = writer.copy_into_new_writer(PdfFileReader(BytesIO(MINIMAL))) + + sh = StandardSecurityHandler.build_from_pw( + "secret", pdf_mac=False, use_gcm=True + ) + w._assign_security_handler(sh) + out = BytesIO() + w.write(out) + + r = PdfFileReader(out) + r.decrypt("secret") + page_content = r.root['/Pages']['/Kids'][0]['/Contents'].data + assert b"Hello" in page_content + + iso_exts = { + int(ext.get_object()['/ExtensionLevel']) + for ext in r.root['/Extensions']['/ISO_'] + } + assert iso_exts == {32003} + + +def _gcm_standard_tamper(tamperer): + w = writer.copy_into_new_writer(PdfFileReader(BytesIO(MINIMAL))) + + sh = StandardSecurityHandler.build_from_pw( + "secret", pdf_mac=False, use_gcm=True + ) + w._assign_security_handler(sh) + out = BytesIO() + w.write(out) + + class NeverDecryptReader(PdfFileReader): + def __init__(self): + super().__init__(out) + + @property + def security_handler(self): + return None + + r = NeverDecryptReader() + w = IncrementalPdfFileWriter.from_reader(r) + page_dict = w.root['/Pages']['/Kids'][0] + content: generic.StreamObject = page_dict['/Contents'] + content._encoded_data = tamperer(content.encoded_data) + w.update_container(content) + w._update_meta = lambda: None + w.write_in_place() + + r = PdfFileReader(out) + r.decrypt("secret") + + # this should work + assert "https" in r.root['/Extensions']['/ISO_'][0]['/URL'] + + # this shouldn't + with pytest.raises(misc.PdfReadError, match="Invalid GCM tag"): + len(r.root['/Pages']['/Kids'][0]['/Contents'].data) + + +def test_gcm_change_content(): + def tamper(ciphertext): + out = BytesIO() + out.write(ciphertext) + out.seek(14) + out.write(b"\xde\xad\xbe\xef") + return out.getvalue() + + _gcm_standard_tamper(tamper) + + +def test_gcm_remove_tag(): + def tamper(ciphertext): + return ciphertext[:-16] + + _gcm_standard_tamper(tamper) + + +def test_gcm_change_nonce(): + def tamper(ciphertext): + out = BytesIO() + out.write(ciphertext) + out.seek(0) + out.write(bytes(12)) + return out.getvalue() + + _gcm_standard_tamper(tamper) + + +def test_gcm_pubkey(): + w = writer.copy_into_new_writer(PdfFileReader(BytesIO(MINIMAL))) + + sh = PubKeySecurityHandler.build_from_certs( + [PUBKEY_TEST_DECRYPTER.cert], + version=SecurityHandlerVersion.AES_GCM, + pdf_mac=False, + ) + w._assign_security_handler(sh) + out = BytesIO() + w.write(out) + + r = PdfFileReader(out) + r.decrypt_pubkey(PUBKEY_TEST_DECRYPTER) + page_content = r.root['/Pages']['/Kids'][0]['/Contents'].data + assert b"Hello" in page_content + + iso_exts = { + int(ext.get_object()['/ExtensionLevel']) + for ext in r.root['/Extensions']['/ISO_'] + } + assert iso_exts == {32003} + + def test_tolerate_empty_encrypted_string(): with open( os.path.join(PDF_DATA_DIR, 'minimal-aes256-empty-encrypted-string.pdf'),