Skip to content

Commit

Permalink
Fix problems from 1.5.4 release. Fix #153, #154 , partial fix #152 (#155
Browse files Browse the repository at this point in the history
)

* use boto3 instead of boto in s3 tests

Looks like moto doesn't work with boto anymore.
Works fine with boto3.

* Override .readline() in s3.BufferedInputBase, increase buffer size

The existing BufferedInputBase didn't override the .readline() method,
forcing the superclass implementation to use .read() to read one byte at
a time. This slowed reading down significantly.

Also increased the buffer size to 256kB, this is consistent with
s3transfer.

http://boto3.readthedocs.io/en/latest/_modules/boto3/s3/transfer.html

* rewrite try..except as an if-else, it is faster that way

* get rid of unused and incorrect TEXT_NEWLINE

* Resolve Issue #154: don't create buckets if they don't exist

When writing, check if the bucket exists and raise a ValueError if it
doesn't.

* fixup for d453af2: create the correct bucket in top-level unit tests

* Resolve Issue #153: don't wrap GzipFile in contextlib.closing

This is no longer necessary since we dropped support for Python 2.6.
The GzipFile from Py2.7 and above is already a context manager, so the
closing is not required.

https://docs.python.org/2/library/gzip.html

* Support errors keyword

* add some integration tests, focusing on S3 only for now

* Specify utf-8 encoding explicitly in tests

If we don't specify it, smart_open will use the system encoding.  This
may be ascii on some systems (e.g. Py2.7), which will fail the test
because it contains non-ascii characters.

* Refactored S3 subsystem to disable seeking

Renamed RawReader to SeekableRawReader.
Renamed BufferedInputBase to SeekableBufferedInputBase.
Introduced new, non-seekable RawReader and BufferedInputBase.
Seeking functionality was strictly necessary while we were supporting
Py2.6, because the gzip reader required it back then.  The gzip reader
from 2.7 onwards does not require seeking, so neither do we.

Seeking is convenient, but appears to be slower, so disabling it for now
is the right thing to do.

* Re-enable seeking for S3

It appears Py2.7 gzip still requires seeking.  Py3 gzip does not.
We're still supporting Py2.7, so we need seeking to work if we continue
to use the existing gzip reader.

* fixup for 3e93fb1: point unit tests at seekable S3 object
  • Loading branch information
mpenkov authored and menshikh-iv committed Dec 6, 2017
1 parent d10166c commit 170e295
Show file tree
Hide file tree
Showing 6 changed files with 450 additions and 128 deletions.
47 changes: 47 additions & 0 deletions integration-tests/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
This directory contains integration tests for smart_open.
To run the tests, you need read/write access to an S3 bucket.
Also, you need to install py.test and its benchmarks addon:

pip install pytest pytest_benchmark

Then, to run the tests, run:

SMART_OPEN_S3_URL=s3://bucket/smart_open_test py.test integration-tests/test_s3.py

You may use any key name instead of "smart_open_test".
It does not have to be an existing key.
**The tests will remove the key prior to each test, so be sure the key doesn't contain anything important.**

The tests will take several minutes to complete.
Each test will run several times to obtain summary statistics such as min, max, mean and median.
This allows us to detect regressions in performance.
Here is some example output (you need a wide screen to get the best of it):

```
(smartopen)sergeyich:smart_open misha$ SMART_OPEN_S3_URL=s3://bucket/smart_open_test py.test integration-tests/test_s3.py
=============================================== test session starts ================================================
platform darwin -- Python 3.6.3, pytest-3.3.0, py-1.5.2, pluggy-0.6.0
benchmark: 3.1.1 (defaults: timer=time.perf_counter disable_gc=False min_rounds=5 min_time=0.000005 max_time=1.0 calibration_precision=10 warmup=False warmup_iterations=100000)
rootdir: /Users/misha/git/smart_open, inifile:
plugins: benchmark-3.1.1
collected 6 items
integration-tests/test_s3.py ...... [100%]
--------------------------------------------------------------------------------------- benchmark: 6 tests --------------------------------------------------------------------------------------
Name (time in s) Min Max Mean StdDev Median IQR Outliers OPS Rounds Iterations
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
test_s3_readwrite_text 2.7593 (1.0) 3.4935 (1.0) 3.2203 (1.0) 0.3064 (1.0) 3.3202 (1.04) 0.4730 (1.0) 1;0 0.3105 (1.0) 5 1
test_s3_readwrite_text_gzip 3.0242 (1.10) 4.6782 (1.34) 3.7079 (1.15) 0.8531 (2.78) 3.2001 (1.0) 1.5850 (3.35) 2;0 0.2697 (0.87) 5 1
test_s3_readwrite_binary 3.0549 (1.11) 3.9062 (1.12) 3.5399 (1.10) 0.3516 (1.15) 3.4721 (1.09) 0.5532 (1.17) 2;0 0.2825 (0.91) 5 1
test_s3_performance_gz 3.1885 (1.16) 5.2845 (1.51) 3.9298 (1.22) 0.8197 (2.68) 3.6974 (1.16) 0.9693 (2.05) 1;0 0.2545 (0.82) 5 1
test_s3_readwrite_binary_gzip 3.3756 (1.22) 5.0423 (1.44) 4.1763 (1.30) 0.6381 (2.08) 4.0722 (1.27) 0.9209 (1.95) 2;0 0.2394 (0.77) 5 1
test_s3_performance 7.6758 (2.78) 29.5266 (8.45) 18.8346 (5.85) 10.3003 (33.62) 21.1854 (6.62) 19.6234 (41.49) 3;0 0.0531 (0.17) 5 1
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Legend:
Outliers: 1 Standard Deviation from Mean; 1.5 IQR (InterQuartile Range) from 1st Quartile and 3rd Quartile.
OPS: Operations Per Second, computed as 1 / Mean
============================================ 6 passed in 285.14 seconds ============================================
```
83 changes: 83 additions & 0 deletions integration-tests/test_s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from __future__ import unicode_literals
import io
import os
import subprocess

import smart_open

_S3_URL = os.environ.get('SMART_OPEN_S3_URL')
assert _S3_URL is not None, 'please set the SMART_OPEN_S3_URL environment variable'


def initialize_bucket():
subprocess.check_call(['aws', 's3', 'rm', '--recursive', _S3_URL])


def write_read(key, content, write_mode, read_mode):
with smart_open.smart_open(key, write_mode) as fout:
fout.write(content)
with smart_open.smart_open(key, read_mode) as fin:
actual = fin.read()
return actual


def test_s3_readwrite_text(benchmark):
initialize_bucket()

key = _S3_URL + '/sanity.txt'
text = 'с гранатою в кармане, с чекою в руке'
actual = benchmark(write_read, key, text, 'w', 'r')
assert actual == text


def test_s3_readwrite_text_gzip(benchmark):
initialize_bucket()

key = _S3_URL + '/sanity.txt.gz'
text = 'не чайки здесь запели на знакомом языке'
actual = benchmark(write_read, key, text, 'w', 'r')
assert actual == text


def test_s3_readwrite_binary(benchmark):
initialize_bucket()

key = _S3_URL + '/sanity.txt'
binary = b'this is a test'
actual = benchmark(write_read, key, binary, 'wb', 'rb')
assert actual == binary


def test_s3_readwrite_binary_gzip(benchmark):
initialize_bucket()

key = _S3_URL + '/sanity.txt.gz'
binary = b'this is a test'
actual = benchmark(write_read, key, binary, 'wb', 'rb')
assert actual == binary


def test_s3_performance(benchmark):
initialize_bucket()

one_megabyte = io.BytesIO()
for _ in range(1024*128):
one_megabyte.write(b'01234567')
one_megabyte = one_megabyte.getvalue()

key = _S3_URL + '/performance.txt'
actual = benchmark(write_read, key, one_megabyte, 'wb', 'rb')
assert actual == one_megabyte


def test_s3_performance_gz(benchmark):
initialize_bucket()

one_megabyte = io.BytesIO()
for _ in range(1024*128):
one_megabyte.write(b'01234567')
one_megabyte = one_megabyte.getvalue()

key = _S3_URL + '/performance.txt.gz'
actual = benchmark(write_read, key, one_megabyte, 'wb', 'rb')
assert actual == one_megabyte
179 changes: 126 additions & 53 deletions smart_open/s3.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
"""Implements file-like objects for reading and writing from/to S3."""
import boto3
import botocore.client

import io
import logging
Expand Down Expand Up @@ -28,6 +29,9 @@
MODES = (READ, READ_BINARY, WRITE, WRITE_BINARY)
"""Allowed I/O modes for working with S3."""

BINARY_NEWLINE = b'\n'
DEFAULT_BUFFER_SIZE = 256 * 1024


def _range_string(start, stop=None):
#
Expand All @@ -47,15 +51,14 @@ def open(bucket_id, key_id, mode, **kwargs):
if mode not in MODES:
raise NotImplementedError('bad mode: %r expected one of %r' % (mode, MODES))

buffer_size = kwargs.pop("buffer_size", io.DEFAULT_BUFFER_SIZE)
encoding = kwargs.pop("encoding", "utf-8")
errors = kwargs.pop("errors", None)
newline = kwargs.pop("newline", None)
line_buffering = kwargs.pop("line_buffering", False)
s3_min_part_size = kwargs.pop("s3_min_part_size", DEFAULT_MIN_PART_SIZE)

if mode in (READ, READ_BINARY):
fileobj = BufferedInputBase(bucket_id, key_id, **kwargs)
fileobj = SeekableBufferedInputBase(bucket_id, key_id, **kwargs)
elif mode in (WRITE, WRITE_BINARY):
fileobj = BufferedOutputBase(bucket_id, key_id, min_part_size=s3_min_part_size, **kwargs)
else:
Expand All @@ -72,6 +75,21 @@ def open(bucket_id, key_id, mode, **kwargs):

class RawReader(object):
"""Read an S3 object."""
def __init__(self, s3_object):
self.position = 0
self._object = s3_object
self._body = s3_object.get()['Body']

def read(self, size=-1):
if size == -1:
return self._body.read()
return self._body.read(size)


class SeekableRawReader(object):
"""Read an S3 object.
Support seeking around, but is slower than RawReader."""
def __init__(self, s3_object):
self.position = 0
self._object = s3_object
Expand All @@ -92,11 +110,8 @@ def read(self, size=-1):


class BufferedInputBase(io.BufferedIOBase):
"""Reads bytes from S3.
Implements the io.BufferedIOBase interface of the standard library."""

def __init__(self, bucket, key, **kwargs):
def __init__(self, bucket, key, buffer_size=DEFAULT_BUFFER_SIZE,
line_terminator=BINARY_NEWLINE, **kwargs):
session = boto3.Session(profile_name=kwargs.pop('profile_name', None))
s3 = session.resource('s3', **kwargs)
self._object = s3.Object(bucket, key)
Expand All @@ -105,6 +120,8 @@ def __init__(self, bucket, key, **kwargs):
self._current_pos = 0
self._buffer = b''
self._eof = False
self._buffer_size = buffer_size
self._line_terminator = line_terminator

#
# This member is part of the io.BufferedIOBase interface.
Expand All @@ -124,43 +141,7 @@ def readable(self):
return True

def seekable(self):
"""If False, seek(), tell() and truncate() will raise IOError.
We offer only seek support, and no truncate support."""
return True

def seek(self, offset, whence=START):
"""Seek to the specified position.
:param int offset: The offset in bytes.
:param int whence: Where the offset is from.
Returns the position after seeking."""
logger.debug('seeking to offset: %r whence: %r', offset, whence)
if whence not in WHENCE_CHOICES:
raise ValueError('invalid whence, expected one of %r' % WHENCE_CHOICES)

if whence == START:
new_position = offset
elif whence == CURRENT:
new_position = self._current_pos + offset
else:
new_position = self._content_length + offset
new_position = _clamp(new_position, 0, self._content_length)

logger.debug('new_position: %r', new_position)
self._current_pos = self._raw_reader.position = new_position
self._buffer = b""
self._eof = self._current_pos == self._content_length
return self._current_pos

def tell(self):
"""Return the current position within the file."""
return self._current_pos

def truncate(self, size=None):
"""Unsupported."""
raise io.UnsupportedOperation
return False

#
# io.BufferedIOBase methods.
Expand Down Expand Up @@ -195,14 +176,7 @@ def read(self, size=-1):
# Fill our buffer to the required size.
#
# logger.debug('filling %r byte-long buffer up to %r bytes', len(self._buffer), size)
while len(self._buffer) < size and not self._eof:
raw = self._raw_reader.read(size=io.DEFAULT_BUFFER_SIZE)
if len(raw):
self._buffer += raw
else:
logger.debug('reached EOF while filling buffer')
self._eof = True

self._fill_buffer(size)
return self._read_from_buffer(size)

def read1(self, size=-1):
Expand All @@ -218,6 +192,30 @@ def readinto(self, b):
b[:len(data)] = data
return len(data)

def readline(self, limit=-1):
"""Read up to and including the next newline. Returns the bytes read."""
if limit != -1:
raise NotImplementedError('limits other than -1 not implemented yet')
the_line = io.BytesIO()
while not (self._eof and len(self._buffer) == 0):
#
# In the worst case, we're reading self._buffer twice here, once in
# the if condition, and once when calling index.
#
# This is sub-optimal, but better than the alternative: wrapping
# .index in a try..except, because that is slower.
#
if self._line_terminator in self._buffer:
next_newline = self._buffer.index(self._line_terminator)
the_line.write(self._buffer[:next_newline + 1])
self._buffer = self._buffer[next_newline + 1:]
break
else:
the_line.write(self._buffer)
self._buffer = b''
self._fill_buffer(self._buffer_size)
return the_line.getvalue()

def terminate(self):
"""Do nothing."""
pass
Expand All @@ -235,6 +233,78 @@ def _read_from_buffer(self, size):
# logger.debug('part: %r', part)
return part

def _fill_buffer(self, size):
while len(self._buffer) < size and not self._eof:
raw = self._raw_reader.read(size=self._buffer_size)
if len(raw):
self._buffer += raw
else:
logger.debug('reached EOF while filling buffer')
self._eof = True


class SeekableBufferedInputBase(BufferedInputBase):
"""Reads bytes from S3.
Implements the io.BufferedIOBase interface of the standard library."""

def __init__(self, bucket, key, buffer_size=DEFAULT_BUFFER_SIZE,
line_terminator=BINARY_NEWLINE, **kwargs):
session = boto3.Session(profile_name=kwargs.pop('profile_name', None))
s3 = session.resource('s3', **kwargs)
self._object = s3.Object(bucket, key)
self._raw_reader = SeekableRawReader(self._object)
self._content_length = self._object.content_length
self._current_pos = 0
self._buffer = b''
self._eof = False
self._buffer_size = buffer_size
self._line_terminator = line_terminator

#
# This member is part of the io.BufferedIOBase interface.
#
self.raw = None

def seekable(self):
"""If False, seek(), tell() and truncate() will raise IOError.
We offer only seek support, and no truncate support."""
return True

def seek(self, offset, whence=START):
"""Seek to the specified position.
:param int offset: The offset in bytes.
:param int whence: Where the offset is from.
Returns the position after seeking."""
logger.debug('seeking to offset: %r whence: %r', offset, whence)
if whence not in WHENCE_CHOICES:
raise ValueError('invalid whence, expected one of %r' % WHENCE_CHOICES)

if whence == START:
new_position = offset
elif whence == CURRENT:
new_position = self._current_pos + offset
else:
new_position = self._content_length + offset
new_position = _clamp(new_position, 0, self._content_length)

logger.debug('new_position: %r', new_position)
self._current_pos = self._raw_reader.position = new_position
self._buffer = b""
self._eof = self._current_pos == self._content_length
return self._current_pos

def tell(self):
"""Return the current position within the file."""
return self._current_pos

def truncate(self, size=None):
"""Unsupported."""
raise io.UnsupportedOperation


class BufferedOutputBase(io.BufferedIOBase):
"""Writes bytes to S3.
Expand All @@ -252,7 +322,10 @@ def __init__(self, bucket, key, min_part_size=DEFAULT_MIN_PART_SIZE, **kwargs):
#
# https://stackoverflow.com/questions/26871884/how-can-i-easily-determine-if-a-boto-3-s3-bucket-resource-exists
#
s3.create_bucket(Bucket=bucket)
try:
s3.meta.client.head_bucket(Bucket=bucket)
except botocore.client.ClientError:
raise ValueError('the bucket %r does not exist, or is forbidden for access' % bucket)
self._object = s3.Object(bucket, key)
self._min_part_size = min_part_size
self._mp = self._object.initiate_multipart_upload()
Expand Down
Loading

0 comments on commit 170e295

Please sign in to comment.