Skip to content

Commit

Permalink
MAINT: adapt backend to latest joblib API
Browse files Browse the repository at this point in the history
  • Loading branch information
aabadie committed Jan 26, 2018
1 parent 05f46e3 commit b382dcc
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 66 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ test:
pytest

install:
pip install -r requirements.txt .
pip install -r requirements.txt -r requirements-tests.txt .

run-examples:
cd docker && \
Expand Down
79 changes: 52 additions & 27 deletions joblibhadoop/hdfs/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,21 @@
import re
import os.path
import hdfs3
from joblib._store_backends import (StoreBackendBase, StoreManagerMixin,
from joblib._store_backends import (StoreBackendBase, StoreBackendMixin,
CacheItemInfo)


class HDFSStoreBackend(StoreBackendBase, StoreManagerMixin):
class HDFSStoreBackend(StoreBackendBase, StoreBackendMixin):
"""A StoreBackend for Hadoop storage file system (HDFS)."""

def __init__(self):
self.mv = None
self.storage = None
self.cachedir = None
self.compress = None
self.mmap_mode = None
def _open_item(self, f, mode):
return self.storage.open(f, mode)

def _item_exists(self, path):
return self.storage.exists(path)

def _move_item(self, src, dst):
return self.storage.mv(src, dst)

def clear_location(self, location):
"""Check if object exists in store."""
Expand All @@ -26,15 +28,15 @@ def create_location(self, location):
"""Create object location on store."""
self._mkdirp(location)

def get_cache_items(self):
def get_items(self):
"""Return the whole list of items available in cache."""
cache_items = []
try:
self.storage.ls(self.cachedir)
self.storage.ls(self._location)
except IOError:
return []

for path in self.storage.walk(self.cachedir):
for path in self.storage.walk(self._location):
is_cache_hash_dir = re.match('[a-f0-9]{32}$',
os.path.basename(path))

Expand All @@ -56,27 +58,50 @@ def get_cache_items(self):

return cache_items

def configure(self, location, host='localhost', port=9000, user=None,
ticket_cache=None, token=None, pars=None, connect=True,
**kwargs):
def _prepare_options(self, store_options):
if 'host' not in store_options:
store_options['host'] = 'localhost'

if 'port' not in store_options:
store_options['port'] = 9000

if 'user' not in store_options:
store_options['user'] = None

if 'ticket_cache' not in store_options:
store_options['ticket_cache'] = None

if 'token' not in store_options:
store_options['token'] = None

if 'pars' not in store_options:
store_options['pars'] = None

if 'connect' not in store_options:
store_options['connect'] = True

return store_options

def configure(self, location, verbose=0,
store_options=dict(host='localhost', port=9000, user=None,
ticket_cache=None, token=None, pars=None,
connect=True)):
"""Configure the store backend."""
self.storage = hdfs3.HDFileSystem(host=host, port=port, user=user,
ticket_cache=ticket_cache,
token=token, pars=pars,
connect=connect)

store_options = self._prepare_options(store_options)
self.storage = hdfs3.HDFileSystem(
host=store_options['host'], port=store_options['port'],
user=store_options['user'],
ticket_cache=store_options['ticket_cache'],
token=store_options['token'], pars=store_options['pars'],
connect=store_options['connect'])
if location.startswith('/'):
location = location[1:]
self.cachedir = os.path.join(location, 'joblib')
self.storage.mkdir(self.cachedir)

# attach required methods using monkey patching trick.
self.open_object = self.storage.open
self.object_exists = self.storage.exists
self.mv = self.storage.mv
self._location = location
self.storage.mkdir(self._location)

# computation results can be stored compressed for faster I/O
self.compress = (False if 'compress' not in kwargs
else kwargs['compress'])
self.compress = store_options['compress']

# Memory map mode is not supported
self.mmap_mode = None
Expand Down
52 changes: 29 additions & 23 deletions joblibhadoop/hdfs/tests/test_hdfs_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from joblib import Memory
from joblibhadoop.hdfs import register_hdfs_store_backend

__NAMENODE = os.environ['JOBLIB_HDFS_NAMENODE']
NAMENODE = os.environ['JOBLIB_HDFS_NAMENODE']


@mark.parametrize("compress", [True, False])
Expand All @@ -32,10 +32,12 @@ def func(arg):

register_hdfs_store_backend()

mem = Memory(location=tmpdir.strpath[1:], host=__NAMENODE,
backend='hdfs', user='test', verbose=0, compress=compress)
mem = Memory(location=tmpdir.strpath[1:], backend='hdfs',
verbose=0, compress=compress,
store_options=dict(host=NAMENODE, user='test'))

assert mem.store.cachedir == os.path.join(tmpdir.strpath[1:], "joblib")
assert mem.store_backend._location == os.path.join(tmpdir.strpath[1:],
"joblib")

func = mem.cache(func)

Expand Down Expand Up @@ -69,26 +71,28 @@ def test_root_location_replacement(tmpdir):

register_hdfs_store_backend()

mem = Memory(location=location, host=__NAMENODE,
backend='hdfs', user='test', verbose=100)
mem = Memory(location=location, backend='hdfs', verbose=100,
store_options=dict(host=NAMENODE, user='test'))

assert mem.store.cachedir == os.path.join(tmpdir.strpath[1:], "joblib")
assert mem.store_backend._location == os.path.join(tmpdir.strpath[1:],
"joblib")


def test_passing_backend_base_to_memory(tmpdir):
"""Test passing a store as location in memory is correctly handled."""

register_hdfs_store_backend()

mem = Memory(location=tmpdir.strpath, host=__NAMENODE,
backend='hdfs', user='test', verbose=100)
mem = Memory(location=tmpdir.strpath, backend='hdfs', verbose=100,
store_options=dict(host=NAMENODE, user='test'))

assert mem.store.cachedir == os.path.join(tmpdir.strpath[1:], "joblib")
assert mem.store_backend._location == os.path.join(tmpdir.strpath[1:],
"joblib")

mem2 = Memory(location=mem.store, host=__NAMENODE,
backend='hdfs', user='test', verbose=100)
mem2 = Memory(location=mem.store_backend, backend='hdfs', verbose=100,
store_options=dict(host=NAMENODE, user='test'))

assert mem2.store.cachedir == mem.store.cachedir
assert mem2.store_backend._location == mem.store_backend._location


def test_clear_cache(tmpdir):
Expand All @@ -100,34 +104,36 @@ def func(arg):

register_hdfs_store_backend()

mem = Memory(location=tmpdir.strpath, host=__NAMENODE,
backend='hdfs', user='test', verbose=100, compress=False)
mem = Memory(location=tmpdir.strpath, backend='hdfs',
verbose=100, compress=False,
store_options=dict(host=NAMENODE, user='test'))
cached_func = mem.cache(func)
cached_func("test")

mem.clear()

assert not mem.store.object_exists(mem.store.cachedir)
assert not mem.store_backend._item_exists(mem.store_backend._location)


def test_get_cache_items(tmpdir):
def test_get_items(tmpdir):
"""Test cache items listing."""
def func(arg):
"""Dummy function."""
return arg

register_hdfs_store_backend()

mem = Memory(location=tmpdir.strpath, host=__NAMENODE,
backend='hdfs', user='test', verbose=100, compress=False)
assert not mem.store.get_cache_items()
mem = Memory(location=tmpdir.strpath, backend='hdfs',
verbose=100, compress=False,
store_options=dict(host=NAMENODE, user='test'))
assert not mem.store_backend.get_items()

cached_func = mem.cache(func)
for arg in ["test1", "test2", "test3"]:
cached_func(arg)

# get_cache_items always returns an empty list for the moment
assert len(mem.store.get_cache_items()) == 3
# get_items always returns an empty list for the moment
assert len(mem.store_backend.get_items()) == 3

mem.clear()
assert not mem.store.get_cache_items()
assert not mem.store_backend.get_items()
2 changes: 1 addition & 1 deletion joblibhadoop/resources/joblib-hadoop-environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ dependencies:
- pip:
- setuptools
- git+https://github.com/dask/hdfs3.git@master#egg=hdfs3
- git+https://github.com/aabadie/joblib.git@storage_backend_2#egg=joblib
- git+https://github.com/joblib/joblib.git#egg=joblib
- git+https://github.com/joblib/joblib-hadoop@master#egg=joblibhadoop
10 changes: 10 additions & 0 deletions requirements-tests.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Numerical dependencies used by tests
numpy

# For tests
pytest
pytest-cov
pytest-pep8
pytest-env
codecov
tox
16 changes: 2 additions & 14 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,14 +1,2 @@
# Needs joblib specific branch for this project since it's not yet merged.
# See https://github.com/joblib/pull/397 for more details.
-e git+https://github.com/aabadie/joblib.git@storage_backend_2#egg=joblib

# Regular dependencies
numpy

# For tests
pytest
pytest-cov
pytest-pep8
pytest-env
codecov
tox
# Needs joblib master branch
-e git+https://github.com/joblib/joblib.git#egg=joblib

0 comments on commit b382dcc

Please sign in to comment.