From cde3eeb1d43cfe891dbcab5df7590a114093c815 Mon Sep 17 00:00:00 2001 From: Michael Penkov Date: Tue, 6 Oct 2020 23:50:40 +0900 Subject: [PATCH 1/9] Implement option to buffer to disk for S3 writes Currently, we buffer in-memory. This works well in most cases, but can be a problem when performing many uploads in parallel. Each upload needs to buffer at least 5MB of data (that is the minimum part size for AWS S3). When performing hundreds or thousands of uploads, memory can become a bottleneck. This PR relieves that bottleneck by buffering to temporary files instead. Each part gets saved to a separate temporary file and discarded after upload. This reduces memory usage at the cost of increased IO with the local disk. --- smart_open/s3.py | 31 ++++++++++++++++++++++++++++--- smart_open/tests/test_s3.py | 24 ++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 3 deletions(-) diff --git a/smart_open/s3.py b/smart_open/s3.py index c8c28a15..69666de4 100644 --- a/smart_open/s3.py +++ b/smart_open/s3.py @@ -10,6 +10,7 @@ import io import functools import logging +import tempfile import time try: @@ -184,6 +185,7 @@ def open( singlepart_upload_kwargs=None, object_kwargs=None, defer_seek=False, + diskbuffer=False, ): """Open an S3 object for reading or writing. @@ -227,6 +229,10 @@ def open( If set to `True` on a file opened for reading, GetObject will not be called until the first seek() or read(). Avoids redundant API queries when seeking before reading. + diskbuffer: boolean, optional + Default: `False` + If set to `True`, buffers to local disk instead of in RAM. + Use this to keep RAM usage low at the expense of additional disk IO. """ logger.debug('%r', locals()) if mode not in constants.BINARY_MODES: @@ -255,6 +261,7 @@ def open( session=session, upload_kwargs=multipart_upload_kwargs, resource_kwargs=resource_kwargs, + diskbuffer=diskbuffer, ) else: fileobj = SinglepartWriter( @@ -650,6 +657,17 @@ def __repr__(self): ) +def _open_buffer(diskbuffer=False): + if diskbuffer: + # + # NB We will be both writing _and_ reading (in that order, but that's + # purely and implementation detail) from the buffer. + # + return tempfile.NamedTemporaryFile(prefix='smart_open.s3.', mode='wb+') + else: + return io.BytesIO() + + class MultipartWriter(io.BufferedIOBase): """Writes bytes to S3 using the multi part API. @@ -663,6 +681,7 @@ def __init__( session=None, resource_kwargs=None, upload_kwargs=None, + diskbuffer=False, ): if min_part_size < MIN_MIN_PART_SIZE: logger.warning("S3 requires minimum part size >= 5MB; \ @@ -678,6 +697,7 @@ def __init__( self._session = session self._resource_kwargs = resource_kwargs self._upload_kwargs = upload_kwargs + self._diskbuffer = diskbuffer s3 = session.resource('s3', **resource_kwargs) try: @@ -692,7 +712,7 @@ def __init__( ) ) from error - self._buf = io.BytesIO() + self._buf = _open_buffer(self._diskbuffer) self._total_bytes = 0 self._total_parts = 0 self._parts = [] @@ -807,7 +827,8 @@ def _upload_next_part(self): logger.debug("upload of part #%i finished" % part_num) self._total_parts += 1 - self._buf = io.BytesIO() + self._buf.close() + self._buf = _open_buffer(self._diskbuffer) def __enter__(self): return self @@ -852,10 +873,12 @@ def __init__( session=None, resource_kwargs=None, upload_kwargs=None, + diskbuffer=False, ): self._session = session self._resource_kwargs = resource_kwargs + self._diskbuffer = diskbuffer if session is None: session = boto3.Session() @@ -873,7 +896,7 @@ def __init__( except botocore.client.ClientError as e: raise ValueError('the bucket %r does not exist, or is forbidden for access' % bucket) from e - self._buf = io.BytesIO() + self._buf = _open_buffer(self._diskbuffer) self._total_bytes = 0 # @@ -900,6 +923,8 @@ def close(self): 'the bucket %r does not exist, or is forbidden for access' % self._object.bucket_name) from e logger.debug("direct upload finished") + + self._buf.close() self._buf = None @property diff --git a/smart_open/tests/test_s3.py b/smart_open/tests/test_s3.py index 4d3f31f0..bbb9f1c6 100644 --- a/smart_open/tests/test_s3.py +++ b/smart_open/tests/test_s3.py @@ -494,6 +494,18 @@ def test_to_boto3(self): boto3_body = returned_obj.get()['Body'].read() self.assertEqual(contents, boto3_body) + def test_diskbuffer(self): + """Does the MultipartWriter support the diskbuffer feature?""" + contents = b'get ready for a surprise' + + with smart_open.s3.MultipartWriter(BUCKET_NAME, WRITE_KEY_NAME, diskbuffer=True) as fout: + fout.write(contents) + + with smart_open.s3.open(BUCKET_NAME, WRITE_KEY_NAME, 'rb') as fin: + actual = fin.read() + + assert actual == contents + @moto.mock_s3 class SinglepartWriterTest(unittest.TestCase): @@ -589,6 +601,18 @@ def test_flush_close(self): fout.flush() fout.close() + def test_diskbuffer(self): + """Does the SinglepartWriter support the diskbuffer feature?""" + contents = b'get ready for a surprise' + + with smart_open.s3.SinglepartWriter(BUCKET_NAME, WRITE_KEY_NAME, diskbuffer=True) as fout: + fout.write(contents) + + with smart_open.s3.open(BUCKET_NAME, WRITE_KEY_NAME, 'rb') as fin: + actual = fin.read() + + assert actual == contents + ARBITRARY_CLIENT_ERROR = botocore.client.ClientError(error_response={}, operation_name='bar') From c66550cf7229dc8e38287c9a6a1b208b1714ace7 Mon Sep 17 00:00:00 2001 From: Michael Penkov Date: Tue, 6 Oct 2020 23:56:22 +0900 Subject: [PATCH 2/9] update help.txt --- help.txt | 8 ++++++-- smart_open/s3.py | 2 +- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/help.txt b/help.txt index 653c096a..e4a1b046 100644 --- a/help.txt +++ b/help.txt @@ -165,6 +165,10 @@ FUNCTIONS If set to `True` on a file opened for reading, GetObject will not be called until the first seek() or read(). Avoids redundant API queries when seeking before reading. + diskbuffer: boolean, optional + Default: `False` + If set to `True`, buffers to local disk instead of in RAM. + Use this to keep RAM usage low at the expense of additional disk IO. scp (smart_open/ssh.py) ~~~~~~~~~~~~~~~~~~~~~~~ @@ -324,9 +328,9 @@ DATA __all__ = ['open', 'parse_uri', 'register_compressor', 's3_iter_bucket... VERSION - 2.1.1 + 2.2.1 FILE - /home/misha/git/smart_open/smart_open/__init__.py + /Users/misha/git/smart_open/smart_open/__init__.py diff --git a/smart_open/s3.py b/smart_open/s3.py index 69666de4..4caefec3 100644 --- a/smart_open/s3.py +++ b/smart_open/s3.py @@ -661,7 +661,7 @@ def _open_buffer(diskbuffer=False): if diskbuffer: # # NB We will be both writing _and_ reading (in that order, but that's - # purely and implementation detail) from the buffer. + # purely an implementation detail) from the buffer. # return tempfile.NamedTemporaryFile(prefix='smart_open.s3.', mode='wb+') else: From 6bf628a18d1ac6990cba84315ee3a60a79feade4 Mon Sep 17 00:00:00 2001 From: Michael Penkov Date: Wed, 7 Oct 2020 00:05:21 +0900 Subject: [PATCH 3/9] improve logging message --- smart_open/s3.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/smart_open/s3.py b/smart_open/s3.py index 4caefec3..ee2ee679 100644 --- a/smart_open/s3.py +++ b/smart_open/s3.py @@ -810,8 +810,14 @@ def to_boto3(self): # def _upload_next_part(self): part_num = self._total_parts + 1 - logger.info("uploading part #%i, %i bytes (total %.3fGB)", - part_num, self._buf.tell(), self._total_bytes / 1024.0 ** 3) + logger.info( + "uploading bucket %r key %r part #%i, %i bytes (total %.3fGB)", + self.object.bucket_name, + self.object.key, + part_num, + self._buf.tell(), + self._total_bytes / 1024.0 ** 3, + ) self._buf.seek(0) part = self._mp.Part(part_num) From e0ff3d1f96969e53d4940522101690a9567efa2a Mon Sep 17 00:00:00 2001 From: Michael Penkov Date: Wed, 7 Oct 2020 09:55:59 +0900 Subject: [PATCH 4/9] simplify implementation re-use the same buffer in the same writer instance --- smart_open/s3.py | 62 ++++++++++++++++++------------------- smart_open/tests/test_s3.py | 31 ++++++++++--------- 2 files changed, 47 insertions(+), 46 deletions(-) diff --git a/smart_open/s3.py b/smart_open/s3.py index ee2ee679..f66504ec 100644 --- a/smart_open/s3.py +++ b/smart_open/s3.py @@ -185,8 +185,8 @@ def open( singlepart_upload_kwargs=None, object_kwargs=None, defer_seek=False, - diskbuffer=False, - ): + writebuffer=None, + ): """Open an S3 object for reading or writing. Parameters @@ -229,10 +229,13 @@ def open( If set to `True` on a file opened for reading, GetObject will not be called until the first seek() or read(). Avoids redundant API queries when seeking before reading. - diskbuffer: boolean, optional - Default: `False` - If set to `True`, buffers to local disk instead of in RAM. - Use this to keep RAM usage low at the expense of additional disk IO. + writebuffer: IO[bytes], optional + By default, this module will buffer data in memory using io.BytesIO + when writing. Pass another binary IO instance here to use it instead. + For example, you may pass a file object to buffer to local disk instead + of in RAM. Use this to keep RAM usage low at the expense of additional + disk IO. If you pass in an open file, then you are responsible for + cleaning it up after writing completes. """ logger.debug('%r', locals()) if mode not in constants.BINARY_MODES: @@ -261,7 +264,7 @@ def open( session=session, upload_kwargs=multipart_upload_kwargs, resource_kwargs=resource_kwargs, - diskbuffer=diskbuffer, + writebuffer=writebuffer, ) else: fileobj = SinglepartWriter( @@ -270,6 +273,7 @@ def open( session=session, upload_kwargs=singlepart_upload_kwargs, resource_kwargs=resource_kwargs, + writebuffer=writebuffer, ) else: assert False, 'unexpected mode: %r' % mode @@ -657,17 +661,6 @@ def __repr__(self): ) -def _open_buffer(diskbuffer=False): - if diskbuffer: - # - # NB We will be both writing _and_ reading (in that order, but that's - # purely an implementation detail) from the buffer. - # - return tempfile.NamedTemporaryFile(prefix='smart_open.s3.', mode='wb+') - else: - return io.BytesIO() - - class MultipartWriter(io.BufferedIOBase): """Writes bytes to S3 using the multi part API. @@ -681,8 +674,8 @@ def __init__( session=None, resource_kwargs=None, upload_kwargs=None, - diskbuffer=False, - ): + writebuffer=None, + ): if min_part_size < MIN_MIN_PART_SIZE: logger.warning("S3 requires minimum part size >= 5MB; \ multipart upload may fail") @@ -697,7 +690,6 @@ def __init__( self._session = session self._resource_kwargs = resource_kwargs self._upload_kwargs = upload_kwargs - self._diskbuffer = diskbuffer s3 = session.resource('s3', **resource_kwargs) try: @@ -712,7 +704,11 @@ def __init__( ) ) from error - self._buf = _open_buffer(self._diskbuffer) + if writebuffer is None: + self._buf = io.BytesIO() + else: + self._buf = writebuffer + self._total_bytes = 0 self._total_parts = 0 self._parts = [] @@ -812,8 +808,8 @@ def _upload_next_part(self): part_num = self._total_parts + 1 logger.info( "uploading bucket %r key %r part #%i, %i bytes (total %.3fGB)", - self.object.bucket_name, - self.object.key, + self._object.bucket_name, + self._object.key, part_num, self._buf.tell(), self._total_bytes / 1024.0 ** 3, @@ -833,8 +829,9 @@ def _upload_next_part(self): logger.debug("upload of part #%i finished" % part_num) self._total_parts += 1 - self._buf.close() - self._buf = _open_buffer(self._diskbuffer) + + self._buf.seek(0) + self._buf.truncate(0) def __enter__(self): return self @@ -879,12 +876,11 @@ def __init__( session=None, resource_kwargs=None, upload_kwargs=None, - diskbuffer=False, - ): + writebuffer=None, + ): self._session = session self._resource_kwargs = resource_kwargs - self._diskbuffer = diskbuffer if session is None: session = boto3.Session() @@ -902,7 +898,11 @@ def __init__( except botocore.client.ClientError as e: raise ValueError('the bucket %r does not exist, or is forbidden for access' % bucket) from e - self._buf = _open_buffer(self._diskbuffer) + if writebuffer is None: + self._buf = io.BytesIO() + else: + self._buf = writebuffer + self._total_bytes = 0 # @@ -929,8 +929,6 @@ def close(self): 'the bucket %r does not exist, or is forbidden for access' % self._object.bucket_name) from e logger.debug("direct upload finished") - - self._buf.close() self._buf = None @property diff --git a/smart_open/tests/test_s3.py b/smart_open/tests/test_s3.py index bbb9f1c6..0188e390 100644 --- a/smart_open/tests/test_s3.py +++ b/smart_open/tests/test_s3.py @@ -10,6 +10,7 @@ import io import logging import os +import tempfile import time import unittest import warnings @@ -494,17 +495,18 @@ def test_to_boto3(self): boto3_body = returned_obj.get()['Body'].read() self.assertEqual(contents, boto3_body) - def test_diskbuffer(self): - """Does the MultipartWriter support the diskbuffer feature?""" + def test_writebuffer(self): + """Does the MultipartWriter support writing to a custom buffer?""" contents = b'get ready for a surprise' - with smart_open.s3.MultipartWriter(BUCKET_NAME, WRITE_KEY_NAME, diskbuffer=True) as fout: - fout.write(contents) + with tempfile.NamedTemporaryFile(mode='rb+') as f: + with smart_open.s3.MultipartWriter(BUCKET_NAME, WRITE_KEY_NAME, writebuffer=f) as fout: + fout.write(contents) - with smart_open.s3.open(BUCKET_NAME, WRITE_KEY_NAME, 'rb') as fin: - actual = fin.read() + with smart_open.s3.open(BUCKET_NAME, WRITE_KEY_NAME, 'rb') as fin: + actual = fin.read() - assert actual == contents + assert actual == contents @moto.mock_s3 @@ -601,17 +603,18 @@ def test_flush_close(self): fout.flush() fout.close() - def test_diskbuffer(self): - """Does the SinglepartWriter support the diskbuffer feature?""" + def test_writebuffer(self): + """Does the SinglepartWriter support writing to a custom buffer?""" contents = b'get ready for a surprise' - with smart_open.s3.SinglepartWriter(BUCKET_NAME, WRITE_KEY_NAME, diskbuffer=True) as fout: - fout.write(contents) + with tempfile.NamedTemporaryFile(mode='rb+') as f: + with smart_open.s3.SinglepartWriter(BUCKET_NAME, WRITE_KEY_NAME, writebuffer=f) as fout: + fout.write(contents) - with smart_open.s3.open(BUCKET_NAME, WRITE_KEY_NAME, 'rb') as fin: - actual = fin.read() + with smart_open.s3.open(BUCKET_NAME, WRITE_KEY_NAME, 'rb') as fin: + actual = fin.read() - assert actual == contents + assert actual == contents ARBITRARY_CLIENT_ERROR = botocore.client.ClientError(error_response={}, operation_name='bar') From 9f512dc2d8e00fc70f0369ae07f70a4a9d5109ed Mon Sep 17 00:00:00 2001 From: Michael Penkov Date: Wed, 7 Oct 2020 09:57:21 +0900 Subject: [PATCH 5/9] dedent --- smart_open/s3.py | 35 +++++++++++++++++------------------ 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/smart_open/s3.py b/smart_open/s3.py index f66504ec..2f7de3e8 100644 --- a/smart_open/s3.py +++ b/smart_open/s3.py @@ -667,15 +667,15 @@ class MultipartWriter(io.BufferedIOBase): Implements the io.BufferedIOBase interface of the standard library.""" def __init__( - self, - bucket, - key, - min_part_size=DEFAULT_MIN_PART_SIZE, - session=None, - resource_kwargs=None, - upload_kwargs=None, - writebuffer=None, - ): + self, + bucket, + key, + min_part_size=DEFAULT_MIN_PART_SIZE, + session=None, + resource_kwargs=None, + upload_kwargs=None, + writebuffer=None, + ): if min_part_size < MIN_MIN_PART_SIZE: logger.warning("S3 requires minimum part size >= 5MB; \ multipart upload may fail") @@ -870,15 +870,14 @@ class SinglepartWriter(io.BufferedIOBase): the data be written to S3 and the buffer is released.""" def __init__( - self, - bucket, - key, - session=None, - resource_kwargs=None, - upload_kwargs=None, - writebuffer=None, - ): - + self, + bucket, + key, + session=None, + resource_kwargs=None, + upload_kwargs=None, + writebuffer=None, + ): self._session = session self._resource_kwargs = resource_kwargs From fad32263c08aaac768987d05b668038eb3214647 Mon Sep 17 00:00:00 2001 From: Michael Penkov Date: Wed, 7 Oct 2020 10:05:50 +0900 Subject: [PATCH 6/9] update help.txt --- help.txt | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/help.txt b/help.txt index e4a1b046..ec5ebf0b 100644 --- a/help.txt +++ b/help.txt @@ -165,10 +165,13 @@ FUNCTIONS If set to `True` on a file opened for reading, GetObject will not be called until the first seek() or read(). Avoids redundant API queries when seeking before reading. - diskbuffer: boolean, optional - Default: `False` - If set to `True`, buffers to local disk instead of in RAM. - Use this to keep RAM usage low at the expense of additional disk IO. + writebuffer: IO[bytes], optional + By default, this module will buffer data in memory using io.BytesIO + when writing. Pass another binary IO instance here to use it instead. + For example, you may pass a file object to buffer to local disk instead + of in RAM. Use this to keep RAM usage low at the expense of additional + disk IO. If you pass in an open file, then you are responsible for + cleaning it up after writing completes. scp (smart_open/ssh.py) ~~~~~~~~~~~~~~~~~~~~~~~ From d9f5f54bb25c53f09d86c72cbeb220174d22198a Mon Sep 17 00:00:00 2001 From: Michael Penkov Date: Wed, 7 Oct 2020 10:08:28 +0900 Subject: [PATCH 7/9] fixup --- smart_open/s3.py | 1 - 1 file changed, 1 deletion(-) diff --git a/smart_open/s3.py b/smart_open/s3.py index 2f7de3e8..bb9f5c68 100644 --- a/smart_open/s3.py +++ b/smart_open/s3.py @@ -10,7 +10,6 @@ import io import functools import logging -import tempfile import time try: From 1e958ecf8be68ab4cf0e9bf20251851859bc87b1 Mon Sep 17 00:00:00 2001 From: Michael Penkov Date: Mon, 15 Feb 2021 15:23:50 +0900 Subject: [PATCH 8/9] update howto.md --- howto.md | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/howto.md b/howto.md index b96b9238..a6505de1 100644 --- a/howto.md +++ b/howto.md @@ -197,6 +197,35 @@ It is possible to save both CPU time and memory by sharing the same resource acr The above sharing is safe because it is all happening in the same thread and subprocess (see below for details). +By default, `smart_open` buffers the most recent part of a multipart upload in memory. +The default part size is 50MB. +If you're concerned about memory usage, then you have two options. +The first option is to use smaller part sizes (e.g. 5MB, the lowest value permitted by AWS): + +```python +import boto3 +from smart_open import open +tp = {'min_part_size': 5 * 1024**2} +with open('s3://bucket/key', 'w', transport_params=tp) as fout: + fout.write(lots_of_data) +``` + +This will split your upload into smaller parts. +Be warned that AWS enforces a [limit](https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html) of a maximum of 10,000 parts per upload. + +The second option is to use a temporary file as a buffer instead. + +```python +import boto3 +from smart_open import open +with tempfile.NamedTemporaryFile() as tmp: + tp = {'writebuffer': tmp} + with open('s3://bucket/key', 'w', transport_params=tp) as fout: + fout.write(lots_of_data) +``` + +This option reduces memory usage at the expense of additional disk I/O (writing to a reading from a hard disk is slower). + ## How to Work in a Parallelized Environment Under the covers, `smart_open` uses the [boto3 resource API](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/resources.html) to read from S3. From 5ee2a68959939419b526796674c68deab6c3fafa Mon Sep 17 00:00:00 2001 From: Michael Penkov Date: Mon, 15 Feb 2021 15:25:32 +0900 Subject: [PATCH 9/9] Update CHANGELOG.md --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index bfc88a8c..781a81b7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # Unreleased - Support tell() for text mode write on s3/gcs/azure (PR [#582](https://github.com/RaRe-Technologies/smart_open/pull/582), [@markopy](https://github.com/markopy)) +- Implement option to use a custom buffer during S3 writes (PR [#547](https://github.com/RaRe-Technologies/smart_open/pull/547), [@mpenkov](https://github.com/mpenkov)) # 4.1.2, 18 Jan 2021