Skip to content

Commit

Permalink
Check whether renames worked in atomic hdfs pipes (#2119)
Browse files Browse the repository at this point in the history
Using the snakebite client, renaming a file to an existing file fails
without raising an exception. Instead, it returns a list of moves and
whether each ones succeeded. This means that when using the snakebite
client, atomic writes to existing files fail silently versus failing
while throwing an error with the hadoopcli client.

There are two ways to fix this. First, we can simply remove the target
file if it already exists. Second, we can ensure that we read the
snakebite error and throw it if it happens. This PR does both.

For atomic directory writes, this also adds a check for whether the
directory was renamed to the target or moved inside an existing target
directory.
  • Loading branch information
daveFNbuck authored and Tarrasch committed May 22, 2017
1 parent 50ec7fe commit a2f63c6
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 3 deletions.
27 changes: 24 additions & 3 deletions luigi/contrib/hdfs/format.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@
import os
from luigi.contrib.hdfs.config import load_hadoop_cmd
from luigi.contrib.hdfs import config as hdfs_config
from luigi.contrib.hdfs.clients import remove, rename, mkdir
from luigi.contrib.hdfs.clients import remove, rename, mkdir, listdir
from luigi.contrib.hdfs.error import HDFSCliError

logger = logging.getLogger('luigi-interface')


class HdfsAtomicWriteError(IOError):
pass


class HdfsReadPipe(luigi.format.InputPipeProcessWrapper):

def __init__(self, path):
Expand Down Expand Up @@ -42,7 +47,12 @@ def abort(self):

def close(self):
super(HdfsAtomicWritePipe, self).close()
rename(self.tmppath, self.path)
try:
remove(self.path)
except HDFSCliError:
pass
if not all(result['result'] for result in rename(self.tmppath, self.path) or []):
raise HdfsAtomicWriteError('Atomic write to {} failed'.format(self.path))


class HdfsAtomicWriteDirPipe(luigi.format.OutputPipeProcessWrapper):
Expand All @@ -64,7 +74,18 @@ def abort(self):

def close(self):
super(HdfsAtomicWriteDirPipe, self).close()
rename(self.tmppath, self.path)
try:
remove(self.path)
except HDFSCliError:
pass

# it's unlikely to fail in this way but better safe than sorry
if not all(result['result'] for result in rename(self.tmppath, self.path) or []):
raise HdfsAtomicWriteError('Atomic write to {} failed'.format(self.path))

if os.path.basename(self.tmppath) in map(os.path.basename, listdir(self.path)):
remove(self.path)
raise HdfsAtomicWriteError('Atomic write to {} failed'.format(self.path))


class PlainFormat(luigi.format.Format):
Expand Down
90 changes: 90 additions & 0 deletions test/contrib/hdfs_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from luigi import six
from luigi.contrib.hdfs import SnakebiteHdfsClient
from luigi.contrib.hdfs.hadoopcli_clients import HdfsClient
from luigi.contrib.hdfs.format import HdfsAtomicWriteError, HdfsReadPipe
from luigi.contrib.target import CascadingClient
from minicluster import MiniClusterTestCase
from nose.plugins.attrib import attr
Expand Down Expand Up @@ -169,6 +170,59 @@ def foo():
self.assertRaises(TestException, foo)
self.assertFalse(self.fs.exists(testpath))

def test_target_path_exists(self):
testpath = self._test_file()
try:
if self.fs.exists(testpath):
self.fs.remove(testpath, skip_trash=True)
except:
if self.fs.exists(self._test_dir()):
self.fs.remove(self._test_dir(), skip_trash=True)

with hdfs.HdfsAtomicWritePipe(testpath) as fobj:
fobj.write(b'test1')
with hdfs.HdfsAtomicWritePipe(testpath) as fobj:
fobj.write(b'test2')

with HdfsReadPipe(testpath) as read_pipe:
contents = read_pipe.read()

self.assertEqual(b'test2', contents)

@mock.patch('luigi.contrib.hdfs.format.remove')
def test_target_path_exists_rename_fails_hadoopcli(self, remove):
testpath = self._test_file()
try:
if self.fs.exists(testpath):
self.fs.remove(testpath, skip_trash=True)
except:
if self.fs.exists(self._test_dir()):
self.fs.remove(self._test_dir(), skip_trash=True)

with hdfs.HdfsAtomicWritePipe(testpath) as fobj:
fobj.write(b'test1')
fobj = hdfs.HdfsAtomicWritePipe(testpath)
self.assertRaises(hdfs.HDFSCliError, fobj.close)

@unittest.skipIf(six.PY3, "snakebite doesn't work on Python 3 yet.")
@helpers.with_config({"hdfs": {"client": "snakebite"}})
@mock.patch('luigi.contrib.hdfs.format.rename')
@mock.patch('luigi.contrib.hdfs.format.remove')
def test_target_path_exists_rename_fails_snakebite(self, remove, rename):
rename.side_effect = hdfs.get_autoconfig_client(threading.local()).rename
testpath = self._test_file()
try:
if self.fs.exists(testpath):
self.fs.remove(testpath, skip_trash=True)
except:
if self.fs.exists(self._test_dir()):
self.fs.remove(self._test_dir(), skip_trash=True)

with hdfs.HdfsAtomicWritePipe(testpath) as fobj:
fobj.write(b'test1')
fobj = hdfs.HdfsAtomicWritePipe(testpath)
self.assertRaises(HdfsAtomicWriteError, fobj.close)


@attr('minicluster')
class HdfsAtomicWriteDirPipeTests(MiniClusterTestCase):
Expand Down Expand Up @@ -212,6 +266,42 @@ def foo():
self.assertRaises(TestException, foo)
self.assertFalse(self.fs.exists(self.path))

def test_target_path_exists(self):
with hdfs.HdfsAtomicWriteDirPipe(self.path) as fobj:
fobj.write(b'test1')
with hdfs.HdfsAtomicWritePipe(self.path) as fobj:
fobj.write(b'test2')

with HdfsReadPipe(self.path) as read_pipe:
contents = read_pipe.read()

self.assertEqual(b'test2', contents)

@mock.patch('luigi.contrib.hdfs.format.remove')
def test_rename_into_existing_subdir_after_failed_remove(self, remove):
with hdfs.HdfsAtomicWriteDirPipe(self.path) as fobj:
fobj.write(b'test1')
fobj = hdfs.HdfsAtomicWriteDirPipe(self.path)
self.assertRaises(HdfsAtomicWriteError, fobj.close)

@mock.patch('luigi.contrib.hdfs.format.remove')
def test_target_path_exists_rename_fails_hadoopcli(self, remove):
with hdfs.HdfsAtomicWritePipe(self.path) as fobj:
fobj.write(b'test1')
fobj = hdfs.HdfsAtomicWriteDirPipe(self.path)
self.assertRaises(hdfs.HDFSCliError, fobj.close)

@unittest.skipIf(six.PY3, "snakebite doesn't work on Python 3 yet.")
@helpers.with_config({"hdfs": {"client": "snakebite"}})
@mock.patch('luigi.contrib.hdfs.format.rename')
@mock.patch('luigi.contrib.hdfs.format.remove')
def test_target_path_exists_rename_fails_snakebite(self, remove, rename):
rename.side_effect = hdfs.get_autoconfig_client(threading.local()).rename
with hdfs.HdfsAtomicWritePipe(self.path) as fobj:
fobj.write(b'test1')
fobj = hdfs.HdfsAtomicWriteDirPipe(self.path)
self.assertRaises(HdfsAtomicWriteError, fobj.close)


# This class is a mixin, and does not inherit from TestCase, in order to avoid running the base class as a test case.
@attr('minicluster')
Expand Down

0 comments on commit a2f63c6

Please sign in to comment.