From 7e6afc93e918bc5549e8bd4d4153d1a47bc32566 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Tue, 14 Feb 2023 00:01:20 +0100 Subject: [PATCH] create: implement retries for individual fs files Errors handled for backup src files: - BackupOSError (converted from OSError), e.g. I/O Error - BackupError (stats race, file changed while we backed it up) Error Handling: - retry the same file after some sleep time - sleep time starts from 1ms, increases exponentially up to 10s - 10 tries If retrying does not help: - BackupOSError: skip the file, log it with "E" status - BackupError: last try will back it up, log it with "C" status Works for: - borg create's normal (builtin) fs recursion - borg create --paths-from-command - borg create --paths-from-stdin Notes: - update stats.files_stats late (so we don't get wrong stats in case of e.g. IOErrors while reading the file). - _process_any: no changes to the big block, just indented for adding the retry loop and the try/except. - test_create_erroneous_file succeeds because we retry the file. --- src/borg/archive.py | 17 +-- src/borg/archiver/create_cmd.py | 146 ++++++++++++++-------- src/borg/testsuite/archiver/create_cmd.py | 8 +- 3 files changed, 110 insertions(+), 61 deletions(-) diff --git a/src/borg/archive.py b/src/borg/archive.py index 1b7343db9a..e16db7a4ee 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -1448,7 +1448,7 @@ def process_pipe(self, *, path, cache, fd, mode, user=None, group=None): self.add_item(item, stats=self.stats) return status - def process_file(self, *, path, parent_fd, name, st, cache, flags=flags_normal): + def process_file(self, *, path, parent_fd, name, st, cache, flags=flags_normal, last_try=False): with self.create_helper(path, st, None) as (item, status, hardlinked, hl_chunks): # no status yet with OsOpen(path=path, parent_fd=parent_fd, name=name, flags=flags, noatime=True) as fd: with backup_io("fstat"): @@ -1499,9 +1499,8 @@ def process_file(self, *, path, parent_fd, name, st, cache, flags=flags_normal): else: status = "M" if known else "A" # regular file, modified or added self.print_file_status(status, path) - self.stats.files_stats[status] += 1 - status = None # we already printed the status # Only chunkify the file if needed + changed_while_backup = False if "chunks" not in item: with backup_io("read"): self.process_file_chunks( @@ -1512,9 +1511,7 @@ def process_file(self, *, path, parent_fd, name, st, cache, flags=flags_normal): backup_io_iter(self.chunker.chunkify(None, fd)), ) self.stats.chunking_time = self.chunker.chunking_time - if is_win32: - changed_while_backup = False # TODO - else: + if not is_win32: # TODO for win32 with backup_io("fstat2"): st2 = os.fstat(fd) # special files: @@ -1523,13 +1520,19 @@ def process_file(self, *, path, parent_fd, name, st, cache, flags=flags_normal): changed_while_backup = not is_special_file and st.st_ctime_ns != st2.st_ctime_ns if changed_while_backup: # regular file changed while we backed it up, might be inconsistent/corrupt! - status = "C" + if last_try: + status = "C" # crap! retries did not help. + else: + raise BackupError("file changed while we read it!") if not is_special_file and not changed_while_backup: # we must not memorize special files, because the contents of e.g. a # block or char device will change without its mtime/size/inode changing. # also, we must not memorize a potentially inconsistent/corrupt file that # changed while we backed it up. cache.memorize_file(hashed_path, path_hash, st, [c.id for c in item.chunks]) + self.stats.files_stats[status] += 1 # must be done late + if not changed_while_backup: + status = None # we already called print_file_status self.stats.nfiles += 1 item.update(self.metadata_collector.stat_ext_attrs(st, path, fd=fd)) item.get_size(memorize=True) diff --git a/src/borg/archiver/create_cmd.py b/src/borg/archiver/create_cmd.py index b3d2bf7bbc..9e4c3311b3 100644 --- a/src/borg/archiver/create_cmd.py +++ b/src/borg/archiver/create_cmd.py @@ -286,59 +286,105 @@ def _process_any(self, *, path, parent_fd, name, st, fso, cache, read_special, d if dry_run: return "+" # included - elif stat.S_ISREG(st.st_mode): - return fso.process_file(path=path, parent_fd=parent_fd, name=name, st=st, cache=cache) - elif stat.S_ISDIR(st.st_mode): - return fso.process_dir(path=path, parent_fd=parent_fd, name=name, st=st) - elif stat.S_ISLNK(st.st_mode): - if not read_special: - return fso.process_symlink(path=path, parent_fd=parent_fd, name=name, st=st) - else: - try: - st_target = os_stat(path=path, parent_fd=parent_fd, name=name, follow_symlinks=True) - except OSError: - special = False - else: - special = is_special(st_target.st_mode) - if special: + MAX_RETRIES = 10 # count includes the initial try (initial try == "retry 0") + for retry in range(MAX_RETRIES): + last_try = retry == MAX_RETRIES - 1 + try: + if stat.S_ISREG(st.st_mode): return fso.process_file( - path=path, parent_fd=parent_fd, name=name, st=st_target, cache=cache, flags=flags_special_follow + path=path, parent_fd=parent_fd, name=name, st=st, cache=cache, last_try=last_try + ) + elif stat.S_ISDIR(st.st_mode): + return fso.process_dir(path=path, parent_fd=parent_fd, name=name, st=st) + elif stat.S_ISLNK(st.st_mode): + if not read_special: + return fso.process_symlink(path=path, parent_fd=parent_fd, name=name, st=st) + else: + try: + st_target = os_stat(path=path, parent_fd=parent_fd, name=name, follow_symlinks=True) + except OSError: + special = False + else: + special = is_special(st_target.st_mode) + if special: + return fso.process_file( + path=path, + parent_fd=parent_fd, + name=name, + st=st_target, + cache=cache, + flags=flags_special_follow, + last_try=last_try, + ) + else: + return fso.process_symlink(path=path, parent_fd=parent_fd, name=name, st=st) + elif stat.S_ISFIFO(st.st_mode): + if not read_special: + return fso.process_fifo(path=path, parent_fd=parent_fd, name=name, st=st) + else: + return fso.process_file( + path=path, + parent_fd=parent_fd, + name=name, + st=st, + cache=cache, + flags=flags_special, + last_try=last_try, + ) + elif stat.S_ISCHR(st.st_mode): + if not read_special: + return fso.process_dev(path=path, parent_fd=parent_fd, name=name, st=st, dev_type="c") + else: + return fso.process_file( + path=path, + parent_fd=parent_fd, + name=name, + st=st, + cache=cache, + flags=flags_special, + last_try=last_try, + ) + elif stat.S_ISBLK(st.st_mode): + if not read_special: + return fso.process_dev(path=path, parent_fd=parent_fd, name=name, st=st, dev_type="b") + else: + return fso.process_file( + path=path, + parent_fd=parent_fd, + name=name, + st=st, + cache=cache, + flags=flags_special, + last_try=last_try, + ) + elif stat.S_ISSOCK(st.st_mode): + # Ignore unix sockets + return + elif stat.S_ISDOOR(st.st_mode): + # Ignore Solaris doors + return + elif stat.S_ISPORT(st.st_mode): + # Ignore Solaris event ports + return + else: + self.print_warning("Unknown file type: %s", path) + return + except (BackupError, BackupOSError) as err: + # sleep a bit, so temporary problems might go away... + sleep_s = 1000.0 / 1e6 * 10 ** (retry / 2) # retry 0: 1ms, retry 6: 1s, ... + time.sleep(sleep_s) + if retry < MAX_RETRIES - 1: + logger.warning( + f"{path}: {err}, slept {sleep_s:.3f}s, next: retry: {retry + 1} of {MAX_RETRIES - 1}..." ) else: - return fso.process_symlink(path=path, parent_fd=parent_fd, name=name, st=st) - elif stat.S_ISFIFO(st.st_mode): - if not read_special: - return fso.process_fifo(path=path, parent_fd=parent_fd, name=name, st=st) - else: - return fso.process_file( - path=path, parent_fd=parent_fd, name=name, st=st, cache=cache, flags=flags_special - ) - elif stat.S_ISCHR(st.st_mode): - if not read_special: - return fso.process_dev(path=path, parent_fd=parent_fd, name=name, st=st, dev_type="c") - else: - return fso.process_file( - path=path, parent_fd=parent_fd, name=name, st=st, cache=cache, flags=flags_special - ) - elif stat.S_ISBLK(st.st_mode): - if not read_special: - return fso.process_dev(path=path, parent_fd=parent_fd, name=name, st=st, dev_type="b") - else: - return fso.process_file( - path=path, parent_fd=parent_fd, name=name, st=st, cache=cache, flags=flags_special - ) - elif stat.S_ISSOCK(st.st_mode): - # Ignore unix sockets - return - elif stat.S_ISDOOR(st.st_mode): - # Ignore Solaris doors - return - elif stat.S_ISPORT(st.st_mode): - # Ignore Solaris event ports - return - else: - self.print_warning("Unknown file type: %s", path) - return + # giving up with retries, error will be dealt with (logged) by upper error handler + raise + # we better do a fresh stat on the file, just to make sure to get the current file + # mode right (which could have changed due to a race condition and is important for + # dispatching) and also to get current inode number of that file. + with backup_io("stat"): + st = os_stat(path=path, parent_fd=parent_fd, name=name, follow_symlinks=False) def _rec_walk( self, diff --git a/src/borg/testsuite/archiver/create_cmd.py b/src/borg/testsuite/archiver/create_cmd.py index 63310b6b6a..760d70cdb3 100644 --- a/src/borg/testsuite/archiver/create_cmd.py +++ b/src/borg/testsuite/archiver/create_cmd.py @@ -201,20 +201,20 @@ def test_create_erroneous_file(self): out = self.cmd( f"--repo={self.repository_location}", "create", - f"--chunker-params=fail,{chunk_size},RRRERRR", + f"--chunker-params=fail,{chunk_size},rrrEEErrrr", "--paths-from-stdin", "--list", "test", input=flist.encode(), - exit_code=1, + exit_code=0, ) - assert "E input/file2" in out + assert "E input/file2" not in out # we managed to read it in the 3rd retry (after 3 failed reads) # repo looking good overall? checks for rc == 0. self.cmd(f"--repo={self.repository_location}", "check", "--debug") # check files in created archive out = self.cmd(f"--repo={self.repository_location}", "list", "test") assert "input/file1" in out - assert "input/file2" not in out + assert "input/file2" in out assert "input/file3" in out def test_create_content_from_command(self):