Skip to content

Commit

Permalink
Sync lakectl download recursive summary with local sync
Browse files Browse the repository at this point in the history
  • Loading branch information
nopcoder committed Feb 11, 2025
1 parent a1ee81d commit 6fe84bc
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 12 deletions.
36 changes: 24 additions & 12 deletions cmd/lakectl/cmd/fs_download.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/go-openapi/swag"
Expand Down Expand Up @@ -41,12 +42,12 @@ var fsDownloadCmd = &cobra.Command{
downloader := helpers.NewDownloader(client, syncFlags.Presign)
downloader.PartSize = downloadPartSize

remotePath := remote.GetPath()
if !recursive {
// if dest is a directory, add the file name
if s, _ := os.Stat(dest); s != nil && s.IsDir() {
dest += uri.PathSeparator
}
remotePath := remote.GetPath()
if remotePath != "" && strings.HasSuffix(dest, uri.PathSeparator) {
dest += filepath.Base(remotePath)
}
Expand All @@ -63,19 +64,8 @@ var fsDownloadCmd = &cobra.Command{
pw := newDownloadProgressWriter(syncFlags.NoProgress)
// ProgressRender start render progress and return callback waiting for the progress to finish.
go pw.Render()
defer func() {
for pw.IsRenderInProgress() {
// for manual-stop mode, stop when there are no more active trackers
if pw.LengthActive() == 0 {
pw.Stop()
}
const waitForRender = 100 * time.Millisecond
time.Sleep(waitForRender)
}
}()

ch := make(chan string, filesChanSize)
remotePath := remote.GetPath()
if remotePath != "" && !strings.HasSuffix(remotePath, uri.PathSeparator) {
*remote.Path += uri.PathSeparator
}
Expand Down Expand Up @@ -116,6 +106,7 @@ var fsDownloadCmd = &cobra.Command{
// download files in parallel
var wg sync.WaitGroup
wg.Add(syncFlags.Parallelism)
var downloaded int64
for i := 0; i < syncFlags.Parallelism; i++ {
go func() {
defer wg.Done()
Expand All @@ -139,11 +130,32 @@ var fsDownloadCmd = &cobra.Command{
DieErr(err)
}
tracker.MarkAsDone()
atomic.AddInt64(&downloaded, 1)
}
}()
}
// wait for all downloads to finish
wg.Wait()

// wait for progress to finish render
for pw.IsRenderInProgress() {
// for manual-stop mode, stop when there are no more active trackers
if pw.LengthActive() == 0 {
pw.Stop()
}
const waitForRender = 100 * time.Millisecond
time.Sleep(waitForRender)
}

Write(localSummaryTemplate, struct {
Operation string
Downloaded int64
Removed int
Uploaded int
}{
Operation: "Download",
Downloaded: downloaded,
})
},
}

Expand Down
8 changes: 8 additions & 0 deletions esti/lakectl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,10 @@ func TestLakectlFsDownload(t *testing.T) {
require.Contains(t, sanitizedResult, "download ro/ro_1k.2")
require.Contains(t, sanitizedResult, "download ro/ro_1k.3")
require.Contains(t, sanitizedResult, "download ro/ro_1k.4")
require.Contains(t, sanitizedResult, "Download Summary:")
require.Contains(t, sanitizedResult, "Downloaded: 5")
require.Contains(t, sanitizedResult, "Uploaded: 0")
require.Contains(t, sanitizedResult, "Removed: 0")
})

t.Run("directory_with_dest", func(t *testing.T) {
Expand All @@ -652,6 +656,10 @@ func TestLakectlFsDownload(t *testing.T) {
require.Contains(t, sanitizedResult, "download ro/ro_1k.2")
require.Contains(t, sanitizedResult, "download ro/ro_1k.3")
require.Contains(t, sanitizedResult, "download ro/ro_1k.4")
require.Contains(t, sanitizedResult, "Download Summary:")
require.Contains(t, sanitizedResult, "Downloaded: 5")
require.Contains(t, sanitizedResult, "Uploaded: 0")
require.Contains(t, sanitizedResult, "Removed: 0")
})

t.Run("directory_without_recursive", func(t *testing.T) {
Expand Down

0 comments on commit 6fe84bc

Please sign in to comment.