diff --git a/.github/workflows/github-ci.yaml b/.github/workflows/github-ci.yaml index 820ccdcaa5..d5d9bb4d4d 100644 --- a/.github/workflows/github-ci.yaml +++ b/.github/workflows/github-ci.yaml @@ -57,12 +57,12 @@ jobs: runs-on: ubuntu-20.04 strategy: matrix: - python-version: ["3.7", "3.8", "3.9", "3.10", "3.11", "3.12"] + python-version: ["3.8", "3.9", "3.10", "3.11", "3.12", "3.13-dev"] use-crypto-lib: ["cryptography"] include: - - python-version: "3.7" + - python-version: "3.8" use-crypto-lib: "pycryptodome" - - python-version: "3.7" + - python-version: "3.8" use-crypto-lib: "none" steps: - name: Update APT packages @@ -83,14 +83,14 @@ jobs: key: cache-downloaded-files - name: Setup Python uses: actions/setup-python@v5 - if: matrix.python-version == '3.7' || matrix.python-version == '3.8' || matrix.python-version == '3.9' || matrix.python-version == '3.10' + if: matrix.python-version == '3.8' || matrix.python-version == '3.9' || matrix.python-version == '3.10' with: python-version: ${{ matrix.python-version }} cache: 'pip' cache-dependency-path: '**/requirements/ci.txt' - name: Setup Python (3.11+) uses: actions/setup-python@v5 - if: matrix.python-version == '3.11' || matrix.python-version == '3.12' + if: matrix.python-version == '3.11' || matrix.python-version == '3.12' || matrix.python-version == '3.13-dev' with: python-version: ${{ matrix.python-version }} allow-prereleases: true @@ -102,11 +102,11 @@ jobs: - name: Install requirements (Python 3) run: | pip install -r requirements/ci.txt - if: matrix.python-version == '3.7' || matrix.python-version == '3.8' || matrix.python-version == '3.9' || matrix.python-version == '3.10' + if: matrix.python-version == '3.8' || matrix.python-version == '3.9' || matrix.python-version == '3.10' - name: Install requirements (Python 3.11+) run: | pip install -r requirements/ci-3.11.txt - if: matrix.python-version == '3.11' || matrix.python-version == '3.12' + if: matrix.python-version == '3.11' || matrix.python-version == '3.12' || matrix.python-version == '3.13-dev' - name: Remove pycryptodome and cryptography run: | pip uninstall pycryptodome cryptography -y diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 9f782ec080..b1a4fb27f3 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -12,6 +12,9 @@ on: permissions: contents: write +env: + HEAD_COMMIT_MESSAGE: ${{ github.event.head_commit.message }} + jobs: build_and_publish: name: Publish a new version @@ -24,7 +27,7 @@ jobs: - name: Extract version from commit message id: extract_version run: | - VERSION=$(echo "${{ github.event.head_commit.message }}" | grep -oP '(?<=REL: )\d+\.\d+\.\d+') + VERSION=$(echo "$HEAD_COMMIT_MESSAGE" | grep -oP '(?<=REL: )\d+\.\d+\.\d+') echo "version=$VERSION" >> $GITHUB_OUTPUT - name: Extract tag message from commit message @@ -32,7 +35,7 @@ jobs: run: | VERSION="${{ steps.extract_version.outputs.version }}" delimiter="$(openssl rand -hex 8)" - MESSAGE=$(echo "${{ github.event.head_commit.message }}" | sed "0,/REL: $VERSION/s///" ) + MESSAGE=$(echo "$HEAD_COMMIT_MESSAGE" | sed "0,/REL: $VERSION/s///" ) echo "message<<${delimiter}" >> $GITHUB_OUTPUT echo "$MESSAGE" >> $GITHUB_OUTPUT echo "${delimiter}" >> $GITHUB_OUTPUT diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md index 84f0b6ee43..89fec3b14e 100644 --- a/CONTRIBUTORS.md +++ b/CONTRIBUTORS.md @@ -19,6 +19,7 @@ history and [GitHub's 'Contributors' feature](https://github.com/py-pdf/pypdf/gr * [ediamondscience](https://github.com/ediamondscience) * [Ermeson, Felipe](https://github.com/FelipeErmeson) * [Freitag, François](https://github.com/francoisfreitag) +* [Gagnon, William G.](https://github.com/williamgagnon) * [Górny, Michał](https://github.com/mgorny) * [Grillo, Miguel](https://github.com/Ineffable22) * [Gutteridge, David H.](https://github.com/dhgutteridge) diff --git a/docs/modules/PageObject.rst b/docs/modules/PageObject.rst index 45e81b6ab9..b4524b4437 100644 --- a/docs/modules/PageObject.rst +++ b/docs/modules/PageObject.rst @@ -6,14 +6,12 @@ The PageObject Class :undoc-members: :show-inheritance: -.. autoclass:: pypdf._utils.ImageFile +.. autoclass:: pypdf._page.VirtualListImages :members: :undoc-members: :show-inheritance: - :exclude-members: IndirectObject -.. autoclass:: pypdf._utils.File +.. autoclass:: pypdf._page.ImageFile :members: + :inherited-members: File :undoc-members: - :show-inheritance: - :exclude-members: IndirectObject diff --git a/docs/user/file-size.md b/docs/user/file-size.md index 0ee72e37e3..d47ddcc0ed 100644 --- a/docs/user/file-size.md +++ b/docs/user/file-size.md @@ -9,23 +9,17 @@ Some PDF documents contain the same object multiple times. For example, if an image appears three times in a PDF it could be embedded three times. Or it can be embedded once and referenced twice. -This can be done by reading and writing the file: +When adding data to a PdfWriter, the data is copied while respecting the original format. +For example, if two pages include the same image which is duplicated in the source document, the object will be duplicated in the PdfWriter object. -```python -from pypdf import PdfReader, PdfWriter - -reader = PdfReader("big-old-file.pdf") -writer = PdfWriter() +Additionally, when you delete objects in a document, pypdf cannot easily identify whether the objects are used elsewhere or not or if the user wants to keep them in. When writing the PDF file, these objects will be hidden within (part of the file, but not displayed). -for page in reader.pages: - writer.add_page(page) +In order to reduce the file size, use a compression call: `writer.compress_identical_objects(remove_identicals=True, remove_orphans=True)` -if reader.metadata is not None: - writer.add_metadata(reader.metadata) +* `remove_identicals` enables/disables compression merging identical objects. +* `remove_orphans` enables/disables suppression of unused objects. -with open("smaller-new-file.pdf", "wb") as fp: - writer.write(fp) -``` +It is recommended to apply this process just before writing to the file/stream. It depends on the PDF how well this works, but we have seen an 86% file reduction (from 5.7 MB to 0.8 MB) within a real PDF. diff --git a/pypdf/_cmap.py b/pypdf/_cmap.py index 9a2d10a611..6c5996703f 100644 --- a/pypdf/_cmap.py +++ b/pypdf/_cmap.py @@ -3,11 +3,10 @@ from typing import Any, Dict, List, Tuple, Union, cast from ._codecs import adobe_glyphs, charset_encoding -from ._utils import b_, logger_error, logger_warning +from ._utils import logger_error, logger_warning from .generic import ( DecodedStreamObject, DictionaryObject, - IndirectObject, NullObject, StreamObject, ) @@ -127,6 +126,8 @@ def build_char_map_from_dict( "/ETenms-B5-V": "cp950", "/UniCNS-UTF16-H": "utf-16-be", "/UniCNS-UTF16-V": "utf-16-be", + "/UniGB-UTF16-H": "gb18030", + "/UniGB-UTF16-V": "gb18030", # UCS2 in code } @@ -258,8 +259,8 @@ def prepare_cm(ft: DictionaryObject) -> bytes: tu = ft["/ToUnicode"] cm: bytes if isinstance(tu, StreamObject): - cm = b_(cast(DecodedStreamObject, ft["/ToUnicode"]).get_data()) - elif isinstance(tu, str) and tu.startswith("/Identity"): + cm = cast(DecodedStreamObject, ft["/ToUnicode"]).get_data() + else: # if (tu is None) or cast(str, tu).startswith("/Identity"): # the full range 0000-FFFF will be processed cm = b"beginbfrange\n<0000> <0001> <0000>\nendbfrange" if isinstance(cm, str): @@ -448,34 +449,27 @@ def compute_space_width( en: int = cast(int, ft["/LastChar"]) if st > space_code or en < space_code: raise Exception("Not in range") - if w[space_code - st] == 0: + if w[space_code - st].get_object() == 0: raise Exception("null width") - sp_width = w[space_code - st] + sp_width = w[space_code - st].get_object() except Exception: if "/FontDescriptor" in ft and "/MissingWidth" in cast( DictionaryObject, ft["/FontDescriptor"] ): - sp_width = ft["/FontDescriptor"]["/MissingWidth"] # type: ignore + sp_width = ft["/FontDescriptor"]["/MissingWidth"].get_object() # type: ignore else: # will consider width of char as avg(width)/2 m = 0 cpt = 0 - for x in w: - if x > 0: - m += x + for xx in w: + xx = xx.get_object() + if xx > 0: + m += xx cpt += 1 sp_width = m / max(1, cpt) / 2 - if isinstance(sp_width, IndirectObject): - # According to - # 'Table 122 - Entries common to all font descriptors (continued)' - # the MissingWidth should be a number, but according to #2286 it can - # be an indirect object - obj = sp_width.get_object() - if obj is None or isinstance(obj, NullObject): - return 0.0 - return obj # type: ignore - + if sp_width is None or isinstance(sp_width, NullObject): + sp_width = 0.0 return sp_width diff --git a/pypdf/_doc_common.py b/pypdf/_doc_common.py index d4c5c43c3c..4f607340db 100644 --- a/pypdf/_doc_common.py +++ b/pypdf/_doc_common.py @@ -49,7 +49,6 @@ from ._page import PageObject, _VirtualList from ._page_labels import index2label as page_index2page_label from ._utils import ( - b_, deprecate_with_replacement, logger_warning, parse_iso8824_date, @@ -1122,7 +1121,12 @@ def _flatten( obj = page.get_object() if obj: # damaged file may have invalid child in /Pages - self._flatten(obj, inherit, **addt) + try: + self._flatten(obj, inherit, **addt) + except RecursionError: + raise PdfReadError( + "Maximum recursion depth reached during page flattening." + ) elif t == "/Page": for attr_in, value in list(inherit.items()): # if the page has it's own value, it does not inherit the @@ -1258,7 +1262,7 @@ def xfa(self) -> Optional[Dict[str, Any]]: if isinstance(f, IndirectObject): field = cast(Optional[EncodedStreamObject], f.get_object()) if field: - es = zlib.decompress(b_(field._data)) + es = zlib.decompress(field._data) retval[tag] = es return retval diff --git a/pypdf/_encryption.py b/pypdf/_encryption.py index 5ddd8d0efe..e5cdd9324e 100644 --- a/pypdf/_encryption.py +++ b/pypdf/_encryption.py @@ -43,7 +43,7 @@ rc4_encrypt, ) -from ._utils import b_, logger_warning +from ._utils import logger_warning from .generic import ( ArrayObject, ByteStringObject, @@ -78,7 +78,7 @@ def encrypt_object(self, obj: PdfObject) -> PdfObject: elif isinstance(obj, StreamObject): obj2 = StreamObject() obj2.update(obj) - obj2.set_data(self.stm_crypt.encrypt(b_(obj._data))) + obj2.set_data(self.stm_crypt.encrypt(obj._data)) for key, value in obj.items(): # Dont forget the Stream dict. obj2[key] = self.encrypt_object(value) obj = obj2 @@ -96,7 +96,7 @@ def decrypt_object(self, obj: PdfObject) -> PdfObject: data = self.str_crypt.decrypt(obj.original_bytes) obj = create_string_object(data) elif isinstance(obj, StreamObject): - obj._data = self.stm_crypt.decrypt(b_(obj._data)) + obj._data = self.stm_crypt.decrypt(obj._data) for key, value in obj.items(): # Dont forget the Stream dict. obj[key] = self.decrypt_object(value) elif isinstance(obj, DictionaryObject): diff --git a/pypdf/_merger.py b/pypdf/_merger.py index 7176a1adf7..a52a354e38 100644 --- a/pypdf/_merger.py +++ b/pypdf/_merger.py @@ -46,7 +46,6 @@ from ._utils import ( StrByteType, deprecate_with_replacement, - str_, ) from ._writer import PdfWriter from .constants import GoToActionArguments, TypArguments, TypFitArguments @@ -82,6 +81,15 @@ def __init__(self, pagedata: PageObject, src: PdfReader, id: int) -> None: self.id = id +# transfered from _utils : as this function is only required here +# and merger will be soon deprecated +def str_(b: Any) -> str: # pragma: no cover + if isinstance(b, bytes): + return b.decode("latin-1") + else: + return str(b) # will return b.__str__() if defined + + class PdfMerger: """ Use :class:`PdfWriter` instead. diff --git a/pypdf/_page.py b/pypdf/_page.py index 63038d9d07..17ec04477f 100644 --- a/pypdf/_page.py +++ b/pypdf/_page.py @@ -28,8 +28,9 @@ # POSSIBILITY OF SUCH DAMAGE. import math -import sys +from dataclasses import dataclass from decimal import Decimal +from io import BytesIO from pathlib import Path from typing import ( Any, @@ -38,6 +39,7 @@ Iterable, Iterator, List, + Literal, Optional, Sequence, Set, @@ -58,9 +60,8 @@ ) from ._utils import ( CompressedTransformationMatrix, - File, - ImageFile, TransformationMatrixType, + _human_readable_bytes, logger_warning, matrix_multiply, ) @@ -85,11 +86,13 @@ StreamObject, ) -if sys.version_info >= (3, 8): - from typing import Literal -else: - from typing_extensions import Literal +try: + from PIL.Image import Image + pil_not_imported = False +except ImportError: + Image = object # type: ignore + pil_not_imported = True # error will be raised only when using images MERGE_CROP_BOX = "cropbox" # pypdf<=3.4.0 used 'trimbox' @@ -307,6 +310,160 @@ def apply_on( return list(pt1) if isinstance(pt, list) else pt1 +@dataclass +class ImageFile: + """ + Image within the PDF file. *This object is not designed to be built.* + + This object should not be modified except using :func:`ImageFile.replace` to replace the image with a new one. + """ + + name: str = "" + """ + Filename as identified within the PDF file. + """ + + data: bytes = b"" + """ + Data as bytes. + """ + + image: Optional[Image] = None + """ + Data as PIL image. + """ + + indirect_reference: Optional[IndirectObject] = None + """ + Reference to the object storing the stream. + """ + + def replace(self, new_image: Image, **kwargs: Any) -> None: + """ + Replace the image with a new PIL image. + + Args: + new_image (PIL.Image.Image): The new PIL image to replace the existing image. + **kwargs: Additional keyword arguments to pass to `Image.save()`. + + Raises: + TypeError: If the image is inline or in a PdfReader. + TypeError: If the image does not belong to a PdfWriter. + TypeError: If `new_image` is not a PIL Image. + + Note: + This method replaces the existing image with a new image. + It is not allowed for inline images or images within a PdfReader. + The `kwargs` parameter allows passing additional parameters + to `Image.save()`, such as quality. + """ + if pil_not_imported: + raise ImportError( + "pillow is required to do image extraction. " + "It can be installed via 'pip install pypdf[image]'" + ) + + from ._reader import PdfReader + + # to prevent circular import + from .filters import _xobj_to_image + from .generic import DictionaryObject, PdfObject + + if self.indirect_reference is None: + raise TypeError("Cannot update an inline image.") + if not hasattr(self.indirect_reference.pdf, "_id_translated"): + raise TypeError("Cannot update an image not belonging to a PdfWriter.") + if not isinstance(new_image, Image): + raise TypeError("new_image shall be a PIL Image") + b = BytesIO() + new_image.save(b, "PDF", **kwargs) + reader = PdfReader(b) + assert reader.pages[0].images[0].indirect_reference is not None + self.indirect_reference.pdf._objects[self.indirect_reference.idnum - 1] = ( + reader.pages[0].images[0].indirect_reference.get_object() + ) + cast( + PdfObject, self.indirect_reference.get_object() + ).indirect_reference = self.indirect_reference + # change the object attributes + extension, byte_stream, img = _xobj_to_image( + cast(DictionaryObject, self.indirect_reference.get_object()) + ) + assert extension is not None + self.name = self.name[: self.name.rfind(".")] + extension + self.data = byte_stream + self.image = img + + def __str__(self) -> str: + return f"{self.__class__.__name__}(name={self.name}, data: {_human_readable_bytes(len(self.data))})" + + def __repr__(self) -> str: + return self.__str__()[:-1] + f", hash: {hash(self.data)})" + + +class VirtualListImages(Sequence[ImageFile]): + """ + Provides access to images referenced within a page. + Only one copy will be returned if the usage is used on the same page multiple times. + See :func:`PageObject.images` for more details. + """ + + def __init__( + self, + ids_function: Callable[[], List[Union[str, List[str]]]], + get_function: Callable[[Union[str, List[str], Tuple[str]]], ImageFile], + ) -> None: + self.ids_function = ids_function + self.get_function = get_function + self.current = -1 + + def __len__(self) -> int: + return len(self.ids_function()) + + def keys(self) -> List[Union[str, List[str]]]: + return self.ids_function() + + def items(self) -> List[Tuple[Union[str, List[str]], ImageFile]]: + return [(x, self[x]) for x in self.ids_function()] + + @overload + def __getitem__(self, index: Union[int, str, List[str]]) -> ImageFile: + ... + + @overload + def __getitem__(self, index: slice) -> Sequence[ImageFile]: + ... + + def __getitem__( + self, index: Union[int, slice, str, List[str], Tuple[str]] + ) -> Union[ImageFile, Sequence[ImageFile]]: + lst = self.ids_function() + if isinstance(index, slice): + indices = range(*index.indices(len(self))) + lst = [lst[x] for x in indices] + cls = type(self) + return cls((lambda: lst), self.get_function) + if isinstance(index, (str, list, tuple)): + return self.get_function(index) + if not isinstance(index, int): + raise TypeError("invalid sequence indices type") + len_self = len(lst) + if index < 0: + # support negative indexes + index = len_self + index + if index < 0 or index >= len_self: + raise IndexError("sequence index out of range") + return self.get_function(lst[index]) + + def __iter__(self) -> Iterator[ImageFile]: + for i in range(len(self)): + yield self[i] + + def __str__(self) -> str: + p = [f"Image_{i}={n}" for i, n in enumerate(self.ids_function())] + return f"[{', '.join(p)}]" + + class PageObject(DictionaryObject): """ PageObject represents a single page within a PDF file. @@ -397,33 +554,6 @@ def create_blank_page( return page - @property - def _old_images(self) -> List[File]: # deprecated - """ - Get a list of all images of the page. - - This requires pillow. You can install it via 'pip install pypdf[image]'. - - For the moment, this does NOT include inline images. They will be added - in future. - """ - images_extracted: List[File] = [] - if RES.XOBJECT not in self[PG.RESOURCES]: # type: ignore - return images_extracted - - x_object = self[PG.RESOURCES][RES.XOBJECT].get_object() # type: ignore - for obj in x_object: - if x_object[obj][IA.SUBTYPE] == "/Image": - extension, byte_stream, img = _xobj_to_image(x_object[obj]) - if extension is not None: - filename = f"{obj[1:]}{extension}" - images_extracted.append(File(name=filename, data=byte_stream)) - images_extracted[-1].image = img - images_extracted[-1].indirect_reference = x_object[ - obj - ].indirect_reference - return images_extracted - def _get_ids_image( self, obj: Optional[DictionaryObject] = None, @@ -501,7 +631,7 @@ def _get_image( return self._get_image(ids, cast(DictionaryObject, xobjs[id[0]])) @property - def images(self) -> List[ImageFile]: + def images(self) -> VirtualListImages: """ Read-only property emulating a list of images on a page. @@ -511,20 +641,19 @@ def images(self) -> List[ImageFile]: - An integer Examples: - reader.pages[0].images[0] # return fist image - reader.pages[0].images['/I0'] # return image '/I0' - # return image '/Image1' within '/TP1' Xobject/Form: - reader.pages[0].images['/TP1','/Image1'] - for img in reader.pages[0].images: # loop within all objects + * `reader.pages[0].images[0]` # return fist image + * `reader.pages[0].images['/I0']` # return image '/I0' + * `reader.pages[0].images['/TP1','/Image1']` # return image '/Image1' within '/TP1' Xobject/Form + * `for img in reader.pages[0].images:` # loops through all objects images.keys() and images.items() can be used. The ImageFile has the following properties: - `.name` : name of the object - `.data` : bytes of the object - `.image` : PIL Image Object - `.indirect_reference` : object reference + * `.name` : name of the object + * `.data` : bytes of the object + * `.image` : PIL Image Object + * `.indirect_reference` : object reference and the following methods: `.replace(new_image: PIL.Image.Image, **kwargs)` : @@ -538,7 +667,7 @@ def images(self) -> List[ImageFile]: Inline images are extracted and named ~0~, ~1~, ..., with the indirect_reference set to None. """ - return _VirtualListImages(self._get_ids_image, self._get_image) # type: ignore + return VirtualListImages(self._get_ids_image, self._get_image) def _translate_value_inlineimage(self, k: str, v: PdfObject) -> PdfObject: """Translate values used in inline image""" @@ -852,7 +981,7 @@ def _add_transformation_matrix( FloatObject(e), FloatObject(f), ], - " cm", + b"cm", ], ) return contents @@ -870,7 +999,7 @@ def _get_contents_as_bytes(self) -> Optional[bytes]: if isinstance(obj, list): return b"".join(x.get_object().get_data() for x in obj) else: - return cast(bytes, cast(EncodedStreamObject, obj).get_data()) + return cast(EncodedStreamObject, obj).get_data() else: return None @@ -1063,11 +1192,11 @@ def _merge_page( rect.height, ], ), - "re", + b"re", ), ) - page2content.operations.insert(1, ([], "W")) - page2content.operations.insert(2, ([], "n")) + page2content.operations.insert(1, ([], b"W")) + page2content.operations.insert(2, ([], b"n")) if page2transformation is not None: page2content = page2transformation(page2content) page2content = PageObject._content_stream_rename( @@ -1201,11 +1330,11 @@ def _merge_page_writer( rect.height, ], ), - "re", + b"re", ), ) - page2content.operations.insert(1, ([], "W")) - page2content.operations.insert(2, ([], "n")) + page2content.operations.insert(1, ([], b"W")) + page2content.operations.insert(2, ([], b"n")) if page2transformation is not None: page2content = page2transformation(page2content) page2content = PageObject._content_stream_rename( @@ -1753,7 +1882,7 @@ def process_operation(operator: bytes, operands: List[Any]) -> None: cmap = ( unknown_char_map[2], unknown_char_map[3], - "???" + operands[0], + f"???{operands[0]}", None, ) try: @@ -2399,60 +2528,3 @@ def process_font(f: DictionaryObject) -> None: for a in cast(DictionaryObject, cast(DictionaryObject, obj["/AP"])["/N"]): _get_fonts_walk(cast(DictionaryObject, a), fnt, emb) return fnt, emb # return the sets for each page - - -class _VirtualListImages(Sequence[ImageFile]): - def __init__( - self, - ids_function: Callable[[], List[Union[str, List[str]]]], - get_function: Callable[[Union[str, List[str], Tuple[str]]], ImageFile], - ) -> None: - self.ids_function = ids_function - self.get_function = get_function - self.current = -1 - - def __len__(self) -> int: - return len(self.ids_function()) - - def keys(self) -> List[Union[str, List[str]]]: - return self.ids_function() - - def items(self) -> List[Tuple[Union[str, List[str]], ImageFile]]: - return [(x, self[x]) for x in self.ids_function()] - - @overload - def __getitem__(self, index: Union[int, str, List[str]]) -> ImageFile: - ... - - @overload - def __getitem__(self, index: slice) -> Sequence[ImageFile]: - ... - - def __getitem__( - self, index: Union[int, slice, str, List[str], Tuple[str]] - ) -> Union[ImageFile, Sequence[ImageFile]]: - lst = self.ids_function() - if isinstance(index, slice): - indices = range(*index.indices(len(self))) - lst = [lst[x] for x in indices] - cls = type(self) - return cls((lambda: lst), self.get_function) - if isinstance(index, (str, list, tuple)): - return self.get_function(index) - if not isinstance(index, int): - raise TypeError("invalid sequence indices type") - len_self = len(lst) - if index < 0: - # support negative indexes - index = len_self + index - if index < 0 or index >= len_self: - raise IndexError("sequence index out of range") - return self.get_function(lst[index]) - - def __iter__(self) -> Iterator[ImageFile]: - for i in range(len(self)): - yield self[i] - - def __str__(self) -> str: - p = [f"Image_{i}={n}" for i, n in enumerate(self.ids_function())] - return f"[{', '.join(p)}]" diff --git a/pypdf/_protocols.py b/pypdf/_protocols.py index 9f413660bb..b5fa14879c 100644 --- a/pypdf/_protocols.py +++ b/pypdf/_protocols.py @@ -2,13 +2,7 @@ from abc import abstractmethod from pathlib import Path -from typing import IO, Any, Dict, List, Optional, Tuple, Union - -try: - # Python 3.8+: https://peps.python.org/pep-0586 - from typing import Protocol -except ImportError: - from typing_extensions import Protocol # type: ignore[assignment] +from typing import IO, Any, Dict, List, Optional, Protocol, Tuple, Union from ._utils import StrByteType, StreamType diff --git a/pypdf/_reader.py b/pypdf/_reader.py index aeababa7b7..f505abf75b 100644 --- a/pypdf/_reader.py +++ b/pypdf/_reader.py @@ -33,6 +33,7 @@ from pathlib import Path from types import TracebackType from typing import ( + TYPE_CHECKING, Any, Callable, Dict, @@ -47,11 +48,9 @@ from ._doc_common import PdfDocCommon, convert_to_int from ._encryption import Encryption, PasswordType -from ._page import PageObject from ._utils import ( StrByteType, StreamType, - b_, logger_warning, read_non_whitespace, read_previous_line, @@ -78,11 +77,15 @@ NullObject, NumberObject, PdfObject, + StreamObject, TextStringObject, read_object, ) from .xmp import XmpInformation +if TYPE_CHECKING: + from ._page import PageObject + class PdfReader(PdfDocCommon): """ @@ -188,7 +191,10 @@ def close(self) -> None: @property def root_object(self) -> DictionaryObject: """Provide access to "/Root". Standardized with PdfWriter.""" - return cast(DictionaryObject, self.trailer[TK.ROOT].get_object()) + root = self.trailer[TK.ROOT] + if root is None: + raise PdfReadError('Cannot find "/Root" key in trailer') + return cast(DictionaryObject, root.get_object()) @property def _info(self) -> Optional[DictionaryObject]: @@ -274,22 +280,6 @@ def xmp_metadata(self) -> Optional[XmpInformation]: finally: self._override_encryption = False - def _get_page(self, page_number: int) -> PageObject: - """ - Retrieve a page by number from this PDF file. - - Args: - page_number: The page number to retrieve - (pages begin at zero) - - Returns: - A :class:`PageObject` instance. - """ - if self.flattened_pages is None: - self._flatten() - assert self.flattened_pages is not None, "hint for mypy" - return self.flattened_pages[page_number] - def _get_page_number_by_indirect( self, indirect_reference: Union[None, int, NullObject, IndirectObject] ) -> Optional[int]: @@ -326,9 +316,7 @@ def _get_object_from_stream( obj_stm: EncodedStreamObject = IndirectObject(stmnum, 0, self).get_object() # type: ignore # This is an xref to a stream, so its type better be a stream assert cast(str, obj_stm["/Type"]) == "/ObjStm" - # /N is the number of indirect objects in the stream - assert idx < obj_stm["/N"] - stream_data = BytesIO(b_(obj_stm.get_data())) + stream_data = BytesIO(obj_stm.get_data()) for i in range(obj_stm["/N"]): # type: ignore read_non_whitespace(stream_data) stream_data.seek(-1, 1) @@ -542,7 +530,10 @@ def read_object_header(self, stream: StreamType) -> Tuple[int, int]: def cache_get_indirect_object( self, generation: int, idnum: int ) -> Optional[PdfObject]: - return self.resolved_objects.get((generation, idnum)) + try: + return self.resolved_objects.get((generation, idnum)) + except RecursionError: + raise PdfReadError("Maximum recursion depth reached.") def cache_indirect_object( self, generation: int, idnum: int, obj: Optional[PdfObject] @@ -932,7 +923,7 @@ def _read_pdf15_xref_stream( xrefstream = cast(ContentStream, read_object(stream, self)) assert cast(str, xrefstream["/Type"]) == "/XRef" self.cache_indirect_object(generation, idnum, xrefstream) - stream_data = BytesIO(b_(xrefstream.get_data())) + stream_data = BytesIO(xrefstream.get_data()) # Index pairs specify the subsections in the dictionary. If # none create one subsection that spans everything. idx_pairs = xrefstream.get("/Index", [0, xrefstream.get("/Size")]) @@ -1005,6 +996,41 @@ def _rebuild_xref_table(self, stream: StreamType) -> None: if generation not in self.xref: self.xref[generation] = {} self.xref[generation][idnum] = m.start(1) + + logger_warning("parsing for Object Streams", __name__) + for g in self.xref: + for i in self.xref[g]: + # get_object in manual + stream.seek(self.xref[g][i], 0) + try: + _ = self.read_object_header(stream) + o = cast(StreamObject, read_object(stream, self)) + if o.get("/Type", "") != "/ObjStm": + continue + strm = BytesIO(o.get_data()) + cpt = 0 + while True: + s = read_until_whitespace(strm) + if not s.isdigit(): + break + _i = int(s) + skip_over_whitespace(strm) + strm.seek(-1, 1) + s = read_until_whitespace(strm) + if not s.isdigit(): # pragma: no cover + break # pragma: no cover + _o = int(s) + self.xref_objStm[_i] = (i, _o) + cpt += 1 + if cpt != o.get("/N"): # pragma: no cover + logger_warning( # pragma: no cover + f"found {cpt} objects within Object({i},{g})" + f" whereas {o.get('/N')} expected", + __name__, + ) + except Exception: # could be of many cause + pass + stream.seek(0, 0) for m in re.finditer(rb"[\r\n \t][ \t]*trailer[\r\n \t]*(<<)", f_): stream.seek(m.start(1), 0) diff --git a/pypdf/_text_extraction/_layout_mode/_fixed_width_page.py b/pypdf/_text_extraction/_layout_mode/_fixed_width_page.py index 1be500959c..e7af1b2340 100644 --- a/pypdf/_text_extraction/_layout_mode/_fixed_width_page.py +++ b/pypdf/_text_extraction/_layout_mode/_fixed_width_page.py @@ -1,10 +1,9 @@ """Extract PDF text preserving the layout of the source PDF""" -import sys from itertools import groupby from math import ceil from pathlib import Path -from typing import Any, Dict, Iterator, List, Optional, Tuple +from typing import Any, Dict, Iterator, List, Literal, Optional, Tuple, TypedDict from ..._utils import logger_warning from .. import LAYOUT_NEW_BT_GROUP_SPACE_WIDTHS @@ -12,11 +11,6 @@ from ._text_state_manager import TextStateManager from ._text_state_params import TextStateParams -if sys.version_info >= (3, 8): - from typing import Literal, TypedDict -else: - from typing_extensions import Literal, TypedDict - class BTGroup(TypedDict): """ diff --git a/pypdf/_text_extraction/_layout_mode/_font.py b/pypdf/_text_extraction/_layout_mode/_font.py index a912fddb27..1d9617d74a 100644 --- a/pypdf/_text_extraction/_layout_mode/_font.py +++ b/pypdf/_text_extraction/_layout_mode/_font.py @@ -1,8 +1,9 @@ """Font constants and classes for "layout" mode text operations""" from dataclasses import dataclass, field -from typing import Any, Dict, Sequence, Union +from typing import Any, Dict, Sequence, Union, cast +from ...errors import ParseError from ...generic import IndirectObject from ._font_widths import STANDARD_WIDTHS @@ -43,7 +44,7 @@ def __post_init__(self) -> None: self.font_dictionary["/DescendantFonts"] ): while isinstance(d_font, IndirectObject): - d_font = d_font.get_object() # type: ignore[assignment] + d_font = d_font.get_object() self.font_dictionary["/DescendantFonts"][d_font_idx] = d_font ord_map = { ord(_target): _surrogate @@ -58,6 +59,7 @@ def __post_init__(self) -> None: skip_count = 0 _w = d_font.get("/W", []) for idx, w_entry in enumerate(_w): + w_entry = w_entry.get_object() if skip_count: skip_count -= 1 continue @@ -66,13 +68,18 @@ def __post_init__(self) -> None: # warning and or use reader's "strict" to force an ex??? continue # check for format (1): `int [int int int int ...]` - if isinstance(_w[idx + 1], Sequence): - start_idx, width_list = _w[idx : idx + 2] + w_next_entry = _w[idx + 1].get_object() + if isinstance(w_next_entry, Sequence): + start_idx, width_list = w_entry, w_next_entry self.width_map.update( { ord_map[_cidx]: _width for _cidx, _width in zip( - range(start_idx, start_idx + len(width_list), 1), + range( + cast(int, start_idx), + cast(int, start_idx) + len(width_list), + 1, + ), width_list, ) if _cidx in ord_map @@ -80,18 +87,31 @@ def __post_init__(self) -> None: ) skip_count = 1 # check for format (2): `int int int` - if not isinstance(_w[idx + 1], Sequence) and not isinstance( - _w[idx + 2], Sequence + elif isinstance(w_next_entry, (int, float)) and isinstance( + _w[idx + 2].get_object(), (int, float) ): - start_idx, stop_idx, const_width = _w[idx : idx + 3] + start_idx, stop_idx, const_width = ( + w_entry, + w_next_entry, + _w[idx + 2].get_object(), + ) self.width_map.update( { ord_map[_cidx]: const_width - for _cidx in range(start_idx, stop_idx + 1, 1) + for _cidx in range( + cast(int, start_idx), cast(int, stop_idx + 1), 1 + ) if _cidx in ord_map } ) skip_count = 2 + else: + # Note: this doesn't handle the case of out of bounds (reaching the end of the width definitions + # while expecting more elements). This raises an IndexError which is sufficient. + raise ParseError( + f"Invalid font width definition. Next elements: {w_entry}, {w_next_entry}, {_w[idx + 2]}" + ) # pragma: no cover + if not self.width_map and "/BaseFont" in self.font_dictionary: for key in STANDARD_WIDTHS: if self.font_dictionary["/BaseFont"].startswith(f"/{key}"): diff --git a/pypdf/_utils.py b/pypdf/_utils.py index 38c0d67d7a..e0034ccc4e 100644 --- a/pypdf/_utils.py +++ b/pypdf/_utils.py @@ -36,7 +36,7 @@ import warnings from dataclasses import dataclass from datetime import datetime, timezone -from io import DEFAULT_BUFFER_SIZE, BytesIO +from io import DEFAULT_BUFFER_SIZE from os import SEEK_CUR from typing import ( IO, @@ -47,7 +47,6 @@ Pattern, Tuple, Union, - cast, overload, ) @@ -336,34 +335,6 @@ def mark_location(stream: StreamType) -> None: stream.seek(-radius, 1) -B_CACHE: Dict[str, bytes] = {} - - -def b_(s: Union[str, bytes]) -> bytes: - if isinstance(s, bytes): - return s - bc = B_CACHE - if s in bc: - return bc[s] - try: - r = s.encode("latin-1") - if len(s) < 2: - bc[s] = r - return r - except Exception: - r = s.encode("utf-8") - if len(s) < 2: - bc[s] = r - return r - - -def str_(b: Any) -> str: - if isinstance(b, bytes): - return b.decode("latin-1") - else: - return str(b) # will return b.__str__() if defined - - @overload def ord_(b: str) -> int: ... @@ -390,20 +361,6 @@ def ord_(b: Union[int, str, bytes]) -> Union[int, bytes]: WHITESPACES_AS_REGEXP = b"[" + WHITESPACES_AS_BYTES + b"]" -def paeth_predictor(left: int, up: int, up_left: int) -> int: - p = left + up - up_left - dist_left = abs(p - left) - dist_up = abs(p - up) - dist_up_left = abs(p - up_left) - - if dist_left <= dist_up and dist_left <= dist_up_left: - return left - elif dist_up <= dist_up_left: - return up - else: - return up_left - - def deprecate(msg: str, stacklevel: int = 3) -> None: warnings.warn(msg, DeprecationWarning, stacklevel=stacklevel) @@ -414,12 +371,17 @@ def deprecation(msg: str) -> None: def deprecate_with_replacement(old_name: str, new_name: str, removed_in: str) -> None: """Raise an exception that a feature will be removed, but has a replacement.""" - deprecate(f"{old_name} is deprecated and will be removed in pypdf {removed_in}. Use {new_name} instead.", 4) + deprecate( + f"{old_name} is deprecated and will be removed in pypdf {removed_in}. Use {new_name} instead.", + 4, + ) def deprecation_with_replacement(old_name: str, new_name: str, removed_in: str) -> None: """Raise an exception that a feature was already removed, but has a replacement.""" - deprecation(f"{old_name} is deprecated and was removed in pypdf {removed_in}. Use {new_name} instead.") + deprecation( + f"{old_name} is deprecated and was removed in pypdf {removed_in}. Use {new_name} instead." + ) def deprecate_no_replacement(name: str, removed_in: str) -> None: @@ -562,10 +524,18 @@ def getter(self, method): # type: ignore # noqa: ANN001, ANN202 class File: from .generic import IndirectObject - name: str - data: bytes - image: Optional[Any] = None # optional ; direct image access - indirect_reference: Optional[IndirectObject] = None # optional ; link to PdfObject + name: str = "" + """ + Filename as identified within the PDF file. + """ + data: bytes = b"" + """ + Data as bytes. + """ + indirect_reference: Optional[IndirectObject] = None + """ + Reference to the object storing the stream. + """ def __str__(self) -> str: return f"{self.__class__.__name__}(name={self.name}, data: {_human_readable_bytes(len(self.data))})" @@ -574,66 +544,6 @@ def __repr__(self) -> str: return self.__str__()[:-1] + f", hash: {hash(self.data)})" -@dataclass -class ImageFile(File): - from .generic import IndirectObject - - image: Optional[Any] = None # optional ; direct PIL image access - indirect_reference: Optional[IndirectObject] = None # optional ; link to PdfObject - - def replace(self, new_image: Any, **kwargs: Any) -> None: - """ - Replace the Image with a new PIL image. - - Args: - new_image (PIL.Image.Image): The new PIL image to replace the existing image. - **kwargs: Additional keyword arguments to pass to `Image.Image.save()`. - - Raises: - TypeError: If the image is inline or in a PdfReader. - TypeError: If the image does not belong to a PdfWriter. - TypeError: If `new_image` is not a PIL Image. - - Note: - This method replaces the existing image with a new image. - It is not allowed for inline images or images within a PdfReader. - The `kwargs` parameter allows passing additional parameters - to `Image.Image.save()`, such as quality. - """ - from PIL import Image - - from ._reader import PdfReader - - # to prevent circular import - from .filters import _xobj_to_image - from .generic import DictionaryObject, PdfObject - - if self.indirect_reference is None: - raise TypeError("Can not update an inline image") - if not hasattr(self.indirect_reference.pdf, "_id_translated"): - raise TypeError("Can not update an image not belonging to a PdfWriter") - if not isinstance(new_image, Image.Image): - raise TypeError("new_image shall be a PIL Image") - b = BytesIO() - new_image.save(b, "PDF", **kwargs) - reader = PdfReader(b) - assert reader.pages[0].images[0].indirect_reference is not None - self.indirect_reference.pdf._objects[self.indirect_reference.idnum - 1] = ( - reader.pages[0].images[0].indirect_reference.get_object() - ) - cast( - PdfObject, self.indirect_reference.get_object() - ).indirect_reference = self.indirect_reference - # change the object attributes - extension, byte_stream, img = _xobj_to_image( - cast(DictionaryObject, self.indirect_reference.get_object()) - ) - assert extension is not None - self.name = self.name[: self.name.rfind(".")] + extension - self.data = byte_stream - self.image = img - - @functools.total_ordering class Version: COMPONENT_PATTERN = re.compile(r"^(\d+)(.*)$") diff --git a/pypdf/_writer.py b/pypdf/_writer.py index 00b9d498c0..a72e2a23df 100644 --- a/pypdf/_writer.py +++ b/pypdf/_writer.py @@ -27,20 +27,19 @@ # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. -import collections import decimal import enum import hashlib import re import uuid from io import BytesIO, FileIO, IOBase +from itertools import compress from pathlib import Path from types import TracebackType from typing import ( IO, Any, Callable, - Deque, Dict, Iterable, List, @@ -62,7 +61,7 @@ StrByteType, StreamType, _get_max_pdf_version_header, - b_, + deprecate, deprecate_with_replacement, logger_warning, ) @@ -157,12 +156,17 @@ def __init__( clone_from: Union[None, PdfReader, StrByteType, Path] = None, ) -> None: self._header = b"%PDF-1.3" - self._objects: List[PdfObject] = [] + self._objects: List[Optional[PdfObject]] = [] """The indirect objects in the PDF.""" - self._idnum_hash: Dict[bytes, IndirectObject] = {} - """Maps hash values of indirect objects to their IndirectObject instances.""" + """Maps hash values of indirect objects to the list of IndirectObjects. + This is used for compression. + """ + self._idnum_hash: Dict[bytes, Tuple[IndirectObject, List[IndirectObject]]] = {} + """List of already translated IDs. + dict[id(pdf)][(idnum, generation)] + """ self._id_translated: Dict[int, Dict[int, int]] = {} # The root of our page tree node. @@ -371,10 +375,13 @@ def get_object( indirect_reference: Union[int, IndirectObject], ) -> PdfObject: if isinstance(indirect_reference, int): - return self._objects[indirect_reference - 1] - if indirect_reference.pdf != self: + obj = self._objects[indirect_reference - 1] + elif indirect_reference.pdf != self: raise ValueError("pdf must be self") - return self._objects[indirect_reference.idnum - 1] + else: + obj = self._objects[indirect_reference.idnum - 1] + assert obj is not None # clarification for mypy + return obj def _replace_object( self, @@ -393,7 +400,9 @@ def _replace_object( obj = obj.clone(self) self._objects[indirect_reference - 1] = obj obj.indirect_reference = IndirectObject(indirect_reference, gen, self) - return self._objects[indirect_reference - 1] + + assert isinstance(obj, PdfObject) # clarification for mypy + return obj def _add_page( self, @@ -678,9 +687,10 @@ def add_attachment(self, filename: str, data: Union[str, bytes]) -> None: # Hello world! # endstream # endobj - + if isinstance(data, str): + data = data.encode("latin-1") file_entry = DecodedStreamObject() - file_entry.set_data(b_(data)) + file_entry.set_data(data) file_entry.update({NameObject(PA.TYPE): NameObject("/EmbeddedFile")}) # The Filespec entry @@ -1242,14 +1252,13 @@ def write_stream(self, stream: StreamType) -> None: "It may not be written to correctly.", __name__, ) + # deprecated to be removed in pypdf 6.0.0 : + # if not self._root: + # self._root = self._add_object(self._root_object) + # self._sweep_indirect_references(self._root) - if not self._root: - self._root = self._add_object(self._root_object) - - self._sweep_indirect_references(self._root) - - object_positions = self._write_pdf_structure(stream) - xref_location = self._write_xref_table(stream, object_positions) + object_positions, free_objects = self._write_pdf_structure(stream) + xref_location = self._write_xref_table(stream, object_positions, free_objects) self._write_trailer(stream, xref_location) def write(self, stream: Union[Path, StrByteType]) -> Tuple[bool, IO[Any]]: @@ -1282,8 +1291,9 @@ def write(self, stream: Union[Path, StrByteType]) -> Tuple[bool, IO[Any]]: return my_file, stream - def _write_pdf_structure(self, stream: StreamType) -> List[int]: + def _write_pdf_structure(self, stream: StreamType) -> Tuple[List[int], List[int]]: object_positions = [] + free_objects = [] # will contain list of all free entries stream.write(self.pdf_header.encode() + b"\n") stream.write(b"%\xE2\xE3\xCF\xD3\n") @@ -1296,15 +1306,26 @@ def _write_pdf_structure(self, stream: StreamType) -> List[int]: obj = self._encryption.encrypt_object(obj, idnum, 0) obj.write_to_stream(stream) stream.write(b"\nendobj\n") - return object_positions - - def _write_xref_table(self, stream: StreamType, object_positions: List[int]) -> int: + else: + object_positions.append(-1) + free_objects.append(i + 1) + free_objects.append(0) # add 0 to loop in accordance with PDF spec + return object_positions, free_objects + + def _write_xref_table( + self, stream: StreamType, object_positions: List[int], free_objects: List[int] + ) -> int: xref_location = stream.tell() stream.write(b"xref\n") stream.write(f"0 {len(self._objects) + 1}\n".encode()) - stream.write(f"{0:0>10} {65535:0>5} f \n".encode()) + stream.write(f"{free_objects[0]:0>10} {65535:0>5} f \n".encode()) + free_idx = 1 for offset in object_positions: - stream.write(f"{offset:0>10} {0:0>5} n \n".encode()) + if offset > 0: + stream.write(f"{offset:0>10} {0:0>5} n \n".encode()) + else: + stream.write(f"{free_objects[free_idx]:0>10} {1:0>5} f \n".encode()) + free_idx += 1 return xref_location def _write_trailer(self, stream: StreamType, xref_location: int) -> None: @@ -1349,6 +1370,79 @@ def add_metadata(self, infos: Dict[str, Any]) -> None: assert isinstance(self._info, DictionaryObject) self._info.update(args) + def compress_identical_objects( + self, + remove_identicals: bool = True, + remove_orphans: bool = True, + ) -> None: + """ + Parse the PDF file and merge objects that have same hash. + This will make objects common to multiple pages. + Recommended to be used just before writing output. + + Args: + remove_identicals: Remove identical objects. + remove_orphans: Remove unreferenced objects. + """ + + def replace_in_obj( + obj: PdfObject, crossref: Dict[IndirectObject, IndirectObject] + ) -> None: + if isinstance(obj, DictionaryObject): + key_val = obj.items() + elif isinstance(obj, ArrayObject): + key_val = enumerate(obj) # type: ignore + else: + return + assert isinstance(obj, (DictionaryObject, ArrayObject)) + for k, v in key_val: + if isinstance(v, IndirectObject): + orphans[v.idnum - 1] = False + if v in crossref: + obj[k] = crossref[v] + else: + """the filtering on DictionaryObject and ArrayObject only + will be performed within replace_in_obj""" + replace_in_obj(v, crossref) + + # _idnum_hash :dict[hash]=(1st_ind_obj,[other_indir_objs,...]) + self._idnum_hash = {} + orphans = [True] * len(self._objects) + # look for similar objects + for idx, obj in enumerate(self._objects): + if obj is None: + continue + assert isinstance(obj.indirect_reference, IndirectObject) + h = obj.hash_value() + if remove_identicals and h in self._idnum_hash: + self._idnum_hash[h][1].append(obj.indirect_reference) + self._objects[idx] = None + else: + self._idnum_hash[h] = (obj.indirect_reference, []) + + # generate the dict converting others to 1st + cnv = {v[0]: v[1] for v in self._idnum_hash.values() if len(v[1]) > 0} + cnv_rev: Dict[IndirectObject, IndirectObject] = {} + for k, v in cnv.items(): + cnv_rev.update(zip(v, (k,) * len(v))) + + # replace reference to merged objects + for obj in self._objects: + if isinstance(obj, (DictionaryObject, ArrayObject)): + replace_in_obj(obj, cnv_rev) + + # remove orphans (if applicable) + orphans[self.root_object.indirect_reference.idnum - 1] = False # type: ignore + + orphans[self._info.indirect_reference.idnum - 1] = False # type: ignore + + try: + orphans[self._ID.indirect_reference.idnum - 1] = False # type: ignore + except AttributeError: + pass + for i in compress(range(len(self._objects)), orphans): + self._objects[i] = None + def _sweep_indirect_references( self, root: Union[ @@ -1363,7 +1457,7 @@ def _sweep_indirect_references( TextStringObject, NullObject, ], - ) -> None: + ) -> None: # deprecated """ Resolving any circular references to Page objects. @@ -1379,73 +1473,13 @@ def _sweep_indirect_references( Args: root: The root of the PDF object tree to sweep. """ - stack: Deque[ - Tuple[ - Any, - Optional[Any], - Any, - List[PdfObject], - ] - ] = collections.deque() - discovered = [] - parent = None - grant_parents: List[PdfObject] = [] - key_or_id = None - - # Start from root - stack.append((root, parent, key_or_id, grant_parents)) - - while len(stack): - data, parent, key_or_id, grant_parents = stack.pop() - - # Build stack for a processing depth-first - if isinstance(data, (ArrayObject, DictionaryObject)): - for key, value in data.items(): - stack.append( - ( - value, - data, - key, - grant_parents + [parent] if parent is not None else [], - ) - ) - elif isinstance(data, IndirectObject) and data.pdf != self: - data = self._resolve_indirect_object(data) - - if str(data) not in discovered: - discovered.append(str(data)) - stack.append((data.get_object(), None, None, [])) - - # Check if data has a parent and if it is a dict or - # an array update the value - if isinstance(parent, (DictionaryObject, ArrayObject)): - if isinstance(data, StreamObject): - # a dictionary value is a stream; streams must be indirect - # objects, so we need to change this value. - data = self._resolve_indirect_object(self._add_object(data)) - - update_hashes = [] - - # Data changed and thus the hash value changed - if parent[key_or_id] != data: - update_hashes = [parent.hash_value()] + [ - grant_parent.hash_value() for grant_parent in grant_parents - ] - parent[key_or_id] = data - - # Update old hash value to new hash value - for old_hash in update_hashes: - indirect_reference = self._idnum_hash.pop(old_hash, None) - - if indirect_reference is not None: - indirect_reference_obj = indirect_reference.get_object() - - if indirect_reference_obj is not None: - self._idnum_hash[ - indirect_reference_obj.hash_value() - ] = indirect_reference + deprecate( + "_sweep_indirect_references has been removed, please report to dev team if this warning is observed", + ) - def _resolve_indirect_object(self, data: IndirectObject) -> IndirectObject: + def _resolve_indirect_object( + self, data: IndirectObject + ) -> IndirectObject: # deprecated """ Resolves an indirect object to an indirect object in this PDF file. @@ -1470,36 +1504,10 @@ def _resolve_indirect_object(self, data: IndirectObject) -> IndirectObject: Raises: ValueError: If the input stream is closed. """ - if hasattr(data.pdf, "stream") and data.pdf.stream.closed: - raise ValueError(f"I/O operation on closed file: {data.pdf.stream.name}") - - if data.pdf == self: - return data - - # Get real object indirect object - real_obj = data.pdf.get_object(data) - - if real_obj is None: - logger_warning( - f"Unable to resolve [{data.__class__.__name__}: {data}], " - "returning NullObject instead", - __name__, - ) - real_obj = NullObject() - - hash_value = real_obj.hash_value() - - # Check if object is handled - if hash_value in self._idnum_hash: - return self._idnum_hash[hash_value] - - if data.pdf == self: - self._idnum_hash[hash_value] = IndirectObject(data.idnum, 0, self) - # This is new object in this pdf - else: - self._idnum_hash[hash_value] = self._add_object(real_obj) - - return self._idnum_hash[hash_value] + deprecate( + "_resolve_indirect_object has been removed, please report to dev team if this warning is observed", + ) + return IndirectObject(0, 0, self) def get_reference(self, obj: PdfObject) -> IndirectObject: idnum = self._objects.index(obj) + 1 diff --git a/pypdf/_xobj_image_helpers.py b/pypdf/_xobj_image_helpers.py index 45b0c145be..d870b15897 100644 --- a/pypdf/_xobj_image_helpers.py +++ b/pypdf/_xobj_image_helpers.py @@ -2,11 +2,11 @@ import sys from io import BytesIO -from typing import Any, List, Tuple, Union, cast +from typing import Any, List, Literal, Tuple, Union, cast from ._utils import check_if_whitespace_only, logger_warning from .constants import ColorSpaces -from .errors import PdfReadError +from .errors import EmptyImageDataError, PdfReadError from .generic import ( ArrayObject, DecodedStreamObject, @@ -15,13 +15,6 @@ NullObject, ) -if sys.version_info[:2] >= (3, 8): - from typing import Literal -else: - # PEP 586 introduced typing.Literal with Python 3.8 - # For older Python versions, the backport typing_extensions is necessary: - from typing_extensions import Literal - if sys.version_info[:2] >= (3, 10): from typing import TypeAlias else: @@ -129,7 +122,7 @@ def bits2byte(data: bytes, size: Tuple[int, int], bits: int) -> bytes: by = 0 bit = 8 - bits for y in range(size[1]): - if (bit != 0) and (bit != 8 - bits): + if bit != 8 - bits: by += 1 bit = 8 - bits for x in range(size[0]): @@ -148,9 +141,14 @@ def _extended_image_frombytes( img = Image.frombytes(mode, size, data) except ValueError as exc: nb_pix = size[0] * size[1] - if len(data) % nb_pix != 0: + data_length = len(data) + if data_length == 0: + raise EmptyImageDataError( + "Data is 0 bytes, cannot process an image from empty data." + ) from exc + if data_length % nb_pix != 0: raise exc - k = nb_pix * len(mode) / len(data) + k = nb_pix * len(mode) / data_length data = b"".join([bytes((x,) * int(k)) for x in data]) img = Image.frombytes(mode, size, data) return img diff --git a/pypdf/annotations/_markup_annotations.py b/pypdf/annotations/_markup_annotations.py index 4db8dfdbf0..98a222483b 100644 --- a/pypdf/annotations/_markup_annotations.py +++ b/pypdf/annotations/_markup_annotations.py @@ -104,9 +104,9 @@ def __init__( self[NameObject("/Rect")] = RectangleObject(rect) font_str = "font: " - if bold is True: + if bold: font_str = f"{font_str}bold " - if italic is True: + if italic: font_str = f"{font_str}italic " font_str = f"{font_str}{font} {font_size}" font_str = f"{font_str};text-align:left;color:#{font_color}" diff --git a/pypdf/annotations/_non_markup_annotations.py b/pypdf/annotations/_non_markup_annotations.py index dcdb3b0ff8..6272cceee6 100644 --- a/pypdf/annotations/_non_markup_annotations.py +++ b/pypdf/annotations/_non_markup_annotations.py @@ -1,6 +1,5 @@ from typing import TYPE_CHECKING, Any, Optional, Tuple, Union -from ..constants import AnnotationFlag from ..generic._base import ( BooleanObject, NameObject, @@ -12,8 +11,6 @@ from ..generic._rectangle import RectangleObject from ._base import AnnotationDictionary -DEFAULT_ANNOTATION_FLAG = AnnotationFlag(0) - class Link(AnnotationDictionary): def __init__( diff --git a/pypdf/errors.py b/pypdf/errors.py index c962dec662..ad197ffc11 100644 --- a/pypdf/errors.py +++ b/pypdf/errors.py @@ -59,4 +59,8 @@ class EmptyFileError(PdfReadError): """Raised when a PDF file is empty or has no content.""" +class EmptyImageDataError(PyPdfError): + """Raised when trying to process an image that has no data.""" + + STREAM_TRUNCATED_PREMATURELY = "Stream has ended unexpectedly" diff --git a/pypdf/filters.py b/pypdf/filters.py index 137e3603a3..43730cc8e9 100644 --- a/pypdf/filters.py +++ b/pypdf/filters.py @@ -43,7 +43,7 @@ from ._utils import ( WHITESPACES_AS_BYTES, - b_, + deprecate, deprecate_with_replacement, deprecation_no_replacement, logger_warning, @@ -376,20 +376,18 @@ class LZWDecode: """ Taken from: - http://www.java2s.com/Open-Source/Java-Document/PDF/PDF- - Renderer/com/sun/pdfview/decode/LZWDecode.java.htm + http://www.java2s.com/Open-Source/Java-Document/PDF/PDF-Renderer/com/sun/pdfview/decode/LZWDecode.java.htm """ class Decoder: + STOP = 257 + CLEARDICT = 256 + def __init__(self, data: bytes) -> None: - self.STOP = 257 - self.CLEARDICT = 256 self.data = data self.bytepos = 0 self.bitpos = 0 - self.dict = [""] * 4096 - for i in range(256): - self.dict[i] = chr(i) + self.dict = [struct.pack("B", i) for i in range(256)] + [b""] * (4096 - 256) self.reset_dict() def reset_dict(self) -> None: @@ -416,7 +414,7 @@ def next_code(self) -> int: self.bytepos = self.bytepos + 1 return value - def decode(self) -> str: + def decode(self) -> bytes: """ TIFF 6.0 specification explains in sufficient details the steps to implement the LZW encode() and decode() algorithms. @@ -429,7 +427,7 @@ def decode(self) -> str: PdfReadError: If the stop code is missing """ cW = self.CLEARDICT - baos = "" + baos = b"" while True: pW = cW cW = self.next_code() @@ -444,11 +442,11 @@ def decode(self) -> str: else: if cW < self.dictlen: baos += self.dict[cW] - p = self.dict[pW] + self.dict[cW][0] + p = self.dict[pW] + self.dict[cW][0:1] self.dict[self.dictlen] = p self.dictlen += 1 else: - p = self.dict[pW] + self.dict[pW][0] + p = self.dict[pW] + self.dict[pW][0:1] baos += p self.dict[self.dictlen] = p self.dictlen += 1 @@ -460,11 +458,11 @@ def decode(self) -> str: return baos @staticmethod - def decode( + def _decodeb( data: bytes, decode_parms: Optional[DictionaryObject] = None, **kwargs: Any, - ) -> str: + ) -> bytes: """ Decode an LZW encoded data stream. @@ -476,9 +474,28 @@ def decode( decoded data. """ # decode_parms is unused here - return LZWDecode.Decoder(data).decode() + @staticmethod + def decode( + data: bytes, + decode_parms: Optional[DictionaryObject] = None, + **kwargs: Any, + ) -> str: # deprecated + """ + Decode an LZW encoded data stream. + + Args: + data: ``bytes`` or ``str`` text to decode. + decode_parms: a dictionary of parameter values. + + Returns: + decoded data. + """ + # decode_parms is unused here + deprecate("LZWDecode.decode will return bytes instead of str in pypdf 6.0.0") + return LZWDecode.Decoder(data).decode().decode("latin-1") + class ASCII85Decode: """Decodes string ASCII85-encoded data into a byte format.""" @@ -651,7 +668,7 @@ def decode( return tiff_header + data -def decode_stream_data(stream: Any) -> Union[bytes, str]: # utils.StreamObject +def decode_stream_data(stream: Any) -> bytes: # utils.StreamObject """ Decode the stream data based on the specified filters. @@ -678,7 +695,7 @@ def decode_stream_data(stream: Any) -> Union[bytes, str]: # utils.StreamObject decodparms = stream.get(SA.DECODE_PARMS, ({},) * len(filters)) if not isinstance(decodparms, (list, tuple)): decodparms = (decodparms,) - data: bytes = b_(stream._data) + data: bytes = stream._data # If there is not data to decode we should not try to decode the data. if data: for filter_type, params in zip(filters, decodparms): @@ -691,7 +708,7 @@ def decode_stream_data(stream: Any) -> Union[bytes, str]: # utils.StreamObject elif filter_type in (FT.RUN_LENGTH_DECODE, FTA.RL): data = RunLengthDecode.decode(data) elif filter_type in (FT.LZW_DECODE, FTA.LZW): - data = LZWDecode.decode(data, params) # type: ignore + data = LZWDecode._decodeb(data, params) elif filter_type in (FT.ASCII_85_DECODE, FTA.A85): data = ASCII85Decode.decode(data) elif filter_type == FT.DCT_DECODE: diff --git a/pypdf/generic/_base.py b/pypdf/generic/_base.py index 2d606b4184..f48dc66c38 100644 --- a/pypdf/generic/_base.py +++ b/pypdf/generic/_base.py @@ -30,18 +30,17 @@ import re from binascii import unhexlify from math import log10 +from struct import iter_unpack from typing import Any, Callable, ClassVar, Dict, Optional, Sequence, Union, cast from .._codecs import _pdfdoc_encoding_rev from .._protocols import PdfObjectProtocol, PdfWriterProtocol from .._utils import ( StreamType, - b_, deprecate_no_replacement, logger_warning, read_non_whitespace, read_until_regex, - str_, ) from ..errors import STREAM_TRUNCATED_PREMATURELY, PdfReadError, PdfStreamError @@ -240,6 +239,9 @@ def __init__(self, idnum: int, generation: int, pdf: Any) -> None: # PdfReader self.generation = generation self.pdf = pdf + def __hash__(self) -> int: + return hash((self.idnum, self.generation, id(self.pdf))) + def clone( self, pdf_dest: PdfWriterProtocol, @@ -308,6 +310,10 @@ def __getitem__(self, key: Any) -> Any: # items should be extracted from pointed Object return self._get_object_with_check()[key] # type: ignore + def __float__(self) -> str: + # in this case we are looking for the pointed data + return self.get_object().__float__() # type: ignore + def __str__(self) -> str: # in this case we are looking for the pointed data return self.get_object().__str__() @@ -369,10 +375,10 @@ def read_from_stream(stream: StreamType, pdf: Any) -> "IndirectObject": # PdfRe class FloatObject(float, PdfObject): def __new__( - cls, value: Union[str, Any] = "0.0", context: Optional[Any] = None + cls, value: Any = "0.0", context: Optional[Any] = None ) -> "FloatObject": try: - value = float(str_(value)) + value = float(value) return float.__new__(cls, value) except Exception as e: # If this isn't a valid decimal (happens in malformed PDFs) @@ -511,23 +517,38 @@ class TextStringObject(str, PdfObject): # noqa: SLOT000 autodetect_pdfdocencoding: bool autodetect_utf16: bool utf16_bom: bytes + _original_bytes: Optional[bytes] = None def __new__(cls, value: Any) -> "TextStringObject": + org = None if isinstance(value, bytes): + org = value value = value.decode("charmap") o = str.__new__(cls, value) + o._original_bytes = org o.autodetect_utf16 = False o.autodetect_pdfdocencoding = False o.utf16_bom = b"" if value.startswith(("\xfe\xff", "\xff\xfe")): + assert org is not None # for mypy + try: + o = str.__new__(cls, org.decode("utf-16")) + except UnicodeDecodeError as exc: + logger_warning( + f"{exc!s}\ninitial string:{exc.object!r}", + __name__, + ) + o = str.__new__(cls, exc.object[: exc.start].decode("utf-16")) + o._original_bytes = org o.autodetect_utf16 = True - o.utf16_bom = value[:2].encode("charmap") + o.utf16_bom = org[:2] else: try: encode_pdfdocencoding(o) o.autodetect_pdfdocencoding = True except UnicodeEncodeError: o.autodetect_utf16 = True + o.utf16_bom = codecs.BOM_UTF16_BE return o def clone( @@ -538,6 +559,7 @@ def clone( ) -> "TextStringObject": """Clone object into pdf_dest.""" obj = TextStringObject(self) + obj._original_bytes = self._original_bytes obj.autodetect_pdfdocencoding = self.autodetect_pdfdocencoding obj.autodetect_utf16 = self.autodetect_utf16 obj.utf16_bom = self.utf16_bom @@ -553,7 +575,10 @@ def original_bytes(self) -> bytes: if that occurs, this "original_bytes" property can be used to back-calculate what the original encoded bytes were. """ - return self.get_original_bytes() + if self._original_bytes is not None: + return self._original_bytes + else: + return self.get_original_bytes() def get_original_bytes(self) -> bytes: # We're a text string object, but the library is trying to get our raw @@ -578,6 +603,8 @@ def get_encoded_bytes(self) -> bytes: # nicer to look at in the PDF file. Sadly, we take a performance hit # here for trying... try: + if self._original_bytes is not None: + return self._original_bytes if self.autodetect_utf16: raise UnicodeEncodeError("", "forced", -1, -1, "") bytearr = encode_pdfdocencoding(self) @@ -599,15 +626,16 @@ def write_to_stream( ) bytearr = self.get_encoded_bytes() stream.write(b"(") - for c in bytearr: - if not chr(c).isalnum() and c != b" ": + for c_ in iter_unpack("c", bytearr): + c = cast(bytes, c_[0]) + if not c.isalnum() and c != b" ": # This: # stream.write(rf"\{c:0>3o}".encode()) # gives # https://github.com/davidhalter/parso/issues/207 - stream.write(("\\%03o" % c).encode()) + stream.write(b"\\%03o" % ord(c)) else: - stream.write(b_(chr(c))) + stream.write(c) stream.write(b")") @@ -710,12 +738,13 @@ def read_from_stream(stream: StreamType, pdf: Any) -> "NameObject": # PdfReader def encode_pdfdocencoding(unicode_string: str) -> bytes: - retval = bytearray() - for c in unicode_string: - try: - retval += b_(chr(_pdfdoc_encoding_rev[c])) - except KeyError: - raise UnicodeEncodeError( - "pdfdocencoding", c, -1, -1, "does not exist in translation table" - ) - return bytes(retval) + try: + return bytes([_pdfdoc_encoding_rev[k] for k in unicode_string]) + except KeyError: + raise UnicodeEncodeError( + "pdfdocencoding", + unicode_string, + -1, + -1, + "does not exist in translation table", + ) diff --git a/pypdf/generic/_data_structures.py b/pypdf/generic/_data_structures.py index 87d6886742..2c6e20e575 100644 --- a/pypdf/generic/_data_structures.py +++ b/pypdf/generic/_data_structures.py @@ -52,7 +52,6 @@ from .._utils import ( WHITESPACES, StreamType, - b_, deprecate_no_replacement, deprecate_with_replacement, logger_warning, @@ -843,7 +842,7 @@ def _reset_node_tree_relationship(child_obj: Any) -> None: class StreamObject(DictionaryObject): def __init__(self) -> None: - self._data: Union[bytes, str] = b"" + self._data: bytes = b"" self.decoded_self: Optional[DecodedStreamObject] = None def _clone( @@ -877,7 +876,7 @@ def _clone( pass super()._clone(src, pdf_dest, force_duplicate, ignore_fields, visited) - def get_data(self) -> Union[bytes, str]: + def get_data(self) -> bytes: return self._data def set_data(self, data: bytes) -> None: @@ -885,7 +884,7 @@ def set_data(self, data: bytes) -> None: def hash_value_data(self) -> bytes: data = super().hash_value_data() - data += b_(self._data) + data += self._data return data def write_to_stream( @@ -955,7 +954,7 @@ def flate_encode(self, level: int = -1) -> "EncodedStreamObject": retval[NameObject(SA.FILTER)] = f if params is not None: retval[NameObject(SA.DECODE_PARMS)] = params - retval._data = FlateDecode.encode(b_(self._data), level) + retval._data = FlateDecode.encode(self._data, level) return retval def decode_as_image(self) -> Any: @@ -993,7 +992,7 @@ def __init__(self) -> None: self.decoded_self: Optional[DecodedStreamObject] = None # This overrides the parent method: - def get_data(self) -> Union[bytes, str]: + def get_data(self) -> bytes: from ..filters import decode_stream_data if self.decoded_self is not None: @@ -1003,7 +1002,7 @@ def get_data(self) -> Union[bytes, str]: # create decoded object decoded = DecodedStreamObject() - decoded.set_data(b_(decode_stream_data(self))) + decoded.set_data(decode_stream_data(self)) for key, value in list(self.items()): if key not in (SA.LENGTH, SA.FILTER, SA.DECODE_PARMS): decoded[key] = value @@ -1022,7 +1021,7 @@ def set_data(self, data: bytes) -> None: # deprecated super().set_data(FlateDecode.encode(data)) else: raise PdfReadError( - "Streams encoded with different filter from only FlateDecode is not supported" + "Streams encoded with a filter different from FlateDecode are not supported" ) @@ -1058,7 +1057,7 @@ def __init__( # The inner list has two elements: # Element 0: List # Element 1: str - self._operations: List[Tuple[Any, Any]] = [] + self._operations: List[Tuple[Any, bytes]] = [] # stream may be a StreamObject or an ArrayObject containing # multiple StreamObjects to be cat'd together. @@ -1069,14 +1068,14 @@ def __init__( if isinstance(stream, ArrayObject): data = b"" for s in stream: - data += b_(s.get_object().get_data()) + data += s.get_object().get_data() if len(data) == 0 or data[-1] != b"\n": data += b"\n" super().set_data(bytes(data)) else: stream_data = stream.get_data() assert stream_data is not None - super().set_data(b_(stream_data)) + super().set_data(stream_data) self.forced_encoding = forced_encoding def clone( @@ -1132,7 +1131,7 @@ def _clone( ignore_fields: """ src_cs = cast("ContentStream", src) - super().set_data(b_(src_cs._data)) + super().set_data(src_cs._data) self.pdf = pdf_dest self._operations = list(src_cs._operations) self.forced_encoding = src_cs.forced_encoding @@ -1249,10 +1248,10 @@ def get_data(self) -> bytes: for op in operands: op.write_to_stream(new_data) new_data.write(b" ") - new_data.write(b_(operator)) + new_data.write(operator) new_data.write(b"\n") self._data = new_data.getvalue() - return b_(self._data) + return self._data # This overrides the parent method: def set_data(self, data: bytes) -> None: @@ -1262,21 +1261,21 @@ def set_data(self, data: bytes) -> None: @property def operations(self) -> List[Tuple[Any, Any]]: if not self._operations and self._data: - self._parse_content_stream(BytesIO(b_(self._data))) + self._parse_content_stream(BytesIO(self._data)) self._data = b"" return self._operations @operations.setter - def operations(self, operations: List[Tuple[Any, Any]]) -> None: + def operations(self, operations: List[Tuple[Any, bytes]]) -> None: self._operations = operations self._data = b"" def isolate_graphics_state(self) -> None: if self._operations: - self._operations.insert(0, ([], "q")) - self._operations.append(([], "Q")) + self._operations.insert(0, ([], b"q")) + self._operations.append(([], b"Q")) elif self._data: - self._data = b"q\n" + b_(self._data) + b"\nQ\n" + self._data = b"q\n" + self._data + b"\nQ\n" # This overrides the parent method: def write_to_stream( diff --git a/pypdf/generic/_utils.py b/pypdf/generic/_utils.py index fdcdc33399..6fce6d0b22 100644 --- a/pypdf/generic/_utils.py +++ b/pypdf/generic/_utils.py @@ -2,7 +2,7 @@ from typing import Dict, List, Tuple, Union from .._codecs import _pdfdoc_encoding -from .._utils import StreamType, b_, logger_warning, read_non_whitespace +from .._utils import StreamType, logger_warning, read_non_whitespace from ..errors import STREAM_TRUNCATED_PREMATURELY, PdfStreamError from ._base import ByteStringObject, TextStringObject @@ -16,7 +16,7 @@ def read_hex_string_from_stream( forced_encoding: Union[None, str, List[str], Dict[int, str]] = None, ) -> Union["TextStringObject", "ByteStringObject"]: stream.read(1) - txt = "" + arr = [] x = b"" while True: tok = read_non_whitespace(stream) @@ -26,13 +26,37 @@ def read_hex_string_from_stream( break x += tok if len(x) == 2: - txt += chr(int(x, base=16)) + arr.append(int(x, base=16)) x = b"" if len(x) == 1: x += b"0" - if len(x) == 2: - txt += chr(int(x, base=16)) - return create_string_object(b_(txt), forced_encoding) + if x != b"": + arr.append(int(x, base=16)) + return create_string_object(bytes(arr), forced_encoding) + + +__ESPACE_DICT__ = { + b"n": ord(b"\n"), + b"r": ord(b"\r"), + b"t": ord(b"\t"), + b"b": ord(b"\b"), + b"f": ord(b"\f"), + b"(": ord(b"("), + b")": ord(b")"), + b"/": ord(b"/"), + b"\\": ord(b"\\"), + b" ": ord(b" "), + b"%": ord(b"%"), + b"<": ord(b"<"), + b">": ord(b">"), + b"[": ord(b"["), + b"]": ord(b"]"), + b"#": ord(b"#"), + b"_": ord(b"_"), + b"&": ord(b"&"), + b"$": ord(b"$"), +} +__BACKSLASH_CODE__ = 92 def read_string_from_stream( @@ -54,30 +78,9 @@ def read_string_from_stream( break elif tok == b"\\": tok = stream.read(1) - escape_dict = { - b"n": b"\n", - b"r": b"\r", - b"t": b"\t", - b"b": b"\b", - b"f": b"\f", - b"c": rb"\c", - b"(": b"(", - b")": b")", - b"/": b"/", - b"\\": b"\\", - b" ": b" ", - b"%": b"%", - b"<": b"<", - b">": b">", - b"[": b"[", - b"]": b"]", - b"#": b"#", - b"_": b"_", - b"&": b"&", - b"$": b"$", - } try: - tok = escape_dict[tok] + txt.append(__ESPACE_DICT__[tok]) + continue except KeyError: if b"0" <= tok <= b"7": # "The number ddd may consist of one, two, or three @@ -85,6 +88,7 @@ def read_string_from_stream( # Three octal digits shall be used, with leading zeros # as needed, if the next character of the string is also # a digit." (PDF reference 7.3.4.2, p 16) + sav = stream.tell() - 1 for _ in range(2): ntok = stream.read(1) if b"0" <= ntok <= b"7": @@ -92,7 +96,13 @@ def read_string_from_stream( else: stream.seek(-1, 1) # ntok has to be analyzed break - tok = b_(chr(int(tok, base=8))) + i = int(tok, base=8) + if i > 255: + txt.append(__BACKSLASH_CODE__) + stream.seek(sav) + else: + txt.append(i) + continue elif tok in b"\n\r": # This case is hit when a backslash followed by a line # break occurs. If it's a multi-char EOL, consume the @@ -102,12 +112,13 @@ def read_string_from_stream( stream.seek(-1, 1) # Then don't add anything to the actual string, since this # line break was escaped: - tok = b"" + continue else: msg = f"Unexpected escaped string: {tok.decode('utf-8','ignore')}" logger_warning(msg, __name__) - txt.append(tok) - return create_string_object(b"".join(txt), forced_encoding) + txt.append(__BACKSLASH_CODE__) + txt.append(ord(tok)) + return create_string_object(bytes(txt), forced_encoding) def create_string_object( @@ -137,27 +148,45 @@ def create_string_object( out += forced_encoding[x] except Exception: out += bytes((x,)).decode("charmap") - return TextStringObject(out) + obj = TextStringObject(out) + obj._original_bytes = string + return obj elif isinstance(forced_encoding, str): if forced_encoding == "bytes": return ByteStringObject(string) - return TextStringObject(string.decode(forced_encoding)) + obj = TextStringObject(string.decode(forced_encoding)) + obj._original_bytes = string + return obj else: try: if string.startswith((codecs.BOM_UTF16_BE, codecs.BOM_UTF16_LE)): retval = TextStringObject(string.decode("utf-16")) + retval._original_bytes = string retval.autodetect_utf16 = True retval.utf16_bom = string[:2] return retval - else: - # This is probably a big performance hit here, but we need - # to convert string objects into the text/unicode-aware - # version if possible... and the only way to check if that's - # possible is to try. - # Some strings are strings, some are just byte arrays. - retval = TextStringObject(decode_pdfdocencoding(string)) - retval.autodetect_pdfdocencoding = True + if string.startswith(b"\x00"): + retval = TextStringObject(string.decode("utf-16be")) + retval._original_bytes = string + retval.autodetect_utf16 = True + retval.utf16_bom = codecs.BOM_UTF16_BE return retval + if string[1:2] == b"\x00": + retval = TextStringObject(string.decode("utf-16le")) + retval._original_bytes = string + retval.autodetect_utf16 = True + retval.utf16_bom = codecs.BOM_UTF16_LE + return retval + + # This is probably a big performance hit here, but we need + # to convert string objects into the text/unicode-aware + # version if possible... and the only way to check if that's + # possible is to try. + # Some strings are strings, some are just byte arrays. + retval = TextStringObject(decode_pdfdocencoding(string)) + retval._original_bytes = string + retval.autodetect_pdfdocencoding = True + return retval except UnicodeDecodeError: return ByteStringObject(string) else: diff --git a/pypdf/types.py b/pypdf/types.py index b8fbab92cf..e383dc7b1f 100644 --- a/pypdf/types.py +++ b/pypdf/types.py @@ -1,13 +1,7 @@ """Helpers for working with PDF types.""" import sys -from typing import List, Union - -if sys.version_info[:2] >= (3, 8): - # Python 3.8+: https://peps.python.org/pep-0586 - from typing import Literal -else: - from typing_extensions import Literal +from typing import List, Literal, Union if sys.version_info[:2] >= (3, 10): # Python 3.10+: https://www.python.org/dev/peps/pep-0484 diff --git a/requirements/ci-3.11.txt b/requirements/ci-3.11.txt index f382fe2b94..2101771181 100644 --- a/requirements/ci-3.11.txt +++ b/requirements/ci-3.11.txt @@ -6,7 +6,7 @@ # attrs==23.1.0 # via flake8-bugbear -coverage[toml]==7.3.0 +coverage[toml]==7.6.0 # via # -r requirements/ci.in # pytest-cov @@ -35,7 +35,7 @@ mypy-extensions==1.0.0 # via mypy packaging==23.1 # via pytest -pillow==10.0.1 +pillow==10.4.0 # via # -r requirements/ci.in # fpdf2 diff --git a/resources/Seige_of_Vicksburg_Sample_OCR-crazyones-merged.pdf b/resources/Seige_of_Vicksburg_Sample_OCR-crazyones-merged.pdf index 0e9633ac16..a53f28f0be 100644 Binary files a/resources/Seige_of_Vicksburg_Sample_OCR-crazyones-merged.pdf and b/resources/Seige_of_Vicksburg_Sample_OCR-crazyones-merged.pdf differ diff --git a/tests/test_cmap.py b/tests/test_cmap.py index 9dcfb252d5..8042d306eb 100644 --- a/tests/test_cmap.py +++ b/tests/test_cmap.py @@ -1,13 +1,19 @@ """Test the pypdf_cmap module.""" from io import BytesIO +from pathlib import Path import pytest -from pypdf import PdfReader +from pypdf import PdfReader, PdfWriter from pypdf._cmap import build_char_map +from pypdf.generic import ArrayObject, IndirectObject, NameObject, NullObject from . import get_data_from_url +TESTS_ROOT = Path(__file__).parent.resolve() +PROJECT_ROOT = TESTS_ROOT.parent +RESOURCE_ROOT = PROJECT_ROOT / "resources" + @pytest.mark.enable_socket() @pytest.mark.slow() @@ -206,3 +212,39 @@ def test_eten_b5(): """Issue #2356""" reader = PdfReader(BytesIO(get_data_from_url(name="iss2290.pdf"))) reader.pages[0].extract_text().startswith("1/7 \n富邦新終身壽險") + + +def test_missing_entries_in_cmap(): + """ + Issue #2702: this issue is observed on damaged pdfs + use of this file in test has been discarded as too slow/long + we will create the same error from crazyones + """ + pdf_path = RESOURCE_ROOT / "crazyones.pdf" + reader = PdfReader(pdf_path) + p = reader.pages[0] + p["/Resources"]["/Font"]["/F1"][NameObject("/ToUnicode")] = IndirectObject( + 99999999, 0, reader + ) + p.extract_text() + + +def test_null_missing_width(): + """For coverage of #2792""" + writer = PdfWriter(RESOURCE_ROOT / "crazyones.pdf") + page = writer.pages[0] + ft = page["/Resources"]["/Font"]["/F1"] + ft[NameObject("/Widths")] = ArrayObject() + ft["/FontDescriptor"][NameObject("/MissingWidth")] = NullObject() + page.extract_text() + + +@pytest.mark.enable_socket() +def test_unigb_utf16(): + """Cf #2812""" + url = ( + "https://github.com/user-attachments/files/16767536/W020240105322424121296.pdf" + ) + name = "iss2812.pdf" + reader = PdfReader(BytesIO(get_data_from_url(url, name=name))) + assert "《中国能源展望 2060(2024 年版)》编写委员会" in reader.pages[1].extract_text() diff --git a/tests/test_filters.py b/tests/test_filters.py index 146ce43cbd..6320958882 100644 --- a/tests/test_filters.py +++ b/tests/test_filters.py @@ -5,7 +5,6 @@ from io import BytesIO from itertools import product as cartesian_product from pathlib import Path -from unittest.mock import patch import pytest from PIL import Image @@ -225,14 +224,11 @@ def test_ccitt_fax_decode(): @pytest.mark.enable_socket() -@patch("pypdf._reader.logger_warning") -def test_decompress_zlib_error(mock_logger_warning): +def test_decompress_zlib_error(caplog): reader = PdfReader(BytesIO(get_data_from_url(name="tika-952445.pdf"))) for page in reader.pages: page.extract_text() - mock_logger_warning.assert_called_with( - "incorrect startxref pointer(3)", "pypdf._reader" - ) + assert "incorrect startxref pointer(3)" in caplog.text @pytest.mark.enable_socket() diff --git a/tests/test_generic.py b/tests/test_generic.py index b1079974ef..6b8ae0151c 100644 --- a/tests/test_generic.py +++ b/tests/test_generic.py @@ -494,6 +494,9 @@ def test_textstringobject_autodetect_utf16(): tso.autodetect_utf16 = True tso.utf16_bom = codecs.BOM_UTF16_BE assert tso.get_original_bytes() == b"\xfe\xff\x00f\x00o\x00o" + tso.utf16_bom = codecs.BOM_UTF16_LE + assert tso.get_original_bytes() == b"\xff\xfef\x00o\x00o\x00" + assert tso.get_encoded_bytes() == b"\xff\xfef\x00o\x00o\x00" def test_remove_child_not_in_tree(): @@ -1131,6 +1134,16 @@ def test_create_string_object_utf16_bom(): result.get_encoded_bytes() == b"\xff\xfeP\x00a\x00p\x00e\x00r\x00P\x00o\x00r\x00t\x00 \x001\x004\x00\x00\x00" ) + result = TextStringObject( + b"\xff\xfeP\x00a\x00p\x00e\x00r\x00P\x00o\x00r\x00t\x00 \x001\x004\x00\x00\x00" + ) + assert result == "PaperPort 14\x00" + assert result.autodetect_utf16 is True + assert result.utf16_bom == b"\xff\xfe" + assert ( + result.get_encoded_bytes() + == b"\xff\xfeP\x00a\x00p\x00e\x00r\x00P\x00o\x00r\x00t\x00 \x001\x004\x00\x00\x00" + ) # utf16-be without bom result = TextStringObject("ÿ") diff --git a/tests/test_images.py b/tests/test_images.py index 5955bf47c5..5fd7d0968a 100644 --- a/tests/test_images.py +++ b/tests/test_images.py @@ -462,3 +462,14 @@ def test_extract_image_from_object(caplog): co = reader.pages[0].get_contents() co.decode_as_image() assert "does not seem to be an Image" in caplog.text + + +@pytest.mark.enable_socket() +def test_4bits_images(caplog): + url = "https://github.com/user-attachments/files/16624406/tt.pdf" + name = "iss2411.pdf" + reader = PdfReader(BytesIO(get_data_from_url(url, name=name))) + url = "https://github.com/user-attachments/assets/53058564-9a28-4e4a-818f-a6528013d7dc" + name = "iss2411.png" + img = Image.open(BytesIO(get_data_from_url(url, name=name))) + assert image_similarity(reader.pages[0].images[1].image, img) == 1.0 diff --git a/tests/test_page.py b/tests/test_page.py index cb7b6c723f..72df648e45 100644 --- a/tests/test_page.py +++ b/tests/test_page.py @@ -1131,9 +1131,9 @@ def test_merge_page_resources_smoke_test(): # use these keys for some "operations", to validate renaming # (the operand name doesn't matter) contents1 = page1[NO("/Contents")] = ContentStream(None, None) - contents1.operations = [(ArrayObject(props1.keys()), "page1-contents")] + contents1.operations = [(ArrayObject(props1.keys()), b"page1-contents")] contents2 = page2[NO("/Contents")] = ContentStream(None, None) - contents2.operations = [(ArrayObject(props2.keys()), "page2-contents")] + contents2.operations = [(ArrayObject(props2.keys()), b"page2-contents")] expected_properties = { "/just1": "/just1-value", @@ -1438,3 +1438,12 @@ def test_negative_index(): src_abs = RESOURCE_ROOT / "git.pdf" reader = PdfReader(src_abs) assert reader.pages[0] == reader.pages[-1] + + +def test_get_contents_as_bytes(): + writer = PdfWriter(RESOURCE_ROOT / "crazyones.pdf") + co = writer.pages[0]["/Contents"][0] + expected = co.get_data() + assert writer.pages[0]._get_contents_as_bytes() == expected + writer.pages[0][NameObject("/Contents")] = writer.pages[0]["/Contents"][0] + assert writer.pages[0]._get_contents_as_bytes() == expected diff --git a/tests/test_reader.py b/tests/test_reader.py index 0a2a32b81a..bd3e6aa68c 100644 --- a/tests/test_reader.py +++ b/tests/test_reader.py @@ -115,7 +115,9 @@ def test_iss1943(): docinfo = reader.metadata docinfo.update( { - NameObject("/CreationDate"): TextStringObject("D:20230705005151Z00'00'"), + NameObject("/CreationDate"): TextStringObject( + "D:20230705005151Z00'00'" + ), NameObject("/ModDate"): TextStringObject("D:20230705005151Z00'00'"), } ) @@ -274,14 +276,22 @@ def test_get_images(src, expected_images): False, 0, False, - ["startxref on same line as offset", "incorrect startxref pointer(1)"], + [ + "startxref on same line as offset", + "incorrect startxref pointer(1)", + "parsing for Object Streams", + ], ), # error on startxref, but no strict => xref rebuilt,no fail ( False, True, 0, False, - ["startxref on same line as offset", "incorrect startxref pointer(1)"], + [ + "startxref on same line as offset", + "incorrect startxref pointer(1)", + "parsing for Object Streams", + ], ), ], ) @@ -342,7 +352,10 @@ def test_issue297(caplog): assert caplog.text == "" assert "Broken xref table" in exc.value.args[0] reader = PdfReader(path, strict=False) - assert normalize_warnings(caplog.text) == ["incorrect startxref pointer(1)"] + assert normalize_warnings(caplog.text) == [ + "incorrect startxref pointer(1)", + "parsing for Object Streams", + ] reader.pages[0] @@ -432,7 +445,7 @@ def test_get_form(src, expected, expected_get_fields, txt_file_path): def test_get_page_number(src, page_number): src = RESOURCE_ROOT / src reader = PdfReader(src) - reader._get_page(0) + reader.get_page(0) page = reader.pages[page_number] assert reader.get_page_number(page) == page_number @@ -605,9 +618,9 @@ def test_read_unknown_zero_pages(caplog): "startxref on same line as offset", ] assert normalize_warnings(caplog.text) == warnings - with pytest.raises(AttributeError) as exc: + with pytest.raises(PdfReadError) as exc: len(reader.pages) - assert exc.value.args[0] == "'NoneType' object has no attribute 'get_object'" + assert exc.value.args[0] == 'Cannot find "/Root" key in trailer' def test_read_encrypted_without_decryption(): @@ -896,11 +909,14 @@ def test_form_topname_with_and_without_acroform(caplog): def test_extract_text_xref_issue_2(caplog): # pdf/0264cf510015b2a4b395a15cb23c001e.pdf url = "https://corpora.tika.apache.org/base/docs/govdocs1/981/981961.pdf" - msg = "incorrect startxref pointer(2)" + msg = [ + "incorrect startxref pointer(2)", + "parsing for Object Streams", + ] reader = PdfReader(BytesIO(get_data_from_url(url, name="tika-981961.pdf"))) for page in reader.pages: page.extract_text() - assert normalize_warnings(caplog.text) == [msg] + assert normalize_warnings(caplog.text) == msg @pytest.mark.enable_socket() @@ -908,11 +924,13 @@ def test_extract_text_xref_issue_2(caplog): def test_extract_text_xref_issue_3(caplog): # pdf/0264cf510015b2a4b395a15cb23c001e.pdf url = "https://corpora.tika.apache.org/base/docs/govdocs1/977/977774.pdf" - msg = "incorrect startxref pointer(3)" + msg = [ + "incorrect startxref pointer(3)", + ] reader = PdfReader(BytesIO(get_data_from_url(url, name="tika-977774.pdf"))) for page in reader.pages: page.extract_text() - assert normalize_warnings(caplog.text) == [msg] + assert normalize_warnings(caplog.text) == msg @pytest.mark.enable_socket() @@ -1577,3 +1595,25 @@ def test_context_manager_with_stream(): with PdfReader(pdf_stream) as reader: assert not reader.stream.closed assert not pdf_stream.closed + + +@pytest.mark.enable_socket() +@pytest.mark.timeout(10) +def test_iss2761(): + url = "https://github.com/user-attachments/files/16312198/crash-b26d05712a29b241ac6f9dc7fff57428ba2d1a04.pdf" + name = "iss2761.pdf" + reader = PdfReader(BytesIO(get_data_from_url(url, name=name)), strict=False) + with pytest.raises(PdfReadError): + reader.pages[0].extract_text() + + +@pytest.mark.enable_socket() +def test_iss2817(): + """Test for rebuiling Xref_ObjStm""" + url = "https://github.com/user-attachments/files/16764070/crash-7e1356f1179b4198337f282304cb611aea26a199.pdf" + name = "iss2817.pdf" + reader = PdfReader(BytesIO(get_data_from_url(url, name=name))) + assert ( + reader.pages[0]["/Annots"][0].get_object()["/Contents"] + == "A\xa0\xa0\xa0\xa0\xa0\xa0\xa0\xa0\xa0 B" + ) diff --git a/tests/test_text_extraction.py b/tests/test_text_extraction.py index 1ffa68a3e6..dcd4e6caeb 100644 --- a/tests/test_text_extraction.py +++ b/tests/test_text_extraction.py @@ -10,6 +10,7 @@ from pypdf import PdfReader, mult from pypdf._text_extraction import set_custom_rtl +from pypdf.errors import ParseError from . import get_data_from_url @@ -156,3 +157,19 @@ def test_layout_mode_type0_font_widths(): encoding="utf-8" ) assert expected == reader.pages[0].extract_text(extraction_mode="layout") + + +@pytest.mark.enable_socket() +def test_layout_mode_indirect_sequence_font_widths(): + # Cover the situation where the sequence for font widths is an IndirectObject + # ref https://github.com/py-pdf/pypdf/pull/2788 + url = "https://github.com/user-attachments/files/16491621/2788_example.pdf" + name ="2788_example.pdf" + reader = PdfReader(BytesIO(get_data_from_url(url, name=name))) + assert reader.pages[0].extract_text(extraction_mode="layout") == "" + url = "https://github.com/user-attachments/files/16491619/2788_example_malformed.pdf" + name = "2788_example_malformed.pdf" + reader = PdfReader(BytesIO(get_data_from_url(url, name=name))) + with pytest.raises(ParseError) as exc: + reader.pages[0].extract_text(extraction_mode="layout") + assert str(exc.value).startswith("Invalid font width definition") diff --git a/tests/test_utils.py b/tests/test_utils.py index 81fcf9fb47..a4ddff8831 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -109,22 +109,6 @@ def test_mark_location(): Path("pypdf_pdfLocation.txt").unlink() # cleanup -@pytest.mark.parametrize( - ("input_str", "expected"), - [ - ("foo", b"foo"), - ("😀", "😀".encode()), - ("‰", "‰".encode()), - ("▷", "▷".encode()), - ("世", "世".encode()), - # A multi-character string example with non-latin-1 characters: - ("😀😃", "😀😃".encode()), - ], -) -def test_b(input_str: str, expected: bytes): - assert pypdf._utils.b_(input_str) == expected - - def test_deprecate_no_replacement(): with pytest.warns(DeprecationWarning) as warn: pypdf._utils.deprecate_no_replacement("foo", removed_in="3.0.0") @@ -132,24 +116,6 @@ def test_deprecate_no_replacement(): assert warn[0].message.args[0] == error_msg -@pytest.mark.parametrize( - ("left", "up", "upleft", "expected"), - [ - (0, 0, 0, 0), - (1, 0, 0, 1), - (0, 1, 0, 1), - (0, 0, 1, 0), - (1, 2, 3, 1), - (2, 1, 3, 1), - (1, 3, 2, 2), - (3, 1, 2, 2), - (3, 2, 1, 3), - ], -) -def test_paeth_predictor(left, up, upleft, expected): - assert pypdf._utils.paeth_predictor(left, up, upleft) == expected - - @pytest.mark.parametrize( ("dat", "pos", "to_read", "expected", "expected_pos"), [ diff --git a/tests/test_workflows.py b/tests/test_workflows.py index 93bc0c9e5e..f307271e70 100644 --- a/tests/test_workflows.py +++ b/tests/test_workflows.py @@ -391,11 +391,11 @@ def test_merge(tmp_path, url, name): { "/Author": "Unknown", "/CreationDate": "Thursday, May 06, 1999 3:56:54 PM", - "/Creator": "C:DEBÆł8", + "/Creator": r"C:\DEB\6338", "/Keywords": "", "/Producer": "Acrobat PDFWriter 3.02 for Windows", "/Subject": "", - "/Title": "C:DEBÆł8-6R.PDF", + "/Title": r"C:\DEB\6338-6R.PDF", }, ) ], @@ -412,7 +412,7 @@ def test_get_metadata(url, name, expected_metadata): ("url", "name", "strict", "exception"), [ ( - "https://corpora.tika.apache.org/base/docs/govdocs1/938/938702.pdf", + "https://github.com/user-attachments/files/16624503/tika-938702.pdf", "tika-938702.pdf", False, None, # iss #1090 is now fixed @@ -980,7 +980,7 @@ def test_replace_image(tmp_path): # extra tests for coverage with pytest.raises(TypeError) as exc: reader.pages[0].images[0].replace(img) - assert exc.value.args[0] == "Can not update an image not belonging to a PdfWriter" + assert exc.value.args[0] == "Cannot update an image not belonging to a PdfWriter." i = writer.pages[0].images[0] with pytest.raises(TypeError) as exc: i.replace(reader.pages[0].images[0]) # missing .image @@ -988,7 +988,16 @@ def test_replace_image(tmp_path): i.indirect_reference = None # to behave like an inline image with pytest.raises(TypeError) as exc: i.replace(reader.pages[0].images[0].image) - assert exc.value.args[0] == "Can not update an inline image" + assert exc.value.args[0] == "Cannot update an inline image." + + import pypdf + + try: + pypdf._page.pil_not_imported = True + with pytest.raises(ImportError) as exc: + i.replace(reader.pages[0].images[0].image) + finally: + pypdf._page.pil_not_imported = False @pytest.mark.enable_socket() @@ -1015,7 +1024,7 @@ def test_inline_images(): with pytest.raises(TypeError) as exc: reader.pages[0].images[0].replace(img_ref) - assert exc.value.args[0] == "Can not update an inline image" + assert exc.value.args[0] == "Cannot update an inline image." _a = {} for x, y in reader.pages[2].images[0:-2].items(): @@ -1289,3 +1298,12 @@ def test_extract_empty_page(): name = "iss2533.pdf" reader = PdfReader(BytesIO(get_data_from_url(url, name))) assert reader.pages[1].extract_text(extraction_mode="layout") == "" + + +@pytest.mark.enable_socket() +def test_iss2815(): + """Cf #2815""" + url = "https://github.com/user-attachments/files/16760725/crash-c1920c7a064649e1191d7879952ec252473fc7e6.pdf" + name = "iss2815.pdf" + reader = PdfReader(BytesIO(get_data_from_url(url, name))) + assert reader.pages[0].extract_text() == "test command with wrong number of args" diff --git a/tests/test_writer.py b/tests/test_writer.py index 9dfeffdd89..b6a47a18c8 100644 --- a/tests/test_writer.py +++ b/tests/test_writer.py @@ -1290,7 +1290,7 @@ def test_attachments(): to_add = [ ("foobar.txt", b"foobarcontent"), ("foobar2.txt", b"foobarcontent2"), - ("foobar2.txt", b"2nd_foobarcontent"), + ("foobar2.txt", "2nd_foobarcontent"), ] for name, content in to_add: writer.add_attachment(name, content) @@ -2188,6 +2188,10 @@ def test_replace_object(): reader._replace_object(reader.pages[0].indirect_reference, reader.pages[0]) pg = PageObject.create_blank_page(writer, 1000, 1000) reader._replace_object(reader.pages[0].indirect_reference, pg) + pg = PageObject.create_blank_page(None, 1000, 1000) + pg[NameObject("/Contents")] = writer.pages[0]["/Contents"] + writer._add_object(pg) + writer.add_page(pg) def test_mime_jupyter(): @@ -2300,3 +2304,53 @@ def test_matrix_entry_in_field_annots(): auto_regenerate=False, ) assert "/Matrix" in writer.pages[0]["/Annots"][5].get_object()["/AP"]["/N"] + + +@pytest.mark.enable_socket() +def test_compress_identical_objects(): + """Cf #2728 and #2794""" + url = "https://github.com/user-attachments/files/16575458/tt2.pdf" + name = "iss2794.pdf" + in_bytes = BytesIO(get_data_from_url(url, name=name)) + writer = PdfWriter(in_bytes) + writer.compress_identical_objects(remove_orphans=False) + out1 = BytesIO() + writer.write(out1) + assert 0.5 * len(in_bytes.getvalue()) > len(out1.getvalue()) + writer.remove_page( + 1 + ) # page0 contains fields which keep reference to the deleted page + out2 = BytesIO() + writer.write(out2) + assert len(out1.getvalue()) - 100 < len(out2.getvalue()) + writer.compress_identical_objects(remove_identicals=False) + out3 = BytesIO() + writer.write(out3) + assert len(out2.getvalue()) > len(out3.getvalue()) + + +def test_set_need_appearances_writer(): + """Minimal test for coverage""" + writer = PdfWriter() + writer.set_need_appearances_writer() + + +def test_utf16_metadata(): + """See #2754""" + writer = PdfWriter(RESOURCE_ROOT / "crazyones.pdf") + writer.add_metadata( + { + "/Subject": "Invoice №AI_047", + } + ) + b = BytesIO() + writer.write(b) + b.seek(0) + reader = PdfReader(b) + assert reader.metadata.subject == "Invoice №AI_047" + bb = b.getvalue() + i = bb.find(b"/Subject") + assert bb[i : i + 100] == ( + b"/Subject (\\376\\377\\000I\\000n\\000v\\000o\\000i\\000c\\000e" + b"\\000 \\041\\026\\000A\\000I\\000\\137\\0000\\0004\\0007)" + ) diff --git a/tests/test_xmp.py b/tests/test_xmp.py index f864a9df9d..6615b93c8e 100644 --- a/tests/test_xmp.py +++ b/tests/test_xmp.py @@ -7,7 +7,7 @@ import pypdf.generic import pypdf.xmp -from pypdf import PdfReader +from pypdf import PdfReader, PdfWriter from pypdf.errors import PdfReadError from . import get_data_from_url @@ -42,6 +42,35 @@ def test_read_xmp_metadata_samples(src): } +def test_writer_xmp_metadata_samples(): + writer = PdfWriter(SAMPLE_ROOT / "020-xmp/output_with_metadata_pymupdf.pdf") + xmp = writer.xmp_metadata + assert xmp + assert xmp.dc_contributor == [] + assert xmp.dc_creator == ["John Doe"] + assert xmp.dc_source == "Martin Thoma" # attribute node + assert xmp.dc_description == {"x-default": "This is a text"} + assert xmp.dc_date == [datetime(1990, 4, 28, 0, 0)] + assert xmp.dc_title == {"x-default": "Sample PDF with XMP Metadata"} + assert xmp.custom_properties == { + "Style": "FooBarStyle", + "other": "worlds", + "⏰": "time", + } + co = pypdf.generic.ContentStream(None, None) + co.set_data( + xmp.stream.get_data().replace( + b'dc:source="Martin Thoma"', b'dc:source="Pubpub-Zz"' + ) + ) + writer.xmp_metadata = pypdf.xmp.XmpInformation(co) + b = BytesIO() + writer.write(b) + reader = PdfReader(b) + xmp2 = reader.xmp_metadata + assert xmp2.dc_source == "Pubpub-Zz" + + @pytest.mark.parametrize( ("src", "has_xmp"), [ diff --git a/tests/test_xobject_image_helpers.py b/tests/test_xobject_image_helpers.py index 63ecebd9b4..39b7131fcd 100644 --- a/tests/test_xobject_image_helpers.py +++ b/tests/test_xobject_image_helpers.py @@ -4,8 +4,8 @@ import pytest from pypdf import PdfReader -from pypdf._xobj_image_helpers import _handle_flate -from pypdf.errors import PdfReadError +from pypdf._xobj_image_helpers import _extended_image_frombytes, _handle_flate +from pypdf.errors import EmptyImageDataError, PdfReadError from pypdf.generic import ArrayObject, DecodedStreamObject, NameObject, NumberObject from . import get_data_from_url @@ -113,3 +113,12 @@ def test_handle_flate__image_mode_1(): colors=2, obj_as_text="dummy", ) + + +def test_extended_image_frombytes_zero_data(): + mode = "RGB" + size = (1, 1) + data = b"" + + with pytest.raises(EmptyImageDataError, match="Data is 0 bytes, cannot process an image from empty data."): + _extended_image_frombytes(mode, size, data)