Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gh-97930: Merge with importlib_resources 5.9 #97929

Merged
merged 2 commits into from
Oct 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 57 additions & 5 deletions Lib/importlib/resources/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,14 @@ def from_package(package):


@contextlib.contextmanager
def _tempfile(reader, suffix='',
# gh-93353: Keep a reference to call os.remove() in late Python
# finalization.
*, _os_remove=os.remove):
def _tempfile(
reader,
suffix='',
# gh-93353: Keep a reference to call os.remove() in late Python
# finalization.
*,
_os_remove=os.remove,
):
# Not using tempfile.NamedTemporaryFile as it leads to deeper 'try'
# blocks due to the need to close the temporary file to work on Windows
# properly.
Expand All @@ -89,13 +93,30 @@ def _tempfile(reader, suffix='',
pass


def _temp_file(path):
return _tempfile(path.read_bytes, suffix=path.name)


def _is_present_dir(path: Traversable) -> bool:
"""
Some Traversables implement ``is_dir()`` to raise an
exception (i.e. ``FileNotFoundError``) when the
directory doesn't exist. This function wraps that call
to always return a boolean and only return True
if there's a dir and it exists.
"""
with contextlib.suppress(FileNotFoundError):
return path.is_dir()
return False


@functools.singledispatch
def as_file(path):
"""
Given a Traversable object, return that object as a
path on the local file system in a context manager.
"""
return _tempfile(path.read_bytes, suffix=path.name)
return _temp_dir(path) if _is_present_dir(path) else _temp_file(path)


@as_file.register(pathlib.Path)
Expand All @@ -105,3 +126,34 @@ def _(path):
Degenerate behavior for pathlib.Path objects.
"""
yield path


@contextlib.contextmanager
def _temp_path(dir: tempfile.TemporaryDirectory):
"""
Wrap tempfile.TemporyDirectory to return a pathlib object.
"""
with dir as result:
yield pathlib.Path(result)


@contextlib.contextmanager
def _temp_dir(path):
"""
Given a traversable dir, recursively replicate the whole tree
to the file system in a context manager.
"""
assert path.is_dir()
with _temp_path(tempfile.TemporaryDirectory()) as temp_dir:
yield _write_contents(temp_dir, path)


def _write_contents(target, source):
child = target.joinpath(source.name)
if source.is_dir():
child.mkdir()
for item in source.iterdir():
_write_contents(child, item)
else:
child.open('wb').write(source.read_bytes())
return child
23 changes: 22 additions & 1 deletion Lib/importlib/resources/abc.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import abc
import io
import itertools
import os
import pathlib
from typing import Any, BinaryIO, Iterable, Iterator, NoReturn, Text, Optional
from typing import runtime_checkable, Protocol
from typing import Union
Expand Down Expand Up @@ -53,6 +55,10 @@ def contents(self) -> Iterable[str]:
raise FileNotFoundError


class TraversalError(Exception):
pass


@runtime_checkable
class Traversable(Protocol):
"""
Expand Down Expand Up @@ -95,7 +101,6 @@ def is_file(self) -> bool:
Return True if self is a file
"""

@abc.abstractmethod
def joinpath(self, *descendants: StrPath) -> "Traversable":
"""
Return Traversable resolved with any descendants applied.
Expand All @@ -104,6 +109,22 @@ def joinpath(self, *descendants: StrPath) -> "Traversable":
and each may contain multiple levels separated by
``posixpath.sep`` (``/``).
"""
if not descendants:
return self
names = itertools.chain.from_iterable(
path.parts for path in map(pathlib.PurePosixPath, descendants)
)
target = next(names)
matches = (
traversable for traversable in self.iterdir() if traversable.name == target
)
try:
match = next(matches)
except StopIteration:
raise TraversalError(
"Target not found during traversal.", target, list(names)
)
return match.joinpath(*names)

def __truediv__(self, child: StrPath) -> "Traversable":
"""
Expand Down
16 changes: 7 additions & 9 deletions Lib/importlib/resources/readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,13 @@ def is_dir(self):
def is_file(self):
return False

def joinpath(self, child):
# first try to find child in current paths
for file in self.iterdir():
if file.name == child:
return file
# if it does not exist, construct it with the first path
return self._paths[0] / child

__truediv__ = joinpath
def joinpath(self, *descendants):
try:
return super().joinpath(*descendants)
except abc.TraversalError:
# One of the paths did not resolve (a directory does not exist).
# Just return something that will not exist.
return self._paths[0].joinpath(*descendants)

def open(self, *args, **kwargs):
raise FileNotFoundError(f'{self} is not a file')
Expand Down
14 changes: 0 additions & 14 deletions Lib/importlib/resources/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,20 +99,6 @@ def iterdir(self):
def open(self, *args, **kwargs):
raise IsADirectoryError()

@staticmethod
def _flatten(compound_names):
for name in compound_names:
yield from name.split('/')

def joinpath(self, *descendants):
if not descendants:
return self
names = self._flatten(descendants)
target = next(names)
return next(
traversable for traversable in self.iterdir() if traversable.name == target
).joinpath(*names)


class TraversableReader(TraversableResources, SimpleReader):
"""
Expand Down
5 changes: 5 additions & 0 deletions Lib/test/test_importlib/resources/test_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ def test_join_path(self):
str(path.joinpath('imaginary'))[len(prefix) + 1 :],
os.path.join('namespacedata01', 'imaginary'),
)
self.assertEqual(path.joinpath(), path)

def test_join_path_compound(self):
path = MultiplexedPath(self.folder)
assert not path.joinpath('imaginary/foo.py').exists()

def test_repr(self):
self.assertEqual(
Expand Down
8 changes: 8 additions & 0 deletions Lib/test/test_importlib/resources/test_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ def test_submodule_contents_by_name(self):
{'__init__.py', 'binary.file'},
)

def test_as_file_directory(self):
with resources.as_file(resources.files('ziptestdata')) as data:
assert data.name == 'ziptestdata'
assert data.is_dir()
assert data.joinpath('subdirectory').is_dir()
assert len(list(data.iterdir()))
assert not data.parent.exists()


class ResourceFromZipsTest02(util.ZipSetupBase, unittest.TestCase):
ZIP_MODULE = zipdata02 # type: ignore
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Apply changes from importlib_resources 5.8 and 5.9: ``Traversable.joinpath``
provides a concrete implementation. ``as_file`` now supports directories of
resources.