Skip to content

Commit

Permalink
Merge pull request #1 from swansonk14/master
Browse files Browse the repository at this point in the history
Fetch upstream
  • Loading branch information
fcoclavero authored Jun 14, 2021
2 parents 5424163 + eb122f7 commit 2752ec4
Show file tree
Hide file tree
Showing 11 changed files with 336 additions and 496 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ gifs
p_tqdm.egg-info
__pycache__
*.pyc
.idea
6 changes: 3 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
language: python
python:
- "2.7"
- "3.4"
- "3.5"
- "3.6"
- "3.7"
- "3.8"
install:
- pip install -r requirements.txt
- pip install -e .
script:
- nosetests
2 changes: 1 addition & 1 deletion LICENSE.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Copyright (c) 2017 Kyle Swanson
Copyright (c) 2020 Kyle Swanson

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
16 changes: 11 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# p_tqdm

[![PyPI - Python Version](https://img.shields.io/pypi/pyversions/p-tqdm)](https://badge.fury.io/py/p-tqdm)
[![PyPI version](https://badge.fury.io/py/p-tqdm.svg)](https://badge.fury.io/py/p-tqdm)
[![Build Status](https://travis-ci.org/swansonk14/p_tqdm.svg?branch=master)](https://travis-ci.org/swansonk14/p_tqdm)

`p_tqdm` makes parallel processing with progress bars easy.
Expand All @@ -10,8 +12,6 @@

```pip install p_tqdm```

`p_tqdm` works with Python versions 2.7, 3.4, 3.5, 3.6.

## Example

Let's say you want to add two lists element by element. Without any parallelism, this can be done easily with a Python `map`.
Expand Down Expand Up @@ -150,19 +150,25 @@ for result in iterator:

### Arguments

All `p_tqdm` functions accept any number of lists (of the same length) as input, as long as the number of lists matches the number of arguments of the function. Additionally, if any non-list variable is passed as an input to a `p_tqdm` function, the variable will be passed to all calls of the function. See the example below.
All `p_tqdm` functions accept any number of iterables as input, as long as the number of iterables matches the number of arguments of the function.

To repeat a non-iterable argument along with the iterables, use Python's [partial](https://docs.python.org/3/library/functools.html#functools.partial) from the [functools](https://docs.python.org/3/library/functools.html) library. See the example below.

```python
from functools import partial

l1 = ['1', '2', '3']
l2 = ['a', 'b', 'c']

def add(a, b, c):
def add(a, b, c=''):
return a + b + c

added = p_map(add, l1, l2, '!')
added = p_map(partial(add, c='!'), l1, l2)
# added == ['1a!', '2b!', '3c!']
```

### CPUs

All the parallel `p_tqdm` functions can be passed the keyword `num_cpus` to indicate how many CPUs to use. The default is all CPUs. `num_cpus` can either be an integer to indicate the exact number of CPUs to use or a float to indicate the proportion of CPUs to use.

Note that the parallel Pool objects used by `p_tqdm` are automatically closed when the map finishes processing.
177 changes: 10 additions & 167 deletions p_tqdm/__init__.py
Original file line number Diff line number Diff line change
@@ -1,167 +1,10 @@
"""Map functions with tqdm progress bars for parallel and sequential processing.
p_map: Performs a parallel ordered map.
p_imap: Returns an iterator for a parallel ordered map.
p_umap: Performs a parallel unordered map.
p_uimap: Returns an iterator for a parallel unordered map.
t_map: Performs a sequential map.
t_imap: Returns an iterator for a sequential map.
"""

from pathos.helpers import cpu_count
from pathos.multiprocessing import ProcessingPool as Pool
from tqdm import tqdm

def _parallel(ordered, function, *arrays, **kwargs):
"""Returns an iterator for a parallel map with a progress bar.
Arguments:
ordered(bool): True for an ordered map, false for an unordered map.
function(function): The function to apply to each element
of the given arrays.
arrays(tuple): One or more arrays of the same length
containing the data to be mapped. If a non-list
variable is passed, it will be repeated a number
of times equal to the lengths of the list(s). If only
non-list variables are passed, the function will be
performed num_iter times.
num_cpus(int): The number of cpus to use in parallel.
If an int, uses that many cpus.
If a float, uses that proportion of cpus.
If None, uses all available cpus.
num_iter(int): If only non-list variables are passed, the
function will be performed num_iter times on
these variables. Default: 1.
Returns:
An iterator which will apply the function
to each element of the given arrays in
parallel in order with a progress bar.
"""

# Convert tuple to list
arrays = list(arrays)

# Extract kwargs
num_cpus = kwargs.get('num_cpus', None)
num_iter = kwargs.get('num_iter', 1)

# Determine num_cpus
if num_cpus is None:
num_cpus = cpu_count()
elif type(num_cpus) == float:
num_cpus = int(round(num_cpus * cpu_count()))

# Determine num_iter when at least one list is present
if any([type(array) == list for array in arrays]):
num_iter = max([len(array) for array in arrays if type(array) == list])

# Convert single variables to lists
# and confirm lists are same length
for i, array in enumerate(arrays):
if type(array) != list:
arrays[i] = [array for _ in range(num_iter)]
else:
assert len(array) == num_iter

# Create parallel iterator
map_type = 'imap' if ordered else 'uimap'
iterator = tqdm(getattr(Pool(num_cpus), map_type)(function, *arrays),
total=num_iter)

return iterator

def p_map(function, *arrays, **kwargs):
"""Performs a parallel ordered map with a progress bar."""

ordered = True
iterator = _parallel(ordered, function, *arrays, **kwargs)
result = list(iterator)

return result

def p_imap(function, *arrays, **kwargs):
"""Returns an iterator for a parallel ordered map with a progress bar."""

ordered = True
iterator = _parallel(ordered, function, *arrays, **kwargs)

return iterator

def p_umap(function, *arrays, **kwargs):
"""Performs a parallel unordered map with a progress bar."""

ordered = False
iterator = _parallel(ordered, function, *arrays, **kwargs)
result = list(iterator)

return result

def p_uimap(function, *arrays, **kwargs):
"""Returns an iterator for a parallel unordered map with a progress bar."""

ordered = False
iterator = _parallel(ordered, function, *arrays, **kwargs)

return iterator

def _sequential(function, *arrays, **kwargs):
"""Returns an iterator for a sequential map with a progress bar.
Arguments:
function(function): The function to apply to each element
of the given arrays.
arrays(tuple): One or more arrays of the same length
containing the data to be mapped. If a non-list
variable is passed, it will be repeated a number
of times equal to the lengths of the list(s). If only
non-list variables are passed, the function will be
performed num_iter times.
num_iter(int): If only non-list variables are passed, the
function will be performed num_iter times on
these variables. Default: 1.
Returns:
An iterator which will apply the function
to each element of the given arrays sequentially
in order with a progress bar.
"""

# Convert tuple to list
arrays = list(arrays)

# Extract kwargs
num_iter = kwargs.get('num_iter', 1)

# Determine num_iter when at least one list is present
if any([type(array) == list for array in arrays]):
num_iter = max([len(array) for array in arrays if type(array) == list])

# Convert single variables to lists
# and confirm lists are same length
for i, array in enumerate(arrays):
if type(array) != list:
arrays[i] = [array for _ in range(num_iter)]
else:
assert len(array) == num_iter

# Create parallel iterator
iterator = tqdm(map(function, *arrays),
total=num_iter)

return iterator

def t_map(function, *arrays, **kwargs):
"""Performs a sequential map with a progress bar."""

iterator = _sequential(function, *arrays, **kwargs)
result = list(iterator)

return result

def t_imap(function, *arrays, **kwargs):
"""Returns an iterator for a sequential map with a progress bar."""

iterator = _sequential(function, *arrays, **kwargs)

return iterator
from p_tqdm.p_tqdm import p_map, p_imap, p_umap, p_uimap, t_map, t_imap

__all__ = [
'p_map',
'p_imap',
'p_umap',
'p_uimap',
't_map',
't_imap'
]
127 changes: 127 additions & 0 deletions p_tqdm/p_tqdm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
"""Map functions with tqdm progress bars for parallel and sequential processing.
p_map: Performs a parallel ordered map.
p_imap: Returns an iterator for a parallel ordered map.
p_umap: Performs a parallel unordered map.
p_uimap: Returns an iterator for a parallel unordered map.
t_map: Performs a sequential map.
t_imap: Returns an iterator for a sequential map.
"""

from collections import Sized
from typing import Any, Callable, Generator, Iterable, List

from pathos.helpers import cpu_count
from pathos.multiprocessing import ProcessPool as Pool
from tqdm.auto import tqdm


def _parallel(ordered: bool, function: Callable, *iterables: Iterable, **kwargs: Any) -> Generator:
"""Returns a generator for a parallel map with a progress bar.
Arguments:
ordered(bool): True for an ordered map, false for an unordered map.
function(Callable): The function to apply to each element of the given Iterables.
iterables(Tuple[Iterable]): One or more Iterables containing the data to be mapped.
Returns:
A generator which will apply the function to each element of the given Iterables
in parallel in order with a progress bar.
"""

# Extract num_cpus
num_cpus = kwargs.pop('num_cpus', None)

# Determine num_cpus
if num_cpus is None:
num_cpus = cpu_count()
elif type(num_cpus) == float:
num_cpus = int(round(num_cpus * cpu_count()))

# Determine length of tqdm (equal to length of shortest iterable)
length = min(len(iterable) for iterable in iterables if isinstance(iterable, Sized))

# Create parallel generator
map_type = 'imap' if ordered else 'uimap'
pool = Pool(num_cpus)
map_func = getattr(pool, map_type)

for item in tqdm(map_func(function, *iterables), total=length, **kwargs):
yield item

pool.clear()


def p_map(function: Callable, *iterables: Iterable, **kwargs: Any) -> List[Any]:
"""Performs a parallel ordered map with a progress bar."""

ordered = True
generator = _parallel(ordered, function, *iterables, **kwargs)
result = list(generator)

return result


def p_imap(function: Callable, *iterables: Iterable, **kwargs: Any) -> Generator:
"""Returns a generator for a parallel ordered map with a progress bar."""

ordered = True
generator = _parallel(ordered, function, *iterables, **kwargs)

return generator


def p_umap(function: Callable, *iterables: Iterable, **kwargs: Any) -> List[Any]:
"""Performs a parallel unordered map with a progress bar."""

ordered = False
generator = _parallel(ordered, function, *iterables, **kwargs)
result = list(generator)

return result


def p_uimap(function: Callable, *iterables: Iterable, **kwargs: Any) -> Generator:
"""Returns a generator for a parallel unordered map with a progress bar."""

ordered = False
generator = _parallel(ordered, function, *iterables, **kwargs)

return generator


def _sequential(function: Callable, *iterables: Iterable, **kwargs: Any) -> Generator:
"""Returns a generator for a sequential map with a progress bar.
Arguments:
function(Callable): The function to apply to each element of the given Iterables.
iterables(Tuple[Iterable]): One or more Iterables containing the data to be mapped.
Returns:
A generator which will apply the function to each element of the given Iterables
sequentially in order with a progress bar.
"""

# Determine length of tqdm (equal to length of shortest iterable)
length = min(len(iterable) for iterable in iterables if isinstance(iterable, Sized))

# Create sequential generator
for item in tqdm(map(function, *iterables), total=length, **kwargs):
yield item


def t_map(function: Callable, *iterables: Iterable, **kwargs: Any) -> List[Any]:
"""Performs a sequential map with a progress bar."""

generator = _sequential(function, *iterables, **kwargs)
result = list(generator)

return result


def t_imap(function: Callable, *iterables: Iterable, **kwargs: Any) -> Generator:
"""Returns a generator for a sequential map with a progress bar."""

generator = _sequential(function, *iterables, **kwargs)

return generator
Empty file removed p_tqdm/tests/__init__.py
Empty file.
Loading

0 comments on commit 2752ec4

Please sign in to comment.