Skip to content

Commit

Permalink
create: implement retries for individual fs files
Browse files Browse the repository at this point in the history
Errors handled for backup src files:
- BackupOSError (converted from OSError), e.g. I/O Error
- BackupError (stats race, file changed while we backed it up)

Error Handling:
- retry the same file after some sleep time
- sleep time starts from 1ms, increases exponentially up to 10s
- 10 tries

If retrying does not help:
- BackupOSError: skip the file, log it with "E" status
- BackupError: last try will back it up, log it with "C" status

Works for:
- borg create's normal (builtin) fs recursion
- borg create --paths-from-command
- borg create --paths-from-stdin

Notes:
- update stats.files_stats late (so we don't get wrong
  stats in case of e.g. IOErrors while reading the file).
- _process_any: no changes to the big block, just indented
  for adding the retry loop and the try/except.
- test_create_erroneous_file succeeds because we retry the file.
  • Loading branch information
ThomasWaldmann committed Feb 23, 2023
1 parent 6da5b7d commit 7e6afc9
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 61 deletions.
17 changes: 10 additions & 7 deletions src/borg/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -1448,7 +1448,7 @@ def process_pipe(self, *, path, cache, fd, mode, user=None, group=None):
self.add_item(item, stats=self.stats)
return status

def process_file(self, *, path, parent_fd, name, st, cache, flags=flags_normal):
def process_file(self, *, path, parent_fd, name, st, cache, flags=flags_normal, last_try=False):
with self.create_helper(path, st, None) as (item, status, hardlinked, hl_chunks): # no status yet
with OsOpen(path=path, parent_fd=parent_fd, name=name, flags=flags, noatime=True) as fd:
with backup_io("fstat"):
Expand Down Expand Up @@ -1499,9 +1499,8 @@ def process_file(self, *, path, parent_fd, name, st, cache, flags=flags_normal):
else:
status = "M" if known else "A" # regular file, modified or added
self.print_file_status(status, path)
self.stats.files_stats[status] += 1
status = None # we already printed the status
# Only chunkify the file if needed
changed_while_backup = False
if "chunks" not in item:
with backup_io("read"):
self.process_file_chunks(
Expand All @@ -1512,9 +1511,7 @@ def process_file(self, *, path, parent_fd, name, st, cache, flags=flags_normal):
backup_io_iter(self.chunker.chunkify(None, fd)),
)
self.stats.chunking_time = self.chunker.chunking_time
if is_win32:
changed_while_backup = False # TODO
else:
if not is_win32: # TODO for win32
with backup_io("fstat2"):
st2 = os.fstat(fd)
# special files:
Expand All @@ -1523,13 +1520,19 @@ def process_file(self, *, path, parent_fd, name, st, cache, flags=flags_normal):
changed_while_backup = not is_special_file and st.st_ctime_ns != st2.st_ctime_ns
if changed_while_backup:
# regular file changed while we backed it up, might be inconsistent/corrupt!
status = "C"
if last_try:
status = "C" # crap! retries did not help.
else:
raise BackupError("file changed while we read it!")
if not is_special_file and not changed_while_backup:
# we must not memorize special files, because the contents of e.g. a
# block or char device will change without its mtime/size/inode changing.
# also, we must not memorize a potentially inconsistent/corrupt file that
# changed while we backed it up.
cache.memorize_file(hashed_path, path_hash, st, [c.id for c in item.chunks])
self.stats.files_stats[status] += 1 # must be done late
if not changed_while_backup:
status = None # we already called print_file_status
self.stats.nfiles += 1
item.update(self.metadata_collector.stat_ext_attrs(st, path, fd=fd))
item.get_size(memorize=True)
Expand Down
146 changes: 96 additions & 50 deletions src/borg/archiver/create_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,59 +286,105 @@ def _process_any(self, *, path, parent_fd, name, st, fso, cache, read_special, d

if dry_run:
return "+" # included
elif stat.S_ISREG(st.st_mode):
return fso.process_file(path=path, parent_fd=parent_fd, name=name, st=st, cache=cache)
elif stat.S_ISDIR(st.st_mode):
return fso.process_dir(path=path, parent_fd=parent_fd, name=name, st=st)
elif stat.S_ISLNK(st.st_mode):
if not read_special:
return fso.process_symlink(path=path, parent_fd=parent_fd, name=name, st=st)
else:
try:
st_target = os_stat(path=path, parent_fd=parent_fd, name=name, follow_symlinks=True)
except OSError:
special = False
else:
special = is_special(st_target.st_mode)
if special:
MAX_RETRIES = 10 # count includes the initial try (initial try == "retry 0")
for retry in range(MAX_RETRIES):
last_try = retry == MAX_RETRIES - 1
try:
if stat.S_ISREG(st.st_mode):
return fso.process_file(
path=path, parent_fd=parent_fd, name=name, st=st_target, cache=cache, flags=flags_special_follow
path=path, parent_fd=parent_fd, name=name, st=st, cache=cache, last_try=last_try
)
elif stat.S_ISDIR(st.st_mode):
return fso.process_dir(path=path, parent_fd=parent_fd, name=name, st=st)
elif stat.S_ISLNK(st.st_mode):
if not read_special:
return fso.process_symlink(path=path, parent_fd=parent_fd, name=name, st=st)
else:
try:
st_target = os_stat(path=path, parent_fd=parent_fd, name=name, follow_symlinks=True)
except OSError:
special = False
else:
special = is_special(st_target.st_mode)
if special:
return fso.process_file(
path=path,
parent_fd=parent_fd,
name=name,
st=st_target,
cache=cache,
flags=flags_special_follow,
last_try=last_try,
)
else:
return fso.process_symlink(path=path, parent_fd=parent_fd, name=name, st=st)
elif stat.S_ISFIFO(st.st_mode):
if not read_special:
return fso.process_fifo(path=path, parent_fd=parent_fd, name=name, st=st)
else:
return fso.process_file(
path=path,
parent_fd=parent_fd,
name=name,
st=st,
cache=cache,
flags=flags_special,
last_try=last_try,
)
elif stat.S_ISCHR(st.st_mode):
if not read_special:
return fso.process_dev(path=path, parent_fd=parent_fd, name=name, st=st, dev_type="c")
else:
return fso.process_file(
path=path,
parent_fd=parent_fd,
name=name,
st=st,
cache=cache,
flags=flags_special,
last_try=last_try,
)
elif stat.S_ISBLK(st.st_mode):
if not read_special:
return fso.process_dev(path=path, parent_fd=parent_fd, name=name, st=st, dev_type="b")
else:
return fso.process_file(
path=path,
parent_fd=parent_fd,
name=name,
st=st,
cache=cache,
flags=flags_special,
last_try=last_try,
)
elif stat.S_ISSOCK(st.st_mode):
# Ignore unix sockets
return
elif stat.S_ISDOOR(st.st_mode):
# Ignore Solaris doors
return
elif stat.S_ISPORT(st.st_mode):
# Ignore Solaris event ports
return
else:
self.print_warning("Unknown file type: %s", path)
return
except (BackupError, BackupOSError) as err:
# sleep a bit, so temporary problems might go away...
sleep_s = 1000.0 / 1e6 * 10 ** (retry / 2) # retry 0: 1ms, retry 6: 1s, ...
time.sleep(sleep_s)
if retry < MAX_RETRIES - 1:
logger.warning(
f"{path}: {err}, slept {sleep_s:.3f}s, next: retry: {retry + 1} of {MAX_RETRIES - 1}..."
)
else:
return fso.process_symlink(path=path, parent_fd=parent_fd, name=name, st=st)
elif stat.S_ISFIFO(st.st_mode):
if not read_special:
return fso.process_fifo(path=path, parent_fd=parent_fd, name=name, st=st)
else:
return fso.process_file(
path=path, parent_fd=parent_fd, name=name, st=st, cache=cache, flags=flags_special
)
elif stat.S_ISCHR(st.st_mode):
if not read_special:
return fso.process_dev(path=path, parent_fd=parent_fd, name=name, st=st, dev_type="c")
else:
return fso.process_file(
path=path, parent_fd=parent_fd, name=name, st=st, cache=cache, flags=flags_special
)
elif stat.S_ISBLK(st.st_mode):
if not read_special:
return fso.process_dev(path=path, parent_fd=parent_fd, name=name, st=st, dev_type="b")
else:
return fso.process_file(
path=path, parent_fd=parent_fd, name=name, st=st, cache=cache, flags=flags_special
)
elif stat.S_ISSOCK(st.st_mode):
# Ignore unix sockets
return
elif stat.S_ISDOOR(st.st_mode):
# Ignore Solaris doors
return
elif stat.S_ISPORT(st.st_mode):
# Ignore Solaris event ports
return
else:
self.print_warning("Unknown file type: %s", path)
return
# giving up with retries, error will be dealt with (logged) by upper error handler
raise
# we better do a fresh stat on the file, just to make sure to get the current file
# mode right (which could have changed due to a race condition and is important for
# dispatching) and also to get current inode number of that file.
with backup_io("stat"):
st = os_stat(path=path, parent_fd=parent_fd, name=name, follow_symlinks=False)

def _rec_walk(
self,
Expand Down
8 changes: 4 additions & 4 deletions src/borg/testsuite/archiver/create_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,20 +201,20 @@ def test_create_erroneous_file(self):
out = self.cmd(
f"--repo={self.repository_location}",
"create",
f"--chunker-params=fail,{chunk_size},RRRERRR",
f"--chunker-params=fail,{chunk_size},rrrEEErrrr",
"--paths-from-stdin",
"--list",
"test",
input=flist.encode(),
exit_code=1,
exit_code=0,
)
assert "E input/file2" in out
assert "E input/file2" not in out # we managed to read it in the 3rd retry (after 3 failed reads)
# repo looking good overall? checks for rc == 0.
self.cmd(f"--repo={self.repository_location}", "check", "--debug")
# check files in created archive
out = self.cmd(f"--repo={self.repository_location}", "list", "test")
assert "input/file1" in out
assert "input/file2" not in out
assert "input/file2" in out
assert "input/file3" in out

def test_create_content_from_command(self):
Expand Down

0 comments on commit 7e6afc9

Please sign in to comment.