Skip to content

Commit

Permalink
Vendor in latest pipdeptree to solve some of the pkg_resources conver…
Browse files Browse the repository at this point in the history
…sion errors.
  • Loading branch information
matteius committed Apr 24, 2024
1 parent 57dfac4 commit 201bbb8
Show file tree
Hide file tree
Showing 9 changed files with 266 additions and 186 deletions.
9 changes: 0 additions & 9 deletions pipenv/vendor/pipdeptree/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +0,0 @@
import os
import sys

pardir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# for finding pipdeptree itself
sys.path.append(pardir)
# for finding stuff in vendor and patched
sys.path.append(os.path.dirname(os.path.dirname(pardir)))

15 changes: 3 additions & 12 deletions pipenv/vendor/pipdeptree/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,23 @@

from __future__ import annotations

import os
import sys
from typing import Sequence

pardir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# for finding pipdeptree itself
sys.path.append(pardir)
# for finding stuff in vendor and patched
sys.path.append(os.path.dirname(os.path.dirname(pardir)))

from pipenv.vendor.pipdeptree._cli import get_options
from pipenv.vendor.pipdeptree._discovery import get_installed_distributions
from pipenv.vendor.pipdeptree._models import PackageDAG
from pipenv.vendor.pipdeptree._non_host import handle_non_host_target
from pipenv.vendor.pipdeptree._render import render
from pipenv.vendor.pipdeptree._validate import validate


def main(args: Sequence[str] | None = None) -> None | int:
"""CLI - The main function called as entry point."""
options = get_options(args)
result = handle_non_host_target(options)
if result is not None:
return result

pkgs = get_installed_distributions(local_only=options.local_only, user_only=options.user_only)
pkgs = get_installed_distributions(
interpreter=options.python, local_only=options.local_only, user_only=options.user_only
)
tree = PackageDAG.from_pkgs(pkgs)
is_text_output = not any([options.json, options.json_tree, options.output_format])

Expand Down
75 changes: 75 additions & 0 deletions pipenv/vendor/pipdeptree/_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from __future__ import annotations

from json import JSONDecodeError
from pathlib import Path
from typing import TYPE_CHECKING

from pipenv.patched.pip._internal.models.direct_url import (
DirectUrl, # noqa: PLC2701
DirectUrlValidationError, # noqa: PLC2701
)
from pipenv.patched.pip._internal.utils.egg_link import egg_link_path_from_sys_path # noqa: PLC2701
from pipenv.patched.pip._vendor.packaging.version import Version # noqa: PLC2701

if TYPE_CHECKING:
from importlib.metadata import Distribution


class PipBaseDistributionAdapter:
"""
An adapter class for pip's `pipenv.patched.pip._internal.metadata.BaseDistribution` abstract class.
It essentially wraps over an importlib.metadata.Distribution object and provides just enough fields/methods found in
pip's `BaseDistribution` so that we can use `pipenv.patched.pip._internal.operations.freeze.FrozenRequirement.from_dist()`.
:param dist: Represents an `importlib.metadata.Distribution` object.
"""

DIRECT_URL_METADATA_NAME = "direct_url.json"

def __init__(self, dist: Distribution) -> None:
self._dist = dist
self._raw_name = dist.metadata["Name"]
self._version = Version(dist.version)

@property
def raw_name(self) -> str:
return self._raw_name

@property
def version(self) -> Version:
return self._version

@property
def editable(self) -> bool:
return self.editable_project_location is not None

@property
def direct_url(self) -> DirectUrl | None:
result = None
json_str = self._dist.read_text(self.DIRECT_URL_METADATA_NAME)
try:
if json_str:
result = DirectUrl.from_json(json_str)
except (
UnicodeDecodeError,
JSONDecodeError,
DirectUrlValidationError,
):
return result
return result

@property
def editable_project_location(self) -> str | None:
direct_url = self.direct_url
if direct_url and direct_url.is_local_editable():
from pipenv.patched.pip._internal.utils.urls import url_to_path # noqa: PLC2701, PLC0415

return url_to_path(direct_url.url)

result = None
egg_link_path = egg_link_path_from_sys_path(self.raw_name)
if egg_link_path:
with Path(egg_link_path).open("r") as f:
result = f.readline().rstrip()
return result
103 changes: 81 additions & 22 deletions pipenv/vendor/pipdeptree/_discovery.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,94 @@
from __future__ import annotations

from typing import TYPE_CHECKING
import ast
import site
import subprocess # noqa: S404
import sys
from importlib.metadata import Distribution, distributions
from pathlib import Path
from typing import Iterable, Tuple

if TYPE_CHECKING:
from pipenv.patched.pip._vendor.pkg_resources import DistInfoDistribution
from pipenv.patched.pip._vendor.packaging.utils import canonicalize_name


def get_installed_distributions(
interpreter: str = str(sys.executable),
local_only: bool = False, # noqa: FBT001, FBT002
user_only: bool = False, # noqa: FBT001, FBT002
) -> list[DistInfoDistribution]:
try:
from pipenv.patched.pip._internal.metadata import pkg_resources # noqa: PLC0415, PLC2701
except ImportError:
# For backward compatibility with python ver. 2.7 and pip
# version 20.3.4 (the latest pip version that works with python
# version 2.7)
from pipenv.patched.pip._internal.utils import misc # noqa: PLC0415, PLC2701 # pragma: no cover

return misc.get_installed_distributions( # type: ignore[no-any-return,attr-defined]
local_only=local_only,
user_only=user_only,
)
) -> list[Distribution]:
# See https://docs.python.org/3/library/venv.html#how-venvs-work for more details.
in_venv = sys.prefix != sys.base_prefix
original_dists: Iterable[Distribution] = []
py_path = Path(interpreter).absolute()
using_custom_interpreter = py_path != Path(sys.executable).absolute()

if user_only:
original_dists = distributions(path=[site.getusersitepackages()])
elif using_custom_interpreter:
# We query the interpreter directly to get its `sys.path` list to be used by `distributions()`.
# If --python and --local-only are given, we ensure that we are only using paths associated to the interpreter's
# environment.
if local_only:
cmd = "import sys; print([p for p in sys.path if p.startswith(sys.prefix)])"
else:
cmd = "import sys; print(sys.path)"

args = [str(py_path), "-c", cmd]
result = subprocess.run(args, stdout=subprocess.PIPE, check=False, text=True) # noqa: S603
original_dists = distributions(path=ast.literal_eval(result.stdout))
elif local_only and in_venv:
venv_site_packages = [p for p in sys.path if p.startswith(sys.prefix)]
original_dists = distributions(path=venv_site_packages)
else:
dists = pkg_resources.Environment.from_paths(None).iter_installed_distributions(
local_only=local_only,
skip=(),
user_only=user_only,
)
return [d._dist for d in dists] # type: ignore[attr-defined] # noqa: SLF001
original_dists = distributions()

# Since importlib.metadata.distributions() can return duplicate packages, we need to handle this. pip's approach is
# to keep track of each package metadata it finds, and if it encounters one again it will simply just ignore it. We
# take it one step further and warn the user that there are duplicate packages in their environment.
# See https://github.com/pypa/pip/blob/7c49d06ea4be4635561f16a524e3842817d1169a/src/pip/_internal/metadata/importlib/_envs.py#L34
seen_dists: dict[str, Distribution] = {}
first_seen_to_already_seen_dists_dict: dict[Distribution, list[Distribution]] = {}
dists = []
for dist in original_dists:
normalized_name = canonicalize_name(dist.metadata["Name"])
if normalized_name not in seen_dists:
seen_dists[normalized_name] = dist
dists.append(dist)
continue
already_seen_dists = first_seen_to_already_seen_dists_dict.setdefault(seen_dists[normalized_name], [])
already_seen_dists.append(dist)

if first_seen_to_already_seen_dists_dict:
render_duplicated_dist_metadata_text(first_seen_to_already_seen_dists_dict)

return dists


FirstSeenWithDistsPair = Tuple[Distribution, Distribution]


def render_duplicated_dist_metadata_text(
first_seen_to_already_seen_dists_dict: dict[Distribution, list[Distribution]],
) -> None:
entries_to_pairs_dict: dict[str, list[FirstSeenWithDistsPair]] = {}
for first_seen, dists in first_seen_to_already_seen_dists_dict.items():
for dist in dists:
entry = str(dist.locate_file(""))
dist_list = entries_to_pairs_dict.setdefault(entry, [])
dist_list.append((first_seen, dist))

print("Warning!!! Duplicate package metadata found:", file=sys.stderr) # noqa: T201
for entry, pairs in entries_to_pairs_dict.items():
print(f'"{entry}"', file=sys.stderr) # noqa: T201
for first_seen, dist in pairs:
print( # noqa: T201
(
f" {dist.metadata['Name']:<32} {dist.version:<16} (using {first_seen.version},"
f" \"{first_seen.locate_file('')}\")"
),
file=sys.stderr,
)
print("-" * 72, file=sys.stderr) # noqa: T201


__all__ = [
Expand Down
62 changes: 43 additions & 19 deletions pipenv/vendor/pipdeptree/_models/dag.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,31 @@
from __future__ import annotations

import sys
from collections import defaultdict, deque
from fnmatch import fnmatch
from itertools import chain
from typing import TYPE_CHECKING, Iterator, List, Mapping

from pipenv.patched.pip._vendor.packaging.utils import canonicalize_name

if TYPE_CHECKING:
from pipenv.patched.pip._vendor.pkg_resources import DistInfoDistribution
from importlib.metadata import Distribution


from .package import DistPackage, InvalidRequirementError, ReqPackage


from .package import DistPackage, ReqPackage, pep503_normalize
def render_invalid_reqs_text_if_necessary(dist_name_to_invalid_reqs_dict: dict[str, list[str]]) -> None:
if not dist_name_to_invalid_reqs_dict:
return

print("Warning!!! Invalid requirement strings found for the following distributions:", file=sys.stderr) # noqa: T201
for dist_name, invalid_reqs in dist_name_to_invalid_reqs_dict.items():
print(dist_name, file=sys.stderr) # noqa: T201

for invalid_req in invalid_reqs:
print(f' Skipping "{invalid_req}"', file=sys.stderr) # noqa: T201
print("-" * 72, file=sys.stderr) # noqa: T201


class PackageDAG(Mapping[DistPackage, List[ReqPackage]]):
Expand All @@ -36,23 +52,34 @@ class PackageDAG(Mapping[DistPackage, List[ReqPackage]]):
"""

@classmethod
def from_pkgs(cls, pkgs: list[DistInfoDistribution]) -> PackageDAG:
def from_pkgs(cls, pkgs: list[Distribution]) -> PackageDAG:
dist_pkgs = [DistPackage(p) for p in pkgs]
idx = {p.key: p for p in dist_pkgs}
m: dict[DistPackage, list[ReqPackage]] = {}
dist_name_to_invalid_reqs_dict: dict[str, list[str]] = {}
for p in dist_pkgs:
reqs = []
for r in p.requires():
# Requirement key is not sufficiently normalized in pkg_resources - apply additional normalization
d = idx.get(pep503_normalize(r.key))
# pip's _vendor.packaging.requirements.Requirement uses the exact casing of a dependency's name found in
# a project's build config, which is not ideal when rendering.
requires_iterator = p.requires()
while True:
try:
req = next(requires_iterator)
except InvalidRequirementError as err:
# We can't work with invalid requirement strings. Let's warn the user about them.
dist_name_to_invalid_reqs_dict.setdefault(p.project_name, []).append(str(err))
continue
except StopIteration:
break
d = idx.get(canonicalize_name(req.name))
# Distribution.requires only returns the name of requirements in the metadata file, which may not be the
# same as the name in PyPI. We should try to retain the original package names for requirements.
# See https://github.com/tox-dev/pipdeptree/issues/242
r.project_name = d.project_name if d is not None else r.project_name
pkg = ReqPackage(r, d)
req.name = d.project_name if d is not None else req.name
pkg = ReqPackage(req, d)
reqs.append(pkg)
m[p] = reqs

render_invalid_reqs_text_if_necessary(dist_name_to_invalid_reqs_dict)

return cls(m)

def __init__(self, m: dict[DistPackage, list[ReqPackage]]) -> None:
Expand Down Expand Up @@ -110,16 +137,11 @@ def filter_nodes(self, include: list[str] | None, exclude: set[str] | None) -> P
if include is None and exclude is None:
return self

# Note: In following comparisons, we use lower cased values so
# that user may specify `key` or `project_name`. As per the
# documentation, `key` is simply
# `project_name.lower()`. Refer:
# https://setuptools.readthedocs.io/en/latest/pkg_resources.html#distribution-objects
include_with_casing_preserved: list[str] = []
if include:
include_with_casing_preserved = include
include = [s.lower() for s in include]
exclude = {s.lower() for s in exclude} if exclude else set()
include = [canonicalize_name(i) for i in include]
exclude = {canonicalize_name(s) for s in exclude} if exclude else set()

# Check for mutual exclusion of show_only and exclude sets
# after normalizing the values to lowercase
Expand Down Expand Up @@ -164,7 +186,9 @@ def filter_nodes(self, include: list[str] | None, exclude: set[str] | None) -> P
# a dependency is missing
continue

non_existent_includes = [i for i in include_with_casing_preserved if i.lower() not in matched_includes]
non_existent_includes = [
i for i in include_with_casing_preserved if canonicalize_name(i) not in matched_includes
]
if non_existent_includes:
raise ValueError("No packages matched using the following patterns: " + ", ".join(non_existent_includes))

Expand Down Expand Up @@ -242,7 +266,7 @@ def reverse(self) -> PackageDAG: # type: ignore[override]
for v in vs:
assert isinstance(v, DistPackage)
node = next((p for p in m if p.key == v.key), v.as_parent_of(None))
m[node].append(k) # type: ignore[arg-type]
m[node].append(k)
if k.key not in child_keys:
assert isinstance(k, ReqPackage)
assert k.dist is not None
Expand Down
Loading

0 comments on commit 201bbb8

Please sign in to comment.