Skip to content
This repository has been archived by the owner on Oct 11, 2024. It is now read-only.

Produce map of items that need backup details entries #1892

Merged
merged 13 commits into from
Dec 22, 2022
Prev Previous commit
Next Next commit
Add progress entries for base snapshot items
Add more metadata like the previous path to progress entries and track
entries for items in the base snapshot. This will allow adding details
entries for base snapshot items eventually.
ashmrtn committed Dec 22, 2022
commit 3d70dbd4d7e5baddf1d8d36ee3876441a09d9df7
73 changes: 57 additions & 16 deletions src/internal/kopia/upload.go
Original file line number Diff line number Diff line change
@@ -118,8 +118,9 @@ func (rw *restoreStreamReader) Read(p []byte) (n int, err error) {
}

type itemDetails struct {
info details.ItemInfo
info *details.ItemInfo
repoPath path.Path
prevPath path.Path
}

type corsoProgress struct {
@@ -282,7 +283,12 @@ func collectionEntries(
// Relative path given to us in the callback is missing the root
// element. Add to pending set before calling the callback to avoid race
// conditions when the item is completed.
d := &itemDetails{info: ei.Info(), repoPath: itemPath}
//
// TODO(ashmrtn): If we want to pull item info for cached item from a
// previous snapshot then we should populate prevPath here and leave
// info nil.
itemInfo := ei.Info()
d := &itemDetails{info: &itemInfo, repoPath: itemPath}
progress.put(encodeAsPath(itemPath.PopFront().Elements()...), d)
}

@@ -309,6 +315,8 @@ func collectionEntries(
func streamBaseEntries(
ctx context.Context,
cb func(context.Context, fs.Entry) error,
curPath path.Path,
prevPath path.Path,
dir fs.Directory,
encodedSeen map[string]struct{},
progress *corsoProgress,
@@ -334,27 +342,43 @@ func streamBaseEntries(
return nil
}

if err := cb(ctx, entry); err != nil {
entName, err := decodeElement(entry.Name())
if err != nil {
entName = entry.Name()
}
entName, err := decodeElement(entry.Name())
if err != nil {
return errors.Wrapf(err, "unable to decode entry name %s", entry.Name())
}

return errors.Wrapf(err, "executing callback on item %q", entName)
// For now assuming that item IDs don't need escaping.
itemPath, err := curPath.Append(entName, true)
if err != nil {
return errors.Wrap(err, "getting full item path for base entry")
}

return nil
})
if err != nil {
name, err := decodeElement(dir.Name())
// We need the previous path so we can find this item in the base snapshot's
// backup details. If the item moved and we had only the new path, we'd be
// unable to find it in the old backup details because we wouldn't know what
// to look for.
prevItemPath, err := prevPath.Append(entName, true)
if err != nil {
name = dir.Name()
return errors.Wrap(err, "getting previous full item path for base entry")
}

// All items have item info in the base backup. However, we need to make
// sure we have enough metadata to find those entries. Ensure that by adding
ashmrtn marked this conversation as resolved.
Show resolved Hide resolved
// the item to progress and having progress aggregate everything for later.
d := &itemDetails{info: nil, repoPath: itemPath, prevPath: prevItemPath}
progress.put(encodeAsPath(itemPath.PopFront().Elements()...), d)

if err := cb(ctx, entry); err != nil {
return errors.Wrapf(err, "executing callback on item %q", itemPath)
}

return nil
})
if err != nil {
return errors.Wrapf(
err,
"traversing items in base snapshot directory %q",
name,
curPath.Item(),
)
}

@@ -366,6 +390,8 @@ func streamBaseEntries(
// kopia callbacks on directory entries. It binds the directory to the given
// DataCollection.
func getStreamItemFunc(
curPath path.Path,
prevPath path.Path,
staticEnts []fs.Entry,
streamedEnts data.Collection,
baseDir fs.Directory,
@@ -384,7 +410,15 @@ func getStreamItemFunc(

seen, errs := collectionEntries(ctx, cb, streamedEnts, progress)

if err := streamBaseEntries(ctx, cb, baseDir, seen, progress); err != nil {
if err := streamBaseEntries(
ctx,
cb,
curPath,
prevPath,
baseDir,
seen,
progress,
); err != nil {
errs = multierror.Append(
errs,
errors.Wrap(err, "streaming base snapshot entries"),
@@ -430,7 +464,14 @@ func buildKopiaDirs(dirName string, dir *treeMap, progress *corsoProgress) (fs.D

return virtualfs.NewStreamingDirectory(
encodeAsPath(dirName),
getStreamItemFunc(childDirs, dir.collection, dir.baseDir, progress),
getStreamItemFunc(
dir.currentPath,
dir.prevPath,
childDirs,
dir.collection,
dir.baseDir,
progress,
),
), nil
}