Skip to content

Commit

Permalink
hdfs: Localize HdfsClient.call_check
Browse files Browse the repository at this point in the history
Again to simplify later refactorings
  • Loading branch information
Tarrasch committed May 7, 2015
1 parent 69bd225 commit 4855ee3
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 24 deletions.
2 changes: 1 addition & 1 deletion luigi/contrib/hdfs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
# clients.py
from luigi.contrib.hdfs import clients as hdfs_clients
HDFSCliError = hdfs_clients.HDFSCliError
call_check = hdfs_clients.call_check
call_check = hdfs_clients.HdfsClient.call_check
list_path = hdfs_clients.SnakebiteHdfsClient.list_path
HdfsClient = hdfs_clients.HdfsClient
SnakebiteHdfsClient = hdfs_clients.SnakebiteHdfsClient
Expand Down
44 changes: 22 additions & 22 deletions luigi/contrib/hdfs/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,21 @@ def __init__(self, command, returncode, stdout, stderr):
super(HDFSCliError, self).__init__(msg)


def call_check(command):
p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True, universal_newlines=True)
stdout, stderr = p.communicate()
if p.returncode != 0:
raise HDFSCliError(command, p.returncode, stdout, stderr)
return stdout


class HdfsClient(FileSystem):
"""
This client uses Apache 2.x syntax for file system commands, which also matched CDH4.
"""

recursive_listdir_cmd = ['-ls', '-R']

@staticmethod
def call_check(command):
p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True, universal_newlines=True)
stdout, stderr = p.communicate()
if p.returncode != 0:
raise HDFSCliError(command, p.returncode, stdout, stderr)
return stdout

def exists(self, path):
"""
Use ``hadoop fs -stat`` to check file existence.
Expand Down Expand Up @@ -93,7 +93,7 @@ def rename(self, path, dest):
path = [path]
else:
warnings.warn("Renaming multiple files at once is not atomic.")
call_check(load_hadoop_cmd() + ['fs', '-mv'] + path + [dest])
self.call_check(load_hadoop_cmd() + ['fs', '-mv'] + path + [dest])

def rename_dont_move(self, path, dest):
"""
Expand Down Expand Up @@ -127,14 +127,14 @@ def remove(self, path, recursive=True, skip_trash=False):
cmd = cmd + ['-skipTrash']

cmd = cmd + [path]
call_check(cmd)
self.call_check(cmd)

def chmod(self, path, permissions, recursive=False):
if recursive:
cmd = load_hadoop_cmd() + ['fs', '-chmod', '-R', permissions, path]
else:
cmd = load_hadoop_cmd() + ['fs', '-chmod', permissions, path]
call_check(cmd)
self.call_check(cmd)

def chown(self, path, owner, group, recursive=False):
if owner is None:
Expand All @@ -146,11 +146,11 @@ def chown(self, path, owner, group, recursive=False):
cmd = load_hadoop_cmd() + ['fs', '-chown', '-R', ownership, path]
else:
cmd = load_hadoop_cmd() + ['fs', '-chown', ownership, path]
call_check(cmd)
self.call_check(cmd)

def count(self, path):
cmd = load_hadoop_cmd() + ['fs', '-count', path]
stdout = call_check(cmd)
stdout = self.call_check(cmd)
lines = stdout.split('\n')
for line in stdout.split('\n'):
if line.startswith("OpenJDK 64-Bit Server VM warning") or line.startswith("It's highly recommended") or not line:
Expand All @@ -161,20 +161,20 @@ def count(self, path):
return results

def copy(self, path, destination):
call_check(load_hadoop_cmd() + ['fs', '-cp', path, destination])
self.call_check(load_hadoop_cmd() + ['fs', '-cp', path, destination])

def put(self, local_path, destination):
call_check(load_hadoop_cmd() + ['fs', '-put', local_path, destination])
self.call_check(load_hadoop_cmd() + ['fs', '-put', local_path, destination])

def get(self, path, local_destination):
call_check(load_hadoop_cmd() + ['fs', '-get', path, local_destination])
self.call_check(load_hadoop_cmd() + ['fs', '-get', path, local_destination])

def getmerge(self, path, local_destination, new_line=False):
if new_line:
cmd = load_hadoop_cmd() + ['fs', '-getmerge', '-nl', path, local_destination]
else:
cmd = load_hadoop_cmd() + ['fs', '-getmerge', path, local_destination]
call_check(cmd)
self.call_check(cmd)

def mkdir(self, path, parents=True, raise_if_exists=False):
if (parents and raise_if_exists):
Expand All @@ -183,7 +183,7 @@ def mkdir(self, path, parents=True, raise_if_exists=False):
cmd = (load_hadoop_cmd() + ['fs', '-mkdir'] +
(['-p'] if parents else []) +
[path])
call_check(cmd)
self.call_check(cmd)
except HDFSCliError as ex:
if "File exists" in ex.stderr:
if raise_if_exists:
Expand All @@ -200,7 +200,7 @@ def listdir(self, path, ignore_directories=False, ignore_files=False,
cmd = load_hadoop_cmd() + ['fs'] + self.recursive_listdir_cmd + [path]
else:
cmd = load_hadoop_cmd() + ['fs', '-ls', path]
lines = call_check(cmd).split('\n')
lines = self.call_check(cmd).split('\n')

for line in lines:
if not line:
Expand Down Expand Up @@ -234,7 +234,7 @@ def listdir(self, path, ignore_directories=False, ignore_files=False,
yield file

def touchz(self, path):
call_check(load_hadoop_cmd() + ['fs', '-touchz', path])
self.call_check(load_hadoop_cmd() + ['fs', '-touchz', path])


class SnakebiteHdfsClient(HdfsClient):
Expand Down Expand Up @@ -502,7 +502,7 @@ def mkdir(self, path):
No -p switch, so this will fail creating ancestors.
"""
try:
call_check(load_hadoop_cmd() + ['fs', '-mkdir', path])
self.call_check(load_hadoop_cmd() + ['fs', '-mkdir', path])
except HDFSCliError as ex:
if "File exists" in ex.stderr:
raise FileAlreadyExists(ex.stderr)
Expand All @@ -519,7 +519,7 @@ def remove(self, path, recursive=True, skip_trash=False):
cmd = cmd + ['-skipTrash']

cmd = cmd + [path]
call_check(cmd)
self.call_check(cmd)


class HdfsClientApache1(HdfsClientCdh3):
Expand Down
3 changes: 2 additions & 1 deletion test/contrib/hdfs_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from luigi import six
from minicluster import MiniClusterTestCase
from nose.plugins.attrib import attr
import luigi.contrib.hdfs.clients

from target_test import FileSystemTargetTestMixin

Expand Down Expand Up @@ -696,7 +697,7 @@ def test_listdir_full_list_get_everything(self):
self.assertEqual(4, len(entries[5]), msg="%r" % entries)
self.assertEqual(path + '/sub2/file4.dat', entries[5][0], msg="%r" % entries)

@mock.patch('luigi.contrib.hdfs.clients.call_check')
@mock.patch('luigi.contrib.hdfs.clients.HdfsClient.call_check')
def test_cdh3_client(self, call_check):
cdh3_client = luigi.contrib.hdfs.HdfsClientCdh3()
cdh3_client.remove("/some/path/here")
Expand Down

0 comments on commit 4855ee3

Please sign in to comment.