Skip to content

Commit

Permalink
Refactor changes
Browse files Browse the repository at this point in the history
  • Loading branch information
kidrahahjo committed Sep 1, 2020
1 parent 0d3402d commit 5493b4b
Showing 1 changed file with 22 additions and 14 deletions.
36 changes: 22 additions & 14 deletions flow/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -1886,19 +1886,20 @@ def _print_status(iterable, fetch_status, description):

singleton_groups = [op for op in self.operations]

def _generate_results_with_tqdm(iterable, desc):
return list(tqdm(
iterable=iterable, desc=desc,
total=len(iterable), file=err))

with self._potentially_buffered():
try:
if status_parallelization == 'thread':
with contextlib.closing(ThreadPool()) as pool:
# First attempt at parallelized status determination.
# This may fail on systems that don't allow threads.
label_results = _generate_results_with_tqdm('job-labels', pool.imap)
op_results = _generate_results_with_tqdm('groups', pool.imap)
label_results = list(tqdm(
iterable=pool.imap(_get_job_labels, distinct_jobs),
desc="Collecting job label info", total=len(distinct_jobs),
file=err))
op_results = list(tqdm(
iterable=pool.imap(_get_group_status, singleton_groups),
desc="Collecting operation status", total=len(singleton_groups),
file=err))
elif status_parallelization == 'process':
with contextlib.closing(Pool()) as pool:
try:
Expand Down Expand Up @@ -1928,14 +1929,21 @@ def _generate_results_with_tqdm(iterable, desc):
raise RuntimeError(
"Unable to parallelize execution due to a pickling "
"error: {}.".format(error))
label_results = _generate_results_with_tqdm(
l_results, desc="Collecting job label info", len_itr=len(distinct_jobs))
op_results = _generate_results_with_tqdm(
g_results, desc="Collecting operation status",
len_itr=len(singleton_groups))
label_results = list(tqdm(
iterable=l_results, desc="Collecting job label info",
total=len(distinct_jobs), file=err))
op_results = list(tqdm(
iterable=g_results, desc="Collecting operation status",
total=len(singleton_groups), file=err))
elif status_parallelization == 'none':
label_results = _generate_results_with_tqdm('job-labels')
op_results = _generate_results_with_tqdm('groups')
label_results = list(tqdm(
iterable=map(_get_job_labels, distinct_jobs),
desc="Collecting job label info", total=len(distinct_jobs),
file=err))
op_results = list(tqdm(
iterable=map(_get_group_status, singleton_groups),
desc="Collecting operation status", total=len(singleton_groups),
file=err))
else:
raise RuntimeError("Configuration value status_parallelization is invalid. "
"You can set it to 'thread', 'parallel', or 'none'")
Expand Down

0 comments on commit 5493b4b

Please sign in to comment.