diff --git a/luigi/contrib/hdfs/__init__.py b/luigi/contrib/hdfs/__init__.py index 571da1a15f..bea64fb4f7 100644 --- a/luigi/contrib/hdfs/__init__.py +++ b/luigi/contrib/hdfs/__init__.py @@ -40,8 +40,8 @@ # clients.py from luigi.contrib.hdfs import clients as hdfs_clients HDFSCliError = hdfs_clients.HDFSCliError -call_check = hdfs_clients.call_check -list_path = hdfs_clients.list_path +call_check = hdfs_clients.HdfsClient.call_check +list_path = hdfs_clients.SnakebiteHdfsClient.list_path HdfsClient = hdfs_clients.HdfsClient SnakebiteHdfsClient = hdfs_clients.SnakebiteHdfsClient HdfsClientCdh3 = hdfs_clients.HdfsClientCdh3 diff --git a/luigi/contrib/hdfs/clients.py b/luigi/contrib/hdfs/clients.py index 367050c5b9..d87aedcd39 100644 --- a/luigi/contrib/hdfs/clients.py +++ b/luigi/contrib/hdfs/clients.py @@ -51,22 +51,6 @@ def __init__(self, command, returncode, stdout, stderr): super(HDFSCliError, self).__init__(msg) -def call_check(command): - p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True, universal_newlines=True) - stdout, stderr = p.communicate() - if p.returncode != 0: - raise HDFSCliError(command, p.returncode, stdout, stderr) - return stdout - - -def list_path(path): - if isinstance(path, list) or isinstance(path, tuple): - return path - if isinstance(path, str) or isinstance(path, unicode): - return [path, ] - return [str(path), ] - - class HdfsClient(FileSystem): """ This client uses Apache 2.x syntax for file system commands, which also matched CDH4. @@ -74,6 +58,14 @@ class HdfsClient(FileSystem): recursive_listdir_cmd = ['-ls', '-R'] + @staticmethod + def call_check(command): + p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True, universal_newlines=True) + stdout, stderr = p.communicate() + if p.returncode != 0: + raise HDFSCliError(command, p.returncode, stdout, stderr) + return stdout + def exists(self, path): """ Use ``hadoop fs -stat`` to check file existence. @@ -101,7 +93,7 @@ def rename(self, path, dest): path = [path] else: warnings.warn("Renaming multiple files at once is not atomic.") - call_check(load_hadoop_cmd() + ['fs', '-mv'] + path + [dest]) + self.call_check(load_hadoop_cmd() + ['fs', '-mv'] + path + [dest]) def rename_dont_move(self, path, dest): """ @@ -135,14 +127,14 @@ def remove(self, path, recursive=True, skip_trash=False): cmd = cmd + ['-skipTrash'] cmd = cmd + [path] - call_check(cmd) + self.call_check(cmd) def chmod(self, path, permissions, recursive=False): if recursive: cmd = load_hadoop_cmd() + ['fs', '-chmod', '-R', permissions, path] else: cmd = load_hadoop_cmd() + ['fs', '-chmod', permissions, path] - call_check(cmd) + self.call_check(cmd) def chown(self, path, owner, group, recursive=False): if owner is None: @@ -154,11 +146,11 @@ def chown(self, path, owner, group, recursive=False): cmd = load_hadoop_cmd() + ['fs', '-chown', '-R', ownership, path] else: cmd = load_hadoop_cmd() + ['fs', '-chown', ownership, path] - call_check(cmd) + self.call_check(cmd) def count(self, path): cmd = load_hadoop_cmd() + ['fs', '-count', path] - stdout = call_check(cmd) + stdout = self.call_check(cmd) lines = stdout.split('\n') for line in stdout.split('\n'): if line.startswith("OpenJDK 64-Bit Server VM warning") or line.startswith("It's highly recommended") or not line: @@ -169,20 +161,20 @@ def count(self, path): return results def copy(self, path, destination): - call_check(load_hadoop_cmd() + ['fs', '-cp', path, destination]) + self.call_check(load_hadoop_cmd() + ['fs', '-cp', path, destination]) def put(self, local_path, destination): - call_check(load_hadoop_cmd() + ['fs', '-put', local_path, destination]) + self.call_check(load_hadoop_cmd() + ['fs', '-put', local_path, destination]) def get(self, path, local_destination): - call_check(load_hadoop_cmd() + ['fs', '-get', path, local_destination]) + self.call_check(load_hadoop_cmd() + ['fs', '-get', path, local_destination]) def getmerge(self, path, local_destination, new_line=False): if new_line: cmd = load_hadoop_cmd() + ['fs', '-getmerge', '-nl', path, local_destination] else: cmd = load_hadoop_cmd() + ['fs', '-getmerge', path, local_destination] - call_check(cmd) + self.call_check(cmd) def mkdir(self, path, parents=True, raise_if_exists=False): if (parents and raise_if_exists): @@ -191,7 +183,7 @@ def mkdir(self, path, parents=True, raise_if_exists=False): cmd = (load_hadoop_cmd() + ['fs', '-mkdir'] + (['-p'] if parents else []) + [path]) - call_check(cmd) + self.call_check(cmd) except HDFSCliError as ex: if "File exists" in ex.stderr: if raise_if_exists: @@ -208,7 +200,7 @@ def listdir(self, path, ignore_directories=False, ignore_files=False, cmd = load_hadoop_cmd() + ['fs'] + self.recursive_listdir_cmd + [path] else: cmd = load_hadoop_cmd() + ['fs', '-ls', path] - lines = call_check(cmd).split('\n') + lines = self.call_check(cmd).split('\n') for line in lines: if not line: @@ -242,7 +234,7 @@ def listdir(self, path, ignore_directories=False, ignore_files=False, yield file def touchz(self, path): - call_check(load_hadoop_cmd() + ['fs', '-touchz', path]) + self.call_check(load_hadoop_cmd() + ['fs', '-touchz', path]) class SnakebiteHdfsClient(HdfsClient): @@ -272,6 +264,14 @@ def __new__(cls): logger.warning("Failed to load snakebite.client. Using HdfsClient.") return HdfsClient() + @staticmethod + def list_path(path): + if isinstance(path, list) or isinstance(path, tuple): + return path + if isinstance(path, str) or isinstance(path, unicode): + return [path, ] + return [str(path), ] + def get_bite(self): """ If Luigi has forked, we have a different PID, and need to reconnect. @@ -325,7 +325,7 @@ def rename(self, path, dest): dir_path = '/'.join(parts[0:-1]) if not self.exists(dir_path): self.mkdir(dir_path, parents=True) - return list(self.get_bite().rename(list_path(path), dest)) + return list(self.get_bite().rename(self.list_path(path), dest)) def rename_dont_move(self, path, dest): """ @@ -357,7 +357,7 @@ def remove(self, path, recursive=True, skip_trash=False): :type skip_trash: boolean, default is False (use trash) :return: list of deleted items """ - return list(self.get_bite().delete(list_path(path), recurse=recursive)) + return list(self.get_bite().delete(self.list_path(path), recurse=recursive)) def chmod(self, path, permissions, recursive=False): """ @@ -373,7 +373,7 @@ def chmod(self, path, permissions, recursive=False): """ if type(permissions) == str: permissions = int(permissions, 8) - return list(self.get_bite().chmod(list_path(path), + return list(self.get_bite().chmod(self.list_path(path), permissions, recursive)) def chown(self, path, owner, group, recursive=False): @@ -395,10 +395,10 @@ def chown(self, path, owner, group, recursive=False): bite = self.get_bite() if owner: if group: - return all(bite.chown(list_path(path), "%s:%s" % (owner, group), + return all(bite.chown(self.list_path(path), "%s:%s" % (owner, group), recurse=recursive)) - return all(bite.chown(list_path(path), owner, recurse=recursive)) - return list(bite.chgrp(list_path(path), group, recurse=recursive)) + return all(bite.chown(self.list_path(path), owner, recurse=recursive)) + return list(bite.chgrp(self.list_path(path), group, recurse=recursive)) def count(self, path): """ @@ -409,7 +409,7 @@ def count(self, path): :return: dictionary with content_size, dir_count and file_count keys """ try: - res = self.get_bite().count(list_path(path)).next() + res = self.get_bite().count(self.list_path(path)).next() dir_count = res['directoryCount'] file_count = res['fileCount'] content_size = res['spaceConsumed'] @@ -427,7 +427,7 @@ def get(self, path, local_destination): :param local_destination: path on the system running Luigi :type local_destination: string """ - return list(self.get_bite().copyToLocal(list_path(path), + return list(self.get_bite().copyToLocal(self.list_path(path), local_destination)) def mkdir(self, path, parents=True, mode=0o755, raise_if_exists=False): @@ -444,7 +444,7 @@ def mkdir(self, path, parents=True, mode=0o755, raise_if_exists=False): :param mode: \*nix style owner/group/other permissions :type mode: octal, default 0755 """ - result = list(self.get_bite().mkdir(list_path(path), + result = list(self.get_bite().mkdir(self.list_path(path), create_parent=parents, mode=mode)) if raise_if_exists and "ile exists" in result[0].get('error', ''): raise luigi.target.FileAlreadyExists("%s exists" % (path, )) @@ -474,7 +474,7 @@ def listdir(self, path, ignore_directories=False, ignore_files=False, true, a tuple starting with the path, and include_* items in order """ bite = self.get_bite() - for entry in bite.ls(list_path(path), recurse=recursive): + for entry in bite.ls(self.list_path(path), recurse=recursive): if ignore_directories and entry['file_type'] == 'd': continue if ignore_files and entry['file_type'] == 'f': @@ -502,7 +502,7 @@ def mkdir(self, path): No -p switch, so this will fail creating ancestors. """ try: - call_check(load_hadoop_cmd() + ['fs', '-mkdir', path]) + self.call_check(load_hadoop_cmd() + ['fs', '-mkdir', path]) except HDFSCliError as ex: if "File exists" in ex.stderr: raise FileAlreadyExists(ex.stderr) @@ -519,7 +519,7 @@ def remove(self, path, recursive=True, skip_trash=False): cmd = cmd + ['-skipTrash'] cmd = cmd + [path] - call_check(cmd) + self.call_check(cmd) class HdfsClientApache1(HdfsClientCdh3): diff --git a/test/contrib/hdfs_test.py b/test/contrib/hdfs_test.py index 883fee9635..a4c7d2d5d4 100644 --- a/test/contrib/hdfs_test.py +++ b/test/contrib/hdfs_test.py @@ -29,6 +29,7 @@ from luigi import six from minicluster import MiniClusterTestCase from nose.plugins.attrib import attr +import luigi.contrib.hdfs.clients from target_test import FileSystemTargetTestMixin @@ -696,7 +697,7 @@ def test_listdir_full_list_get_everything(self): self.assertEqual(4, len(entries[5]), msg="%r" % entries) self.assertEqual(path + '/sub2/file4.dat', entries[5][0], msg="%r" % entries) - @mock.patch('luigi.contrib.hdfs.clients.call_check') + @mock.patch('luigi.contrib.hdfs.clients.HdfsClient.call_check') def test_cdh3_client(self, call_check): cdh3_client = luigi.contrib.hdfs.HdfsClientCdh3() cdh3_client.remove("/some/path/here")