diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 7e3169e7..071b31b8 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -137,6 +137,8 @@ jobs: - run: bash ci_helpers/helpers.sh enable_moto_server if: ${{ matrix.moto_server }} + + - run: bash ci_helpers/helpers.sh create_ftp_server - name: Run integration tests run: python ci_helpers/run_integration_tests.py @@ -146,6 +148,8 @@ jobs: - run: bash ci_helpers/helpers.sh disable_moto_server if: ${{ matrix.moto_server }} + + - run: bash ci_helpers/helpers.sh delete_ftp_server benchmarks: needs: [linters,unit_tests] diff --git a/ci_helpers/helpers.sh b/ci_helpers/helpers.sh index 02301933..c24567a9 100755 --- a/ci_helpers/helpers.sh +++ b/ci_helpers/helpers.sh @@ -7,8 +7,17 @@ enable_moto_server(){ moto_server -p5000 2>/dev/null& } +create_ftp_server(){ + docker run -d -p 21:21 -p 21000-21010:21000-21010 -e USERS="user|123|/home/user/dir" -e ADDRESS=localhost --name my-ftp-server delfer/alpine-ftp-server +} + disable_moto_server(){ lsof -i tcp:5000 | tail -n1 | cut -f2 -d" " | xargs kill -9 } +delete_ftp_server(){ + docker kill my-ftp-server + docker rm my-ftp-server +} + "$@" diff --git a/ci_helpers/run_integration_tests.py b/ci_helpers/run_integration_tests.py index 51726cb1..5417ca8c 100644 --- a/ci_helpers/run_integration_tests.py +++ b/ci_helpers/run_integration_tests.py @@ -9,6 +9,7 @@ 'pytest', 'integration-tests/test_207.py', 'integration-tests/test_http.py', + 'integration-tests/test_ftp.py' ] ) diff --git a/integration-tests/test_ftp.py b/integration-tests/test_ftp.py new file mode 100644 index 00000000..79129dd8 --- /dev/null +++ b/integration-tests/test_ftp.py @@ -0,0 +1,71 @@ +from __future__ import unicode_literals + +from smart_open import open + +def test_nonbinary(): + file_contents = "Test Test \n new test \n another tests" + appended_content1 = "Added \n to end" + + with open("ftp://user:123@localhost:21/home/user/dir/file", "w") as f: + f.write(file_contents) + + with open("ftp://user:123@localhost:21/home/user/dir/file", "r") as f: + read_contents = f.read() + assert read_contents == file_contents + + with open("ftp://user:123@localhost:21/home/user/dir/file", "a") as f: + f.write(appended_content1) + + with open("ftp://user:123@localhost:21/home/user/dir/file", "r") as f: + read_contents = f.read() + assert read_contents == file_contents + appended_content1 + +def test_binary(): + file_contents = b"Test Test \n new test \n another tests" + appended_content1 = b"Added \n to end" + + with open("ftp://user:123@localhost:21/home/user/dir/file2", "wb") as f: + f.write(file_contents) + + with open("ftp://user:123@localhost:21/home/user/dir/file2", "rb") as f: + read_contents = f.read() + assert read_contents == file_contents + + with open("ftp://user:123@localhost:21/home/user/dir/file2", "ab") as f: + f.write(appended_content1) + + with open("ftp://user:123@localhost:21/home/user/dir/file2", "rb") as f: + read_contents = f.read() + assert read_contents == file_contents + appended_content1 + +def test_line_endings_non_binary(): + B_CLRF = b'\r\n' + CLRF = '\r\n' + file_contents = f"Test Test {CLRF} new test {CLRF} another tests{CLRF}" + + with open("ftp://user:123@localhost:21/home/user/dir/file3", "w") as f: + f.write(file_contents) + + with open("ftp://user:123@localhost:21/home/user/dir/file3", "r") as f: + for line in f: + assert not CLRF in line + + with open("ftp://user:123@localhost:21/home/user/dir/file3", "rb") as f: + for line in f: + assert B_CLRF in line + +def test_line_endings_binary(): + B_CLRF = b'\r\n' + CLRF = '\r\n' + file_contents = f"Test Test {CLRF} new test {CLRF} another tests{CLRF}".encode('utf-8') + + with open("ftp://user:123@localhost:21/home/user/dir/file4", "wb") as f: + f.write(file_contents) + + with open("ftp://user:123@localhost:21/home/user/dir/file4", "r") as f: + for line in f: + assert not CLRF in line + + with open("ftp://user:123@localhost:21/home/user/dir/file4", "rb") as f: + for line in f: + assert B_CLRF in line \ No newline at end of file diff --git a/smart_open/ftp.py b/smart_open/ftp.py new file mode 100644 index 00000000..2d89fb0e --- /dev/null +++ b/smart_open/ftp.py @@ -0,0 +1,132 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2019 Radim Rehurek +# +# This code is distributed under the terms and conditions +# from the MIT License (MIT). +# + +"""Implements I/O streams over FTP. +""" + +import getpass +import logging +import urllib.parse +import smart_open.utils +from ftplib import FTP, error_reply +import types +logger = logging.getLogger(__name__) + +SCHEME = "ftp" + +"""Supported URL schemes.""" + +DEFAULT_PORT = 21 + +URI_EXAMPLES = ( + "ftp://username@host/path/file", + "ftp://username:password@host/path/file", + "ftp://username:password@host:port/path/file", +) + + +def _unquote(text): + return text and urllib.parse.unquote(text) + + +def parse_uri(uri_as_string): + split_uri = urllib.parse.urlsplit(uri_as_string) + assert split_uri.scheme in SCHEME + return dict( + scheme=split_uri.scheme, + uri_path=_unquote(split_uri.path), + user=_unquote(split_uri.username), + host=split_uri.hostname, + port=int(split_uri.port or DEFAULT_PORT), + password=_unquote(split_uri.password), + ) + + +def open_uri(uri, mode, transport_params): + smart_open.utils.check_kwargs(open, transport_params) + parsed_uri = parse_uri(uri) + uri_path = parsed_uri.pop("uri_path") + parsed_uri.pop("scheme") + return open(uri_path, mode, transport_params=transport_params, **parsed_uri) + + +def convert_transport_params_to_args(transport_params): + supported_keywords = [ + "timeout", + "source_address", + "encoding", + ] + unsupported_keywords = [k for k in transport_params if k not in supported_keywords] + kwargs = {k: v for (k, v) in transport_params.items() if k in supported_keywords} + + if unsupported_keywords: + logger.warning( + "ignoring unsupported ftp keyword arguments: %r", unsupported_keywords + ) + + return kwargs + + +def _connect(hostname, username, port, password, transport_params): + kwargs = convert_transport_params_to_args(transport_params) + ftp = FTP(**kwargs) + try: + ftp.connect(hostname, port) + except Exception as e: + logger.error("Unable to connect to FTP server: try checking the host and port!") + raise error_reply(e) + try: + ftp.login(username, password) + except Exception as e: + logger.error("Unable to login to FTP server: try checking the username and password!") + raise error_reply(e) + return ftp + + +# transport paramaters can include any extra parameters that you want to be passed into FTP_TLS +def open( + path, + mode="r", + host=None, + user=None, + password=None, + port=DEFAULT_PORT, + transport_params=None, +): + if not host: + raise ValueError("you must specify the host to connect to") + if not user: + user = getpass.getuser() + if not transport_params: + transport_params = {} + conn = _connect(host, user, port, password, transport_params) + mode_to_ftp_cmds = { + "r": ("RETR", "r"), + "rb": ("RETR", "rb"), + "w": ("STOR", "w"), + "wb": ("STOR", "wb"), + "a": ("APPE", "w"), + "ab": ("APPE", "wb") + } + try: + ftp_mode, file_obj_mode = mode_to_ftp_cmds[mode] + except KeyError: + raise ValueError(f"unsupported mode: {mode}") + ftp_mode, file_obj_mode = mode_to_ftp_cmds[mode] + socket = conn.transfercmd(f"{ftp_mode} {path}") + fobj = socket.makefile(file_obj_mode) + + def full_close(self): + self.orig_close() + self.socket.close() + self.conn.close() + fobj.orig_close = fobj.close + fobj.socket = socket + fobj.conn = conn + fobj.close = types.MethodType(full_close, fobj) + return fobj diff --git a/smart_open/transport.py b/smart_open/transport.py index 00fb27d7..ffdedadf 100644 --- a/smart_open/transport.py +++ b/smart_open/transport.py @@ -17,7 +17,7 @@ logger = logging.getLogger(__name__) -NO_SCHEME = '' +NO_SCHEME = "" _REGISTRY = {NO_SCHEME: smart_open.local_file} _ERRORS = {} @@ -54,15 +54,15 @@ def register_transport(submodule): # Save only the last module name piece module_name = module_name.rsplit(".")[-1] - if hasattr(submodule, 'SCHEME'): + if hasattr(submodule, "SCHEME"): schemes = [submodule.SCHEME] - elif hasattr(submodule, 'SCHEMES'): + elif hasattr(submodule, "SCHEMES"): schemes = submodule.SCHEMES else: - raise ValueError('%r does not have a .SCHEME or .SCHEMES attribute' % submodule) + raise ValueError("%r does not have a .SCHEME or .SCHEMES attribute" % submodule) - for f in ('open', 'open_uri', 'parse_uri'): - assert hasattr(submodule, f), '%r is missing %r' % (submodule, f) + for f in ("open", "open_uri", "parse_uri"): + assert hasattr(submodule, f), "%r is missing %r" % (submodule, f) for scheme in schemes: assert scheme not in _REGISTRY @@ -80,7 +80,9 @@ def get_transport(scheme): """ global _ERRORS, _MISSING_DEPS_ERROR, _REGISTRY, SUPPORTED_SCHEMES expected = SUPPORTED_SCHEMES - readme_url = 'https://github.com/RaRe-Technologies/smart_open/blob/master/README.rst' + readme_url = ( + "https://github.com/RaRe-Technologies/smart_open/blob/master/README.rst" + ) message = ( "Unable to handle scheme %(scheme)r, expected one of %(expected)r. " "Extra dependencies required by %(scheme)r may be missing. " @@ -94,13 +96,14 @@ def get_transport(scheme): register_transport(smart_open.local_file) -register_transport('smart_open.azure') -register_transport('smart_open.gcs') -register_transport('smart_open.hdfs') -register_transport('smart_open.http') -register_transport('smart_open.s3') -register_transport('smart_open.ssh') -register_transport('smart_open.webhdfs') +register_transport("smart_open.azure") +register_transport("smart_open.ftp") +register_transport("smart_open.gcs") +register_transport("smart_open.hdfs") +register_transport("smart_open.http") +register_transport("smart_open.s3") +register_transport("smart_open.ssh") +register_transport("smart_open.webhdfs") SUPPORTED_SCHEMES = tuple(sorted(_REGISTRY.keys())) """The transport schemes that the local installation of ``smart_open`` supports."""