Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement option to write to a custom buffer during S3 writes #547

Merged
merged 10 commits into from
Feb 15, 2021
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions help.txt
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,13 @@ FUNCTIONS
If set to `True` on a file opened for reading, GetObject will not be
called until the first seek() or read().
Avoids redundant API queries when seeking before reading.
writebuffer: IO[bytes], optional
By default, this module will buffer data in memory using io.BytesIO
when writing. Pass another binary IO instance here to use it instead.
For example, you may pass a file object to buffer to local disk instead
of in RAM. Use this to keep RAM usage low at the expense of additional
disk IO. If you pass in an open file, then you are responsible for
cleaning it up after writing completes.

scp (smart_open/ssh.py)
~~~~~~~~~~~~~~~~~~~~~~~
Expand Down Expand Up @@ -324,9 +331,9 @@ DATA
__all__ = ['open', 'parse_uri', 'register_compressor', 's3_iter_bucket...

VERSION
2.1.1
2.2.1

FILE
/home/misha/git/smart_open/smart_open/__init__.py
/Users/misha/git/smart_open/smart_open/__init__.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

osx migration, again :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Migration between home and office :)



29 changes: 29 additions & 0 deletions howto.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,35 @@ It is possible to save both CPU time and memory by sharing the same resource acr

The above sharing is safe because it is all happening in the same thread and subprocess (see below for details).

By default, `smart_open` buffers the most recent part of a multipart upload in memory.
The default part size is 50MB.
If you're concerned about memory usage, then you have two options.
The first option is to use smaller part sizes (e.g. 5MB, the lowest value permitted by AWS):

```python
import boto3
from smart_open import open
tp = {'min_part_size': 5 * 1024**2}
with open('s3://bucket/key', 'w', transport_params=tp) as fout:
fout.write(lots_of_data)
```

This will split your upload into smaller parts.
Be warned that AWS enforces a [limit](https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html) of a maximum of 10,000 parts per upload.

The second option is to use a temporary file as a buffer instead.

```python
import boto3
from smart_open import open
with tempfile.NamedTemporaryFile() as tmp:
tp = {'writebuffer': tmp}
with open('s3://bucket/key', 'w', transport_params=tp) as fout:
fout.write(lots_of_data)
```

This option reduces memory usage at the expense of additional disk I/O (writing to a reading from a hard disk is slower).
mpenkov marked this conversation as resolved.
Show resolved Hide resolved

## How to Work in a Parallelized Environment

Under the covers, `smart_open` uses the [boto3 resource API](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/resources.html) to read from S3.
Expand Down
45 changes: 33 additions & 12 deletions smart_open/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ def open(
singlepart_upload_kwargs=None,
object_kwargs=None,
defer_seek=False,
writebuffer=None,
):
"""Open an S3 object for reading or writing.

Expand Down Expand Up @@ -234,6 +235,13 @@ def open(
If set to `True` on a file opened for reading, GetObject will not be
called until the first seek() or read().
Avoids redundant API queries when seeking before reading.
writebuffer: IO[bytes], optional
By default, this module will buffer data in memory using io.BytesIO
when writing. Pass another binary IO instance here to use it instead.
For example, you may pass a file object to buffer to local disk instead
of in RAM. Use this to keep RAM usage low at the expense of additional
disk IO. If you pass in an open file, then you are responsible for
cleaning it up after writing completes.
"""
logger.debug('%r', locals())
if mode not in constants.BINARY_MODES:
Expand Down Expand Up @@ -264,6 +272,7 @@ def open(
resource=resource,
upload_kwargs=multipart_upload_kwargs,
resource_kwargs=resource_kwargs,
writebuffer=writebuffer,
)
else:
fileobj = SinglepartWriter(
Expand All @@ -273,6 +282,7 @@ def open(
resource=resource,
upload_kwargs=singlepart_upload_kwargs,
resource_kwargs=resource_kwargs,
writebuffer=writebuffer,
)
else:
assert False, 'unexpected mode: %r' % mode
Expand Down Expand Up @@ -743,6 +753,7 @@ def __init__(
resource=None,
resource_kwargs=None,
upload_kwargs=None,
writebuffer=None,
):
if min_part_size < MIN_MIN_PART_SIZE:
logger.warning("S3 requires minimum part size >= 5MB; \
Expand All @@ -767,7 +778,11 @@ def __init__(
)
) from error

self._buf = io.BytesIO()
if writebuffer is None:
self._buf = io.BytesIO()
else:
self._buf = writebuffer

self._total_bytes = 0
self._total_parts = 0
self._parts = []
Expand Down Expand Up @@ -898,7 +913,9 @@ def _upload_next_part(self):
logger.debug("%s: upload of part_num #%i finished", self, part_num)

self._total_parts += 1
self._buf = io.BytesIO()

self._buf.seek(0)
self._buf.truncate(0)
piskvorky marked this conversation as resolved.
Show resolved Hide resolved

def __enter__(self):
return self
Expand Down Expand Up @@ -937,15 +954,15 @@ class SinglepartWriter(io.BufferedIOBase):
the data be written to S3 and the buffer is released."""

def __init__(
self,
bucket,
key,
session=None,
resource=None,
resource_kwargs=None,
upload_kwargs=None,
):

self,
bucket,
key,
session=None,
resource=None,
resource_kwargs=None,
upload_kwargs=None,
writebuffer=None,
):
_initialize_boto3(self, session, resource, resource_kwargs)

if upload_kwargs is None:
Expand All @@ -959,7 +976,11 @@ def __init__(
except botocore.client.ClientError as e:
raise ValueError('the bucket %r does not exist, or is forbidden for access' % bucket) from e

self._buf = io.BytesIO()
if writebuffer is None:
self._buf = io.BytesIO()
else:
self._buf = writebuffer

self._total_bytes = 0

#
Expand Down
27 changes: 27 additions & 0 deletions smart_open/tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io
import logging
import os
import tempfile
import time
import unittest
import warnings
Expand Down Expand Up @@ -601,6 +602,19 @@ def test_to_boto3(self):
boto3_body = returned_obj.get()['Body'].read()
self.assertEqual(contents, boto3_body)

def test_writebuffer(self):
"""Does the MultipartWriter support writing to a custom buffer?"""
contents = b'get ready for a surprise'

with tempfile.NamedTemporaryFile(mode='rb+') as f:
with smart_open.s3.MultipartWriter(BUCKET_NAME, WRITE_KEY_NAME, writebuffer=f) as fout:
fout.write(contents)

with smart_open.s3.open(BUCKET_NAME, WRITE_KEY_NAME, 'rb') as fin:
actual = fin.read()

assert actual == contents


@moto.mock_s3
class SinglepartWriterTest(unittest.TestCase):
Expand Down Expand Up @@ -696,6 +710,19 @@ def test_flush_close(self):
fout.flush()
fout.close()

def test_writebuffer(self):
"""Does the SinglepartWriter support writing to a custom buffer?"""
contents = b'get ready for a surprise'

with tempfile.NamedTemporaryFile(mode='rb+') as f:
with smart_open.s3.SinglepartWriter(BUCKET_NAME, WRITE_KEY_NAME, writebuffer=f) as fout:
fout.write(contents)

with smart_open.s3.open(BUCKET_NAME, WRITE_KEY_NAME, 'rb') as fin:
actual = fin.read()

assert actual == contents


ARBITRARY_CLIENT_ERROR = botocore.client.ClientError(error_response={}, operation_name='bar')

Expand Down