Skip to content

Commit

Permalink
MAINT: apply new joblib store API (#9)
Browse files Browse the repository at this point in the history
* MAINT: apply new joblib store API
  • Loading branch information
aabadie authored Jan 27, 2018
1 parent ba03e94 commit 85f250e
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 54 deletions.
6 changes: 3 additions & 3 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ Using joblib-hadoop on a Hadoop cluster
if __name__ == '__main__':
register_hdfs_store_backend()
mem = Memory(location='joblib_cache_hdfs',
backend='hdfs', host='namenode', port=8020, user='test',
verbose=100, compress=True)
mem = Memory(location='joblib_cache_hdfs', backend='hdfs',
verbose=100, compress=True
store_options=dict(host='namenode', port=8020, user='test'))
multiply = mem.cache(np.multiply)
array1 = np.arange(10000)
Expand Down
2 changes: 1 addition & 1 deletion examples/joblib_hdfs_multiply.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

mem = Memory(location='joblib_cache_hdfs', backend='hdfs',
verbose=100, compress=True,
store_options=dict(host='namenode', port=9000, user='test'))
backend_options=dict(host='namenode', port=9000, user='test'))
mem.clear()
multiply = mem.cache(np.multiply)
array1 = np.arange(1000)
Expand Down
55 changes: 19 additions & 36 deletions joblibhadoop/hdfs/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
from joblib._store_backends import (StoreBackendBase, StoreBackendMixin,
CacheItemInfo)

DEFAULT_BACKEND_OPTIONS = dict(host='localhost', port=9000, user=None,
ticket_cache=None, token=None, pars=None,
connect=True)


class HDFSStoreBackend(StoreBackendBase, StoreBackendMixin):
"""A StoreBackend for Hadoop storage file system (HDFS)."""
Expand All @@ -32,11 +36,11 @@ def get_items(self):
"""Return the whole list of items available in cache."""
cache_items = []
try:
self.storage.ls(self._location)
self.storage.ls(self.location)
except IOError:
return []

for path in self.storage.walk(self._location):
for path in self.storage.walk(self.location):
is_cache_hash_dir = re.match('[a-f0-9]{32}$',
os.path.basename(path))

Expand All @@ -58,50 +62,29 @@ def get_items(self):

return cache_items

def _prepare_options(self, store_options):
if 'host' not in store_options:
store_options['host'] = 'localhost'

if 'port' not in store_options:
store_options['port'] = 9000

if 'user' not in store_options:
store_options['user'] = None

if 'ticket_cache' not in store_options:
store_options['ticket_cache'] = None

if 'token' not in store_options:
store_options['token'] = None

if 'pars' not in store_options:
store_options['pars'] = None

if 'connect' not in store_options:
store_options['connect'] = True
def _check_options(self, options):
for k, v in DEFAULT_BACKEND_OPTIONS.items():
if k not in options:
options[k] = v

return store_options
return options

def configure(self, location, verbose=0,
store_options=dict(host='localhost', port=9000, user=None,
ticket_cache=None, token=None, pars=None,
connect=True)):
backend_options=DEFAULT_BACKEND_OPTIONS):
"""Configure the store backend."""

store_options = self._prepare_options(store_options)
options = self._check_options(backend_options.copy())
self.storage = hdfs3.HDFileSystem(
host=store_options['host'], port=store_options['port'],
user=store_options['user'],
ticket_cache=store_options['ticket_cache'],
token=store_options['token'], pars=store_options['pars'],
connect=store_options['connect'])
host=options['host'], port=options['port'], user=options['user'],
ticket_cache=options['ticket_cache'], token=options['token'],
pars=options['pars'], connect=options['connect'])
if location.startswith('/'):
location = location[1:]
self._location = location
self.storage.mkdir(self._location)
self.location = location
self.storage.mkdir(self.location)

# computation results can be stored compressed for faster I/O
self.compress = store_options['compress']
self.compress = options['compress']

# Memory map mode is not supported
self.mmap_mode = None
Expand Down
28 changes: 14 additions & 14 deletions joblibhadoop/hdfs/tests/test_hdfs_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ def func(arg):

mem = Memory(location=tmpdir.strpath[1:], backend='hdfs',
verbose=0, compress=compress,
store_options=dict(host=NAMENODE, user='test'))
backend_options=dict(host=NAMENODE, user='test'))

assert mem.store_backend._location == os.path.join(tmpdir.strpath[1:],
"joblib")
assert mem.store_backend.location == os.path.join(tmpdir.strpath[1:],
"joblib")

func = mem.cache(func)

Expand Down Expand Up @@ -72,10 +72,10 @@ def test_root_location_replacement(tmpdir):
register_hdfs_store_backend()

mem = Memory(location=location, backend='hdfs', verbose=100,
store_options=dict(host=NAMENODE, user='test'))
backend_options=dict(host=NAMENODE, user='test'))

assert mem.store_backend._location == os.path.join(tmpdir.strpath[1:],
"joblib")
assert mem.store_backend.location == os.path.join(tmpdir.strpath[1:],
"joblib")


def test_passing_backend_base_to_memory(tmpdir):
Expand All @@ -84,15 +84,15 @@ def test_passing_backend_base_to_memory(tmpdir):
register_hdfs_store_backend()

mem = Memory(location=tmpdir.strpath, backend='hdfs', verbose=100,
store_options=dict(host=NAMENODE, user='test'))
backend_options=dict(host=NAMENODE, user='test'))

assert mem.store_backend._location == os.path.join(tmpdir.strpath[1:],
"joblib")
assert mem.store_backend.location == os.path.join(tmpdir.strpath[1:],
"joblib")

mem2 = Memory(location=mem.store_backend, backend='hdfs', verbose=100,
store_options=dict(host=NAMENODE, user='test'))
backend_options=dict(host=NAMENODE, user='test'))

assert mem2.store_backend._location == mem.store_backend._location
assert mem2.store_backend.location == mem.store_backend.location


def test_clear_cache(tmpdir):
Expand All @@ -106,13 +106,13 @@ def func(arg):

mem = Memory(location=tmpdir.strpath, backend='hdfs',
verbose=100, compress=False,
store_options=dict(host=NAMENODE, user='test'))
backend_options=dict(host=NAMENODE, user='test'))
cached_func = mem.cache(func)
cached_func("test")

mem.clear()

assert not mem.store_backend._item_exists(mem.store_backend._location)
assert not mem.store_backend._item_exists(mem.store_backend.location)


def test_get_items(tmpdir):
Expand All @@ -125,7 +125,7 @@ def func(arg):

mem = Memory(location=tmpdir.strpath, backend='hdfs',
verbose=100, compress=False,
store_options=dict(host=NAMENODE, user='test'))
backend_options=dict(host=NAMENODE, user='test'))
assert not mem.store_backend.get_items()

cached_func = mem.cache(func)
Expand Down

0 comments on commit 85f250e

Please sign in to comment.