Skip to content

Commit

Permalink
Support for reading and writing files directly to/from ftp
Browse files Browse the repository at this point in the history
  • Loading branch information
RachitSharma2001 committed Oct 2, 2022
1 parent 4c6bc38 commit 46bc025
Show file tree
Hide file tree
Showing 6 changed files with 234 additions and 14 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ jobs:

- run: bash ci_helpers/helpers.sh enable_moto_server
if: ${{ matrix.moto_server }}

- run: bash ci_helpers/helpers.sh create_ftp_server

- name: Run integration tests
run: python ci_helpers/run_integration_tests.py
Expand All @@ -146,6 +148,8 @@ jobs:

- run: bash ci_helpers/helpers.sh disable_moto_server
if: ${{ matrix.moto_server }}

- run: bash ci_helpers/helpers.sh delete_ftp_server

benchmarks:
needs: [linters,unit_tests]
Expand Down
9 changes: 9 additions & 0 deletions ci_helpers/helpers.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,17 @@ enable_moto_server(){
moto_server -p5000 2>/dev/null&
}

create_ftp_server(){
docker run -d -p 21:21 -p 21000-21010:21000-21010 -e USERS="user|123|/home/user/dir" -e ADDRESS=localhost --name my-ftp-server delfer/alpine-ftp-server
}

disable_moto_server(){
lsof -i tcp:5000 | tail -n1 | cut -f2 -d" " | xargs kill -9
}

delete_ftp_server(){
docker kill my-ftp-server
docker rm my-ftp-server
}

"$@"
1 change: 1 addition & 0 deletions ci_helpers/run_integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
'pytest',
'integration-tests/test_207.py',
'integration-tests/test_http.py',
'integration-tests/test_ftp.py'
]
)

Expand Down
71 changes: 71 additions & 0 deletions integration-tests/test_ftp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from __future__ import unicode_literals

from smart_open import open

def test_nonbinary():
file_contents = "Test Test \n new test \n another tests"
appended_content1 = "Added \n to end"

with open("ftp://user:123@localhost:21/home/user/dir/file", "w") as f:
f.write(file_contents)

with open("ftp://user:123@localhost:21/home/user/dir/file", "r") as f:
read_contents = f.read()
assert read_contents == file_contents

with open("ftp://user:123@localhost:21/home/user/dir/file", "a") as f:
f.write(appended_content1)

with open("ftp://user:123@localhost:21/home/user/dir/file", "r") as f:
read_contents = f.read()
assert read_contents == file_contents + appended_content1

def test_binary():
file_contents = b"Test Test \n new test \n another tests"
appended_content1 = b"Added \n to end"

with open("ftp://user:123@localhost:21/home/user/dir/file2", "wb") as f:
f.write(file_contents)

with open("ftp://user:123@localhost:21/home/user/dir/file2", "rb") as f:
read_contents = f.read()
assert read_contents == file_contents

with open("ftp://user:123@localhost:21/home/user/dir/file2", "ab") as f:
f.write(appended_content1)

with open("ftp://user:123@localhost:21/home/user/dir/file2", "rb") as f:
read_contents = f.read()
assert read_contents == file_contents + appended_content1

def test_line_endings_non_binary():
B_CLRF = b'\r\n'
CLRF = '\r\n'
file_contents = f"Test Test {CLRF} new test {CLRF} another tests{CLRF}"

with open("ftp://user:123@localhost:21/home/user/dir/file3", "w") as f:
f.write(file_contents)

with open("ftp://user:123@localhost:21/home/user/dir/file3", "r") as f:
for line in f:
assert not CLRF in line

with open("ftp://user:123@localhost:21/home/user/dir/file3", "rb") as f:
for line in f:
assert B_CLRF in line

def test_line_endings_binary():
B_CLRF = b'\r\n'
CLRF = '\r\n'
file_contents = f"Test Test {CLRF} new test {CLRF} another tests{CLRF}".encode('utf-8')

with open("ftp://user:123@localhost:21/home/user/dir/file4", "wb") as f:
f.write(file_contents)

with open("ftp://user:123@localhost:21/home/user/dir/file4", "r") as f:
for line in f:
assert not CLRF in line

with open("ftp://user:123@localhost:21/home/user/dir/file4", "rb") as f:
for line in f:
assert B_CLRF in line
132 changes: 132 additions & 0 deletions smart_open/ftp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Radim Rehurek <[email protected]>
#
# This code is distributed under the terms and conditions
# from the MIT License (MIT).
#

"""Implements I/O streams over FTP.
"""

import getpass
import logging
import urllib.parse
import smart_open.utils
from ftplib import FTP, error_reply
import types
logger = logging.getLogger(__name__)

SCHEME = "ftp"

"""Supported URL schemes."""

DEFAULT_PORT = 21

URI_EXAMPLES = (
"ftp://username@host/path/file",
"ftp://username:password@host/path/file",
"ftp://username:password@host:port/path/file",
)


def _unquote(text):
return text and urllib.parse.unquote(text)


def parse_uri(uri_as_string):
split_uri = urllib.parse.urlsplit(uri_as_string)
assert split_uri.scheme in SCHEME
return dict(
scheme=split_uri.scheme,
uri_path=_unquote(split_uri.path),
user=_unquote(split_uri.username),
host=split_uri.hostname,
port=int(split_uri.port or DEFAULT_PORT),
password=_unquote(split_uri.password),
)


def open_uri(uri, mode, transport_params):
smart_open.utils.check_kwargs(open, transport_params)
parsed_uri = parse_uri(uri)
uri_path = parsed_uri.pop("uri_path")
parsed_uri.pop("scheme")
return open(uri_path, mode, transport_params=transport_params, **parsed_uri)


def convert_transport_params_to_args(transport_params):
supported_keywords = [
"timeout",
"source_address",
"encoding",
]
unsupported_keywords = [k for k in transport_params if k not in supported_keywords]
kwargs = {k: v for (k, v) in transport_params.items() if k in supported_keywords}

if unsupported_keywords:
logger.warning(
"ignoring unsupported ftp keyword arguments: %r", unsupported_keywords
)

return kwargs


def _connect(hostname, username, port, password, transport_params):
kwargs = convert_transport_params_to_args(transport_params)
ftp = FTP(**kwargs)
try:
ftp.connect(hostname, port)
except Exception as e:
logger.error("Unable to connect to FTP server: try checking the host and port!")
raise error_reply(e)
try:
ftp.login(username, password)
except Exception as e:
logger.error("Unable to login to FTP server: try checking the username and password!")
raise error_reply(e)
return ftp


# transport paramaters can include any extra parameters that you want to be passed into FTP_TLS
def open(
path,
mode="r",
host=None,
user=None,
password=None,
port=DEFAULT_PORT,
transport_params=None,
):
if not host:
raise ValueError("you must specify the host to connect to")
if not user:
user = getpass.getuser()
if not transport_params:
transport_params = {}
conn = _connect(host, user, port, password, transport_params)
mode_to_ftp_cmds = {
"r": ("RETR", "r"),
"rb": ("RETR", "rb"),
"w": ("STOR", "w"),
"wb": ("STOR", "wb"),
"a": ("APPE", "w"),
"ab": ("APPE", "wb")
}
try:
ftp_mode, file_obj_mode = mode_to_ftp_cmds[mode]
except KeyError:
raise ValueError(f"unsupported mode: {mode}")
ftp_mode, file_obj_mode = mode_to_ftp_cmds[mode]
socket = conn.transfercmd(f"{ftp_mode} {path}")
fobj = socket.makefile(file_obj_mode)

def full_close(self):
self.orig_close()
self.socket.close()
self.conn.close()
fobj.orig_close = fobj.close
fobj.socket = socket
fobj.conn = conn
fobj.close = types.MethodType(full_close, fobj)
return fobj
31 changes: 17 additions & 14 deletions smart_open/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

logger = logging.getLogger(__name__)

NO_SCHEME = ''
NO_SCHEME = ""

_REGISTRY = {NO_SCHEME: smart_open.local_file}
_ERRORS = {}
Expand Down Expand Up @@ -54,15 +54,15 @@ def register_transport(submodule):
# Save only the last module name piece
module_name = module_name.rsplit(".")[-1]

if hasattr(submodule, 'SCHEME'):
if hasattr(submodule, "SCHEME"):
schemes = [submodule.SCHEME]
elif hasattr(submodule, 'SCHEMES'):
elif hasattr(submodule, "SCHEMES"):
schemes = submodule.SCHEMES
else:
raise ValueError('%r does not have a .SCHEME or .SCHEMES attribute' % submodule)
raise ValueError("%r does not have a .SCHEME or .SCHEMES attribute" % submodule)

for f in ('open', 'open_uri', 'parse_uri'):
assert hasattr(submodule, f), '%r is missing %r' % (submodule, f)
for f in ("open", "open_uri", "parse_uri"):
assert hasattr(submodule, f), "%r is missing %r" % (submodule, f)

for scheme in schemes:
assert scheme not in _REGISTRY
Expand All @@ -80,7 +80,9 @@ def get_transport(scheme):
"""
global _ERRORS, _MISSING_DEPS_ERROR, _REGISTRY, SUPPORTED_SCHEMES
expected = SUPPORTED_SCHEMES
readme_url = 'https://github.com/RaRe-Technologies/smart_open/blob/master/README.rst'
readme_url = (
"https://github.com/RaRe-Technologies/smart_open/blob/master/README.rst"
)
message = (
"Unable to handle scheme %(scheme)r, expected one of %(expected)r. "
"Extra dependencies required by %(scheme)r may be missing. "
Expand All @@ -94,13 +96,14 @@ def get_transport(scheme):


register_transport(smart_open.local_file)
register_transport('smart_open.azure')
register_transport('smart_open.gcs')
register_transport('smart_open.hdfs')
register_transport('smart_open.http')
register_transport('smart_open.s3')
register_transport('smart_open.ssh')
register_transport('smart_open.webhdfs')
register_transport("smart_open.azure")
register_transport("smart_open.ftp")
register_transport("smart_open.gcs")
register_transport("smart_open.hdfs")
register_transport("smart_open.http")
register_transport("smart_open.s3")
register_transport("smart_open.ssh")
register_transport("smart_open.webhdfs")

SUPPORTED_SCHEMES = tuple(sorted(_REGISTRY.keys()))
"""The transport schemes that the local installation of ``smart_open`` supports."""

0 comments on commit 46bc025

Please sign in to comment.