diff --git a/news/5522.vendor.rst b/news/5522.vendor.rst new file mode 100644 index 0000000000..f4b407fef2 --- /dev/null +++ b/news/5522.vendor.rst @@ -0,0 +1,3 @@ + * Bump version of requirementslib to 2.2.1 + * Bump version of vistir to 0.7.5 + * Bump version of colorama to 0.4.6 diff --git a/pipenv/__init__.py b/pipenv/__init__.py index 7f5d2cef7f..75231b0f62 100644 --- a/pipenv/__init__.py +++ b/pipenv/__init__.py @@ -38,21 +38,26 @@ if "urllib3" in sys.modules: del sys.modules["urllib3"] -from pipenv.vendor.vistir.misc import get_text_stream - -stdout = get_text_stream("stdout") -stderr = get_text_stream("stderr") if os.name == "nt": - from pipenv.vendor.vistir.misc import _can_use_color, _wrap_for_color + from pipenv.vendor import colorama - if _can_use_color(stdout): - stdout = _wrap_for_color(stdout) - if _can_use_color(stderr): - stderr = _wrap_for_color(stderr) + # Backward compatability with vistir + no_color = False + for item in ("ANSI_COLORS_DISABLED", "VISTIR_DISABLE_COLORS", "CI"): + warnings.warn( + ( + f"Please do not use {item}, as it will be removed in future versions." + "\nUse NO_COLOR instead." + ), + DeprecationWarning, + stacklevel=2, + ) + if os.getenv(item): + no_color = True -sys.stdout = stdout -sys.stderr = stderr + if not os.getenv("NO_COLOR") or no_color: + colorama.just_fix_windows_console() from . import resolver # noqa from .cli import cli diff --git a/pipenv/vendor/colorama/__init__.py b/pipenv/vendor/colorama/__init__.py index b149ed79b0..383101cdb3 100644 --- a/pipenv/vendor/colorama/__init__.py +++ b/pipenv/vendor/colorama/__init__.py @@ -1,6 +1,7 @@ # Copyright Jonathan Hartley 2013. BSD 3-Clause license, see LICENSE file. -from .initialise import init, deinit, reinit, colorama_text +from .initialise import init, deinit, reinit, colorama_text, just_fix_windows_console from .ansi import Fore, Back, Style, Cursor from .ansitowin32 import AnsiToWin32 -__version__ = '0.4.4' +__version__ = '0.4.6' + diff --git a/pipenv/vendor/colorama/ansitowin32.py b/pipenv/vendor/colorama/ansitowin32.py index 6039a05432..abf209e60c 100644 --- a/pipenv/vendor/colorama/ansitowin32.py +++ b/pipenv/vendor/colorama/ansitowin32.py @@ -4,7 +4,7 @@ import os from .ansi import AnsiFore, AnsiBack, AnsiStyle, Style, BEL -from .winterm import WinTerm, WinColor, WinStyle +from .winterm import enable_vt_processing, WinTerm, WinColor, WinStyle from .win32 import windll, winapi_test @@ -37,6 +37,12 @@ def __enter__(self, *args, **kwargs): def __exit__(self, *args, **kwargs): return self.__wrapped.__exit__(*args, **kwargs) + def __setstate__(self, state): + self.__dict__ = state + + def __getstate__(self): + return self.__dict__ + def write(self, text): self.__convertor.write(text) @@ -57,7 +63,9 @@ def closed(self): stream = self.__wrapped try: return stream.closed - except AttributeError: + # AttributeError in the case that the stream doesn't support being closed + # ValueError for the case that the stream has already been detached when atexit runs + except (AttributeError, ValueError): return True @@ -86,15 +94,22 @@ def __init__(self, wrapped, convert=None, strip=None, autoreset=False): # (e.g. Cygwin Terminal). In this case it's up to the terminal # to support the ANSI codes. conversion_supported = on_windows and winapi_test() + try: + fd = wrapped.fileno() + except Exception: + fd = -1 + system_has_native_ansi = not on_windows or enable_vt_processing(fd) + have_tty = not self.stream.closed and self.stream.isatty() + need_conversion = conversion_supported and not system_has_native_ansi # should we strip ANSI sequences from our output? if strip is None: - strip = conversion_supported or (not self.stream.closed and not self.stream.isatty()) + strip = need_conversion or not have_tty self.strip = strip # should we should convert ANSI sequences into win32 calls? if convert is None: - convert = conversion_supported and not self.stream.closed and self.stream.isatty() + convert = need_conversion and have_tty self.convert = convert # dict of ansi codes to win32 functions and parameters @@ -256,3 +271,7 @@ def convert_osc(self, text): if params[0] in '02': winterm.set_title(params[1]) return text + + + def flush(self): + self.wrapped.flush() diff --git a/pipenv/vendor/colorama/initialise.py b/pipenv/vendor/colorama/initialise.py index 430d066872..d5fd4b71fe 100644 --- a/pipenv/vendor/colorama/initialise.py +++ b/pipenv/vendor/colorama/initialise.py @@ -6,13 +6,27 @@ from .ansitowin32 import AnsiToWin32 -orig_stdout = None -orig_stderr = None +def _wipe_internal_state_for_tests(): + global orig_stdout, orig_stderr + orig_stdout = None + orig_stderr = None + + global wrapped_stdout, wrapped_stderr + wrapped_stdout = None + wrapped_stderr = None -wrapped_stdout = None -wrapped_stderr = None + global atexit_done + atexit_done = False + + global fixed_windows_console + fixed_windows_console = False -atexit_done = False + try: + # no-op if it wasn't registered + atexit.unregister(reset_all) + except AttributeError: + # python 2: no atexit.unregister. Oh well, we did our best. + pass def reset_all(): @@ -55,6 +69,29 @@ def deinit(): sys.stderr = orig_stderr +def just_fix_windows_console(): + global fixed_windows_console + + if sys.platform != "win32": + return + if fixed_windows_console: + return + if wrapped_stdout is not None or wrapped_stderr is not None: + # Someone already ran init() and it did stuff, so we won't second-guess them + return + + # On newer versions of Windows, AnsiToWin32.__init__ will implicitly enable the + # native ANSI support in the console as a side-effect. We only need to actually + # replace sys.stdout/stderr if we're in the old-style conversion mode. + new_stdout = AnsiToWin32(sys.stdout, convert=None, strip=None, autoreset=False) + if new_stdout.convert: + sys.stdout = new_stdout + new_stderr = AnsiToWin32(sys.stderr, convert=None, strip=None, autoreset=False) + if new_stderr.convert: + sys.stderr = new_stderr + + fixed_windows_console = True + @contextlib.contextmanager def colorama_text(*args, **kwargs): init(*args, **kwargs) @@ -78,3 +115,7 @@ def wrap_stream(stream, convert, strip, autoreset, wrap): if wrapper.should_wrap(): stream = wrapper.stream return stream + + +# Use this for initial setup as well, to reduce code duplication +_wipe_internal_state_for_tests() diff --git a/pipenv/vendor/colorama/win32.py b/pipenv/vendor/colorama/win32.py index c2d8360336..841b0e270a 100644 --- a/pipenv/vendor/colorama/win32.py +++ b/pipenv/vendor/colorama/win32.py @@ -4,6 +4,8 @@ STDOUT = -11 STDERR = -12 +ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004 + try: import ctypes from ctypes import LibraryLoader @@ -89,6 +91,20 @@ def __str__(self): ] _SetConsoleTitleW.restype = wintypes.BOOL + _GetConsoleMode = windll.kernel32.GetConsoleMode + _GetConsoleMode.argtypes = [ + wintypes.HANDLE, + POINTER(wintypes.DWORD) + ] + _GetConsoleMode.restype = wintypes.BOOL + + _SetConsoleMode = windll.kernel32.SetConsoleMode + _SetConsoleMode.argtypes = [ + wintypes.HANDLE, + wintypes.DWORD + ] + _SetConsoleMode.restype = wintypes.BOOL + def _winapi_test(handle): csbi = CONSOLE_SCREEN_BUFFER_INFO() success = _GetConsoleScreenBufferInfo( @@ -150,3 +166,15 @@ def FillConsoleOutputAttribute(stream_id, attr, length, start): def SetConsoleTitle(title): return _SetConsoleTitleW(title) + + def GetConsoleMode(handle): + mode = wintypes.DWORD() + success = _GetConsoleMode(handle, byref(mode)) + if not success: + raise ctypes.WinError() + return mode.value + + def SetConsoleMode(handle, mode): + success = _SetConsoleMode(handle, mode) + if not success: + raise ctypes.WinError() diff --git a/pipenv/vendor/colorama/winterm.py b/pipenv/vendor/colorama/winterm.py index 0fdb4ec4e9..aad867e8c8 100644 --- a/pipenv/vendor/colorama/winterm.py +++ b/pipenv/vendor/colorama/winterm.py @@ -1,6 +1,12 @@ # Copyright Jonathan Hartley 2013. BSD 3-Clause license, see LICENSE file. -from . import win32 +try: + from msvcrt import get_osfhandle +except ImportError: + def get_osfhandle(_): + raise OSError("This isn't windows!") + +from . import win32 # from wincon.h class WinColor(object): @@ -167,3 +173,23 @@ def erase_line(self, mode=0, on_stderr=False): def set_title(self, title): win32.SetConsoleTitle(title) + + +def enable_vt_processing(fd): + if win32.windll is None or not win32.winapi_test(): + return False + + try: + handle = get_osfhandle(fd) + mode = win32.GetConsoleMode(handle) + win32.SetConsoleMode( + handle, + mode | win32.ENABLE_VIRTUAL_TERMINAL_PROCESSING, + ) + + mode = win32.GetConsoleMode(handle) + if mode & win32.ENABLE_VIRTUAL_TERMINAL_PROCESSING: + return True + # Can get TypeError in testsuite where 'fd' is a Mock() + except (OSError, TypeError): + return False diff --git a/pipenv/vendor/requirementslib/__init__.py b/pipenv/vendor/requirementslib/__init__.py index 69a421cfa3..7af3391b8b 100644 --- a/pipenv/vendor/requirementslib/__init__.py +++ b/pipenv/vendor/requirementslib/__init__.py @@ -5,7 +5,7 @@ from .models.pipfile import Pipfile from .models.requirements import Requirement -__version__ = "2.2.0" +__version__ = "2.2.1" logger = logging.getLogger(__name__) diff --git a/pipenv/vendor/requirementslib/models/setup_info.py b/pipenv/vendor/requirementslib/models/setup_info.py index 0b6f427aaa..71b979587f 100644 --- a/pipenv/vendor/requirementslib/models/setup_info.py +++ b/pipenv/vendor/requirementslib/models/setup_info.py @@ -4,6 +4,7 @@ import contextlib import os import shutil +import subprocess as sp import sys from collections.abc import Iterable, Mapping from contextlib import ExitStack @@ -30,7 +31,6 @@ ) from pipenv.patched.pip._vendor.platformdirs import user_cache_dir from pipenv.vendor.vistir.contextmanagers import cd, temp_path -from pipenv.vendor.vistir.misc import run from pipenv.vendor.vistir.path import create_tracked_tempdir, rmtree from ..environment import MYPY_RUNNING @@ -94,16 +94,8 @@ def pep517_subprocess_runner(cmd, cwd=None, extra_environ=None): if extra_environ: env.update(extra_environ) - run( - cmd, - cwd=cwd, - env=env, - block=True, - combine_stderr=True, - return_object=False, - write_to_stdout=False, - nospin=True, - ) + cmd_as_str = " ".join(cmd) + sp.run(cmd_as_str, cwd=cwd, env=env, stdout=sp.PIPE, stderr=sp.STDOUT, shell=True) class BuildEnv(envbuild.BuildEnvironment): @@ -117,14 +109,8 @@ def pip_install(self, reqs): "--prefix", self.path, ] + list(reqs) - run( - cmd, - block=True, - combine_stderr=True, - return_object=False, - write_to_stdout=False, - nospin=True, - ) + + sp.run(cmd, shell=True, stderr=sp.PIPE, stdout=sp.PIPE) class HookCaller(wrappers.Pep517HookCaller): @@ -892,13 +878,12 @@ def run_setup(script_path, egg_base=None): # We couldn't import everything needed to run setup except Exception: python = os.environ.get("PIP_PYTHON_PATH", sys.executable) - out, _ = run( + + sp.run( [python, "setup.py"] + args, cwd=target_cwd, - block=True, - combine_stderr=False, - return_object=False, - nospin=True, + stdout=sp.PIPE, + stderr=sp.PIPE, ) finally: _setup_stop_after = None diff --git a/pipenv/vendor/vendor.txt b/pipenv/vendor/vendor.txt index b42ea3ed16..c3b58032f3 100644 --- a/pipenv/vendor/vendor.txt +++ b/pipenv/vendor/vendor.txt @@ -2,7 +2,7 @@ attrs==22.1.0 cerberus==1.3.4 click-didyoumean==0.0.3 click==8.0.3 -colorama==0.4.4 +colorama==0.4.6 dparse==0.6.2 markupsafe==2.0.1 pexpect==4.8.0 @@ -12,9 +12,9 @@ ptyprocess==0.7.0 pyparsing==3.0.9 python-dotenv==0.19.0 pythonfinder==1.3.1 -requirementslib==2.2.0 +requirementslib==2.2.1 ruamel.yaml==0.17.21 shellingham==1.5.0 toml==0.10.2 tomlkit==0.9.2 -vistir==0.7.4 +vistir==0.7.5 diff --git a/pipenv/vendor/vistir/__init__.py b/pipenv/vendor/vistir/__init__.py index a584da365c..9d09aaeb21 100644 --- a/pipenv/vendor/vistir/__init__.py +++ b/pipenv/vendor/vistir/__init__.py @@ -1,66 +1,42 @@ # -*- coding=utf-8 -*- +import importlib -from .contextmanagers import ( - atomic_open_for_write, - cd, - open_file, - replaced_stream, - replaced_streams, - spinner, - temp_environ, - temp_path, -) -from .cursor import hide_cursor, show_cursor -from .misc import ( - StreamWrapper, - chunked, - decode_for_output, - divide, - get_wrapped_stream, - load_path, - partialclass, - run, - shell_escape, - take, - to_bytes, - to_text, -) -from .path import create_tracked_tempdir, create_tracked_tempfile, mkdir_p, rmtree -from .spin import create_spinner +__newpaths = { + 'create_spinner': 'vistir.spin', + 'cd': 'vistir.contextmanagers', + 'atomic_open_for_write': 'vistir.contextmanagers', + 'open_file': 'vistir.contextmanagers', + 'replaced_stream': 'vistir.contextmanagers', + 'replaced_streams': 'vistir.contextmanagers', + 'spinner': 'vistir.contextmanagers', + 'temp_environ': 'vistir.contextmanagers', + 'temp_path': 'vistir.contextmanagers', + 'hide_cursor': 'vistir.cursor', + 'show_cursor': 'vistir.cursor', + 'StreamWrapper': 'vistir.misc', + 'chunked':'vistir.misc', + 'decode_for_output': 'vistir.misc', + 'divide': 'vistir.misc', + 'get_wrapped_stream': 'vistir.misc', + 'load_path': 'vistir.misc', + 'partialclass': 'vistir.misc', + 'run': 'vistir.misc', + 'shell_escape': 'vistir.misc', + 'take': 'vistir.misc', + 'to_bytes': 'vistir.misc', + 'to_text': 'vistir.misc', + 'create_tracked_tempdir': 'vistir.path', + 'create_tracked_tempfile': 'vistir.path', + 'mkdir_p': 'vistir.path', + 'rmtree': 'vistir.path', +} -__version__ = "0.7.4" +from warnings import warn +def __getattr__(name): + warn((f"Importing {name} directly from vistir is deprecated.\nUse 'from {__newpaths[name]} import {name}' instead.\n" + "This import path will be removed in vistir 0.8"), + DeprecationWarning) + return getattr(importlib.import_module(__newpaths[name]), name) -__all__ = [ - "shell_escape", - "load_path", - "run", - "partialclass", - "temp_environ", - "temp_path", - "cd", - "atomic_open_for_write", - "open_file", - "rmtree", - "mkdir_p", - "TemporaryDirectory", - "NamedTemporaryFile", - "partialmethod", - "spinner", - "create_spinner", - "create_tracked_tempdir", - "create_tracked_tempfile", - "decode_for_output", - "to_text", - "to_bytes", - "take", - "chunked", - "divide", - "StringIO", - "get_wrapped_stream", - "StreamWrapper", - "replaced_stream", - "replaced_streams", - "show_cursor", - "hide_cursor", -] +__version__ = "0.7.5" diff --git a/pipenv/vendor/vistir/spin.py b/pipenv/vendor/vistir/spin.py deleted file mode 100644 index b950c66c17..0000000000 --- a/pipenv/vendor/vistir/spin.py +++ /dev/null @@ -1,496 +0,0 @@ -# -*- coding=utf-8 -*- -import functools -import os -import signal -import sys -import threading -import time -import typing -import warnings - -from io import StringIO - -import pipenv.vendor.colorama as colorama - -from .cursor import hide_cursor, show_cursor -from .misc import decode_for_output, to_text -from .termcolors import COLOR_MAP, COLORS, DISABLE_COLORS, colored - -if typing.TYPE_CHECKING: - from typing import ( - Any, - Callable, - ContextManager, - Dict, - IO, - Optional, - Text, - Type, - TypeVar, - Union, - ) - - TSignalMap = Dict[ - Type[signal.SIGINT], - Callable[..., int, str, Union["DummySpinner", "VistirSpinner"]], - ] - _T = TypeVar("_T", covariant=True) - -try: - import yaspin - import yaspin.spinners - import yaspin.core - - Spinners = yaspin.spinners.Spinners - SpinBase = yaspin.core.Yaspin - -except ImportError: # pragma: no cover - yaspin = None - Spinners = None - SpinBase = None - -if os.name == "nt": # pragma: no cover - - def handler(signum, frame, spinner): - """Signal handler, used to gracefully shut down the ``spinner`` instance - when specified signal is received by the process running the ``spinner``. - - ``signum`` and ``frame`` are mandatory arguments. Check ``signal.signal`` - function for more details. - """ - spinner.fail() - spinner.stop() - - -else: # pragma: no cover - - def handler(signum, frame, spinner): - """Signal handler, used to gracefully shut down the ``spinner`` instance - when specified signal is received by the process running the ``spinner``. - - ``signum`` and ``frame`` are mandatory arguments. Check ``signal.signal`` - function for more details. - """ - spinner.red.fail("✘") - spinner.stop() - - -CLEAR_LINE = chr(27) + "[K" - -TRANSLATION_MAP = {10004: u"OK", 10008: u"x"} - - -decode_output = functools.partial(decode_for_output, translation_map=TRANSLATION_MAP) - - -class DummySpinner(object): - def __init__(self, text="", **kwargs): - # type: (str, Any) -> None - if DISABLE_COLORS: - colorama.init() - self.text = decode_output(text) if text else "" - self.stdout = kwargs.get("stdout", sys.stdout) - self.stderr = kwargs.get("stderr", sys.stderr) - self.out_buff = StringIO() - self.write_to_stdout = kwargs.get("write_to_stdout", False) - super(DummySpinner, self).__init__() - - def __enter__(self): - if self.text and self.text != "None": - if self.write_to_stdout: - self.write(self.text) - return self - - def __exit__(self, exc_type, exc_val, tb): - if exc_type: - import traceback - - formatted_tb = traceback.format_exception(exc_type, exc_val, tb) - self.write_err("".join(formatted_tb)) - self._close_output_buffer() - return False - - def __getattr__(self, k): # pragma: no cover - try: - retval = super(DummySpinner, self).__getattribute__(k) - except AttributeError: - if k in COLOR_MAP.keys() or k.upper() in COLORS: - return self - raise - else: - return retval - - def _close_output_buffer(self): - if self.out_buff and not self.out_buff.closed: - try: - self.out_buff.close() - except Exception: - pass - - def fail(self, exitcode=1, text="FAIL"): - # type: (int, str) -> None - if text is not None and text != "None": - if self.write_to_stdout: - self.write(text) - else: - self.write_err(text) - self._close_output_buffer() - - def ok(self, text="OK"): - # type: (str) -> int - if text is not None and text != "None": - if self.write_to_stdout: - self.write(text) - else: - self.write_err(text) - self._close_output_buffer() - return 0 - - def hide_and_write(self, text, target=None): - # type: (str, Optional[str]) -> None - if not target: - target = self.stdout - if text is None or isinstance(text, str) and text == "None": - pass - target.write(decode_output(u"\r", target_stream=target)) - self._hide_cursor(target=target) - target.write(decode_output(u"{0}\n".format(text), target_stream=target)) - target.write(CLEAR_LINE) - self._show_cursor(target=target) - - def write(self, text=None): - # type: (Optional[str]) -> None - if not self.write_to_stdout: - return self.write_err(text) - if text is None or isinstance(text, str) and text == "None": - pass - if not self.stdout.closed: - stdout = self.stdout - else: - stdout = sys.stdout - stdout.write(decode_output(u"\r", target_stream=stdout)) - text = to_text(text) - line = decode_output(u"{0}\n".format(text), target_stream=stdout) - stdout.write(line) - stdout.write(CLEAR_LINE) - - def write_err(self, text=None): - # type: (Optional[str]) -> None - if text is None or isinstance(text, str) and text == "None": - pass - text = to_text(text) - if not self.stderr.closed: - stderr = self.stderr - else: - if sys.stderr.closed: - print(text) - return - stderr = sys.stderr - stderr.write(decode_output(u"\r", target_stream=stderr)) - line = decode_output(u"{0}\n".format(text), target_stream=stderr) - stderr.write(line) - stderr.write(CLEAR_LINE) - - @staticmethod - def _hide_cursor(target=None): - # type: (Optional[IO]) -> None - pass - - @staticmethod - def _show_cursor(target=None): - # type: (Optional[IO]) -> None - pass - - -if SpinBase is None: - SpinBase = DummySpinner - - -class VistirSpinner(SpinBase): - "A spinner class for handling spinners on windows and posix." - - def __init__(self, *args, **kwargs): - # type: (Any, Any) - """ - Get a spinner object or a dummy spinner to wrap a context. - - Keyword Arguments: - :param str spinner_name: A spinner type e.g. "dots" or "bouncingBar" (default: {"bouncingBar"}) - :param str start_text: Text to start off the spinner with (default: {None}) - :param dict handler_map: Handler map for signals to be handled gracefully (default: {None}) - :param bool nospin: If true, use the dummy spinner (default: {False}) - :param bool write_to_stdout: Writes to stdout if true, otherwise writes to stderr (default: True) - """ - - self.handler = handler - colorama.init() - sigmap = {} # type: TSignalMap - if handler: - sigmap.update({signal.SIGINT: handler, signal.SIGTERM: handler}) - handler_map = kwargs.pop("handler_map", {}) - if os.name == "nt": - sigmap[signal.SIGBREAK] = handler - else: - sigmap[signal.SIGALRM] = handler - if handler_map: - sigmap.update(handler_map) - spinner_name = kwargs.pop("spinner_name", "bouncingBar") - start_text = kwargs.pop("start_text", None) - _text = kwargs.pop("text", "Running...") - kwargs["text"] = start_text if start_text is not None else _text - kwargs["sigmap"] = sigmap - kwargs["spinner"] = getattr(Spinners, spinner_name, "") - write_to_stdout = kwargs.pop("write_to_stdout", True) - self.stdout = kwargs.pop("stdout", sys.stdout) - self.stderr = kwargs.pop("stderr", sys.stderr) - self.out_buff = StringIO() - self.write_to_stdout = write_to_stdout - self.is_dummy = bool(yaspin is None) - self._stop_spin = None # type: Optional[threading.Event] - self._hide_spin = None # type: Optional[threading.Event] - self._spin_thread = None # type: Optional[threading.Thread] - super(VistirSpinner, self).__init__(*args, **kwargs) - if DISABLE_COLORS: - colorama.deinit() - - def ok(self, text=u"OK", err=False): - # type: (str, bool) -> None - """Set Ok (success) finalizer to a spinner.""" - # Do not display spin text for ok state - self._text = None - - _text = to_text(text) if text else u"OK" - err = err or not self.write_to_stdout - self._freeze(_text, err=err) - - def fail(self, text=u"FAIL", err=False): - # type: (str, bool) -> None - """Set fail finalizer to a spinner.""" - # Do not display spin text for fail state - self._text = None - - _text = text if text else u"FAIL" - err = err or not self.write_to_stdout - self._freeze(_text, err=err) - - def hide_and_write(self, text, target=None): - # type: (str, Optional[str]) -> None - if not target: - target = self.stdout - if text is None or isinstance(text, str) and text == u"None": - pass - target.write(decode_output(u"\r")) - self._hide_cursor(target=target) - target.write(decode_output(u"{0}\n".format(text))) - target.write(CLEAR_LINE) - self._show_cursor(target=target) - - def write(self, text): # pragma: no cover - # type: (str) -> None - if not self.write_to_stdout: - return self.write_err(text) - stdout = self.stdout - if self.stdout.closed: - stdout = sys.stdout - stdout.write(decode_output(u"\r", target_stream=stdout)) - stdout.write(decode_output(CLEAR_LINE, target_stream=stdout)) - if text is None: - text = "" - text = decode_output(u"{0}\n".format(text), target_stream=stdout) - stdout.write(text) - self.out_buff.write(text) - - def write_err(self, text): # pragma: no cover - # type: (str) -> None - """Write error text in the terminal without breaking the spinner.""" - stderr = self.stderr - if self.stderr.closed: - stderr = sys.stderr - stderr.write(decode_output(u"\r", target_stream=stderr)) - stderr.write(decode_output(CLEAR_LINE, target_stream=stderr)) - if text is None: - text = "" - text = decode_output(u"{0}\n".format(text), target_stream=stderr) - self.stderr.write(text) - self.out_buff.write(decode_output(text, target_stream=self.out_buff)) - - def start(self): - # type: () -> None - if self._sigmap: - self._register_signal_handlers() - - target = self.stdout if self.write_to_stdout else self.stderr - if target.isatty(): - self._hide_cursor(target=target) - - self._stop_spin = threading.Event() - self._hide_spin = threading.Event() - self._spin_thread = threading.Thread(target=self._spin) - self._spin_thread.start() - - def stop(self): - # type: () -> None - if self._dfl_sigmap: - # Reset registered signal handlers to default ones - self._reset_signal_handlers() - - if self._spin_thread: - self._stop_spin.set() - self._spin_thread.join() - - target = self.stdout if self.write_to_stdout else self.stderr - if target.isatty(): - target.write("\r") - - if self.write_to_stdout: - self._clear_line() - else: - self._clear_err() - - if target.isatty(): - self._show_cursor(target=target) - self.out_buff.close() - - def _freeze(self, final_text, err=False): - # type: (str, bool) -> None - """Stop spinner, compose last frame and 'freeze' it.""" - if not final_text: - final_text = "" - target = self.stderr if err else self.stdout - if target.closed: - target = sys.stderr if err else sys.stdout - text = to_text(final_text) - last_frame = self._compose_out(text, mode="last") - self._last_frame = decode_output(last_frame, target_stream=target) - - # Should be stopped here, otherwise prints after - # self._freeze call will mess up the spinner - self.stop() - target.write(self._last_frame) - - def _compose_color_func(self): - # type: () -> Callable[..., str] - fn = functools.partial( - colored, color=self._color, on_color=self._on_color, attrs=list(self._attrs) - ) - return fn - - def _compose_out(self, frame, mode=None): - # type: (str, Optional[str]) -> Text - # Ensure Unicode input - - frame = to_text(frame) - if self._text is None: - self._text = u"" - text = to_text(self._text) - if self._color_func is not None: - frame = self._color_func(frame) - if self._side == "right": - frame, text = text, frame - # Mode - frame = to_text(frame) - if not mode: - out = u"\r{0} {1}".format(frame, text) - else: - out = u"{0} {1}\n".format(frame, text) - return out - - def _spin(self): - # type: () -> None - target = self.stdout if self.write_to_stdout else self.stderr - clear_fn = self._clear_line if self.write_to_stdout else self._clear_err - while not self._stop_spin.is_set(): - - if self._hide_spin.is_set(): - # Wait a bit to avoid wasting cycles - time.sleep(self._interval) - continue - - # Compose output - spin_phase = next(self._cycle) - out = self._compose_out(spin_phase) - out = decode_output(out, target) - - # Write - target.write(out) - clear_fn() - target.flush() - - # Wait - time.sleep(self._interval) - target.write("\b") - - def _register_signal_handlers(self): - # type: () -> None - # SIGKILL cannot be caught or ignored, and the receiving - # process cannot perform any clean-up upon receiving this - # signal. - try: - if signal.SIGKILL in self._sigmap.keys(): - raise ValueError( - "Trying to set handler for SIGKILL signal. " - "SIGKILL cannot be cought or ignored in POSIX systems." - ) - except AttributeError: - pass - - for sig, sig_handler in self._sigmap.items(): - # A handler for a particular signal, once set, remains - # installed until it is explicitly reset. Store default - # signal handlers for subsequent reset at cleanup phase. - dfl_handler = signal.getsignal(sig) - self._dfl_sigmap[sig] = dfl_handler - - # ``signal.SIG_DFL`` and ``signal.SIG_IGN`` are also valid - # signal handlers and are not callables. - if callable(sig_handler): - # ``signal.signal`` accepts handler function which is - # called with two arguments: signal number and the - # interrupted stack frame. ``functools.partial`` solves - # the problem of passing spinner instance into the handler - # function. - sig_handler = functools.partial(sig_handler, spinner=self) - - signal.signal(sig, sig_handler) - - def _reset_signal_handlers(self): - # type: () -> None - for sig, sig_handler in self._dfl_sigmap.items(): - signal.signal(sig, sig_handler) - - @staticmethod - def _hide_cursor(target=None): - # type: (Optional[IO]) -> None - if not target: - target = sys.stdout - hide_cursor(stream=target) - - @staticmethod - def _show_cursor(target=None): - # type: (Optional[IO]) -> None - if not target: - target = sys.stdout - show_cursor(stream=target) - - @staticmethod - def _clear_err(): - # type: () -> None - sys.stderr.write(CLEAR_LINE) - - @staticmethod - def _clear_line(): - # type: () -> None - sys.stdout.write(CLEAR_LINE) - - -def create_spinner(*args, **kwargs): - warnings.warn( - ('This function is deprecated and will be removed in version 0.8.' - 'Consider using yaspin directly instead, or user rich.status'), - DeprecationWarning, stacklevel=2) - # type: (Any, Any) -> Union[DummySpinner, VistirSpinner] - nospin = kwargs.pop("nospin", False) - use_yaspin = kwargs.pop("use_yaspin", not nospin) - if nospin or not use_yaspin: - return DummySpinner(*args, **kwargs) - return VistirSpinner(*args, **kwargs) diff --git a/tasks/vendoring/__init__.py b/tasks/vendoring/__init__.py index b1b9d9f670..f7aaa0f6bd 100644 --- a/tasks/vendoring/__init__.py +++ b/tasks/vendoring/__init__.py @@ -312,6 +312,7 @@ def install(ctx, vendor_dir, package=None): def post_install_cleanup(ctx, vendor_dir): + log("Removing unused modules and files ...") remove_all(vendor_dir.glob("*.dist-info")) remove_all(vendor_dir.glob("*.egg-info")) @@ -321,8 +322,14 @@ def post_install_cleanup(ctx, vendor_dir): drop_dir(vendor_dir / "shutil_backports") drop_dir(vendor_dir / "cerberus" / "tests") drop_dir(vendor_dir / "cerberus" / "benchmarks") + drop_dir(vendor_dir / "colorama" / "tests") remove_all(vendor_dir.glob("toml.py")) + # this function is called twice hence try ... except ... + try: + (vendor_dir / "vistir" / "spin.py").unlink() + except FileNotFoundError: + pass @invoke.task diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index dc4929caa9..a33149674d 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -6,6 +6,7 @@ import shlex import shutil import traceback +import subprocess as sp import sys import warnings from pathlib import Path @@ -21,7 +22,6 @@ from pipenv.utils.processes import subprocess_run from pipenv.vendor import toml, tomlkit from pipenv.vendor.vistir.contextmanagers import temp_environ -from pipenv.vendor.vistir.misc import run from pipenv.vendor.vistir.path import ( create_tracked_tempdir, handle_remove_readonly ) @@ -498,10 +498,7 @@ def create(self): cmd = [ python, "-m", "virtualenv", self.path.absolute().as_posix() ] - c = run( - cmd, verbose=False, return_object=True, write_to_stdout=False, - combine_stderr=False, block=True, nospin=True, - ) + c = sp.run(cmd) assert c.returncode == 0 def activate(self):