From 92dde0a84bb38df8a23f16b2ff9dcefaf9902c36 Mon Sep 17 00:00:00 2001 From: Frost Ming Date: Thu, 17 Dec 2020 10:09:26 +0800 Subject: [PATCH] retry failed jobs --- news/197.feature | 1 + pdm/core.py | 4 ++- pdm/installers/synchronizers.py | 45 ++++++++++++++++++--------------- 3 files changed, 29 insertions(+), 21 deletions(-) create mode 100644 news/197.feature diff --git a/news/197.feature b/news/197.feature new file mode 100644 index 0000000000..0ecab8afa1 --- /dev/null +++ b/news/197.feature @@ -0,0 +1 @@ +Retry failed jobs when syncing packages. diff --git a/pdm/core.py b/pdm/core.py index 066bb5249f..8b263056e5 100644 --- a/pdm/core.py +++ b/pdm/core.py @@ -109,7 +109,9 @@ def main(self, args=None, prog_name=None, obj=None, **extra): etype, err, traceback = sys.exc_info() if stream.verbosity > stream.NORMAL: raise err.with_traceback(traceback) - stream.echo("[{}]: {}".format(etype.__name__, err), err=True) + stream.echo( + f"{stream.red('[' + etype.__name__ + ']')}: {err}", err=True + ) sys.exit(1) def register_command( diff --git a/pdm/installers/synchronizers.py b/pdm/installers/synchronizers.py index f6225b31de..c8f8063a5b 100644 --- a/pdm/installers/synchronizers.py +++ b/pdm/installers/synchronizers.py @@ -1,8 +1,8 @@ import contextlib +import functools import multiprocessing import traceback from concurrent.futures.thread import ThreadPoolExecutor -from functools import partial from typing import Dict, List, Tuple from pip._vendor.pkg_resources import Distribution, safe_name @@ -61,19 +61,20 @@ def __exit__(self, *args, **kwargs): class Synchronizer: """Synchronize the working set with given installation candidates""" - RETRY_TIMES = 1 SEQUENTIAL_PACKAGES = ("pip", "setuptools", "wheel") def __init__( self, candidates: Dict[str, Candidate], environment: Environment, + retry_times: int = 1, ) -> None: self.candidates = candidates self.environment = environment self.parallel = environment.project.config["parallel_install"] self.all_candidates = environment.project.get_locked_candidates("__all__") self.working_set = environment.get_working_set() + self.retry_times = retry_times @contextlib.contextmanager def create_executor(self): @@ -271,33 +272,37 @@ def synchronize(self, clean: bool = True, dry_run: bool = False) -> None: parallel_jobs.append((kind, key)) errors: List[str] = [] - with stream.indent(" "): - for job in sequential_jobs: - kind, key = job - try: - handlers[kind](key) - except Exception as err: - errors.append(f"{kind} {stream.green(key)} failed:\n") - errors.extend( - traceback.format_exception(type(err), err, err.__traceback__) - ) + failed_jobs: List[Tuple[str, str]] = [] def update_progress(future, kind, key): if future.exception(): - errors.append(f"{kind} {stream.green(key)} failed:\n") + failed_jobs.append((kind, key)) error = future.exception() errors.extend( - traceback.format_exception(type(error), error, error.__traceback__) + [f"{kind} {stream.green(key)} failed:\n"] + + traceback.format_exception( + type(error), error, error.__traceback__ + ) ) with stream.logging("install"): - with stream.indent(" "), self.create_executor() as executor: - for job in parallel_jobs: + with stream.indent(" "): + for job in sequential_jobs: kind, key = job - future = executor.submit(handlers[kind], key) - future.add_done_callback( - partial(update_progress, kind=kind, key=key) - ) + handlers[kind](key) + for i in range(self.retry_times + 1): + with self.create_executor() as executor: + for job in parallel_jobs: + kind, key = job + future = executor.submit(handlers[kind], key) + future.add_done_callback( + functools.partial(update_progress, kind=kind, key=key) + ) + if not failed_jobs or i == self.retry_times: + break + parallel_jobs, failed_jobs = failed_jobs, [] + errors.clear() + stream.echo("Retry failed jobs") if errors: stream.echo(stream.red("\nERRORS:"))