Skip to content

Commit

Permalink
Merge pull request #199 from frostming/feature/197
Browse files Browse the repository at this point in the history
retry failed jobs
  • Loading branch information
frostming authored Dec 17, 2020
2 parents dc83985 + 92dde0a commit 9d1327d
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 21 deletions.
1 change: 1 addition & 0 deletions news/197.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Retry failed jobs when syncing packages.
4 changes: 3 additions & 1 deletion pdm/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ def main(self, args=None, prog_name=None, obj=None, **extra):
etype, err, traceback = sys.exc_info()
if stream.verbosity > stream.NORMAL:
raise err.with_traceback(traceback)
stream.echo("[{}]: {}".format(etype.__name__, err), err=True)
stream.echo(
f"{stream.red('[' + etype.__name__ + ']')}: {err}", err=True
)
sys.exit(1)

def register_command(
Expand Down
45 changes: 25 additions & 20 deletions pdm/installers/synchronizers.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import contextlib
import functools
import multiprocessing
import traceback
from concurrent.futures.thread import ThreadPoolExecutor
from functools import partial
from typing import Dict, List, Tuple

from pip._vendor.pkg_resources import Distribution, safe_name
Expand Down Expand Up @@ -61,19 +61,20 @@ def __exit__(self, *args, **kwargs):
class Synchronizer:
"""Synchronize the working set with given installation candidates"""

RETRY_TIMES = 1
SEQUENTIAL_PACKAGES = ("pip", "setuptools", "wheel")

def __init__(
self,
candidates: Dict[str, Candidate],
environment: Environment,
retry_times: int = 1,
) -> None:
self.candidates = candidates
self.environment = environment
self.parallel = environment.project.config["parallel_install"]
self.all_candidates = environment.project.get_locked_candidates("__all__")
self.working_set = environment.get_working_set()
self.retry_times = retry_times

@contextlib.contextmanager
def create_executor(self):
Expand Down Expand Up @@ -271,33 +272,37 @@ def synchronize(self, clean: bool = True, dry_run: bool = False) -> None:
parallel_jobs.append((kind, key))

errors: List[str] = []
with stream.indent(" "):
for job in sequential_jobs:
kind, key = job
try:
handlers[kind](key)
except Exception as err:
errors.append(f"{kind} {stream.green(key)} failed:\n")
errors.extend(
traceback.format_exception(type(err), err, err.__traceback__)
)
failed_jobs: List[Tuple[str, str]] = []

def update_progress(future, kind, key):
if future.exception():
errors.append(f"{kind} {stream.green(key)} failed:\n")
failed_jobs.append((kind, key))
error = future.exception()
errors.extend(
traceback.format_exception(type(error), error, error.__traceback__)
[f"{kind} {stream.green(key)} failed:\n"]
+ traceback.format_exception(
type(error), error, error.__traceback__
)
)

with stream.logging("install"):
with stream.indent(" "), self.create_executor() as executor:
for job in parallel_jobs:
with stream.indent(" "):
for job in sequential_jobs:
kind, key = job
future = executor.submit(handlers[kind], key)
future.add_done_callback(
partial(update_progress, kind=kind, key=key)
)
handlers[kind](key)
for i in range(self.retry_times + 1):
with self.create_executor() as executor:
for job in parallel_jobs:
kind, key = job
future = executor.submit(handlers[kind], key)
future.add_done_callback(
functools.partial(update_progress, kind=kind, key=key)
)
if not failed_jobs or i == self.retry_times:
break
parallel_jobs, failed_jobs = failed_jobs, []
errors.clear()
stream.echo("Retry failed jobs")

if errors:
stream.echo(stream.red("\nERRORS:"))
Expand Down

0 comments on commit 9d1327d

Please sign in to comment.