Skip to content

Commit

Permalink
hdfs: Localize SnakebiteHdfsClient.list_path
Browse files Browse the repository at this point in the history
Just trying to localize code as much as possible, to make refactoring
easier.
  • Loading branch information
Tarrasch committed May 7, 2015
1 parent a8febc0 commit 69bd225
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 19 deletions.
2 changes: 1 addition & 1 deletion luigi/contrib/hdfs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
from luigi.contrib.hdfs import clients as hdfs_clients
HDFSCliError = hdfs_clients.HDFSCliError
call_check = hdfs_clients.call_check
list_path = hdfs_clients.list_path
list_path = hdfs_clients.SnakebiteHdfsClient.list_path
HdfsClient = hdfs_clients.HdfsClient
SnakebiteHdfsClient = hdfs_clients.SnakebiteHdfsClient
HdfsClientCdh3 = hdfs_clients.HdfsClientCdh3
Expand Down
36 changes: 18 additions & 18 deletions luigi/contrib/hdfs/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,6 @@ def call_check(command):
return stdout


def list_path(path):
if isinstance(path, list) or isinstance(path, tuple):
return path
if isinstance(path, str) or isinstance(path, unicode):
return [path, ]
return [str(path), ]


class HdfsClient(FileSystem):
"""
This client uses Apache 2.x syntax for file system commands, which also matched CDH4.
Expand Down Expand Up @@ -272,6 +264,14 @@ def __new__(cls):
logger.warning("Failed to load snakebite.client. Using HdfsClient.")
return HdfsClient()

@staticmethod
def list_path(path):
if isinstance(path, list) or isinstance(path, tuple):
return path
if isinstance(path, str) or isinstance(path, unicode):
return [path, ]
return [str(path), ]

def get_bite(self):
"""
If Luigi has forked, we have a different PID, and need to reconnect.
Expand Down Expand Up @@ -325,7 +325,7 @@ def rename(self, path, dest):
dir_path = '/'.join(parts[0:-1])
if not self.exists(dir_path):
self.mkdir(dir_path, parents=True)
return list(self.get_bite().rename(list_path(path), dest))
return list(self.get_bite().rename(self.list_path(path), dest))

def rename_dont_move(self, path, dest):
"""
Expand Down Expand Up @@ -357,7 +357,7 @@ def remove(self, path, recursive=True, skip_trash=False):
:type skip_trash: boolean, default is False (use trash)
:return: list of deleted items
"""
return list(self.get_bite().delete(list_path(path), recurse=recursive))
return list(self.get_bite().delete(self.list_path(path), recurse=recursive))

def chmod(self, path, permissions, recursive=False):
"""
Expand All @@ -373,7 +373,7 @@ def chmod(self, path, permissions, recursive=False):
"""
if type(permissions) == str:
permissions = int(permissions, 8)
return list(self.get_bite().chmod(list_path(path),
return list(self.get_bite().chmod(self.list_path(path),
permissions, recursive))

def chown(self, path, owner, group, recursive=False):
Expand All @@ -395,10 +395,10 @@ def chown(self, path, owner, group, recursive=False):
bite = self.get_bite()
if owner:
if group:
return all(bite.chown(list_path(path), "%s:%s" % (owner, group),
return all(bite.chown(self.list_path(path), "%s:%s" % (owner, group),
recurse=recursive))
return all(bite.chown(list_path(path), owner, recurse=recursive))
return list(bite.chgrp(list_path(path), group, recurse=recursive))
return all(bite.chown(self.list_path(path), owner, recurse=recursive))
return list(bite.chgrp(self.list_path(path), group, recurse=recursive))

def count(self, path):
"""
Expand All @@ -409,7 +409,7 @@ def count(self, path):
:return: dictionary with content_size, dir_count and file_count keys
"""
try:
res = self.get_bite().count(list_path(path)).next()
res = self.get_bite().count(self.list_path(path)).next()
dir_count = res['directoryCount']
file_count = res['fileCount']
content_size = res['spaceConsumed']
Expand All @@ -427,7 +427,7 @@ def get(self, path, local_destination):
:param local_destination: path on the system running Luigi
:type local_destination: string
"""
return list(self.get_bite().copyToLocal(list_path(path),
return list(self.get_bite().copyToLocal(self.list_path(path),
local_destination))

def mkdir(self, path, parents=True, mode=0o755, raise_if_exists=False):
Expand All @@ -444,7 +444,7 @@ def mkdir(self, path, parents=True, mode=0o755, raise_if_exists=False):
:param mode: \*nix style owner/group/other permissions
:type mode: octal, default 0755
"""
result = list(self.get_bite().mkdir(list_path(path),
result = list(self.get_bite().mkdir(self.list_path(path),
create_parent=parents, mode=mode))
if raise_if_exists and "ile exists" in result[0].get('error', ''):
raise luigi.target.FileAlreadyExists("%s exists" % (path, ))
Expand Down Expand Up @@ -474,7 +474,7 @@ def listdir(self, path, ignore_directories=False, ignore_files=False,
true, a tuple starting with the path, and include_* items in order
"""
bite = self.get_bite()
for entry in bite.ls(list_path(path), recurse=recursive):
for entry in bite.ls(self.list_path(path), recurse=recursive):
if ignore_directories and entry['file_type'] == 'd':
continue
if ignore_files and entry['file_type'] == 'f':
Expand Down

0 comments on commit 69bd225

Please sign in to comment.