Skip to content
This repository has been archived by the owner on Oct 11, 2024. It is now read-only.

Produce map of items that need backup details entries #1892

Merged
merged 13 commits into from
Dec 22, 2022
127 changes: 107 additions & 20 deletions src/internal/kopia/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,18 @@ func (rw *restoreStreamReader) Read(p []byte) (n int, err error) {
}

type itemDetails struct {
info details.ItemInfo
info *details.ItemInfo
repoPath path.Path
prevPath path.Path
}

type corsoProgress struct {
snapshotfs.UploadProgress
pending map[string]*itemDetails
deets *details.Builder
pending map[string]*itemDetails
deets *details.Builder
// toMerge represents items that we don't have in-memory item info for. The
// item info for these items should be sourced from a base snapshot later on.
toMerge map[string]path.Path
mu sync.RWMutex
totalBytes int64
}
Expand Down Expand Up @@ -153,14 +157,30 @@ func (cp *corsoProgress) FinishedFile(relativePath string, err error) {
return
}

// These items were sourced from a base snapshot or were cached in kopia so we
// never had to materialize their details in-memory.
if d.info == nil {
// TODO(ashmrtn): We should probably be returning an error here?
if d.prevPath == nil {
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add an error log here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't have access to a context here so unless I stash a logger in the struct instance there's not a whole lot we can do :/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logger will fall back to a singleton instance if given the background context. You might miss out on some logging within tests, but for the CLI it'll still write out as expected.

Copy link
Contributor Author

@ashmrtn ashmrtn Dec 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thinking about this more broadly, I think we should be storing errors in the struct and returning them at the end. Otherwise we could miss the fact that we failed to add some details. I'll make a follow up ticket for this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tracking with #1915

}

cp.mu.Lock()
defer cp.mu.Unlock()

cp.toMerge[d.prevPath.ShortRef()] = d.repoPath

return
}

parent := d.repoPath.ToBuilder().Dir()

cp.deets.Add(
d.repoPath.String(),
d.repoPath.ShortRef(),
parent.ShortRef(),
true,
d.info,
*d.info,
)

folders := []details.FolderEntry{}
Expand All @@ -182,7 +202,7 @@ func (cp *corsoProgress) FinishedFile(relativePath string, err error) {
parent = nextParent
}

cp.deets.AddFoldersForItem(folders, d.info)
cp.deets.AddFoldersForItem(folders, *d.info)
}

// Kopia interface function used as a callback when kopia finishes hashing a file.
Expand Down Expand Up @@ -282,7 +302,12 @@ func collectionEntries(
// Relative path given to us in the callback is missing the root
// element. Add to pending set before calling the callback to avoid race
// conditions when the item is completed.
d := &itemDetails{info: ei.Info(), repoPath: itemPath}
//
// TODO(ashmrtn): If we want to pull item info for cached item from a
// previous snapshot then we should populate prevPath here and leave
// info nil.
itemInfo := ei.Info()
d := &itemDetails{info: &itemInfo, repoPath: itemPath}
progress.put(encodeAsPath(itemPath.PopFront().Elements()...), d)
}

Expand All @@ -309,6 +334,8 @@ func collectionEntries(
func streamBaseEntries(
ctx context.Context,
cb func(context.Context, fs.Entry) error,
curPath path.Path,
prevPath path.Path,
dir fs.Directory,
encodedSeen map[string]struct{},
progress *corsoProgress,
Expand All @@ -334,27 +361,43 @@ func streamBaseEntries(
return nil
}

if err := cb(ctx, entry); err != nil {
entName, err := decodeElement(entry.Name())
if err != nil {
entName = entry.Name()
}
entName, err := decodeElement(entry.Name())
if err != nil {
return errors.Wrapf(err, "unable to decode entry name %s", entry.Name())
}

return errors.Wrapf(err, "executing callback on item %q", entName)
// For now assuming that item IDs don't need escaping.
itemPath, err := curPath.Append(entName, true)
if err != nil {
return errors.Wrap(err, "getting full item path for base entry")
}

return nil
})
if err != nil {
name, err := decodeElement(dir.Name())
// We need the previous path so we can find this item in the base snapshot's
// backup details. If the item moved and we had only the new path, we'd be
// unable to find it in the old backup details because we wouldn't know what
// to look for.
prevItemPath, err := prevPath.Append(entName, true)
if err != nil {
name = dir.Name()
return errors.Wrap(err, "getting previous full item path for base entry")
}

// All items have item info in the base backup. However, we need to make
// sure we have enough metadata to find those entries. To do that we add the
// item to progress and having progress aggregate everything for later.
d := &itemDetails{info: nil, repoPath: itemPath, prevPath: prevItemPath}
progress.put(encodeAsPath(itemPath.PopFront().Elements()...), d)

if err := cb(ctx, entry); err != nil {
return errors.Wrapf(err, "executing callback on item %q", itemPath)
}

return nil
})
if err != nil {
return errors.Wrapf(
err,
"traversing items in base snapshot directory %q",
name,
curPath,
)
}

Expand All @@ -366,6 +409,8 @@ func streamBaseEntries(
// kopia callbacks on directory entries. It binds the directory to the given
// DataCollection.
func getStreamItemFunc(
curPath path.Path,
prevPath path.Path,
staticEnts []fs.Entry,
streamedEnts data.Collection,
baseDir fs.Directory,
Expand All @@ -384,7 +429,15 @@ func getStreamItemFunc(

seen, errs := collectionEntries(ctx, cb, streamedEnts, progress)

if err := streamBaseEntries(ctx, cb, baseDir, seen, progress); err != nil {
if err := streamBaseEntries(
ctx,
cb,
curPath,
prevPath,
baseDir,
seen,
progress,
); err != nil {
errs = multierror.Append(
errs,
errors.Wrap(err, "streaming base snapshot entries"),
Expand Down Expand Up @@ -430,11 +483,25 @@ func buildKopiaDirs(dirName string, dir *treeMap, progress *corsoProgress) (fs.D

return virtualfs.NewStreamingDirectory(
encodeAsPath(dirName),
getStreamItemFunc(childDirs, dir.collection, dir.baseDir, progress),
getStreamItemFunc(
dir.currentPath,
dir.prevPath,
childDirs,
dir.collection,
dir.baseDir,
progress,
),
), nil
}

type treeMap struct {
// path.Path representing the node's path. This is passed as a parameter to
// the stream item function so that even baseDir directories can properly
// generate the full path of items.
currentPath path.Path
// Previous path this directory may have resided at if it is sourced from a
// base snapshot.
prevPath path.Path
// Child directories of this directory.
childDirs map[string]*treeMap
// Reference to data pulled from the external service. Contains only items in
Expand Down Expand Up @@ -583,6 +650,8 @@ func inflateCollectionTree(
}

node.collection = s
node.currentPath = s.FullPath()
node.prevPath = s.PreviousPath()
}

// Check that each previous path has only one of the states of deleted, moved,
Expand Down Expand Up @@ -708,7 +777,25 @@ func traverseBaseDir(
return errors.Errorf("unable to get tree node for path %s", currentPath)
}

curP, err := path.FromDataLayerPath(currentPath.String(), false)
if err != nil {
return errors.Errorf(
"unable to convert current path %s to path.Path",
currentPath,
)
}

oldP, err := path.FromDataLayerPath(oldDirPath.String(), false)
if err != nil {
return errors.Errorf(
"unable to convert old path %s to path.Path",
oldDirPath,
)
}

node.baseDir = dir
node.currentPath = curP
node.prevPath = oldP
}

return nil
Expand Down
Loading