Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Localize hdfs methods #935

Merged
merged 2 commits into from
May 7, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions luigi/contrib/hdfs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
# clients.py
from luigi.contrib.hdfs import clients as hdfs_clients
HDFSCliError = hdfs_clients.HDFSCliError
call_check = hdfs_clients.call_check
list_path = hdfs_clients.list_path
call_check = hdfs_clients.HdfsClient.call_check
list_path = hdfs_clients.SnakebiteHdfsClient.list_path
HdfsClient = hdfs_clients.HdfsClient
SnakebiteHdfsClient = hdfs_clients.SnakebiteHdfsClient
HdfsClientCdh3 = hdfs_clients.HdfsClientCdh3
Expand Down
80 changes: 40 additions & 40 deletions luigi/contrib/hdfs/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,29 +51,21 @@ def __init__(self, command, returncode, stdout, stderr):
super(HDFSCliError, self).__init__(msg)


def call_check(command):
p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True, universal_newlines=True)
stdout, stderr = p.communicate()
if p.returncode != 0:
raise HDFSCliError(command, p.returncode, stdout, stderr)
return stdout


def list_path(path):
if isinstance(path, list) or isinstance(path, tuple):
return path
if isinstance(path, str) or isinstance(path, unicode):
return [path, ]
return [str(path), ]


class HdfsClient(FileSystem):
"""
This client uses Apache 2.x syntax for file system commands, which also matched CDH4.
"""

recursive_listdir_cmd = ['-ls', '-R']

@staticmethod
def call_check(command):
p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True, universal_newlines=True)
stdout, stderr = p.communicate()
if p.returncode != 0:
raise HDFSCliError(command, p.returncode, stdout, stderr)
return stdout

def exists(self, path):
"""
Use ``hadoop fs -stat`` to check file existence.
Expand Down Expand Up @@ -101,7 +93,7 @@ def rename(self, path, dest):
path = [path]
else:
warnings.warn("Renaming multiple files at once is not atomic.")
call_check(load_hadoop_cmd() + ['fs', '-mv'] + path + [dest])
self.call_check(load_hadoop_cmd() + ['fs', '-mv'] + path + [dest])

def rename_dont_move(self, path, dest):
"""
Expand Down Expand Up @@ -135,14 +127,14 @@ def remove(self, path, recursive=True, skip_trash=False):
cmd = cmd + ['-skipTrash']

cmd = cmd + [path]
call_check(cmd)
self.call_check(cmd)

def chmod(self, path, permissions, recursive=False):
if recursive:
cmd = load_hadoop_cmd() + ['fs', '-chmod', '-R', permissions, path]
else:
cmd = load_hadoop_cmd() + ['fs', '-chmod', permissions, path]
call_check(cmd)
self.call_check(cmd)

def chown(self, path, owner, group, recursive=False):
if owner is None:
Expand All @@ -154,11 +146,11 @@ def chown(self, path, owner, group, recursive=False):
cmd = load_hadoop_cmd() + ['fs', '-chown', '-R', ownership, path]
else:
cmd = load_hadoop_cmd() + ['fs', '-chown', ownership, path]
call_check(cmd)
self.call_check(cmd)

def count(self, path):
cmd = load_hadoop_cmd() + ['fs', '-count', path]
stdout = call_check(cmd)
stdout = self.call_check(cmd)
lines = stdout.split('\n')
for line in stdout.split('\n'):
if line.startswith("OpenJDK 64-Bit Server VM warning") or line.startswith("It's highly recommended") or not line:
Expand All @@ -169,20 +161,20 @@ def count(self, path):
return results

def copy(self, path, destination):
call_check(load_hadoop_cmd() + ['fs', '-cp', path, destination])
self.call_check(load_hadoop_cmd() + ['fs', '-cp', path, destination])

def put(self, local_path, destination):
call_check(load_hadoop_cmd() + ['fs', '-put', local_path, destination])
self.call_check(load_hadoop_cmd() + ['fs', '-put', local_path, destination])

def get(self, path, local_destination):
call_check(load_hadoop_cmd() + ['fs', '-get', path, local_destination])
self.call_check(load_hadoop_cmd() + ['fs', '-get', path, local_destination])

def getmerge(self, path, local_destination, new_line=False):
if new_line:
cmd = load_hadoop_cmd() + ['fs', '-getmerge', '-nl', path, local_destination]
else:
cmd = load_hadoop_cmd() + ['fs', '-getmerge', path, local_destination]
call_check(cmd)
self.call_check(cmd)

def mkdir(self, path, parents=True, raise_if_exists=False):
if (parents and raise_if_exists):
Expand All @@ -191,7 +183,7 @@ def mkdir(self, path, parents=True, raise_if_exists=False):
cmd = (load_hadoop_cmd() + ['fs', '-mkdir'] +
(['-p'] if parents else []) +
[path])
call_check(cmd)
self.call_check(cmd)
except HDFSCliError as ex:
if "File exists" in ex.stderr:
if raise_if_exists:
Expand All @@ -208,7 +200,7 @@ def listdir(self, path, ignore_directories=False, ignore_files=False,
cmd = load_hadoop_cmd() + ['fs'] + self.recursive_listdir_cmd + [path]
else:
cmd = load_hadoop_cmd() + ['fs', '-ls', path]
lines = call_check(cmd).split('\n')
lines = self.call_check(cmd).split('\n')

for line in lines:
if not line:
Expand Down Expand Up @@ -242,7 +234,7 @@ def listdir(self, path, ignore_directories=False, ignore_files=False,
yield file

def touchz(self, path):
call_check(load_hadoop_cmd() + ['fs', '-touchz', path])
self.call_check(load_hadoop_cmd() + ['fs', '-touchz', path])


class SnakebiteHdfsClient(HdfsClient):
Expand Down Expand Up @@ -272,6 +264,14 @@ def __new__(cls):
logger.warning("Failed to load snakebite.client. Using HdfsClient.")
return HdfsClient()

@staticmethod
def list_path(path):
if isinstance(path, list) or isinstance(path, tuple):
return path
if isinstance(path, str) or isinstance(path, unicode):
return [path, ]
return [str(path), ]

def get_bite(self):
"""
If Luigi has forked, we have a different PID, and need to reconnect.
Expand Down Expand Up @@ -325,7 +325,7 @@ def rename(self, path, dest):
dir_path = '/'.join(parts[0:-1])
if not self.exists(dir_path):
self.mkdir(dir_path, parents=True)
return list(self.get_bite().rename(list_path(path), dest))
return list(self.get_bite().rename(self.list_path(path), dest))

def rename_dont_move(self, path, dest):
"""
Expand Down Expand Up @@ -357,7 +357,7 @@ def remove(self, path, recursive=True, skip_trash=False):
:type skip_trash: boolean, default is False (use trash)
:return: list of deleted items
"""
return list(self.get_bite().delete(list_path(path), recurse=recursive))
return list(self.get_bite().delete(self.list_path(path), recurse=recursive))

def chmod(self, path, permissions, recursive=False):
"""
Expand All @@ -373,7 +373,7 @@ def chmod(self, path, permissions, recursive=False):
"""
if type(permissions) == str:
permissions = int(permissions, 8)
return list(self.get_bite().chmod(list_path(path),
return list(self.get_bite().chmod(self.list_path(path),
permissions, recursive))

def chown(self, path, owner, group, recursive=False):
Expand All @@ -395,10 +395,10 @@ def chown(self, path, owner, group, recursive=False):
bite = self.get_bite()
if owner:
if group:
return all(bite.chown(list_path(path), "%s:%s" % (owner, group),
return all(bite.chown(self.list_path(path), "%s:%s" % (owner, group),
recurse=recursive))
return all(bite.chown(list_path(path), owner, recurse=recursive))
return list(bite.chgrp(list_path(path), group, recurse=recursive))
return all(bite.chown(self.list_path(path), owner, recurse=recursive))
return list(bite.chgrp(self.list_path(path), group, recurse=recursive))

def count(self, path):
"""
Expand All @@ -409,7 +409,7 @@ def count(self, path):
:return: dictionary with content_size, dir_count and file_count keys
"""
try:
res = self.get_bite().count(list_path(path)).next()
res = self.get_bite().count(self.list_path(path)).next()
dir_count = res['directoryCount']
file_count = res['fileCount']
content_size = res['spaceConsumed']
Expand All @@ -427,7 +427,7 @@ def get(self, path, local_destination):
:param local_destination: path on the system running Luigi
:type local_destination: string
"""
return list(self.get_bite().copyToLocal(list_path(path),
return list(self.get_bite().copyToLocal(self.list_path(path),
local_destination))

def mkdir(self, path, parents=True, mode=0o755, raise_if_exists=False):
Expand All @@ -444,7 +444,7 @@ def mkdir(self, path, parents=True, mode=0o755, raise_if_exists=False):
:param mode: \*nix style owner/group/other permissions
:type mode: octal, default 0755
"""
result = list(self.get_bite().mkdir(list_path(path),
result = list(self.get_bite().mkdir(self.list_path(path),
create_parent=parents, mode=mode))
if raise_if_exists and "ile exists" in result[0].get('error', ''):
raise luigi.target.FileAlreadyExists("%s exists" % (path, ))
Expand Down Expand Up @@ -474,7 +474,7 @@ def listdir(self, path, ignore_directories=False, ignore_files=False,
true, a tuple starting with the path, and include_* items in order
"""
bite = self.get_bite()
for entry in bite.ls(list_path(path), recurse=recursive):
for entry in bite.ls(self.list_path(path), recurse=recursive):
if ignore_directories and entry['file_type'] == 'd':
continue
if ignore_files and entry['file_type'] == 'f':
Expand Down Expand Up @@ -502,7 +502,7 @@ def mkdir(self, path):
No -p switch, so this will fail creating ancestors.
"""
try:
call_check(load_hadoop_cmd() + ['fs', '-mkdir', path])
self.call_check(load_hadoop_cmd() + ['fs', '-mkdir', path])
except HDFSCliError as ex:
if "File exists" in ex.stderr:
raise FileAlreadyExists(ex.stderr)
Expand All @@ -519,7 +519,7 @@ def remove(self, path, recursive=True, skip_trash=False):
cmd = cmd + ['-skipTrash']

cmd = cmd + [path]
call_check(cmd)
self.call_check(cmd)


class HdfsClientApache1(HdfsClientCdh3):
Expand Down
3 changes: 2 additions & 1 deletion test/contrib/hdfs_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from luigi import six
from minicluster import MiniClusterTestCase
from nose.plugins.attrib import attr
import luigi.contrib.hdfs.clients

from target_test import FileSystemTargetTestMixin

Expand Down Expand Up @@ -696,7 +697,7 @@ def test_listdir_full_list_get_everything(self):
self.assertEqual(4, len(entries[5]), msg="%r" % entries)
self.assertEqual(path + '/sub2/file4.dat', entries[5][0], msg="%r" % entries)

@mock.patch('luigi.contrib.hdfs.clients.call_check')
@mock.patch('luigi.contrib.hdfs.clients.HdfsClient.call_check')
def test_cdh3_client(self, call_check):
cdh3_client = luigi.contrib.hdfs.HdfsClientCdh3()
cdh3_client.remove("/some/path/here")
Expand Down