Skip to content

Commit

Permalink
Improve executable discovery of a PythonInfo inside a given fol… (#1443)
Browse files Browse the repository at this point in the history
- Use architecture, version, implementation and platform extensions for
candiates
- Cache PythonInfo.from_exe by path (resolving symlinks)
- Ensure what we discover has the same version, implementation and
architecture.
- Improve our test suite for this functionality.
Signed-off-by: Bernat Gabor <[email protected]>
  • Loading branch information
saytosid authored and gaborbernat committed Nov 29, 2019
1 parent f2fc080 commit ad42aa5
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 36 deletions.
82 changes: 50 additions & 32 deletions src/virtualenv/interpreters/discovery/py_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,46 +120,67 @@ def system_executable(self):
if env_prefix:
if self.real_prefix is None and self.base_executable is not None:
return self.base_executable
return self.find_exe(env_prefix)
return self.find_exe_based_of(inside_folder=env_prefix)
else:
return self.executable

def find_exe(self, home):
def find_exe_based_of(self, inside_folder):
# we don't know explicitly here, do some guess work - our executable name should tell
exe_base_name = os.path.basename(self.executable)
possible_names = self._find_possible_exe_names(exe_base_name)
possible_folders = self._find_possible_folders(exe_base_name, home)
possible_names = self._find_possible_exe_names()
possible_folders = self._find_possible_folders(inside_folder)
for folder in possible_folders:
for name in possible_names:
candidate = os.path.join(folder, name)
if os.path.exists(candidate):
return candidate
info = PythonInfo.from_exe(candidate)
keys = {"implementation", "architecture", "version_info"}
if all(getattr(info, k) == getattr(self, k) for k in keys):
return candidate
what = "|".join(possible_names) # pragma: no cover
raise RuntimeError("failed to detect {} in {}".format(what, "|".join(possible_folders))) # pragma: no cover

def _find_possible_folders(self, exe_base_name, home):
def _find_possible_folders(self, inside_folder):
candidate_folder = OrderedDict()
if self.executable.startswith(self.prefix):
relative = self.executable[len(self.prefix) : -len(exe_base_name)]
candidate_folder["{}{}".format(home, relative)] = None
candidate_folder[home] = None
base = os.path.dirname(self.executable)
# following path pattern of the current
if base.startswith(self.prefix):
relative = base[len(self.prefix) :]
candidate_folder["{}{}".format(inside_folder, relative)] = None

# or at root level
candidate_folder[inside_folder] = None
return list(candidate_folder.keys())

@staticmethod
def _find_possible_exe_names(exe_base_name):
exe_no_suffix = os.path.splitext(exe_base_name)[0]
def _find_possible_exe_names(self):
name_candidate = OrderedDict()
for ext in EXTENSIONS:
for name in [self.implementation, "python"]:
for at in range(3, -1, -1):
cur_ver = sys.version_info[0:at]
version = ".".join(str(i) for i in cur_ver)
name = "{}{}{}".format(exe_no_suffix, version, ext)
name_candidate[name] = None
version = ".".join(str(i) for i in self.version_info[:at])
for arch in ["-{}".format(self.architecture), ""]:
for ext in EXTENSIONS:
candidate = "{}{}{}{}".format(name, version, arch, ext)
name_candidate[candidate] = None
return list(name_candidate.keys())

__cache_from_exe = {}

@classmethod
def from_exe(cls, exe, raise_on_error=True):
key = os.path.realpath(exe)
if key in cls.__cache_from_exe:
result, failure = cls.__cache_from_exe[key]
else:
failure, result = cls._load_for_exe(exe)
cls.__cache_from_exe[key] = result, failure
if failure is not None:
if raise_on_error:
raise failure
else:
logging.debug("%s", str(failure))
return result

@classmethod
def _load_for_exe(cls, exe):
path = "{}.py".format(os.path.splitext(__file__)[0])
cmd = [exe, path]
# noinspection DuplicatedCode
Expand All @@ -172,19 +193,16 @@ def from_exe(cls, exe, raise_on_error=True):
code = process.returncode
except OSError as os_error:
out, err, code = "", os_error.strerror, os_error.errno
if code != 0:
if raise_on_error:
msg = "failed to query {} with code {}{}{}".format(
exe, code, " out: []".format(out) if out else "", " err: []".format(err) if err else ""
)
raise RuntimeError(msg)
else:
logging.debug("failed %s with code %s out %s err %s", cmd, code, out, err)
return None

result = cls.from_json(out)
result.executable = exe # keep original executable as this may contain initialization code
return result
result, failure = None, None
if code == 0:
result = cls.from_json(out)
result.executable = exe # keep original executable as this may contain initialization code
else:
msg = "failed to query {} with code {}{}{}".format(
exe, code, " out: []".format(out) if out else "", " err: []".format(err) if err else ""
)
failure = RuntimeError(msg)
return failure, result

def satisfies(self, spec, impl_must_match):
"""check if a given specification can be satisfied by the this python interpreter instance"""
Expand Down
8 changes: 5 additions & 3 deletions tests/unit/interpreters/create/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ def get_venv(tmp_path_factory):
root_python = get_root(tmp_path_factory)
dest = tmp_path_factory.mktemp("venv")
subprocess.check_call([str(root_python), "-m", "venv", "--without-pip", str(dest)])
return CURRENT.find_exe(str(dest))
# sadly creating a virtual environment does not tell us where the executable lives in general case
# so discover using some heuristic
return CURRENT.find_exe_based_of(inside_folder=str(dest))


def get_virtualenv(tmp_path_factory):
Expand All @@ -41,14 +43,14 @@ def get_virtualenv(tmp_path_factory):
virtualenv_at = str(tmp_path_factory.mktemp("venv-for-virtualenv"))
builder = EnvBuilder(symlinks=not IS_WIN)
builder.create(virtualenv_at)
venv_for_virtualenv = CURRENT.find_exe(virtualenv_at)
venv_for_virtualenv = CURRENT.find_exe_based_of(inside_folder=virtualenv_at)
cmd = venv_for_virtualenv, "-m", "pip", "install", "virtualenv==16.6.1"
subprocess.check_call(cmd)

virtualenv_python = tmp_path_factory.mktemp("virtualenv")
cmd = venv_for_virtualenv, "-m", "virtualenv", virtualenv_python
subprocess.check_call(cmd)
return CURRENT.find_exe(virtualenv_python)
return CURRENT.find_exe_based_of(inside_folder=virtualenv_python)


PYTHON = {"root": get_root, "venv": get_venv, "virtualenv": get_virtualenv}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def test_bad_exe_py_info_no_raise(tmp_path, caplog, capsys):
assert not out
assert len(caplog.messages) == 1
msg = caplog.messages[0]
assert repr(exe) in msg
assert str(exe) in msg
assert "code" in msg


Expand Down Expand Up @@ -88,3 +88,24 @@ def test_satisfy_not_version(spec):
parsed_spec = PythonSpec.from_string_spec("{}{}".format(CURRENT.implementation, spec))
matches = CURRENT.satisfies(parsed_spec, True)
assert matches is False


def test_py_info_cached(mocker, tmp_path):
mocker.spy(PythonInfo, "_load_for_exe")
with pytest.raises(RuntimeError):
PythonInfo.from_exe(str(tmp_path))
with pytest.raises(RuntimeError):
PythonInfo.from_exe(str(tmp_path))
assert PythonInfo._load_for_exe.call_count == 1


@pytest.mark.skipif(sys.platform == "win32", reason="symlink is not guaranteed to work on windows")
def test_py_info_cached_symlink(mocker, tmp_path):
mocker.spy(PythonInfo, "_load_for_exe")
with pytest.raises(RuntimeError):
PythonInfo.from_exe(str(tmp_path))
symlinked = tmp_path / "a"
symlinked.symlink_to(tmp_path)
with pytest.raises(RuntimeError):
PythonInfo.from_exe(str(symlinked))
assert PythonInfo._load_for_exe.call_count == 1
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import logging
import os
import sys

import pytest

from virtualenv.interpreters.discovery.py_info import CURRENT, EXTENSIONS


def test_discover_empty_folder(tmp_path, monkeypatch):
with pytest.raises(RuntimeError):
CURRENT.find_exe_based_of(inside_folder=str(tmp_path))


@pytest.mark.skipif(sys.platform == "win32", reason="symlink is not guaranteed to work on windows")
@pytest.mark.parametrize("suffix", EXTENSIONS)
@pytest.mark.parametrize("arch", [CURRENT.architecture, ""])
@pytest.mark.parametrize("version", [".".join(str(i) for i in CURRENT.version_info[0:i]) for i in range(3, 0, -1)])
@pytest.mark.parametrize("impl", [CURRENT.implementation, "python"])
@pytest.mark.parametrize("into", [CURRENT.prefix[len(CURRENT.executable) :], ""])
def test_discover_ok(tmp_path, monkeypatch, suffix, impl, version, arch, into, caplog):
caplog.set_level(logging.DEBUG)
folder = tmp_path / into
folder.mkdir(parents=True, exist_ok=True)
dest = folder / "{}{}".format(impl, version, arch, suffix)
os.symlink(CURRENT.executable, str(dest))
inside_folder = str(tmp_path)
assert CURRENT.find_exe_based_of(inside_folder) == str(dest)
assert not caplog.text

dest.rename(dest.parent / (dest.name + "-1"))
with pytest.raises(RuntimeError):
CURRENT.find_exe_based_of(inside_folder)

0 comments on commit ad42aa5

Please sign in to comment.