diff --git a/luigi/contrib/hdfs/format.py b/luigi/contrib/hdfs/format.py index 4a52b49eec..1856abf3f4 100644 --- a/luigi/contrib/hdfs/format.py +++ b/luigi/contrib/hdfs/format.py @@ -3,11 +3,16 @@ import os from luigi.contrib.hdfs.config import load_hadoop_cmd from luigi.contrib.hdfs import config as hdfs_config -from luigi.contrib.hdfs.clients import remove, rename, mkdir +from luigi.contrib.hdfs.clients import remove, rename, mkdir, listdir +from luigi.contrib.hdfs.error import HDFSCliError logger = logging.getLogger('luigi-interface') +class HdfsAtomicWriteError(IOError): + pass + + class HdfsReadPipe(luigi.format.InputPipeProcessWrapper): def __init__(self, path): @@ -42,7 +47,12 @@ def abort(self): def close(self): super(HdfsAtomicWritePipe, self).close() - rename(self.tmppath, self.path) + try: + remove(self.path) + except HDFSCliError: + pass + if not all(result['result'] for result in rename(self.tmppath, self.path) or []): + raise HdfsAtomicWriteError('Atomic write to {} failed'.format(self.path)) class HdfsAtomicWriteDirPipe(luigi.format.OutputPipeProcessWrapper): @@ -64,7 +74,18 @@ def abort(self): def close(self): super(HdfsAtomicWriteDirPipe, self).close() - rename(self.tmppath, self.path) + try: + remove(self.path) + except HDFSCliError: + pass + + # it's unlikely to fail in this way but better safe than sorry + if not all(result['result'] for result in rename(self.tmppath, self.path) or []): + raise HdfsAtomicWriteError('Atomic write to {} failed'.format(self.path)) + + if os.path.basename(self.tmppath) in map(os.path.basename, listdir(self.path)): + remove(self.path) + raise HdfsAtomicWriteError('Atomic write to {} failed'.format(self.path)) class PlainFormat(luigi.format.Format): diff --git a/test/contrib/hdfs_test.py b/test/contrib/hdfs_test.py index 794c47120b..af48f67999 100644 --- a/test/contrib/hdfs_test.py +++ b/test/contrib/hdfs_test.py @@ -30,6 +30,7 @@ from luigi import six from luigi.contrib.hdfs import SnakebiteHdfsClient from luigi.contrib.hdfs.hadoopcli_clients import HdfsClient +from luigi.contrib.hdfs.format import HdfsAtomicWriteError, HdfsReadPipe from luigi.contrib.target import CascadingClient from minicluster import MiniClusterTestCase from nose.plugins.attrib import attr @@ -169,6 +170,59 @@ def foo(): self.assertRaises(TestException, foo) self.assertFalse(self.fs.exists(testpath)) + def test_target_path_exists(self): + testpath = self._test_file() + try: + if self.fs.exists(testpath): + self.fs.remove(testpath, skip_trash=True) + except: + if self.fs.exists(self._test_dir()): + self.fs.remove(self._test_dir(), skip_trash=True) + + with hdfs.HdfsAtomicWritePipe(testpath) as fobj: + fobj.write(b'test1') + with hdfs.HdfsAtomicWritePipe(testpath) as fobj: + fobj.write(b'test2') + + with HdfsReadPipe(testpath) as read_pipe: + contents = read_pipe.read() + + self.assertEqual(b'test2', contents) + + @mock.patch('luigi.contrib.hdfs.format.remove') + def test_target_path_exists_rename_fails_hadoopcli(self, remove): + testpath = self._test_file() + try: + if self.fs.exists(testpath): + self.fs.remove(testpath, skip_trash=True) + except: + if self.fs.exists(self._test_dir()): + self.fs.remove(self._test_dir(), skip_trash=True) + + with hdfs.HdfsAtomicWritePipe(testpath) as fobj: + fobj.write(b'test1') + fobj = hdfs.HdfsAtomicWritePipe(testpath) + self.assertRaises(hdfs.HDFSCliError, fobj.close) + + @unittest.skipIf(six.PY3, "snakebite doesn't work on Python 3 yet.") + @helpers.with_config({"hdfs": {"client": "snakebite"}}) + @mock.patch('luigi.contrib.hdfs.format.rename') + @mock.patch('luigi.contrib.hdfs.format.remove') + def test_target_path_exists_rename_fails_snakebite(self, remove, rename): + rename.side_effect = hdfs.get_autoconfig_client(threading.local()).rename + testpath = self._test_file() + try: + if self.fs.exists(testpath): + self.fs.remove(testpath, skip_trash=True) + except: + if self.fs.exists(self._test_dir()): + self.fs.remove(self._test_dir(), skip_trash=True) + + with hdfs.HdfsAtomicWritePipe(testpath) as fobj: + fobj.write(b'test1') + fobj = hdfs.HdfsAtomicWritePipe(testpath) + self.assertRaises(HdfsAtomicWriteError, fobj.close) + @attr('minicluster') class HdfsAtomicWriteDirPipeTests(MiniClusterTestCase): @@ -212,6 +266,42 @@ def foo(): self.assertRaises(TestException, foo) self.assertFalse(self.fs.exists(self.path)) + def test_target_path_exists(self): + with hdfs.HdfsAtomicWriteDirPipe(self.path) as fobj: + fobj.write(b'test1') + with hdfs.HdfsAtomicWritePipe(self.path) as fobj: + fobj.write(b'test2') + + with HdfsReadPipe(self.path) as read_pipe: + contents = read_pipe.read() + + self.assertEqual(b'test2', contents) + + @mock.patch('luigi.contrib.hdfs.format.remove') + def test_rename_into_existing_subdir_after_failed_remove(self, remove): + with hdfs.HdfsAtomicWriteDirPipe(self.path) as fobj: + fobj.write(b'test1') + fobj = hdfs.HdfsAtomicWriteDirPipe(self.path) + self.assertRaises(HdfsAtomicWriteError, fobj.close) + + @mock.patch('luigi.contrib.hdfs.format.remove') + def test_target_path_exists_rename_fails_hadoopcli(self, remove): + with hdfs.HdfsAtomicWritePipe(self.path) as fobj: + fobj.write(b'test1') + fobj = hdfs.HdfsAtomicWriteDirPipe(self.path) + self.assertRaises(hdfs.HDFSCliError, fobj.close) + + @unittest.skipIf(six.PY3, "snakebite doesn't work on Python 3 yet.") + @helpers.with_config({"hdfs": {"client": "snakebite"}}) + @mock.patch('luigi.contrib.hdfs.format.rename') + @mock.patch('luigi.contrib.hdfs.format.remove') + def test_target_path_exists_rename_fails_snakebite(self, remove, rename): + rename.side_effect = hdfs.get_autoconfig_client(threading.local()).rename + with hdfs.HdfsAtomicWritePipe(self.path) as fobj: + fobj.write(b'test1') + fobj = hdfs.HdfsAtomicWriteDirPipe(self.path) + self.assertRaises(HdfsAtomicWriteError, fobj.close) + # This class is a mixin, and does not inherit from TestCase, in order to avoid running the base class as a test case. @attr('minicluster')