Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for viewfs:// URLs. #665

Merged
merged 9 commits into from
Feb 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
- Support container client and blob client for azure blob storage (PR [#652](https://github.com/RaRe-Technologies/smart_open/pull/652), [@cbare](https://github.com/cbare))
- Support working directly with file descriptors (PR [#659](https://github.com/RaRe-Technologies/smart_open/pull/659), [@mpenkov](https://github.com/mpenkov))
- Fix AttributeError when reading passthrough zstandard (PR [#658](https://github.com/RaRe-Technologies/smart_open/pull/658), [@mpenkov](https://github.com/mpenkov))
- Added support for viewfs:// URLs (PR [#665](https://github.com/RaRe-Technologies/smart_open/pull/665), [@ChandanChainani](https://github.com/ChandanChainani))
- Drop support for passing buffers to smart_open.open (PR [#660](https://github.com/RaRe-Technologies/smart_open/pull/660), [@mpenkov](https://github.com/mpenkov))
- Pin google-cloud-storage to >=1.31.1 in extras (PR [#687](https://github.com/RaRe-Technologies/smart_open/pull/687), [@PLPeeters](https://github.com/PLPeeters))

Expand Down
8 changes: 5 additions & 3 deletions smart_open/hdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,26 @@

logger = logging.getLogger(__name__)

SCHEME = 'hdfs'
SCHEMES = ('hdfs', 'viewfs')

URI_EXAMPLES = (
'hdfs:///path/file',
'hdfs://path/file',
'viewfs:///path/file',
'viewfs://path/file',
)


def parse_uri(uri_as_string):
split_uri = urllib.parse.urlsplit(uri_as_string)
assert split_uri.scheme == SCHEME
assert split_uri.scheme in SCHEMES

uri_path = split_uri.netloc + split_uri.path
uri_path = "/" + uri_path.lstrip("/")
if not uri_path:
raise RuntimeError("invalid HDFS URI: %r" % uri_as_string)

return dict(scheme=SCHEME, uri_path=uri_path)
return dict(scheme=split_uri.scheme, uri_path=uri_path)


def open_uri(uri, mode, transport_params):
Expand Down
153 changes: 74 additions & 79 deletions smart_open/tests/test_hdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,18 @@
import os
import os.path as P
import subprocess
import unittest
from unittest import mock
import sys

import smart_open.hdfs
import pytest

#
# Workaround for https://bugs.python.org/issue37380
#
if sys.version_info[:2] == (3, 6):
subprocess._cleanup = lambda: None
import smart_open.hdfs

CURR_DIR = P.dirname(P.abspath(__file__))

if sys.platform.startswith("win"):
pytest.skip("these tests don't work under Windows", allow_module_level=True)


#
# We want our mocks to emulate the real implementation as close as possible,
Expand All @@ -43,100 +41,97 @@ def cat(path=None):
return subprocess.Popen(command, stdin=subprocess.PIPE, stdout=subprocess.PIPE)


class CliRawInputBaseTest(unittest.TestCase):
def setUp(self):
self.path = P.join(CURR_DIR, 'test_data', 'crime-and-punishment.txt')
CAP_PATH = P.join(CURR_DIR, 'test_data', 'crime-and-punishment.txt')
with open(CAP_PATH, encoding='utf-8') as fin:
CRIME_AND_PUNISHMENT = fin.read()

#
# We have to specify the encoding explicitly, because different
# platforms like Windows may be using something other than unicode
# by default.
#
with open(self.path, encoding='utf-8') as fin:
self.expected = fin.read()
self.cat = cat(self.path)

def test_read(self):
with mock.patch('subprocess.Popen', return_value=self.cat):
reader = smart_open.hdfs.CliRawInputBase('hdfs://dummy/url')
as_bytes = reader.read()
def test_sanity_read_bytes():
with open(CAP_PATH, 'rb') as fin:
lines = [line for line in fin]
assert len(lines) == 3

#
# Not 100% sure why this is necessary on Windows platforms, but the
# tests fail without it. It may be a bug, but I don't have time to
# investigate right now.
#
as_text = as_bytes.decode('utf-8').replace(os.linesep, '\n')
assert as_text == self.expected

def test_read_75(self):
with mock.patch('subprocess.Popen', return_value=self.cat):
reader = smart_open.hdfs.CliRawInputBase('hdfs://dummy/url')
as_bytes = reader.read(75)
def test_sanity_read_text():
with open(CAP_PATH, 'r', encoding='utf-8') as fin:
text = fin.read()

as_text = as_bytes.decode('utf-8').replace(os.linesep, '\n')
assert as_text == self.expected[:len(as_text)]
expected = 'В начале июля, в чрезвычайно жаркое время'
assert text[:len(expected)] == expected

def test_unzip(self):
path = P.join(CURR_DIR, 'test_data', 'crime-and-punishment.txt.gz')

with mock.patch('subprocess.Popen', return_value=cat(path)):
with gzip.GzipFile(fileobj=smart_open.hdfs.CliRawInputBase('hdfs://dummy/url')) as fin:
as_bytes = fin.read()
@pytest.mark.parametrize('schema', [('hdfs', ), ('viewfs', )])
def test_read(schema):
with mock.patch('subprocess.Popen', return_value=cat(CAP_PATH)):
reader = smart_open.hdfs.CliRawInputBase(f'{schema}://dummy/url')
as_bytes = reader.read()

as_text = as_bytes.decode('utf-8')
assert as_text == self.expected
#
# Not 100% sure why this is necessary on Windows platforms, but the
# tests fail without it. It may be a bug, but I don't have time to
# investigate right now.
#
as_text = as_bytes.decode('utf-8').replace(os.linesep, '\n')
assert as_text == CRIME_AND_PUNISHMENT

def test_context_manager(self):
with mock.patch('subprocess.Popen', return_value=self.cat):
with smart_open.hdfs.CliRawInputBase('hdfs://dummy/url') as fin:
as_bytes = fin.read()

as_text = as_bytes.decode('utf-8').replace('\r\n', '\n')
assert as_text == self.expected
@pytest.mark.parametrize('schema', [('hdfs', ), ('viewfs', )])
def test_read_75(schema):
with mock.patch('subprocess.Popen', return_value=cat(CAP_PATH)):
reader = smart_open.hdfs.CliRawInputBase(f'{schema}://dummy/url')
as_bytes = reader.read(75)

as_text = as_bytes.decode('utf-8').replace(os.linesep, '\n')
assert as_text == CRIME_AND_PUNISHMENT[:len(as_text)]

class SanityTest(unittest.TestCase):
def test_read_bytes(self):
path = P.join(CURR_DIR, 'test_data', 'crime-and-punishment.txt')
with open(path, 'rb') as fin:
lines = [line for line in fin]
assert len(lines) == 3

def test_read_text(self):
path = P.join(CURR_DIR, 'test_data', 'crime-and-punishment.txt')
with open(path, 'r', encoding='utf-8') as fin:
text = fin.read()
@pytest.mark.parametrize('schema', [('hdfs', ), ('viewfs', )])
def test_unzip(schema):
with mock.patch('subprocess.Popen', return_value=cat(CAP_PATH + '.gz')):
with gzip.GzipFile(fileobj=smart_open.hdfs.CliRawInputBase(f'{schema}://dummy/url')) as fin:
as_bytes = fin.read()

as_text = as_bytes.decode('utf-8')
assert as_text == CRIME_AND_PUNISHMENT


@pytest.mark.parametrize('schema', [('hdfs', ), ('viewfs', )])
def test_context_manager(schema):
with mock.patch('subprocess.Popen', return_value=cat(CAP_PATH)):
with smart_open.hdfs.CliRawInputBase(f'{schema}://dummy/url') as fin:
as_bytes = fin.read()

as_text = as_bytes.decode('utf-8').replace('\r\n', '\n')
assert as_text == CRIME_AND_PUNISHMENT

expected = 'В начале июля, в чрезвычайно жаркое время'
assert text[:len(expected)] == expected

@pytest.mark.parametrize('schema', [('hdfs', ), ('viewfs', )])
def test_write(schema):
expected = 'мы в ответе за тех, кого приручили'
mocked_cat = cat()

class CliRawOutputBaseTest(unittest.TestCase):
def test_write(self):
expected = 'мы в ответе за тех, кого приручили'
mocked_cat = cat()
with mock.patch('subprocess.Popen', return_value=mocked_cat):
with smart_open.hdfs.CliRawOutputBase(f'{schema}://dummy/url') as fout:
fout.write(expected.encode('utf-8'))

with mock.patch('subprocess.Popen', return_value=mocked_cat):
with smart_open.hdfs.CliRawOutputBase('hdfs://dummy/url') as fout:
fout.write(expected.encode('utf-8'))
actual = mocked_cat.stdout.read().decode('utf-8')
assert actual == expected

actual = mocked_cat.stdout.read().decode('utf-8')
assert actual == expected

def test_zip(self):
expected = 'мы в ответе за тех, кого приручили'
mocked_cat = cat()
@pytest.mark.parametrize('schema', [('hdfs', ), ('viewfs', )])
def test_write_zip(schema):
expected = 'мы в ответе за тех, кого приручили'
mocked_cat = cat()

with mock.patch('subprocess.Popen', return_value=mocked_cat):
with smart_open.hdfs.CliRawOutputBase('hdfs://dummy/url') as fout:
with gzip.GzipFile(fileobj=fout, mode='wb') as gz_fout:
gz_fout.write(expected.encode('utf-8'))
with mock.patch('subprocess.Popen', return_value=mocked_cat):
with smart_open.hdfs.CliRawOutputBase(f'{schema}://dummy/url') as fout:
with gzip.GzipFile(fileobj=fout, mode='wb') as gz_fout:
gz_fout.write(expected.encode('utf-8'))

with gzip.GzipFile(fileobj=mocked_cat.stdout) as fin:
actual = fin.read().decode('utf-8')
with gzip.GzipFile(fileobj=mocked_cat.stdout) as fin:
actual = fin.read().decode('utf-8')

assert actual == expected
assert actual == expected


def main():
Expand Down