Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add logic for handling large files in MultipartWriter uploads to s3 #796

Merged
merged 11 commits into from
Feb 22, 2024
114 changes: 87 additions & 27 deletions smart_open/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
# from the MIT License (MIT).
#
"""Implements file-like objects for reading and writing from/to AWS S3."""
from __future__ import annotations
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mpenkov this breaks compatibility for py3.6

Copy link
Collaborator

@mpenkov mpenkov Mar 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we no longer support Py3.6

python_requires=">=3.7,<4.0",


import http
import io
Expand All @@ -14,6 +15,12 @@
import time
import warnings

from typing import (
Callable,
List,
TYPE_CHECKING,
)

try:
import boto3
import botocore.client
Expand All @@ -28,18 +35,34 @@

from smart_open import constants

from typing import (
Callable,
List,
)

if TYPE_CHECKING:
from mypy_boto3_s3.client import S3Client
from typing_extensions import Buffer

logger = logging.getLogger(__name__)

DEFAULT_MIN_PART_SIZE = 50 * 1024**2
"""Default minimum part size for S3 multipart uploads"""
MIN_MIN_PART_SIZE = 5 * 1024 ** 2
#
# AWS puts restrictions on the part size for multipart uploads.
# Each part must be more than 5MB, and less than 5GB.
#
# On top of that, our MultipartWriter has a min_part_size option.
# In retrospect, it's an unfortunate name, because it conflicts with the
# minimum allowable part size (5MB), but it's too late to change it, because
# people are using that parameter (unlike the MIN, DEFAULT, MAX constants).
# It really just means "part size": as soon as you have this many bytes,
# write a part to S3 (see the MultipartWriter.write method).
#

MIN_PART_SIZE = 5 * 1024 ** 2
"""The absolute minimum permitted by Amazon."""

DEFAULT_PART_SIZE = 50 * 1024**2
"""The default part size for S3 multipart uploads, chosen carefully by smart_open"""

MAX_PART_SIZE = 5 * 1024 ** 3
"""The absolute maximum permitted by Amazon."""

SCHEMES = ("s3", "s3n", 's3u', "s3a")
DEFAULT_PORT = 443
DEFAULT_HOST = 's3.amazonaws.com'
Expand Down Expand Up @@ -286,7 +309,7 @@ def open(
mode,
version_id=None,
buffer_size=DEFAULT_BUFFER_SIZE,
min_part_size=DEFAULT_MIN_PART_SIZE,
min_part_size=DEFAULT_PART_SIZE,
multipart_upload=True,
defer_seek=False,
client=None,
Expand All @@ -306,12 +329,29 @@ def open(
buffer_size: int, optional
The buffer size to use when performing I/O.
min_part_size: int, optional
The minimum part size for multipart uploads. For writing only.
The minimum part size for multipart uploads, in bytes.

When the writebuffer contains this many bytes, smart_open will upload
the bytes to S3 as a single part of a multi-part upload, freeing the
buffer either partially or entirely. When you close the writer, it
will assemble the parts together.

The value determines the upper limit for the writebuffer. If buffer
space is short (e.g. you are buffering to memory), then use a smaller
value for min_part_size, or consider buffering to disk instead (see
the writebuffer option).

The value must be between 5MB and 5GB. If you specify a value outside
of this range, smart_open will adjust it for you, because otherwise the
upload _will_ fail.

For writing only. Does not apply if you set multipart_upload=False.
multipart_upload: bool, optional
Default: `True`
If set to `True`, will use multipart upload for writing to S3. If set
to `False`, S3 upload will use the S3 Single-Part Upload API, which
is more ideal for small file sizes.

For writing only.
version_id: str, optional
Version of the object, used when reading object.
Expand Down Expand Up @@ -358,10 +398,10 @@ def open(
fileobj = MultipartWriter(
bucket_id,
key_id,
min_part_size=min_part_size,
client=client,
client_kwargs=client_kwargs,
writebuffer=writebuffer,
part_size=min_part_size,
)
else:
fileobj = SinglepartWriter(
Expand Down Expand Up @@ -829,17 +869,21 @@ def __init__(
self,
bucket,
key,
min_part_size=DEFAULT_MIN_PART_SIZE,
part_size=DEFAULT_PART_SIZE,
client=None,
client_kwargs=None,
writebuffer=None,
writebuffer: io.BytesIO | None = None,
):
if min_part_size < MIN_MIN_PART_SIZE:
logger.warning("S3 requires minimum part size >= 5MB; \
multipart upload may fail")
self._min_part_size = min_part_size
adjusted_ps = smart_open.utils.clamp(part_size, MIN_PART_SIZE, MAX_PART_SIZE)
if part_size != adjusted_ps:
logger.warning(f"adjusting part_size from {part_size} to {adjusted_ps}")
part_size = adjusted_ps
self._part_size = part_size

_initialize_boto3(self, client, client_kwargs, bucket, key)
self._client: S3Client
self._bucket: str
self._key: str

try:
partial = functools.partial(
Expand All @@ -862,12 +906,12 @@ def __init__(

self._total_bytes = 0
self._total_parts = 0
self._parts = []
self._parts: list[dict[str, object]] = []

#
# This member is part of the io.BufferedIOBase interface.
#
self.raw = None
self.raw = None # type: ignore[assignment]

def flush(self):
pass
Expand Down Expand Up @@ -943,22 +987,38 @@ def tell(self):
def detach(self):
raise io.UnsupportedOperation("detach() not supported")

def write(self, b):
def write(self, b: Buffer) -> int:
"""Write the given buffer (bytes, bytearray, memoryview or any buffer
interface implementation) to the S3 file.

For more information about buffers, see https://docs.python.org/3/c-api/buffer.html

There's buffering happening under the covers, so this may not actually
do any HTTP transfer right away."""
offset = 0
mv = memoryview(b)
self._total_bytes += len(mv)

length = self._buf.write(b)
self._total_bytes += length
#
# botocore does not accept memoryview, otherwise we could've gotten
# away with not needing to write a copy to the buffer aside from cases
# where b is smaller than part_size
#
while offset < len(mv):
start = offset
end = offset + self._part_size - self._buf.tell()
self._buf.write(mv[start:end])
if self._buf.tell() < self._part_size:
#
# Not enough data to write a new part just yet. The assert
# ensures that we've consumed all of the input buffer.
#
assert end >= len(mv)
return len(mv)

if self._buf.tell() >= self._min_part_size:
self._upload_next_part()

return length
offset = end
return len(mv)

def terminate(self):
"""Cancel the underlying multipart upload."""
Expand All @@ -984,7 +1044,7 @@ def to_boto3(self, resource):
#
# Internal methods.
#
def _upload_next_part(self):
def _upload_next_part(self) -> None:
part_num = self._total_parts + 1
logger.info(
"%s: uploading part_num: %i, %i bytes (total %.3fGB)",
Expand Down Expand Up @@ -1033,10 +1093,10 @@ def __str__(self):
return "smart_open.s3.MultipartWriter(%r, %r)" % (self._bucket, self._key)

def __repr__(self):
return "smart_open.s3.MultipartWriter(bucket=%r, key=%r, min_part_size=%r)" % (
return "smart_open.s3.MultipartWriter(bucket=%r, key=%r, part_size=%r)" % (
self._bucket,
self._key,
self._min_part_size,
self._part_size,
)


Expand Down
74 changes: 55 additions & 19 deletions smart_open/tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,8 @@ def test_every_second_read_fails(self):
class ReaderTest(BaseTest):
def setUp(self):
# lower the multipart upload size, to speed up these tests
self.old_min_part_size = smart_open.s3.DEFAULT_MIN_PART_SIZE
smart_open.s3.DEFAULT_MIN_PART_SIZE = 5 * 1024**2
self.old_min_part_size = smart_open.s3.DEFAULT_PART_SIZE
smart_open.s3.DEFAULT_PART_SIZE = 5 * 1024**2

ignore_resource_warnings()

Expand All @@ -214,7 +214,7 @@ def setUp(self):
s3.Object(BUCKET_NAME, KEY_NAME).put(Body=self.body)

def tearDown(self):
smart_open.s3.DEFAULT_MIN_PART_SIZE = self.old_min_part_size
smart_open.s3.DEFAULT_PART_SIZE = self.old_min_part_size

def test_iter(self):
"""Are S3 files iterated over correctly?"""
Expand Down Expand Up @@ -461,27 +461,64 @@ def test_write_02(self):
fout.write(u"testžížáč".encode("utf-8"))
self.assertEqual(fout.tell(), 14)

#
# Nb. Under Windows, the byte offsets are different for some reason
#
@pytest.mark.skipif(condition=sys.platform == 'win32', reason="does not run on windows")
def test_write_03(self):
"""Does s3 multipart chunking work correctly?"""
# write
smart_open_write = smart_open.s3.MultipartWriter(
BUCKET_NAME, WRITE_KEY_NAME, min_part_size=10
)
with smart_open_write as fout:
fout.write(b"test")
self.assertEqual(fout._buf.tell(), 4)

fout.write(b"test\n")
self.assertEqual(fout._buf.tell(), 9)
self.assertEqual(fout._total_parts, 0)
#
# generate enough test data for a single multipart upload part.
# We need this because moto behaves like S3; it refuses to upload
# parts smaller than 5MB.
#
data_dir = os.path.join(os.path.dirname(__file__), "test_data")
with open(os.path.join(data_dir, "crime-and-punishment.txt"), "rb") as fin:
crime = fin.read()
data = b''
ps = 5 * 1024 * 1024
while len(data) < ps:
data += crime

title = "Преступление и наказание\n\n".encode()
to_be_continued = "\n\n... продолжение следует ...\n\n".encode()

with smart_open.s3.MultipartWriter(BUCKET_NAME, WRITE_KEY_NAME, part_size=ps) as fout:
#
# Write some data without triggering an upload
#
fout.write(title)
assert fout._total_parts == 0
assert fout._buf.tell() == 48

fout.write(b"test")
self.assertEqual(fout._buf.tell(), 0)
self.assertEqual(fout._total_parts, 1)
#
# Trigger a part upload
#
fout.write(data)
assert fout._total_parts == 1
assert fout._buf.tell() == 661

#
# Write _without_ triggering a part upload
#
fout.write(to_be_continued)
assert fout._total_parts == 1
assert fout._buf.tell() == 710

#
# We closed the writer, so the final part must have been uploaded
#
assert fout._buf.tell() == 0
assert fout._total_parts == 2

#
# read back the same key and check its content
output = list(smart_open.s3.open(BUCKET_NAME, WRITE_KEY_NAME, 'rb'))
self.assertEqual(output, [b"testtest\n", b"test"])
#
with smart_open.s3.open(BUCKET_NAME, WRITE_KEY_NAME, 'rb') as fin:
got = fin.read()
want = title + data + to_be_continued
assert want == got

def test_write_04(self):
"""Does writing no data cause key with an empty value to be created?"""
Expand Down Expand Up @@ -586,7 +623,6 @@ def test_write_gz_with_error(self):
) as fout:
fout.write(b"test12345678test12345678")
fout.write(b"test\n")

raise ValueError("some error")

# no multipart upload was committed:
Expand Down
Loading