diff --git a/cli/app.go b/cli/app.go index 8010e04fc50..5ad4780be08 100644 --- a/cli/app.go +++ b/cli/app.go @@ -23,7 +23,6 @@ import ( "github.com/kopia/kopia/repo/blob" "github.com/kopia/kopia/repo/logging" "github.com/kopia/kopia/repo/maintenance" - "github.com/kopia/kopia/snapshot/restore" "github.com/kopia/kopia/snapshot/snapshotmaintenance" ) @@ -87,7 +86,7 @@ type appServices interface { advancedCommand(ctx context.Context) repositoryConfigFileName() string getProgress() *cliProgress - getRestoreProgress() restore.Progress + getRestoreProgress() RestoreProgress stdout() io.Writer Stderr() io.Writer @@ -120,7 +119,7 @@ type App struct { enableAutomaticMaintenance bool pf profileFlags progress *cliProgress - restoreProgress restore.Progress + restoreProgress RestoreProgress initialUpdateCheckDelay time.Duration updateCheckInterval time.Duration updateAvailableNotifyInterval time.Duration @@ -186,11 +185,11 @@ func (c *App) getProgress() *cliProgress { } // SetRestoreProgress is used to set custom restore progress, purposed to be used in tests. -func (c *App) SetRestoreProgress(p restore.Progress) { +func (c *App) SetRestoreProgress(p RestoreProgress) { c.restoreProgress = p } -func (c *App) getRestoreProgress() restore.Progress { +func (c *App) getRestoreProgress() RestoreProgress { return c.restoreProgress } @@ -293,10 +292,6 @@ func (c *App) setup(app *kingpin.Application) { c.pf.setup(app) c.progress.setup(c, app) - if rp, ok := c.restoreProgress.(*cliRestoreProgress); ok { - rp.setup(c, app) - } - c.blob.setup(c, app) c.benchmark.setup(c, app) c.cache.setup(c, app) @@ -325,8 +320,7 @@ type commandParent interface { // NewApp creates a new instance of App. func NewApp() *App { return &App{ - progress: &cliProgress{}, - restoreProgress: &cliRestoreProgress{}, + progress: &cliProgress{}, cliStorageProviders: []StorageProvider{ {"from-config", "the provided configuration file", func() StorageFlags { return &storageFromConfigFlags{} }}, diff --git a/cli/cli_progress.go b/cli/cli_progress.go index ec4236624a4..fa5fda5d3df 100644 --- a/cli/cli_progress.go +++ b/cli/cli_progress.go @@ -259,124 +259,4 @@ func (p *cliProgress) Finish() { } } -type cliRestoreProgress struct { - restoredCount atomic.Int32 - enqueuedCount atomic.Int32 - skippedCount atomic.Int32 - ignoredErrorsCount atomic.Int32 - - restoredTotalFileSize atomic.Int64 - enqueuedTotalFileSize atomic.Int64 - skippedTotalFileSize atomic.Int64 - - progressUpdateInterval time.Duration - enableProgress bool - - svc appServices - outputThrottle timetrack.Throttle - outputMutex sync.Mutex - out textOutput - eta timetrack.Estimator - - // +checklocks:outputMutex - lastLineLength int -} - -func (p *cliRestoreProgress) setup(svc appServices, _ *kingpin.Application) { - cp := svc.getProgress() - if cp == nil { - return - } - - p.progressUpdateInterval = cp.progressUpdateInterval - p.enableProgress = cp.enableProgress - p.out = cp.out - p.svc = svc - - p.eta = timetrack.Start() -} - -func (p *cliRestoreProgress) SetCounters( - enqueuedCount, restoredCount, skippedCount, ignoredErrors int32, - enqueuedBytes, restoredBytes, skippedBytes int64, -) { - p.enqueuedCount.Store(enqueuedCount) - p.enqueuedTotalFileSize.Store(enqueuedBytes) - - p.restoredCount.Store(restoredCount) - p.restoredTotalFileSize.Store(restoredBytes) - - p.skippedCount.Store(skippedCount) - p.skippedTotalFileSize.Store(skippedBytes) - - p.ignoredErrorsCount.Store(ignoredErrors) - - p.maybeOutput() -} - -func (p *cliRestoreProgress) Flush() { - p.outputThrottle.Reset() - p.output("\n") -} - -func (p *cliRestoreProgress) maybeOutput() { - if p.outputThrottle.ShouldOutput(p.svc.getProgress().progressUpdateInterval) { - p.output("") - } -} - -func (p *cliRestoreProgress) output(suffix string) { - if !p.svc.getProgress().enableProgress { - return - } - - p.outputMutex.Lock() - defer p.outputMutex.Unlock() - - restoredCount := p.restoredCount.Load() - enqueuedCount := p.enqueuedCount.Load() - skippedCount := p.skippedCount.Load() - ignoredCount := p.ignoredErrorsCount.Load() - - restoredSize := p.restoredTotalFileSize.Load() - enqueuedSize := p.enqueuedTotalFileSize.Load() - skippedSize := p.skippedTotalFileSize.Load() - - if restoredSize == 0 { - return - } - - var maybeRemaining, maybeSkipped, maybeErrors string - if est, ok := p.eta.Estimate(float64(restoredSize), float64(enqueuedSize)); ok { - maybeRemaining = fmt.Sprintf(" %v (%.1f%%) remaining %v", - units.BytesPerSecondsString(est.SpeedPerSecond), - est.PercentComplete, - est.Remaining) - } - - if skippedCount > 0 { - maybeSkipped = fmt.Sprintf(", skipped %v (%v)", skippedCount, units.BytesString(skippedSize)) - } - - if ignoredCount > 0 { - maybeErrors = fmt.Sprintf(", ignored %v errors", ignoredCount) - } - - line := fmt.Sprintf("Processed %v (%v) of %v (%v)%v%v%v.", - restoredCount+skippedCount, units.BytesString(restoredSize), - enqueuedCount, units.BytesString(enqueuedSize), - maybeSkipped, maybeErrors, maybeRemaining, - ) - - var extraSpaces string - - if len(line) < p.lastLineLength { - // add extra spaces to wipe over previous line if it was longer than current - extraSpaces = strings.Repeat(" ", p.lastLineLength-len(line)) - } - - p.lastLineLength = len(line) - p.out.printStderr("\r%v%v%v", line, extraSpaces, suffix) -} - var _ snapshotfs.UploadProgress = (*cliProgress)(nil) diff --git a/cli/command_restore.go b/cli/command_restore.go index 945348348ba..e0760bad97c 100644 --- a/cli/command_restore.go +++ b/cli/command_restore.go @@ -18,6 +18,7 @@ import ( "github.com/kopia/kopia/fs" "github.com/kopia/kopia/fs/localfs" "github.com/kopia/kopia/internal/clock" + "github.com/kopia/kopia/internal/timetrack" "github.com/kopia/kopia/internal/units" "github.com/kopia/kopia/repo" "github.com/kopia/kopia/repo/object" @@ -95,6 +96,12 @@ followed by the path of the directory for the contents to be restored. unlimitedDepth = math.MaxInt32 ) +// RestoreProgress is invoked to report progress during a restore. +type RestoreProgress interface { + SetCounters(s restore.Stats) + Flush() +} + type restoreSourceTarget struct { source string target string @@ -366,6 +373,21 @@ func (c *commandRestore) setupPlaceholderExpansion(ctx context.Context, rep repo return rootEntry, nil } +func (c *commandRestore) getRestoreProgress() RestoreProgress { + if rp := c.svc.getRestoreProgress(); rp != nil { + return rp + } + + pf := c.svc.getProgress().progressFlags + + return &cliRestoreProgress{ + enableProgress: pf.enableProgress, + out: pf.out, + progressUpdateInterval: pf.progressUpdateInterval, + eta: timetrack.Start(), + } +} + func (c *commandRestore) run(ctx context.Context, rep repo.Repository) error { output, oerr := c.restoreOutput(ctx, rep) if oerr != nil { @@ -396,17 +418,9 @@ func (c *commandRestore) run(ctx context.Context, rep repo.Repository) error { rootEntry = re } - restoreProgress := c.svc.getRestoreProgress() + restoreProgress := c.getRestoreProgress() progressCallback := func(ctx context.Context, stats restore.Stats) { - restoreProgress.SetCounters( - stats.EnqueuedFileCount+stats.EnqueuedDirCount+stats.EnqueuedSymlinkCount, - stats.RestoredFileCount+stats.RestoredDirCount+stats.RestoredSymlinkCount, - stats.SkippedCount, - stats.IgnoredErrorCount, - stats.EnqueuedTotalFileSize, - stats.RestoredTotalFileSize, - stats.SkippedTotalFileSize, - ) + restoreProgress.SetCounters(stats) } st, err := restore.Entry(ctx, rep, output, rootEntry, restore.Options{ diff --git a/cli/restore_progress.go b/cli/restore_progress.go new file mode 100644 index 00000000000..80d5a621518 --- /dev/null +++ b/cli/restore_progress.go @@ -0,0 +1,116 @@ +package cli + +import ( + "fmt" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/kopia/kopia/internal/timetrack" + "github.com/kopia/kopia/internal/units" + "github.com/kopia/kopia/snapshot/restore" +) + +type cliRestoreProgress struct { + restoredCount atomic.Int32 + enqueuedCount atomic.Int32 + skippedCount atomic.Int32 + ignoredErrorsCount atomic.Int32 + + restoredTotalFileSize atomic.Int64 + enqueuedTotalFileSize atomic.Int64 + skippedTotalFileSize atomic.Int64 + + progressUpdateInterval time.Duration + enableProgress bool + + outputThrottle timetrack.Throttle + outputMutex sync.Mutex + out textOutput // +checklocksignore: outputMutex just happens to be held always. + eta timetrack.Estimator // +checklocksignore: outputMutex just happens to be held always. + + // +checklocks:outputMutex + lastLineLength int +} + +func (p *cliRestoreProgress) SetCounters(s restore.Stats) { + p.enqueuedCount.Store(s.EnqueuedFileCount + s.EnqueuedDirCount + s.EnqueuedSymlinkCount) + p.enqueuedTotalFileSize.Store(s.EnqueuedTotalFileSize) + + p.restoredCount.Store(s.RestoredFileCount + s.RestoredDirCount + s.RestoredSymlinkCount) + p.restoredTotalFileSize.Store(s.RestoredTotalFileSize) + + p.skippedCount.Store(s.SkippedCount) + p.skippedTotalFileSize.Store(s.SkippedTotalFileSize) + + p.ignoredErrorsCount.Store(s.IgnoredErrorCount) + + p.maybeOutput() +} + +func (p *cliRestoreProgress) Flush() { + p.outputThrottle.Reset() + p.output("\n") +} + +func (p *cliRestoreProgress) maybeOutput() { + if p.outputThrottle.ShouldOutput(p.progressUpdateInterval) { + p.output("") + } +} + +func (p *cliRestoreProgress) output(suffix string) { + if !p.enableProgress { + return + } + + // ensure the counters are not going back in an output line compared to the previous one + p.outputMutex.Lock() + defer p.outputMutex.Unlock() + + restoredCount := p.restoredCount.Load() + enqueuedCount := p.enqueuedCount.Load() + skippedCount := p.skippedCount.Load() + ignoredCount := p.ignoredErrorsCount.Load() + + restoredSize := p.restoredTotalFileSize.Load() + enqueuedSize := p.enqueuedTotalFileSize.Load() + skippedSize := p.skippedTotalFileSize.Load() + + if restoredSize == 0 { + return + } + + var maybeRemaining, maybeSkipped, maybeErrors string + if est, ok := p.eta.Estimate(float64(restoredSize), float64(enqueuedSize)); ok { + maybeRemaining = fmt.Sprintf(" %v (%.1f%%) remaining %v", + units.BytesPerSecondsString(est.SpeedPerSecond), + est.PercentComplete, + est.Remaining) + } + + if skippedCount > 0 { + maybeSkipped = fmt.Sprintf(", skipped %v (%v)", skippedCount, units.BytesString(skippedSize)) + } + + if ignoredCount > 0 { + maybeErrors = fmt.Sprintf(", ignored %v errors", ignoredCount) + } + + line := fmt.Sprintf("Processed %v (%v) of %v (%v)%v%v%v.", + restoredCount+skippedCount, units.BytesString(restoredSize), + enqueuedCount, units.BytesString(enqueuedSize), + maybeSkipped, maybeErrors, maybeRemaining, + ) + + var extraSpaces string + + if len(line) < p.lastLineLength { + // add extra spaces to wipe over previous line if it was longer than current + extraSpaces = strings.Repeat(" ", p.lastLineLength-len(line)) + } + + p.lastLineLength = len(line) + p.out.printStderr("\r%v%v%v", line, extraSpaces, suffix) +} diff --git a/go.mod b/go.mod index 33d3d7c1ccb..a05ec0ea6e1 100644 --- a/go.mod +++ b/go.mod @@ -34,6 +34,7 @@ require ( github.com/minio/minio-go/v7 v7.0.69 github.com/mxk/go-vss v1.2.0 github.com/natefinch/atomic v1.0.1 + github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9 github.com/pierrec/lz4 v2.6.1+incompatible github.com/pkg/errors v0.9.1 github.com/pkg/profile v1.7.0 diff --git a/go.sum b/go.sum index d0c525e6349..88e838530a4 100644 --- a/go.sum +++ b/go.sum @@ -216,6 +216,8 @@ github.com/natefinch/atomic v1.0.1 h1:ZPYKxkqQOx3KZ+RsbnP/YsgvxWQPGxjC0oBt2AhwV0 github.com/natefinch/atomic v1.0.1/go.mod h1:N/D/ELrljoqDyT3rZrsUmtsuzvHkeB/wWjHV22AZRbM= github.com/orisano/pixelmatch v0.0.0-20220722002657-fb0b55479cde h1:x0TT0RDC7UhAVbbWWBzr41ElhJx5tXPWkIHA2HWPRuw= github.com/orisano/pixelmatch v0.0.0-20220722002657-fb0b55479cde/go.mod h1:nZgzbfBr3hhjoZnS66nKrHmduYNpc34ny7RK4z5/HM0= +github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9 h1:1/WtZae0yGtPq+TI6+Tv1WTxkukpXeMlviSxvL7SRgk= +github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9/go.mod h1:x3N5drFsm2uilKKuuYo6LdyD8vZAW55sH/9w+pbo1sw= github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ= diff --git a/repo/content/index/index_builder.go b/repo/content/index/index_builder.go index b6b25ea4b78..9701fbf6a6f 100644 --- a/repo/content/index/index_builder.go +++ b/repo/content/index/index_builder.go @@ -18,6 +18,11 @@ const randomSuffixSize = 32 // number of random bytes to append at the end to ma // Builder prepares and writes content index. type Builder map[ID]Info +// BuilderCreator is an interface for caller to add indexes to builders. +type BuilderCreator interface { + Add(info Info) +} + // Clone returns a deep Clone of the Builder. func (b Builder) Clone() Builder { if b == nil { @@ -38,7 +43,7 @@ func (b Builder) Add(i Info) { cid := i.ContentID old, found := b[cid] - if !found || contentInfoGreaterThanStruct(i, old) { + if !found || contentInfoGreaterThanStruct(&i, &old) { b[cid] = i } } @@ -63,8 +68,8 @@ func init() { // sortedContents returns the list of []Info sorted lexicographically using bucket sort // sorting is optimized based on the format of content IDs (optional single-character // alphanumeric prefix (0-9a-z), followed by hexadecimal digits (0-9a-f). -func (b Builder) sortedContents() []Info { - var buckets [36 * 16][]Info +func (b Builder) sortedContents() []*Info { + var buckets [36 * 16][]*Info // phase 1 - bucketize into 576 (36 *16) separate lists // by first [0-9a-z] and second character [0-9a-f]. @@ -75,7 +80,7 @@ func (b Builder) sortedContents() []Info { // first: 0..35, second: 0..15 buck := first<<4 + second //nolint:gomnd - buckets[buck] = append(buckets[buck], v) + buckets[buck] = append(buckets[buck], &v) } // phase 2 - sort each non-empty bucket in parallel using goroutines @@ -104,7 +109,7 @@ func (b Builder) sortedContents() []Info { wg.Wait() // Phase 3 - merge results from all buckets. - result := make([]Info, 0, len(b)) + result := make([]*Info, 0, len(b)) for i := range len(buckets) { result = append(result, buckets[i]...) @@ -134,12 +139,16 @@ func (b Builder) Build(output io.Writer, version int) error { // BuildStable writes the pack index to the provided output. func (b Builder) BuildStable(output io.Writer, version int) error { + return buildSortedContents(b.sortedContents(), output, version) +} + +func buildSortedContents(items []*Info, output io.Writer, version int) error { switch version { case Version1: - return b.buildV1(output) + return buildV1(items, output) case Version2: - return b.buildV2(output) + return buildV2(items, output) default: return errors.Errorf("unsupported index version: %v", version) diff --git a/repo/content/index/index_v1.go b/repo/content/index/index_v1.go index 94ed49f6151..a5058dbd4e3 100644 --- a/repo/content/index/index_v1.go +++ b/repo/content/index/index_v1.go @@ -266,8 +266,7 @@ type indexBuilderV1 struct { } // buildV1 writes the pack index to the provided output. -func (b Builder) buildV1(output io.Writer) error { - allContents := b.sortedContents() +func buildV1(allContents []*Info, output io.Writer) error { b1 := &indexBuilderV1{ packBlobIDOffsets: map[blob.ID]uint32{}, keyLength: -1, @@ -307,7 +306,7 @@ func (b Builder) buildV1(output io.Writer) error { return errors.Wrap(w.Flush(), "error flushing index") } -func (b *indexBuilderV1) prepareExtraData(allContents []Info) []byte { +func (b *indexBuilderV1) prepareExtraData(allContents []*Info) []byte { var extraData []byte var hashBuf [maxContentIDSize]byte @@ -330,7 +329,7 @@ func (b *indexBuilderV1) prepareExtraData(allContents []Info) []byte { return extraData } -func (b *indexBuilderV1) writeEntry(w io.Writer, it Info, entry []byte) error { +func (b *indexBuilderV1) writeEntry(w io.Writer, it *Info, entry []byte) error { var hashBuf [maxContentIDSize]byte k := contentIDToBytes(hashBuf[:0], it.ContentID) @@ -362,7 +361,7 @@ func (b *indexBuilderV1) writeEntry(w io.Writer, it Info, entry []byte) error { return nil } -func (b *indexBuilderV1) formatEntry(entry []byte, it Info) error { +func (b *indexBuilderV1) formatEntry(entry []byte, it *Info) error { entryTimestampAndFlags := entry[0:8] entryPackFileOffset := entry[8:12] entryPackedOffset := entry[12:16] diff --git a/repo/content/index/index_v2.go b/repo/content/index/index_v2.go index 245508e3264..100e9a2a27a 100644 --- a/repo/content/index/index_v2.go +++ b/repo/content/index/index_v2.go @@ -373,7 +373,7 @@ type indexBuilderV2 struct { baseTimestamp int64 } -func indexV2FormatInfoFromInfo(v Info) indexV2FormatInfo { +func indexV2FormatInfoFromInfo(v *Info) indexV2FormatInfo { return indexV2FormatInfo{ formatVersion: v.FormatVersion, compressionHeaderID: v.CompressionHeaderID, @@ -382,7 +382,7 @@ func indexV2FormatInfoFromInfo(v Info) indexV2FormatInfo { } // buildUniqueFormatToIndexMap builds a map of unique indexV2FormatInfo to their numeric identifiers. -func buildUniqueFormatToIndexMap(sortedInfos []Info) map[indexV2FormatInfo]byte { +func buildUniqueFormatToIndexMap(sortedInfos []*Info) map[indexV2FormatInfo]byte { result := map[indexV2FormatInfo]byte{} for _, v := range sortedInfos { @@ -396,7 +396,7 @@ func buildUniqueFormatToIndexMap(sortedInfos []Info) map[indexV2FormatInfo]byte } // buildPackIDToIndexMap builds a map of unique blob IDs to their numeric identifiers. -func buildPackIDToIndexMap(sortedInfos []Info) map[blob.ID]int { +func buildPackIDToIndexMap(sortedInfos []*Info) map[blob.ID]int { result := map[blob.ID]int{} for _, v := range sortedInfos { @@ -410,7 +410,7 @@ func buildPackIDToIndexMap(sortedInfos []Info) map[blob.ID]int { } // maxContentLengths computes max content lengths in the builder. -func maxContentLengths(sortedInfos []Info) (maxPackedLength, maxOriginalLength, maxPackOffset uint32) { +func maxContentLengths(sortedInfos []*Info) (maxPackedLength, maxOriginalLength, maxPackOffset uint32) { for _, v := range sortedInfos { if l := v.PackedLength; l > maxPackedLength { maxPackedLength = l @@ -428,15 +428,7 @@ func maxContentLengths(sortedInfos []Info) (maxPackedLength, maxOriginalLength, return } -func max(a, b int) int { - if a > b { - return a - } - - return b -} - -func newIndexBuilderV2(sortedInfos []Info) (*indexBuilderV2, error) { +func newIndexBuilderV2(sortedInfos []*Info) (*indexBuilderV2, error) { entrySize := v2EntryOffsetFormatID // compute a map of unique formats to their indexes. @@ -495,9 +487,7 @@ func newIndexBuilderV2(sortedInfos []Info) (*indexBuilderV2, error) { } // buildV2 writes the pack index to the provided output. -func (b Builder) buildV2(output io.Writer) error { - sortedInfos := b.sortedContents() - +func buildV2(sortedInfos []*Info, output io.Writer) error { b2, err := newIndexBuilderV2(sortedInfos) if err != nil { return err @@ -509,7 +499,7 @@ func (b Builder) buildV2(output io.Writer) error { extraData := b2.prepareExtraData(sortedInfos) if b2.keyLength <= 1 { - return errors.Errorf("invalid key length: %v for %v", b2.keyLength, len(b)) + return errors.Errorf("invalid key length: %v for %v", b2.keyLength, len(sortedInfos)) } // write header @@ -566,7 +556,7 @@ func (b Builder) buildV2(output io.Writer) error { return errors.Wrap(w.Flush(), "error flushing index") } -func (b *indexBuilderV2) prepareExtraData(sortedInfos []Info) []byte { +func (b *indexBuilderV2) prepareExtraData(sortedInfos []*Info) []byte { var extraData []byte for _, it := range sortedInfos { @@ -586,7 +576,7 @@ func (b *indexBuilderV2) prepareExtraData(sortedInfos []Info) []byte { return extraData } -func (b *indexBuilderV2) writeIndexEntry(w io.Writer, it Info) error { +func (b *indexBuilderV2) writeIndexEntry(w io.Writer, it *Info) error { var hashBuf [maxContentIDSize]byte k := contentIDToBytes(hashBuf[:0], it.ContentID) @@ -629,7 +619,7 @@ func (b *indexBuilderV2) writeFormatInfoEntry(w io.Writer, f indexV2FormatInfo) return errors.Wrap(err, "error writing format info entry") } -func (b *indexBuilderV2) writeIndexValueEntry(w io.Writer, it Info) error { +func (b *indexBuilderV2) writeIndexValueEntry(w io.Writer, it *Info) error { var buf [v2EntryMaxLength]byte // 0-3: timestamp bits 0..31 (relative to base time) diff --git a/repo/content/index/merged.go b/repo/content/index/merged.go index 6bd5c4da924..603c90e4ea0 100644 --- a/repo/content/index/merged.go +++ b/repo/content/index/merged.go @@ -33,7 +33,7 @@ func (m Merged) Close() error { return errors.Wrap(err, "closing index shards") } -func contentInfoGreaterThanStruct(a, b Info) bool { +func contentInfoGreaterThanStruct(a, b *Info) bool { if l, r := a.TimestampSeconds, b.TimestampSeconds; l != r { // different timestamps, higher one wins return l > r @@ -66,7 +66,7 @@ func (m Merged) GetInfo(id ID, result *Info) (bool, error) { continue } - if !found || contentInfoGreaterThanStruct(tmp, *result) { + if !found || contentInfoGreaterThanStruct(&tmp, result) { *result = tmp found = true } @@ -88,7 +88,7 @@ func (h nextInfoHeap) Less(i, j int) bool { return a.less(b) } - return !contentInfoGreaterThanStruct(h[i].it, h[j].it) + return !contentInfoGreaterThanStruct(&h[i].it, &h[j].it) } func (h nextInfoHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } @@ -157,23 +157,23 @@ func (m Merged) Iterate(r IDRange, cb func(i Info) error) error { for len(minHeap) > 0 { //nolint:forcetypeassert - min := heap.Pop(&minHeap).(*nextInfo) - if !havePendingItem || pendingItem.ContentID != min.it.ContentID { + minNextInfo := heap.Pop(&minHeap).(*nextInfo) + if !havePendingItem || pendingItem.ContentID != minNextInfo.it.ContentID { if havePendingItem { if err := cb(pendingItem); err != nil { return err } } - pendingItem = min.it + pendingItem = minNextInfo.it havePendingItem = true - } else if contentInfoGreaterThanStruct(min.it, pendingItem) { - pendingItem = min.it + } else if contentInfoGreaterThanStruct(&minNextInfo.it, &pendingItem) { + pendingItem = minNextInfo.it } - it, ok := <-min.ch + it, ok := <-minNextInfo.ch if ok { - heap.Push(&minHeap, &nextInfo{it, min.ch}) + heap.Push(&minHeap, &nextInfo{it, minNextInfo.ch}) } } diff --git a/repo/content/index/one_use_index_builder.go b/repo/content/index/one_use_index_builder.go new file mode 100644 index 00000000000..56cdee1c247 --- /dev/null +++ b/repo/content/index/one_use_index_builder.go @@ -0,0 +1,144 @@ +package index + +import ( + "crypto/rand" + "hash/fnv" + "io" + + "github.com/pkg/errors" + + "github.com/petar/GoLLRB/llrb" + + "github.com/kopia/kopia/internal/gather" +) + +// Less compares with another *Info by their ContentID and return true if the current one is smaller. +func (i *Info) Less(other llrb.Item) bool { + return i.ContentID.less(other.(*Info).ContentID) //nolint:forcetypeassert +} + +// OneUseBuilder prepares and writes content index for epoch index compaction. +type OneUseBuilder struct { + indexStore *llrb.LLRB +} + +// NewOneUseBuilder create a new instance of OneUseBuilder. +func NewOneUseBuilder() *OneUseBuilder { + return &OneUseBuilder{ + indexStore: llrb.New(), + } +} + +// Add adds a new entry to the builder or conditionally replaces it if the timestamp is greater. +func (b *OneUseBuilder) Add(i Info) { + found := b.indexStore.Get(&i) + if found == nil || contentInfoGreaterThanStruct(&i, found.(*Info)) { //nolint:forcetypeassert + _ = b.indexStore.ReplaceOrInsert(&i) + } +} + +// Length returns the number of indexes in the current builder. +func (b *OneUseBuilder) Length() int { + return b.indexStore.Len() +} + +func (b *OneUseBuilder) sortedContents() []*Info { + result := []*Info{} + + for b.indexStore.Len() > 0 { + item := b.indexStore.DeleteMin() + result = append(result, item.(*Info)) //nolint:forcetypeassert + } + + return result +} + +func (b *OneUseBuilder) shard(maxShardSize int) [][]*Info { + numShards := (b.Length() + maxShardSize - 1) / maxShardSize + if numShards <= 1 { + if b.Length() == 0 { + return [][]*Info{} + } + + return [][]*Info{b.sortedContents()} + } + + result := make([][]*Info, numShards) + + for b.indexStore.Len() > 0 { + item := b.indexStore.DeleteMin() + + h := fnv.New32a() + io.WriteString(h, item.(*Info).ContentID.String()) //nolint:errcheck,forcetypeassert + + shard := h.Sum32() % uint32(numShards) //nolint:gosec + + result[shard] = append(result[shard], item.(*Info)) //nolint:forcetypeassert + } + + var nonEmpty [][]*Info + + for _, r := range result { + if len(r) > 0 { + nonEmpty = append(nonEmpty, r) + } + } + + return nonEmpty +} + +// BuildStable writes the pack index to the provided output. +func (b *OneUseBuilder) BuildStable(output io.Writer, version int) error { + return buildSortedContents(b.sortedContents(), output, version) +} + +// BuildShards builds the set of index shards ensuring no more than the provided number of contents are in each index. +// Returns shard bytes and function to clean up after the shards have been written. +func (b *OneUseBuilder) BuildShards(indexVersion int, stable bool, shardSize int) ([]gather.Bytes, func(), error) { + if shardSize == 0 { + return nil, nil, errors.Errorf("invalid shard size") + } + + var ( + shardedBuilders = b.shard(shardSize) + dataShardsBuf []*gather.WriteBuffer + dataShards []gather.Bytes + randomSuffix [32]byte + ) + + closeShards := func() { + for _, ds := range dataShardsBuf { + ds.Close() + } + } + + for _, s := range shardedBuilders { + buf := gather.NewWriteBuffer() + + dataShardsBuf = append(dataShardsBuf, buf) + + if err := buildSortedContents(s, buf, indexVersion); err != nil { + closeShards() + + return nil, nil, errors.Wrap(err, "error building index shard") + } + + if !stable { + if _, err := rand.Read(randomSuffix[:]); err != nil { + closeShards() + + return nil, nil, errors.Wrap(err, "error getting random bytes for suffix") + } + + if _, err := buf.Write(randomSuffix[:]); err != nil { + closeShards() + + return nil, nil, errors.Wrap(err, "error writing extra random suffix to ensure indexes are always globally unique") + } + } + + dataShards = append(dataShards, buf.Bytes()) + } + + return dataShards, closeShards, nil +} diff --git a/repo/content/index/packindex_test.go b/repo/content/index/packindex_test.go index b50b59d2b6f..86644bfca43 100644 --- a/repo/content/index/packindex_test.go +++ b/repo/content/index/packindex_test.go @@ -144,31 +144,34 @@ func testPackIndex(t *testing.T, version int) { b1 := make(Builder) b2 := make(Builder) b3 := make(Builder) + b4 := NewOneUseBuilder() for _, info := range infos { infoMap[info.ContentID] = info b1.Add(info) b2.Add(info) b3.Add(info) + b4.Add(info) } - var buf1, buf2, buf3 bytes.Buffer + var buf1, buf2, buf3, buf4 bytes.Buffer - if err := b1.Build(&buf1, version); err != nil { - t.Fatalf("unable to build: %v", err) - } + err := b1.Build(&buf1, version) + require.NoError(t, err) - if err := b2.Build(&buf2, version); err != nil { - t.Fatalf("unable to build: %v", err) - } + err = b2.Build(&buf2, version) + require.NoError(t, err) - if err := b3.BuildStable(&buf3, version); err != nil { - t.Fatalf("unable to build: %v", err) - } + err = b3.BuildStable(&buf3, version) + require.NoError(t, err) + + err = b4.BuildStable(&buf4, version) + require.NoError(t, err) data1 := buf1.Bytes() data2 := buf2.Bytes() data3 := buf3.Bytes() + data4 := buf4.Bytes() // each build produces exactly identical prefix except for the trailing random bytes. data1Prefix := data1[0 : len(data1)-randomSuffixSize] @@ -176,13 +179,22 @@ func testPackIndex(t *testing.T, version int) { require.Equal(t, data1Prefix, data2Prefix) require.Equal(t, data2Prefix, data3) + require.Equal(t, data2Prefix, data4) require.NotEqual(t, data1, data2) + require.Equal(t, data3, data4) t.Run("FuzzTest", func(t *testing.T) { fuzzTestIndexOpen(data1) }) - ndx, err := Open(data1, nil, func() int { return fakeEncryptionOverhead }) + verifyPackedIndexes(t, infos, infoMap, version, data1) + verifyPackedIndexes(t, infos, infoMap, version, data4) +} + +func verifyPackedIndexes(t *testing.T, infos []Info, infoMap map[ID]Info, version int, packed []byte) { + t.Helper() + + ndx, err := Open(packed, nil, func() int { return fakeEncryptionOverhead }) if err != nil { t.Fatalf("can't open index: %v", err) } @@ -276,7 +288,7 @@ func TestPackIndexPerContentLimits(t *testing.T) { var result bytes.Buffer if tc.errMsg == "" { - require.NoError(t, b.buildV2(&result)) + require.NoError(t, buildV2(b.sortedContents(), &result)) pi, err := Open(result.Bytes(), nil, func() int { return fakeEncryptionOverhead }) require.NoError(t, err) @@ -289,7 +301,7 @@ func TestPackIndexPerContentLimits(t *testing.T) { require.Equal(t, got, tc.info) } else { - err := b.buildV2(&result) + err := buildV2(b.sortedContents(), &result) require.Error(t, err) require.Contains(t, err.Error(), tc.errMsg) } @@ -366,6 +378,76 @@ func TestSortedContents2(t *testing.T) { } } +func TestSortedContents3(t *testing.T) { + b := NewOneUseBuilder() + + for i := range 100 { + v := deterministicContentID(t, "", i) + + b.Add(Info{ + ContentID: v, + }) + } + + got := b.sortedContents() + + var last ID + for _, info := range got { + if info.ContentID.less(last) { + t.Fatalf("not sorted %v (was %v)!", info.ContentID, last) + } + + last = info.ContentID + } +} + +func TestSortedContents4(t *testing.T) { + b := NewOneUseBuilder() + + b.Add(Info{ + ContentID: mustParseID(t, "0123"), + }) + b.Add(Info{ + ContentID: mustParseID(t, "1023"), + }) + b.Add(Info{ + ContentID: mustParseID(t, "0f23"), + }) + b.Add(Info{ + ContentID: mustParseID(t, "f023"), + }) + b.Add(Info{ + ContentID: mustParseID(t, "g0123"), + }) + b.Add(Info{ + ContentID: mustParseID(t, "g1023"), + }) + b.Add(Info{ + ContentID: mustParseID(t, "i0123"), + }) + b.Add(Info{ + ContentID: mustParseID(t, "i1023"), + }) + b.Add(Info{ + ContentID: mustParseID(t, "h0123"), + }) + b.Add(Info{ + ContentID: mustParseID(t, "h1023"), + }) + + got := b.sortedContents() + + var last ID + + for _, info := range got { + if info.ContentID.less(last) { + t.Fatalf("not sorted %v (was %v)!", info.ContentID, last) + } + + last = info.ContentID + } +} + func TestPackIndexV2TooManyUniqueFormats(t *testing.T) { b := Builder{} @@ -380,7 +462,7 @@ func TestPackIndexV2TooManyUniqueFormats(t *testing.T) { }) } - require.NoError(t, b.buildV2(io.Discard)) + require.NoError(t, buildV2(b.sortedContents(), io.Discard)) // add one more to push it over the edge b.Add(Info{ @@ -389,7 +471,7 @@ func TestPackIndexV2TooManyUniqueFormats(t *testing.T) { CompressionHeaderID: compression.HeaderID(5000), }) - err := b.buildV2(io.Discard) + err := buildV2(b.sortedContents(), io.Discard) require.Error(t, err) require.Equal(t, "unsupported - too many unique formats 256 (max 255)", err.Error()) } @@ -518,6 +600,83 @@ func verifyAllShardedIDs(t *testing.T, sharded []Builder, numTotal, numShards in return lens } +func TestShard1(t *testing.T) { + // generate 10000 IDs in random order + ids := make([]int, 10000) + for i := range ids { + ids[i] = i + } + + rand.Shuffle(len(ids), func(i, j int) { + ids[i], ids[j] = ids[j], ids[i] + }) + + cases := []struct { + shardSize int + numShards int + shardLens []int + }{ + {100000, 1, nil}, + {100, 100, nil}, + {500, 20, []int{460, 472, 473, 477, 479, 483, 486, 492, 498, 499, 501, 503, 504, 505, 511, 519, 524, 528, 542, 544}}, + {1000, 10, []int{945, 964, 988, 988, 993, 1002, 1014, 1017, 1021, 1068}}, + {2000, 5, []int{1952, 1995, 2005, 2013, 2035}}, + } + + for _, tc := range cases { + b := NewOneUseBuilder() + + // add ID to the builder + for _, id := range ids { + b.Add(Info{ + ContentID: deterministicContentID(t, "", id), + }) + } + + length := b.Length() + shards := b.shard(tc.shardSize) + + // verify number of shards + lens := verifyAllShardedIDsList(t, shards, length, tc.numShards) + + require.Zero(t, b.Length()) + + // sharding will always produce stable results, verify sorted shard lengths here + if tc.shardLens != nil { + require.ElementsMatch(t, tc.shardLens, lens) + } + } +} + +func verifyAllShardedIDsList(t *testing.T, sharded [][]*Info, numTotal, numShards int) []int { + t.Helper() + + require.Len(t, sharded, numShards) + + m := map[ID]bool{} + for i := range numTotal { + m[deterministicContentID(t, "", i)] = true + } + + cnt := 0 + + var lens []int + + for _, s := range sharded { + cnt += len(s) + lens = append(lens, len(s)) + + for _, v := range s { + delete(m, v.ContentID) + } + } + + require.Equal(t, numTotal, cnt, "invalid total number of sharded elements") + require.Empty(t, m) + + return lens +} + func withOriginalLength(is Info, originalLength uint32) Info { // clone and override original length is.OriginalLength = originalLength diff --git a/repo/content/indexblob/index_blob_manager_v0.go b/repo/content/indexblob/index_blob_manager_v0.go index 6fa70ba2b2d..6a14f02c692 100644 --- a/repo/content/indexblob/index_blob_manager_v0.go +++ b/repo/content/indexblob/index_blob_manager_v0.go @@ -550,7 +550,7 @@ func (m *ManagerV0) dropContentsFromBuilder(bld index.Builder, opt CompactOption } } -func addIndexBlobsToBuilder(ctx context.Context, enc *EncryptionManager, bld index.Builder, indexBlobID blob.ID) error { +func addIndexBlobsToBuilder(ctx context.Context, enc *EncryptionManager, bld index.BuilderCreator, indexBlobID blob.ID) error { var data gather.WriteBuffer defer data.Close() diff --git a/repo/content/indexblob/index_blob_manager_v1.go b/repo/content/indexblob/index_blob_manager_v1.go index 04df58d0f04..edf3558089b 100644 --- a/repo/content/indexblob/index_blob_manager_v1.go +++ b/repo/content/indexblob/index_blob_manager_v1.go @@ -68,7 +68,7 @@ func (m *ManagerV1) Compact(ctx context.Context, opt CompactOptions) error { // CompactEpoch compacts the provided index blobs and writes a new set of blobs. func (m *ManagerV1) CompactEpoch(ctx context.Context, blobIDs []blob.ID, outputPrefix blob.ID) error { - tmpbld := make(index.Builder) + tmpbld := index.NewOneUseBuilder() for _, indexBlob := range blobIDs { if err := addIndexBlobsToBuilder(ctx, m.enc, tmpbld, indexBlob); err != nil { diff --git a/repo/manifest/committed_manifest_manager.go b/repo/manifest/committed_manifest_manager.go index d955ce724d2..109fb6ea009 100644 --- a/repo/manifest/committed_manifest_manager.go +++ b/repo/manifest/committed_manifest_manager.go @@ -32,7 +32,7 @@ type committedManifestManager struct { // +checklocks:cmmu committedEntries map[ID]*manifestEntry // +checklocks:cmmu - committedContentIDs map[content.ID]bool + committedContentIDs map[content.ID]struct{} // autoCompactionThreshold controls the threshold after which the manager auto-compacts // manifest contents @@ -79,7 +79,7 @@ func (m *committedManifestManager) findCommittedEntries(ctx context.Context, lab return findEntriesMatchingLabels(m.committedEntries, labels), nil } -func (m *committedManifestManager) commitEntries(ctx context.Context, entries map[ID]*manifestEntry) (map[content.ID]bool, error) { +func (m *committedManifestManager) commitEntries(ctx context.Context, entries map[ID]*manifestEntry) (map[content.ID]struct{}, error) { if len(entries) == 0 { return nil, nil } @@ -98,7 +98,7 @@ func (m *committedManifestManager) commitEntries(ctx context.Context, entries ma // the lock via commitEntries()) and to compact existing committed entries during compaction // where the lock is already being held. // +checklocks:m.cmmu -func (m *committedManifestManager) writeEntriesLocked(ctx context.Context, entries map[ID]*manifestEntry) (map[content.ID]bool, error) { +func (m *committedManifestManager) writeEntriesLocked(ctx context.Context, entries map[ID]*manifestEntry) (map[content.ID]struct{}, error) { if len(entries) == 0 { return nil, nil } @@ -124,12 +124,11 @@ func (m *committedManifestManager) writeEntriesLocked(ctx context.Context, entri for _, e := range entries { m.committedEntries[e.ID] = e - delete(entries, e.ID) } - m.committedContentIDs[contentID] = true + m.committedContentIDs[contentID] = struct{}{} - return map[content.ID]bool{contentID: true}, nil + return map[content.ID]struct{}{contentID: {}}, nil } // +checklocks:m.cmmu @@ -192,10 +191,10 @@ func (m *committedManifestManager) loadCommittedContentsLocked(ctx context.Conte // +checklocks:m.cmmu func (m *committedManifestManager) loadManifestContentsLocked(manifests map[content.ID]manifest) { m.committedEntries = map[ID]*manifestEntry{} - m.committedContentIDs = map[content.ID]bool{} + m.committedContentIDs = map[content.ID]struct{}{} for contentID := range manifests { - m.committedContentIDs[contentID] = true + m.committedContentIDs[contentID] = struct{}{} } for _, man := range manifests { @@ -257,19 +256,14 @@ func (m *committedManifestManager) compactLocked(ctx context.Context) error { m.b.DisableIndexFlush(ctx) defer m.b.EnableIndexFlush(ctx) - tmp := map[ID]*manifestEntry{} - for k, v := range m.committedEntries { - tmp[k] = v - } - - written, err := m.writeEntriesLocked(ctx, tmp) + written, err := m.writeEntriesLocked(ctx, m.committedEntries) if err != nil { return err } // add the newly-created content to the list, could be duplicate for b := range m.committedContentIDs { - if written[b] { + if _, ok := written[b]; ok { // do not delete content that was just written. continue } @@ -374,7 +368,7 @@ func newCommittedManager(b contentManager, autoCompactionThreshold int) *committ b: b, debugID: debugID, committedEntries: map[ID]*manifestEntry{}, - committedContentIDs: map[content.ID]bool{}, + committedContentIDs: map[content.ID]struct{}{}, autoCompactionThreshold: autoCompactionThreshold, } } diff --git a/repo/manifest/manifest_manager.go b/repo/manifest/manifest_manager.go index 8bb2f39f433..e74a404bed6 100644 --- a/repo/manifest/manifest_manager.go +++ b/repo/manifest/manifest_manager.go @@ -215,6 +215,9 @@ func (m *Manager) Flush(ctx context.Context) error { defer m.mu.Unlock() _, err := m.committed.commitEntries(ctx, m.pendingEntries) + if err == nil { + m.pendingEntries = map[ID]*manifestEntry{} + } return err } diff --git a/repo/manifest/manifest_manager_test.go b/repo/manifest/manifest_manager_test.go index 9c567d8859c..c60fa24ccf8 100644 --- a/repo/manifest/manifest_manager_test.go +++ b/repo/manifest/manifest_manager_test.go @@ -3,6 +3,7 @@ package manifest import ( "context" "encoding/json" + "fmt" "reflect" "sort" "strings" @@ -310,8 +311,8 @@ type contentManagerOpts struct { readOnly bool } -func newContentManagerForTesting(ctx context.Context, t *testing.T, data blobtesting.DataMap, opts contentManagerOpts) contentManager { - t.Helper() +func newContentManagerForTesting(ctx context.Context, tb testing.TB, data blobtesting.DataMap, opts contentManagerOpts) contentManager { + tb.Helper() st := blobtesting.NewMapStorage(data, nil, nil) @@ -328,12 +329,12 @@ func newContentManagerForTesting(ctx context.Context, t *testing.T, data blobtes }, }, nil) - require.NoError(t, err) + require.NoError(tb, err) bm, err := content.NewManagerForTesting(ctx, st, fop, nil, nil) - require.NoError(t, err) + require.NoError(tb, err) - t.Cleanup(func() { bm.CloseShared(ctx) }) + tb.Cleanup(func() { bm.CloseShared(ctx) }) return bm } @@ -488,3 +489,50 @@ func TestManifestAutoCompactionWithReadOnly(t *testing.T) { _, err = mgr.Find(ctx, map[string]string{"color": "red"}) require.NoError(t, err, "forcing reload of manifest manager") } + +func BenchmarkLargeCompaction(b *testing.B) { + item1 := map[string]int{"foo": 1, "bar": 2} + labels1 := map[string]string{"type": "item", "color": "red"} + + table := []int{10000, 100000, 1000000} + + for _, numItems := range table { + b.Run(fmt.Sprintf("%dItems", numItems), func(b *testing.B) { + for range b.N { + b.StopTimer() + // Use default context to avoid lots of log output during benchmark. + ctx := context.Background() + data := blobtesting.DataMap{} + + bm := newContentManagerForTesting(ctx, b, data, contentManagerOpts{}) + + mgr, err := NewManager( + ctx, + bm, + ManagerOptions{AutoCompactionThreshold: 2}, + nil, + ) + require.NoError(b, err, "getting initial manifest manager") + + for range numItems - 1 { + _, err = mgr.Put(ctx, labels1, item1) + require.NoError(b, err, "adding item to manifest manager") + } + + require.NoError(b, mgr.Flush(ctx)) + require.NoError(b, mgr.b.Flush(ctx)) + + _, err = mgr.Put(ctx, labels1, item1) + require.NoError(b, err, "adding item to manifest manager") + + require.NoError(b, mgr.Flush(ctx)) + require.NoError(b, mgr.b.Flush(ctx)) + + b.StartTimer() + + err = mgr.Compact(ctx) + require.NoError(b, err, "forcing reload of manifest manager") + } + }) + } +} diff --git a/snapshot/restore/local_fs_output.go b/snapshot/restore/local_fs_output.go index d78e37c40dd..bccbfa6389e 100644 --- a/snapshot/restore/local_fs_output.go +++ b/snapshot/restore/local_fs_output.go @@ -54,27 +54,16 @@ func getStreamCopier(ctx context.Context, targetpath string, sparse bool) (strea }, nil } -// progressReportingReader is just a wrapper for fs.Reader which is used to capture and pass to cb number of bytes read. +// progressReportingReader wraps fs.Reader Read function to capture the and pass +// the number of bytes read to the callback cb. type progressReportingReader struct { - r fs.Reader + fs.Reader cb FileWriteProgress } -func (r *progressReportingReader) Entry() (fs.Entry, error) { - return r.r.Entry() //nolint:wrapcheck -} - -func (r *progressReportingReader) Seek(offset int64, whence int) (int64, error) { - return r.r.Seek(offset, whence) //nolint:wrapcheck -} - -func (r *progressReportingReader) Close() error { - return r.r.Close() //nolint:wrapcheck -} - func (r *progressReportingReader) Read(p []byte) (int, error) { - bytesRead, err := r.r.Read(p) + bytesRead, err := r.Reader.Read(p) if err == nil && r.cb != nil { r.cb(int64(bytesRead)) } @@ -399,10 +388,8 @@ func write(targetPath string, r fs.Reader, size int64, c streamCopier) error { // close below, as close is idempotent. defer f.Close() //nolint:errcheck - name := f.Name() - if _, err := c(f, r); err != nil { - return errors.Wrap(err, "cannot write data to file %q "+name) + return errors.Wrapf(err, "cannot write data to file %q", f.Name()) } if err := f.Close(); err != nil { @@ -431,9 +418,9 @@ func (o *FilesystemOutput) copyFileContent(ctx context.Context, targetPath strin } defer r.Close() //nolint:errcheck - wr := &progressReportingReader{ - r: r, - cb: progressCb, + rr := &progressReportingReader{ + Reader: r, + cb: progressCb, } log(ctx).Debugf("copying file contents to: %v", targetPath) @@ -441,10 +428,10 @@ func (o *FilesystemOutput) copyFileContent(ctx context.Context, targetPath strin if o.WriteFilesAtomically { //nolint:wrapcheck - return atomicfile.Write(targetPath, wr) + return atomicfile.Write(targetPath, rr) } - return write(targetPath, wr, f.Size(), o.copier) + return write(targetPath, rr, f.Size(), o.copier) } func isEmptyDirectory(name string) (bool, error) { diff --git a/snapshot/restore/restore_progress.go b/snapshot/restore/restore_progress.go deleted file mode 100644 index fad030bbe27..00000000000 --- a/snapshot/restore/restore_progress.go +++ /dev/null @@ -1,10 +0,0 @@ -package restore - -// Progress is invoked by copier to report status of snapshot restoration. -type Progress interface { - SetCounters( - enqueuedCount, restoredCount, skippedCount, ignoredErrors int32, - enqueuedBytes, restoredBytes, skippedBytes int64, - ) - Flush() -} diff --git a/snapshot/snapshotfs/upload.go b/snapshot/snapshotfs/upload.go index f55ec223cb0..fee0d40e3e4 100644 --- a/snapshot/snapshotfs/upload.go +++ b/snapshot/snapshotfs/upload.go @@ -145,10 +145,9 @@ func (u *Uploader) uploadFileInternal(ctx context.Context, parentCheckpointRegis defer u.Progress.FinishedHashingFile(relativePath, f.Size()) if pf, ok := f.(snapshot.HasDirEntryOrNil); ok { - switch de, err := pf.DirEntryOrNil(ctx); { - case err != nil: + if de, err := pf.DirEntryOrNil(ctx); err != nil { return nil, errors.Wrap(err, "can't read placeholder") - case err == nil && de != nil: + } else if de != nil { // We have read sufficient information from the shallow file's extended // attribute to construct DirEntry. _, err := u.repo.VerifyObject(ctx, de.ObjectID) @@ -1069,10 +1068,9 @@ type dirReadError struct { func uploadShallowDirInternal(ctx context.Context, directory fs.Directory, u *Uploader) (*snapshot.DirEntry, error) { if pf, ok := directory.(snapshot.HasDirEntryOrNil); ok { - switch de, err := pf.DirEntryOrNil(ctx); { - case err != nil: + if de, err := pf.DirEntryOrNil(ctx); err != nil { return nil, errors.Wrapf(err, "error reading placeholder for %q", directory.Name()) - case err == nil && de != nil: + } else if de != nil { if _, err := u.repo.VerifyObject(ctx, de.ObjectID); err != nil { return nil, errors.Wrapf(err, "invalid placeholder for %q contains foreign object.ID", directory.Name()) } diff --git a/tests/end_to_end_test/restore_test.go b/tests/end_to_end_test/restore_test.go index 04f196233d0..db29ee6315d 100644 --- a/tests/end_to_end_test/restore_test.go +++ b/tests/end_to_end_test/restore_test.go @@ -29,6 +29,7 @@ import ( "github.com/kopia/kopia/internal/stat" "github.com/kopia/kopia/internal/testlogging" "github.com/kopia/kopia/internal/testutil" + "github.com/kopia/kopia/snapshot/restore" "github.com/kopia/kopia/tests/clitestutil" "github.com/kopia/kopia/tests/testdirtree" "github.com/kopia/kopia/tests/testenv" @@ -42,33 +43,19 @@ const ( overriddenDirPermissions = 0o752 ) -type restoreProgressInvocation struct { - enqueuedCount, restoredCount, skippedCount, ignoredErrors int32 - enqueuedBytes, restoredBytes, skippedBytes int64 -} - type fakeRestoreProgress struct { mtx sync.Mutex - invocations []restoreProgressInvocation + invocations []restore.Stats flushesCount int invocationAfterFlush bool } -func (p *fakeRestoreProgress) SetCounters( - enqueuedCount, restoredCount, skippedCount, ignoredErrors int32, - enqueuedBytes, restoredBytes, skippedBytes int64, -) { +func (p *fakeRestoreProgress) SetCounters(s restore.Stats) { p.mtx.Lock() defer p.mtx.Unlock() - p.invocations = append(p.invocations, restoreProgressInvocation{ - enqueuedCount: enqueuedCount, - restoredCount: restoredCount, - skippedCount: skippedCount, - ignoredErrors: ignoredErrors, - enqueuedBytes: enqueuedBytes, - restoredBytes: restoredBytes, - skippedBytes: skippedBytes, - }) + + p.invocations = append(p.invocations, s) + if p.flushesCount > 0 { p.invocationAfterFlush = true }