Skip to content

Commit

Permalink
Fix an issue with infinite recursion.
Browse files Browse the repository at this point in the history
Use a visited memo to check if the current object in the clone operation
has already been visited, and if so, do not add it to the list of
objects. This avoids infinite recursion in case there are links to
identical objects inside a PDF.
  • Loading branch information
Alexhuszagh committed Oct 23, 2023
1 parent 9047079 commit 71c8140
Showing 1 changed file with 18 additions and 5 deletions.
23 changes: 18 additions & 5 deletions pypdf/generic/_data_structures.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
List,
Optional,
Sequence,
Set,
Tuple,
Union,
cast,
Expand Down Expand Up @@ -187,14 +188,15 @@ def clone(
except Exception:
pass

visited = set()
d__ = cast(
"DictionaryObject",
self._reference_clone(self.__class__(), pdf_dest, force_duplicate),
)
if ignore_fields is None:
ignore_fields = []
if len(d__.keys()) == 0:
d__._clone(self, pdf_dest, force_duplicate, ignore_fields)
d__._clone(self, pdf_dest, force_duplicate, ignore_fields, visited)
return d__

def _clone(
Expand All @@ -203,6 +205,7 @@ def _clone(
pdf_dest: PdfWriterProtocol,
force_duplicate: bool,
ignore_fields: Optional[Sequence[Union[str, int]]],
visited: Set[Tuple[int, int]],
) -> None:
"""
Update the object from src.
Expand Down Expand Up @@ -270,6 +273,13 @@ def _clone(
cur_obj.__class__(), pdf_dest, force_duplicate
),
)
# check to see if we've previously processed our item
idnum = clon.indirect_reference.idnum
generation = clon.indirect_reference.generation
if (idnum, generation) in visited:
cur_obj = None
break
visited.add((idnum, generation))
objs.append((cur_obj, clon))
assert prev_obj is not None
prev_obj[NameObject(k)] = clon.indirect_reference
Expand All @@ -282,7 +292,7 @@ def _clone(
except Exception:
cur_obj = None
for s, c in objs:
c._clone(s, pdf_dest, force_duplicate, ignore_fields)
c._clone(s, pdf_dest, force_duplicate, ignore_fields, visited)

for k, v in src.items():
if k not in ignore_fields:
Expand Down Expand Up @@ -798,6 +808,7 @@ def _clone(
pdf_dest: PdfWriterProtocol,
force_duplicate: bool,
ignore_fields: Optional[Sequence[Union[str, int]]],
visited: Set[Tuple[int, int]],
) -> None:
"""
Update the object from src.
Expand All @@ -820,7 +831,7 @@ def _clone(
)
except Exception:
pass
super()._clone(src, pdf_dest, force_duplicate, ignore_fields)
super()._clone(src, pdf_dest, force_duplicate, ignore_fields, visited)

def get_data(self) -> Union[bytes, str]:
return self._data
Expand Down Expand Up @@ -1048,6 +1059,7 @@ def clone(
except Exception:
pass

visited = set()
d__ = cast(
"ContentStream",
self._reference_clone(
Expand All @@ -1056,7 +1068,7 @@ def clone(
)
if ignore_fields is None:
ignore_fields = []
d__._clone(self, pdf_dest, force_duplicate, ignore_fields)
d__._clone(self, pdf_dest, force_duplicate, ignore_fields, visited)
return d__

def _clone(
Expand All @@ -1065,6 +1077,7 @@ def _clone(
pdf_dest: PdfWriterProtocol,
force_duplicate: bool,
ignore_fields: Optional[Sequence[Union[str, int]]],
visited: Set[Tuple[int, int]],
) -> None:
"""
Update the object from src.
Expand All @@ -1081,7 +1094,7 @@ def _clone(
self._operations = list(src_cs._operations)
self.forced_encoding = src_cs.forced_encoding
# no need to call DictionaryObjection or anything
# like super(DictionaryObject,self)._clone(src, pdf_dest, force_duplicate, ignore_fields)
# like super(DictionaryObject,self)._clone(src, pdf_dest, force_duplicate, ignore_fields, visited)

def _parse_content_stream(self, stream: StreamType) -> None:
# 7.8.2 Content Streams
Expand Down

0 comments on commit 71c8140

Please sign in to comment.