Skip to content

Commit

Permalink
fix: replace allocating download pages by using chunk.NewOffPage to s…
Browse files Browse the repository at this point in the history
…ync.Pool.Get
  • Loading branch information
YinhaoHu committed Jan 6, 2025
1 parent 8fc8cb9 commit 0e12449
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 12 deletions.
30 changes: 19 additions & 11 deletions pkg/sync/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"io"
"sync"

"github.com/juicedata/juicefs/pkg/chunk"
"github.com/juicedata/juicefs/pkg/object"
)

Expand All @@ -33,7 +32,7 @@ type parallelDownloader struct {
fsize int64
blockSize int64
concurrent chan int
buffers map[int64]*chunk.Page
buffers map[int64]*[]byte
off int64
err error
}
Expand All @@ -50,6 +49,15 @@ func (r *parallelDownloader) setErr(err error) {
r.err = err
}

const downloadBufSize = 10 << 20

var downloadBufPool = sync.Pool{
New: func() interface{} {
buf := make([]byte, downloadBufSize)
return &buf
},
}

func (r *parallelDownloader) download() {
for off := int64(0); off < r.fsize; off += r.blockSize {
r.concurrent <- 1
Expand All @@ -73,18 +81,18 @@ func (r *parallelDownloader) download() {
r.setErr(e)
} else { //nolint:typecheck
defer in.Close()
p := chunk.NewOffPage(int(size))
_, e = io.ReadFull(in, p.Data)
p := downloadBufPool.Get().(*[]byte)
_, e = io.ReadFull(in, *p)
if e != nil {
r.setErr(e)
p.Release()
downloadBufPool.Put(p)
} else {
r.Lock()
if r.buffers != nil {
r.buffers[off] = p
saved = true
} else {
p.Release()
downloadBufPool.Put(p)
}
r.Unlock()
}
Expand Down Expand Up @@ -115,10 +123,10 @@ func (r *parallelDownloader) Read(b []byte) (int, error) {
if p == nil {
return 0, r.err
}
n := copy(b, p.Data[r.off-off:])
n := copy(b, (*p)[r.off-off:])
r.off += int64(n)
if r.off == off+int64(len(p.Data)) {
p.Release()
if r.off == off+int64(len(*p)) {
downloadBufPool.Put(&p)
r.Lock()
delete(r.buffers, off)
r.Unlock()
Expand All @@ -134,7 +142,7 @@ func (r *parallelDownloader) Close() {
r.Lock()
defer r.Unlock()
for _, p := range r.buffers {
p.Release()
downloadBufPool.Put(&p)
}
r.buffers = nil
if r.err == nil {
Expand All @@ -152,7 +160,7 @@ func newParallelDownloader(store object.ObjectStorage, key string, size int64, b
fsize: size,
blockSize: bSize,
concurrent: concurrent,
buffers: make(map[int64]*chunk.Page),
buffers: make(map[int64]*[]byte),
}
down.notify = sync.NewCond(down)
go down.download()
Expand Down
2 changes: 1 addition & 1 deletion pkg/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func doCopySingle(src, dst object.ObjectStorage, key string, size int64) error {
if size > maxBlock && !inMap(dst, readInMem) && !inMap(src, fastStreamRead) {
var err error
var in io.Reader
downer := newParallelDownloader(src, key, size, 10<<20, concurrent)
downer := newParallelDownloader(src, key, size, downloadBufSize, concurrent)
defer downer.Close()
if inMap(dst, streamWrite) {
in = downer
Expand Down

0 comments on commit 0e12449

Please sign in to comment.