Skip to content

Commit

Permalink
Merge pull request #414 from MatthiasValvekens/bugfix/crypt-import-bu…
Browse files Browse the repository at this point in the history
…gfixes

Fix issues with object importing and tolerate unpadded empty strings in encrypted documents
  • Loading branch information
MatthiasValvekens authored Mar 27, 2024
2 parents 04a97c0 + a605c21 commit 6871e2b
Show file tree
Hide file tree
Showing 7 changed files with 269 additions and 89 deletions.
3 changes: 2 additions & 1 deletion pyhanko/pdf_utils/crypt/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ def aes_cbc_decrypt(key, data, iv, use_padding=True):
decryptor = cipher.decryptor()
plaintext = decryptor.update(data) + decryptor.finalize()

if use_padding:
# we tolerate empty messages that don't have padding
if use_padding and len(plaintext) > 0:
unpadder = padding.PKCS7(128).unpadder()
return unpadder.update(plaintext) + unpadder.finalize()
else:
Expand Down
249 changes: 161 additions & 88 deletions pyhanko/pdf_utils/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
for the original license.
"""

import logging
import os
import typing
from typing import Dict, Iterable, List, Optional, Set, Tuple, Union, cast
Expand Down Expand Up @@ -53,6 +54,8 @@
'copy_into_new_writer',
]

logger = logging.getLogger(__name__)


# TODO move this to content.py?
def init_xobject_dictionary(
Expand Down Expand Up @@ -755,88 +758,13 @@ def import_object(
a new instance.
"""

return self._import_object(obj, {}, obj_stream)

def _import_object(
self, obj: generic.PdfObject, reference_map: dict, obj_stream
) -> generic.PdfObject:
# TODO check the spec for guidance on fonts. Do font identifiers have
# to be globally unique?

# TODO deal with container_ref

if isinstance(obj, generic.DecryptedObjectProxy):
obj = obj.decrypted
if isinstance(obj, generic.IndirectObject):
try:
return reference_map[obj.reference]
except KeyError:
refd = obj.get_object()
# Add a placeholder to reserve the reference value.
# This ensures correct behaviour in recursive calls
# with self-references.
new_ido = self.allocate_placeholder()
reference_map[obj.reference] = new_ido
imported = self._import_object(refd, reference_map, obj_stream)

# if the imported object is a bare reference and/or a stream
# object, we can't put it into an object stream.
if isinstance(imported, OBJSTREAM_FORBIDDEN):
obj_stream = None

# fill in the placeholder
self.add_object(
imported, obj_stream=obj_stream, idnum=new_ido.idnum
)
return new_ido
elif isinstance(obj, generic.DictionaryObject):
raw_dict = {
k: self._import_object(v, reference_map, obj_stream)
for k, v in obj.items()
if k != '/Metadata'
}
try:
# make sure to import metadata streams as such
meta_ref = obj.get_value_as_reference('/Metadata')
# ensure a MetadataStream object ends up in the cache
meta_ref.get_pdf_handler().get_object(
meta_ref, as_metadata_stream=True
)
# ...then import the reference
raw_dict['/Metadata'] = self._import_object(
generic.IndirectObject(
meta_ref.idnum, meta_ref.generation, meta_ref.pdf
),
reference_map,
obj_stream,
)
except (KeyError, IndirectObjectExpected):
pass

if isinstance(obj, generic.StreamObject):
stm_cls = generic.StreamObject
# again, make sure to import metadata streams as such
try:
# noinspection PyUnresolvedReferences
from pyhanko.pdf_utils.metadata import xmp_xml

if isinstance(obj, xmp_xml.MetadataStream):
stm_cls = xmp_xml.MetadataStream
except ImportError: # pragma: nocover
pass
# In the vast majority of use cases, I'd expect the content
# to be available in encoded form by default.
# By initialising the stream object in this way, we avoid
# a potentially costly decoding operation.
return stm_cls(raw_dict, encoded_data=obj.encoded_data)
else:
return generic.DictionaryObject(raw_dict)
elif isinstance(obj, generic.ArrayObject):
return generic.ArrayObject(
self._import_object(v, reference_map, obj_stream) for v in obj
)
else:
return obj
importer = _ObjectImporter(
source=obj.get_container_ref().get_pdf_handler(),
target=self,
obj_stream=obj_stream,
reference_map={},
)
return importer.import_object(obj)

def import_page_as_xobject(
self, other: PdfHandler, page_ix=0, inherit_filters=True
Expand Down Expand Up @@ -1222,6 +1150,138 @@ def _populate_trailer(self, trailer):
super()._populate_trailer(trailer)


class _ObjectImporter:

def __init__(
self,
source: PdfHandler,
target: BasePdfFileWriter,
reference_map: Dict[generic.Reference, generic.IndirectObject],
obj_stream: Optional[ObjectStream],
):
self.source = source
self.target = target
self.obj_stream = obj_stream
self.queued_references: List[
Tuple[generic.Reference, generic.Reference]
] = []
self.reference_map = reference_map

def import_object(self, obj: generic.PdfObject) -> generic.PdfObject:
result = self._ingest(obj)

while self.queued_references:
source_ref, target_ref = self.queued_references.pop()
source_obj = source_ref.get_object()
imported = self._ingest(source_obj)

# if the imported object is a bare reference and/or a stream
# object, we can't put it into an object stream.
if isinstance(imported, OBJSTREAM_FORBIDDEN):
obj_stream = None
else:
obj_stream = self.obj_stream

# fill in the placeholder
self.target.add_object(
imported, obj_stream=obj_stream, idnum=target_ref.idnum
)

return result

def _ingest(self, obj: generic.PdfObject):
if isinstance(obj, generic.DecryptedObjectProxy):
obj = obj.decrypted
if isinstance(obj, generic.IndirectObject):
return self.process_reference(obj.reference)
elif isinstance(obj, generic.DictionaryObject):
raw_dict = {
k: self._ingest(v) for k, v in obj.items() if k != '/Metadata'
}
try:
# make sure to import metadata streams as such
meta_ref = obj.get_value_as_reference('/Metadata')
# ensure a MetadataStream object ends up in the cache
meta_ref.get_pdf_handler().get_object(
meta_ref, as_metadata_stream=True
)
# ...then import the reference
raw_dict['/Metadata'] = self.process_reference(meta_ref)
except (KeyError, IndirectObjectExpected):
pass

if isinstance(obj, generic.StreamObject):
stm_cls = generic.StreamObject
# again, make sure to import metadata streams as such
try:
# noinspection PyUnresolvedReferences
from pyhanko.pdf_utils.metadata import xmp_xml

if isinstance(obj, xmp_xml.MetadataStream):
stm_cls = xmp_xml.MetadataStream
except ImportError: # pragma: nocover
pass
# In the vast majority of use cases, I'd expect the content
# to be available in encoded form by default.
# By initialising the stream object in this way, we avoid
# a potentially costly decoding operation.
return stm_cls(raw_dict, encoded_data=obj.encoded_data)
else:
return generic.DictionaryObject(raw_dict)
elif isinstance(obj, generic.ArrayObject):
return generic.ArrayObject(self._ingest(v) for v in obj)
else:
return obj

def process_reference(self, ref: generic.Reference) -> generic.PdfObject:
try:
return self.reference_map[ref]
except KeyError:
# Add a placeholder to reserve the reference value.
new_ido = self.target.allocate_placeholder()
self.reference_map[ref] = new_ido
self.queued_references.append((ref, new_ido.reference))
return new_ido

def preprocess_signature_data(self):
# Signature /Contents is never encrypted => ensure we respect that
# (even though the import operation is guaranteed to break the signature
# there are valid use cases for stripping the encryption on such files,
# e.g. for downstream processing)
from ..sign.fields import enumerate_sig_fields

signature_dict_refs = [
field_value.reference
for fq_name, field_value, field_ref in enumerate_sig_fields(
self.source, filled_status=True
)
# this is the case in all valid PDFs
if isinstance(field_value, generic.IndirectObject)
]
if signature_dict_refs:
logger.warning(
"Source document contains filled signature fields--the copy "
"operation will invalidate them."
)
for ref in signature_dict_refs:
sig_dict = ref.get_object()
assert isinstance(sig_dict, generic.DictionaryObject)
raw_dict = {
k: self._ingest(v)
for k, v in sig_dict.items()
if k != '/Contents'
}
raw_dict['/Contents'] = generic.ByteStringObject(
sig_dict.raw_get(
'/Contents', decrypt=generic.EncryptedObjAccess.RAW
).original_bytes
)
self.reference_map[ref] = self.target.add_object(
generic.DictionaryObject(raw_dict),
obj_stream=None,
)


def copy_into_new_writer(
input_handler: PdfHandler, writer_kwargs: Optional[dict] = None
) -> PdfFileWriter:
Expand Down Expand Up @@ -1254,16 +1314,25 @@ def copy_into_new_writer(
w = PdfFileWriter(init_page_tree=False, **writer_kwargs)
input_root_ref = input_handler.root_ref
output_root_ref = w.root_ref
# call _import_object in such a way that we translate the input handler's
# call _ObjectImporter in such a way that we translate the input handler's
# root to the new writer's root.
# From a technical PoV this doesn't matter, but it makes the output file
# somewhat "cleaner" (i.e. it doesn't leave an orphaned document catalog
# cluttering up the file)
new_root_dict = w._import_object(
input_handler.root,
reference_map={input_root_ref: output_root_ref},
importer = _ObjectImporter(
source=input_handler,
target=w,
reference_map={
input_root_ref: generic.IndirectObject(
idnum=output_root_ref.idnum,
generation=output_root_ref.generation,
pdf=w,
)
},
obj_stream=None,
)
importer.preprocess_signature_data()
new_root_dict = importer.import_object(input_handler.root)
# override the old root ref
ix = (output_root_ref.generation, output_root_ref.idnum)
w.objects[ix] = new_root_dict
Expand All @@ -1278,9 +1347,13 @@ def copy_into_new_writer(
except KeyError:
info_dict = None
if info_dict is not None:
imported_info = w._import_object(
info_dict, reference_map={}, obj_stream=None
importer = _ObjectImporter(
source=input_handler,
target=w,
reference_map={},
obj_stream=None,
)
imported_info = importer.import_object(info_dict)
w._info = w.add_object(imported_info)

return w
Binary file not shown.
Binary file not shown.
16 changes: 16 additions & 0 deletions pyhanko_tests/test_crypt.py
Original file line number Diff line number Diff line change
Expand Up @@ -1569,3 +1569,19 @@ def test_tolerate_direct_encryption_dict_in_nonstrict():
r.decrypt('ownersecret')
data = r.root['/Pages']['/Kids'][0]['/Contents'].data
assert b'Hello' in data


def test_tolerate_empty_encrypted_string():
with open(
os.path.join(PDF_DATA_DIR, 'minimal-aes256-empty-encrypted-string.pdf'),
'rb',
) as inf:
r = PdfFileReader(inf)
r.decrypt('secret')
obj = r.root.raw_get('/Blah', decrypt=generic.EncryptedObjAccess.PROXY)
assert isinstance(obj, generic.DecryptedObjectProxy)
decrypted = obj.decrypted
assert isinstance(
decrypted, (generic.TextStringObject, generic.ByteStringObject)
)
assert decrypted.original_bytes == b""
Loading

0 comments on commit 6871e2b

Please sign in to comment.