Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge remote-tracking branch 'upstream/master' into issue91-rebase
Browse files Browse the repository at this point in the history
# Conflicts:
#	requirements-test.txt
#	smart_open/smart_open_lib.py
#	smart_open/tests/test_gzipstreamfile.py
#	smart_open/tests/test_smart_open.py
menshikh-iv committed Oct 18, 2017
2 parents c96e522 + ec130ca commit 8ea8387
Showing 7 changed files with 109 additions and 103 deletions.
19 changes: 10 additions & 9 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
language: python

python:
- "2.7"
- "2.6"
- "3.3"
- "3.4"
- "3.5"
- "3.6"
- "2.7"
- "3.3"
- "3.4"
- "3.5"
- "3.6"

install:
- if [[ $TRAVIS_PYTHON_VERSION == 2.6 ]]; then pip install ordereddict; fi
- python setup.py install
- pip freeze
- python setup.py install
- pip freeze

script: python -W ignore setup.py test
4 changes: 1 addition & 3 deletions README.rst
Original file line number Diff line number Diff line change
@@ -114,9 +114,7 @@ There are nasty hidden gotchas when using ``boto``'s multipart upload functional

Installation
------------

The module has no dependencies beyond Python >= 2.6 (or Python >= 3.3),
``boto`` and ``requests``::
::

pip install smart_open

4 changes: 0 additions & 4 deletions requirements-test.txt

This file was deleted.

3 changes: 0 additions & 3 deletions requirements.txt

This file was deleted.

38 changes: 16 additions & 22 deletions setup.py
Original file line number Diff line number Diff line change
@@ -9,40 +9,34 @@

import io
import os
import sys

if sys.version_info < (2, 6):
raise ImportError("smart_open requires python >= 2.6")


# TODO add ez_setup?
from setuptools import setup, find_packages


def read(fname):
return io.open(os.path.join(os.path.dirname(__file__), fname), encoding='utf-8').read()


setup(
name = 'smart_open',
version = '1.5.3',
description = 'Utils for streaming large files (S3, HDFS, gzip, bz2...)',
long_description = read('README.rst'),
name='smart_open',
version='1.5.3',
description='Utils for streaming large files (S3, HDFS, gzip, bz2...)',
long_description=read('README.rst'),

packages=find_packages(),
package_data={"smart_open.tests": ["test_data/*gz"]},

author = u'Radim Řehůřek',
author_email = '[email protected]',
maintainer = u'Radim Řehůřek',
maintainer_email = '[email protected]',
author=u'Radim Řehůřek',
author_email='[email protected]',
maintainer=u'Radim Řehůřek',
maintainer_email='[email protected]',

url = 'https://github.com/piskvorky/smart_open',
download_url = 'http://pypi.python.org/pypi/smart_open',
url='https://github.com/piskvorky/smart_open',
download_url='http://pypi.python.org/pypi/smart_open',

keywords = 'file streaming, s3, hdfs',
keywords='file streaming, s3, hdfs',

license = 'MIT',
platforms = 'any',
license='MIT',
platforms='any',

install_requires=[
'boto >= 2.32',
@@ -61,17 +55,17 @@ def read(fname):

test_suite="smart_open.tests",

classifiers = [ # from http://pypi.python.org/pypi?%3Aaction=list_classifiers
classifiers=[
'Development Status :: 4 - Beta',
'Environment :: Console',
'Intended Audience :: Developers',
'License :: OSI Approved :: MIT License',
'Operating System :: OS Independent',
'Programming Language :: Python :: 2.6',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3.3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Topic :: System :: Distributed Computing',
'Topic :: Database :: Front-Ends',
],
53 changes: 32 additions & 21 deletions smart_open/smart_open_lib.py
Original file line number Diff line number Diff line change
@@ -5,8 +5,6 @@
#
# This code is distributed under the terms and conditions
# from the MIT License (MIT).
#
# flake8: noqa


"""
@@ -30,6 +28,12 @@
import requests
import io

from boto.compat import BytesIO, urlsplit, six
import boto.s3.connection
import boto.s3.key
from ssl import SSLError


IS_PY2 = (sys.version_info[0] == 2)

if IS_PY2:
@@ -38,14 +42,11 @@

if sys.version_info[0] == 2:
import httplib

elif sys.version_info[0] == 3:
import io as StringIO
import http.client as httplib

from boto.compat import BytesIO, urlsplit, six
import boto.s3.connection
import boto.s3.key
from ssl import SSLError

logger = logging.getLogger(__name__)

@@ -391,12 +392,12 @@ class HdfsOpenRead(object):
"""
def __init__(self, parsed_uri):
if parsed_uri.scheme not in ("hdfs"):
if parsed_uri.scheme != "hdfs":
raise TypeError("can only process HDFS files")
self.parsed_uri = parsed_uri

def __iter__(self):
hdfs = subprocess.Popen(["hdfs", "dfs", "-cat", self.parsed_uri.uri_path], stdout=subprocess.PIPE)
hdfs = subprocess.Popen(["hdfs", "dfs", '-text', self.parsed_uri.uri_path], stdout=subprocess.PIPE)
return hdfs.stdout

def read(self, size=None):
@@ -418,10 +419,12 @@ class HdfsOpenWrite(object):
"""
def __init__(self, parsed_uri):
if parsed_uri.scheme not in ("hdfs"):
if parsed_uri.scheme != "hdfs":
raise TypeError("can only process HDFS files")
self.parsed_uri = parsed_uri
self.out_pipe = subprocess.Popen(["hdfs","dfs","-put","-f","-",self.parsed_uri.uri_path], stdin=subprocess.PIPE)
self.out_pipe = subprocess.Popen(
["hdfs", "dfs", "-put", "-f", "-", self.parsed_uri.uri_path], stdin=subprocess.PIPE
)

def write(self, b):
self.out_pipe.stdin.write(b)
@@ -446,7 +449,7 @@ class WebHdfsOpenRead(object):
"""
def __init__(self, parsed_uri):
if parsed_uri.scheme not in ("webhdfs"):
if parsed_uri.scheme != "webhdfs":
raise TypeError("can only process WebHDFS files")
self.parsed_uri = parsed_uri
self.offset = 0
@@ -469,7 +472,7 @@ def read(self, size=None):
self.offset = 0
else:
payload = {"op": "OPEN", "offset": self.offset, "length": size}
self.offset = self.offset + size
self.offset += size
response = requests.get("http://" + self.parsed_uri.uri_path, params=payload, stream=True)
return response.content

@@ -672,7 +675,7 @@ class WebHdfsOpenWrite(object):
"""
def __init__(self, parsed_uri, min_part_size=WEBHDFS_MIN_PART_SIZE):
if parsed_uri.scheme not in ("webhdfs"):
if parsed_uri.scheme != "webhdfs":
raise TypeError("can only process WebHDFS files")
self.parsed_uri = parsed_uri
self.closed = False
@@ -721,9 +724,12 @@ def write(self, b):

if self.chunk_bytes >= self.min_part_size:
buff = b"".join(self.lines)
logger.info("uploading part #%i, %i bytes (total %.3fGB)" % (self.parts, len(buff), self.total_size / 1024.0 ** 3))
logger.info(
"uploading part #%i, %i bytes (total %.3fGB)",
self.parts, len(buff), self.total_size / 1024.0 ** 3
)
self.upload(buff)
logger.debug("upload of part #%i finished" % self.parts)
logger.debug("upload of part #%i finished", self.parts)
self.parts += 1
self.lines, self.chunk_bytes = [], 0

@@ -733,9 +739,12 @@ def seek(self, offset, whence=None):
def close(self):
buff = b"".join(self.lines)
if buff:
logger.info("uploading last part #%i, %i bytes (total %.3fGB)" % (self.parts, len(buff), self.total_size / 1024.0 ** 3))
logger.info(
"uploading last part #%i, %i bytes (total %.3fGB)",
self.parts, len(buff), self.total_size / 1024.0 ** 3
)
self.upload(buff)
logger.debug("upload of last part #%i finished" % self.parts)
logger.debug("upload of last part #%i finished", self.parts)
self.closed = True

def __enter__(self):
@@ -800,17 +809,19 @@ def s3_iter_bucket(bucket, prefix='', accept_key=lambda key: True, key_limit=Non
keys = ({'key': key, 'retries': retries} for key in bucket.list(prefix=prefix) if accept_key(key.name))

if MULTIPROCESSING:
logger.info("iterating over keys from %s with %i workers" % (bucket, workers))
logger.info("iterating over keys from %s with %i workers", bucket, workers)
pool = multiprocessing.pool.Pool(processes=workers)
iterator = pool.imap_unordered(s3_iter_bucket_process_key_with_kwargs, keys)
else:
logger.info("iterating over keys from %s without multiprocessing" % bucket)
logger.info("iterating over keys from %s without multiprocessing", bucket)
iterator = imap(s3_iter_bucket_process_key_with_kwargs, keys)

for key_no, (key, content) in enumerate(iterator):
if key_no % 1000 == 0:
logger.info("yielding key #%i: %s, size %i (total %.1fMB)" %
(key_no, key, len(content), total_size / 1024.0 ** 2))
logger.info(
"yielding key #%i: %s, size %i (total %.1fMB)",
key_no, key, len(content), total_size / 1024.0 ** 2
)

yield key, content
key.close()
Loading

0 comments on commit 8ea8387

Please sign in to comment.