Skip to content

Commit

Permalink
Updated iter_bucket to use concurrent futures. (#368)
Browse files Browse the repository at this point in the history
* Updated iter_bucket to use concurrent futures.

This commit addresses issue #340.
AWS Lambda environments do not support multiprocessing.Queue or
multiprocessing.Pool, which are used by iter_bucket to optimize the
pulling of files from s3.

Solution: Switch to using concurrent.futures.ThreadPoolExecutor instead.
This still optimizes the pulling of files from s3 without using new
processes.

* disable test_old when mocks are disabled

* favor multiprocessing over concurrent.futures

* make imap_unordered return an iterator instead of a list

* skip tests when their respective features are unavailable

* Revert "disable test_old when mocks are disabled"

This reverts commit 6506562.

* tweak imap_unordered

* remove tests_require pins

Co-authored-by: Michael Penkov <[email protected]>
  • Loading branch information
derpferd and mpenkov authored Mar 11, 2020
1 parent 68a39d9 commit b0418fd
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 10 deletions.
8 changes: 2 additions & 6 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,10 @@ def read(fname):

tests_require = [
'mock',
'moto==1.3.4',
'moto[server]',
'pathlib2',
'responses',
# Temporary pin boto3 & botocore, because moto doesn't work with new version
# See https://github.com/spulec/moto/issues/1793 and
# https://github.com/RaRe-Technologies/smart_open/issues/227
'boto3 < 1.8.0',
# 'botocore < 1.11.0'
'boto3',
# Not used directly but allows boto GCE plugins to load.
# https://github.com/GoogleCloudPlatform/compute-image-packages/issues/262
'google-compute-engine==2.8.12',
Expand Down
29 changes: 28 additions & 1 deletion smart_open/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@

logger = logging.getLogger(__name__)

# AWS Lambda environments do not support multiprocessing.Queue or multiprocessing.Pool.
# However they do support Threads and therefore concurrent.futures's ThreadPoolExecutor.
# We use this flag to allow python 2 backward compatibility, where concurrent.futures doesn't exist.
_CONCURRENT_FUTURES = False
try:
import concurrent.futures
_CONCURRENT_FUTURES = True
except ImportError:
warnings.warn("concurrent.futures could not be imported and won't be used")

# Multiprocessing is unavailable in App Engine (and possibly other sandboxes).
# The only method currently relying on it is iter_bucket, which is instructed
# whether to use it by the MULTIPROCESSING flag.
Expand Down Expand Up @@ -832,11 +842,28 @@ def terminate(self):
pass


class ConcurrentFuturesPool(object):
"""A class that mimics multiprocessing.pool.Pool but uses concurrent futures instead of processes."""
def __init__(self, max_workers):
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers)

def imap_unordered(self, function, items):
futures = [self.executor.submit(function, item) for item in items]
for future in concurrent.futures.as_completed(futures):
yield future.result()

def terminate(self):
self.executor.shutdown(wait=True)


@contextlib.contextmanager
def _create_process_pool(processes=1):
if _MULTIPROCESSING and processes:
logger.info("creating pool with %i workers", processes)
logger.info("creating multiprocessing pool with %i workers", processes)
pool = multiprocessing.pool.Pool(processes=processes)
elif _CONCURRENT_FUTURES and processes:
logger.info("creating concurrent futures pool with %i workers", processes)
pool = ConcurrentFuturesPool(max_workers=processes)
else:
logger.info("creating dummy pool")
pool = DummyPool()
Expand Down
53 changes: 50 additions & 3 deletions smart_open/tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ def test_old(self):

# first, create some keys in the bucket
expected = {}
for key_no in range(200):
for key_no in range(42):
key_name = "mykey%s" % key_no
with smart_open.smart_open("s3://%s/%s" % (BUCKET_NAME, key_name), 'wb') as fout:
content = '\n'.join("line%i%i" % (key_no, line_no) for line_no in range(10)).encode('utf8')
Expand Down Expand Up @@ -542,16 +542,63 @@ def test_old(self):
self.assertEqual(result, expected)


@maybe_mock_s3
@unittest.skipIf(not smart_open.s3._CONCURRENT_FUTURES, 'concurrent.futures unavailable')
class IterBucketConcurrentFuturesTest(unittest.TestCase):
def setUp(self):
self.old_flag_multi = smart_open.s3._MULTIPROCESSING
smart_open.s3._MULTIPROCESSING = False
ignore_resource_warnings()

def tearDown(self):
smart_open.s3._MULTIPROCESSING = self.old_flag_multi
cleanup_bucket()

def test(self):
num_keys = 101
populate_bucket(num_keys=num_keys)
keys = list(smart_open.s3.iter_bucket(BUCKET_NAME))
self.assertEqual(len(keys), num_keys)

expected = [('key_%d' % x, b'%d' % x) for x in range(num_keys)]
self.assertEqual(sorted(keys), sorted(expected))


@maybe_mock_s3
@unittest.skipIf(not smart_open.s3._MULTIPROCESSING, 'multiprocessing unavailable')
class IterBucketMultiprocessingTest(unittest.TestCase):
def setUp(self):
self.old_flag_concurrent = smart_open.s3._CONCURRENT_FUTURES
smart_open.s3._CONCURRENT_FUTURES = False
ignore_resource_warnings()

def tearDown(self):
smart_open.s3._CONCURRENT_FUTURES = self.old_flag_concurrent
cleanup_bucket()

def test(self):
num_keys = 101
populate_bucket(num_keys=num_keys)
keys = list(smart_open.s3.iter_bucket(BUCKET_NAME))
self.assertEqual(len(keys), num_keys)

expected = [('key_%d' % x, b'%d' % x) for x in range(num_keys)]
self.assertEqual(sorted(keys), sorted(expected))


@maybe_mock_s3
class IterBucketSingleProcessTest(unittest.TestCase):
def setUp(self):
self.old_flag = smart_open.s3._MULTIPROCESSING
self.old_flag_multi = smart_open.s3._MULTIPROCESSING
self.old_flag_concurrent = smart_open.s3._CONCURRENT_FUTURES
smart_open.s3._MULTIPROCESSING = False
smart_open.s3._CONCURRENT_FUTURES = False

ignore_resource_warnings()

def tearDown(self):
smart_open.s3._MULTIPROCESSING = self.old_flag
smart_open.s3._MULTIPROCESSING = self.old_flag_multi
smart_open.s3._CONCURRENT_FUTURES = self.old_flag_concurrent
cleanup_bucket()

def test(self):
Expand Down

0 comments on commit b0418fd

Please sign in to comment.