Skip to content

Commit

Permalink
Merge pull request #935 from Tarrasch/localize-hdfs-methods
Browse files Browse the repository at this point in the history
Localize hdfs methods
  • Loading branch information
Tarrasch committed May 7, 2015
2 parents a8febc0 + 4855ee3 commit 9ee0191
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 43 deletions.
4 changes: 2 additions & 2 deletions luigi/contrib/hdfs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
# clients.py
from luigi.contrib.hdfs import clients as hdfs_clients
HDFSCliError = hdfs_clients.HDFSCliError
call_check = hdfs_clients.call_check
list_path = hdfs_clients.list_path
call_check = hdfs_clients.HdfsClient.call_check
list_path = hdfs_clients.SnakebiteHdfsClient.list_path
HdfsClient = hdfs_clients.HdfsClient
SnakebiteHdfsClient = hdfs_clients.SnakebiteHdfsClient
HdfsClientCdh3 = hdfs_clients.HdfsClientCdh3
Expand Down
80 changes: 40 additions & 40 deletions luigi/contrib/hdfs/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,29 +51,21 @@ def __init__(self, command, returncode, stdout, stderr):
super(HDFSCliError, self).__init__(msg)


def call_check(command):
p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True, universal_newlines=True)
stdout, stderr = p.communicate()
if p.returncode != 0:
raise HDFSCliError(command, p.returncode, stdout, stderr)
return stdout


def list_path(path):
if isinstance(path, list) or isinstance(path, tuple):
return path
if isinstance(path, str) or isinstance(path, unicode):
return [path, ]
return [str(path), ]


class HdfsClient(FileSystem):
"""
This client uses Apache 2.x syntax for file system commands, which also matched CDH4.
"""

recursive_listdir_cmd = ['-ls', '-R']

@staticmethod
def call_check(command):
p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True, universal_newlines=True)
stdout, stderr = p.communicate()
if p.returncode != 0:
raise HDFSCliError(command, p.returncode, stdout, stderr)
return stdout

def exists(self, path):
"""
Use ``hadoop fs -stat`` to check file existence.
Expand Down Expand Up @@ -101,7 +93,7 @@ def rename(self, path, dest):
path = [path]
else:
warnings.warn("Renaming multiple files at once is not atomic.")
call_check(load_hadoop_cmd() + ['fs', '-mv'] + path + [dest])
self.call_check(load_hadoop_cmd() + ['fs', '-mv'] + path + [dest])

def rename_dont_move(self, path, dest):
"""
Expand Down Expand Up @@ -135,14 +127,14 @@ def remove(self, path, recursive=True, skip_trash=False):
cmd = cmd + ['-skipTrash']

cmd = cmd + [path]
call_check(cmd)
self.call_check(cmd)

def chmod(self, path, permissions, recursive=False):
if recursive:
cmd = load_hadoop_cmd() + ['fs', '-chmod', '-R', permissions, path]
else:
cmd = load_hadoop_cmd() + ['fs', '-chmod', permissions, path]
call_check(cmd)
self.call_check(cmd)

def chown(self, path, owner, group, recursive=False):
if owner is None:
Expand All @@ -154,11 +146,11 @@ def chown(self, path, owner, group, recursive=False):
cmd = load_hadoop_cmd() + ['fs', '-chown', '-R', ownership, path]
else:
cmd = load_hadoop_cmd() + ['fs', '-chown', ownership, path]
call_check(cmd)
self.call_check(cmd)

def count(self, path):
cmd = load_hadoop_cmd() + ['fs', '-count', path]
stdout = call_check(cmd)
stdout = self.call_check(cmd)
lines = stdout.split('\n')
for line in stdout.split('\n'):
if line.startswith("OpenJDK 64-Bit Server VM warning") or line.startswith("It's highly recommended") or not line:
Expand All @@ -169,20 +161,20 @@ def count(self, path):
return results

def copy(self, path, destination):
call_check(load_hadoop_cmd() + ['fs', '-cp', path, destination])
self.call_check(load_hadoop_cmd() + ['fs', '-cp', path, destination])

def put(self, local_path, destination):
call_check(load_hadoop_cmd() + ['fs', '-put', local_path, destination])
self.call_check(load_hadoop_cmd() + ['fs', '-put', local_path, destination])

def get(self, path, local_destination):
call_check(load_hadoop_cmd() + ['fs', '-get', path, local_destination])
self.call_check(load_hadoop_cmd() + ['fs', '-get', path, local_destination])

def getmerge(self, path, local_destination, new_line=False):
if new_line:
cmd = load_hadoop_cmd() + ['fs', '-getmerge', '-nl', path, local_destination]
else:
cmd = load_hadoop_cmd() + ['fs', '-getmerge', path, local_destination]
call_check(cmd)
self.call_check(cmd)

def mkdir(self, path, parents=True, raise_if_exists=False):
if (parents and raise_if_exists):
Expand All @@ -191,7 +183,7 @@ def mkdir(self, path, parents=True, raise_if_exists=False):
cmd = (load_hadoop_cmd() + ['fs', '-mkdir'] +
(['-p'] if parents else []) +
[path])
call_check(cmd)
self.call_check(cmd)
except HDFSCliError as ex:
if "File exists" in ex.stderr:
if raise_if_exists:
Expand All @@ -208,7 +200,7 @@ def listdir(self, path, ignore_directories=False, ignore_files=False,
cmd = load_hadoop_cmd() + ['fs'] + self.recursive_listdir_cmd + [path]
else:
cmd = load_hadoop_cmd() + ['fs', '-ls', path]
lines = call_check(cmd).split('\n')
lines = self.call_check(cmd).split('\n')

for line in lines:
if not line:
Expand Down Expand Up @@ -242,7 +234,7 @@ def listdir(self, path, ignore_directories=False, ignore_files=False,
yield file

def touchz(self, path):
call_check(load_hadoop_cmd() + ['fs', '-touchz', path])
self.call_check(load_hadoop_cmd() + ['fs', '-touchz', path])


class SnakebiteHdfsClient(HdfsClient):
Expand Down Expand Up @@ -272,6 +264,14 @@ def __new__(cls):
logger.warning("Failed to load snakebite.client. Using HdfsClient.")
return HdfsClient()

@staticmethod
def list_path(path):
if isinstance(path, list) or isinstance(path, tuple):
return path
if isinstance(path, str) or isinstance(path, unicode):
return [path, ]
return [str(path), ]

def get_bite(self):
"""
If Luigi has forked, we have a different PID, and need to reconnect.
Expand Down Expand Up @@ -325,7 +325,7 @@ def rename(self, path, dest):
dir_path = '/'.join(parts[0:-1])
if not self.exists(dir_path):
self.mkdir(dir_path, parents=True)
return list(self.get_bite().rename(list_path(path), dest))
return list(self.get_bite().rename(self.list_path(path), dest))

def rename_dont_move(self, path, dest):
"""
Expand Down Expand Up @@ -357,7 +357,7 @@ def remove(self, path, recursive=True, skip_trash=False):
:type skip_trash: boolean, default is False (use trash)
:return: list of deleted items
"""
return list(self.get_bite().delete(list_path(path), recurse=recursive))
return list(self.get_bite().delete(self.list_path(path), recurse=recursive))

def chmod(self, path, permissions, recursive=False):
"""
Expand All @@ -373,7 +373,7 @@ def chmod(self, path, permissions, recursive=False):
"""
if type(permissions) == str:
permissions = int(permissions, 8)
return list(self.get_bite().chmod(list_path(path),
return list(self.get_bite().chmod(self.list_path(path),
permissions, recursive))

def chown(self, path, owner, group, recursive=False):
Expand All @@ -395,10 +395,10 @@ def chown(self, path, owner, group, recursive=False):
bite = self.get_bite()
if owner:
if group:
return all(bite.chown(list_path(path), "%s:%s" % (owner, group),
return all(bite.chown(self.list_path(path), "%s:%s" % (owner, group),
recurse=recursive))
return all(bite.chown(list_path(path), owner, recurse=recursive))
return list(bite.chgrp(list_path(path), group, recurse=recursive))
return all(bite.chown(self.list_path(path), owner, recurse=recursive))
return list(bite.chgrp(self.list_path(path), group, recurse=recursive))

def count(self, path):
"""
Expand All @@ -409,7 +409,7 @@ def count(self, path):
:return: dictionary with content_size, dir_count and file_count keys
"""
try:
res = self.get_bite().count(list_path(path)).next()
res = self.get_bite().count(self.list_path(path)).next()
dir_count = res['directoryCount']
file_count = res['fileCount']
content_size = res['spaceConsumed']
Expand All @@ -427,7 +427,7 @@ def get(self, path, local_destination):
:param local_destination: path on the system running Luigi
:type local_destination: string
"""
return list(self.get_bite().copyToLocal(list_path(path),
return list(self.get_bite().copyToLocal(self.list_path(path),
local_destination))

def mkdir(self, path, parents=True, mode=0o755, raise_if_exists=False):
Expand All @@ -444,7 +444,7 @@ def mkdir(self, path, parents=True, mode=0o755, raise_if_exists=False):
:param mode: \*nix style owner/group/other permissions
:type mode: octal, default 0755
"""
result = list(self.get_bite().mkdir(list_path(path),
result = list(self.get_bite().mkdir(self.list_path(path),
create_parent=parents, mode=mode))
if raise_if_exists and "ile exists" in result[0].get('error', ''):
raise luigi.target.FileAlreadyExists("%s exists" % (path, ))
Expand Down Expand Up @@ -474,7 +474,7 @@ def listdir(self, path, ignore_directories=False, ignore_files=False,
true, a tuple starting with the path, and include_* items in order
"""
bite = self.get_bite()
for entry in bite.ls(list_path(path), recurse=recursive):
for entry in bite.ls(self.list_path(path), recurse=recursive):
if ignore_directories and entry['file_type'] == 'd':
continue
if ignore_files and entry['file_type'] == 'f':
Expand Down Expand Up @@ -502,7 +502,7 @@ def mkdir(self, path):
No -p switch, so this will fail creating ancestors.
"""
try:
call_check(load_hadoop_cmd() + ['fs', '-mkdir', path])
self.call_check(load_hadoop_cmd() + ['fs', '-mkdir', path])
except HDFSCliError as ex:
if "File exists" in ex.stderr:
raise FileAlreadyExists(ex.stderr)
Expand All @@ -519,7 +519,7 @@ def remove(self, path, recursive=True, skip_trash=False):
cmd = cmd + ['-skipTrash']

cmd = cmd + [path]
call_check(cmd)
self.call_check(cmd)


class HdfsClientApache1(HdfsClientCdh3):
Expand Down
3 changes: 2 additions & 1 deletion test/contrib/hdfs_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from luigi import six
from minicluster import MiniClusterTestCase
from nose.plugins.attrib import attr
import luigi.contrib.hdfs.clients

from target_test import FileSystemTargetTestMixin

Expand Down Expand Up @@ -696,7 +697,7 @@ def test_listdir_full_list_get_everything(self):
self.assertEqual(4, len(entries[5]), msg="%r" % entries)
self.assertEqual(path + '/sub2/file4.dat', entries[5][0], msg="%r" % entries)

@mock.patch('luigi.contrib.hdfs.clients.call_check')
@mock.patch('luigi.contrib.hdfs.clients.HdfsClient.call_check')
def test_cdh3_client(self, call_check):
cdh3_client = luigi.contrib.hdfs.HdfsClientCdh3()
cdh3_client.remove("/some/path/here")
Expand Down

0 comments on commit 9ee0191

Please sign in to comment.