Skip to content

Commit

Permalink
cmd/sync: allocate memory from sync.Pool instead of Chunk.NewOffPage …
Browse files Browse the repository at this point in the history
…to avoid stall problem (#5497)
  • Loading branch information
YinhaoHu authored and jiefenghuang committed Jan 20, 2025
1 parent 336275f commit 705561b
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 28 deletions.
31 changes: 20 additions & 11 deletions pkg/sync/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"io"
"sync"

"github.com/juicedata/juicefs/pkg/chunk"
"github.com/juicedata/juicefs/pkg/object"
)

Expand All @@ -33,7 +32,7 @@ type parallelDownloader struct {
fsize int64
blockSize int64
concurrent chan int
buffers map[int64]*chunk.Page
buffers map[int64]*[]byte
off int64
err error
}
Expand All @@ -50,6 +49,15 @@ func (r *parallelDownloader) setErr(err error) {
r.err = err
}

const downloadBufSize = 10 << 20

var downloadBufPool = sync.Pool{
New: func() interface{} {
buf := make([]byte, 0, downloadBufSize)
return &buf
},
}

func (r *parallelDownloader) download() {
for off := int64(0); off < r.fsize; off += r.blockSize {
r.concurrent <- 1
Expand All @@ -73,18 +81,19 @@ func (r *parallelDownloader) download() {
r.setErr(e)
} else { //nolint:typecheck
defer in.Close()
p := chunk.NewOffPage(int(size))
_, e = io.ReadFull(in, p.Data)
p := downloadBufPool.Get().(*[]byte)
*p = (*p)[:size]
_, e = io.ReadFull(in, *p)
if e != nil {
r.setErr(e)
p.Release()
downloadBufPool.Put(p)
} else {
r.Lock()
if r.buffers != nil {
r.buffers[off] = p
saved = true
} else {
p.Release()
downloadBufPool.Put(p)
}
r.Unlock()
}
Expand Down Expand Up @@ -115,10 +124,10 @@ func (r *parallelDownloader) Read(b []byte) (int, error) {
if p == nil {
return 0, r.err
}
n := copy(b, p.Data[r.off-off:])
n := copy(b, (*p)[r.off-off:])
r.off += int64(n)
if r.off == off+int64(len(p.Data)) {
p.Release()
if r.off == off+int64(len(*p)) {
downloadBufPool.Put(p)
r.Lock()
delete(r.buffers, off)
r.Unlock()
Expand All @@ -134,7 +143,7 @@ func (r *parallelDownloader) Close() {
r.Lock()
defer r.Unlock()
for _, p := range r.buffers {
p.Release()
downloadBufPool.Put(p)
}
r.buffers = nil
if r.err == nil {
Expand All @@ -152,7 +161,7 @@ func newParallelDownloader(store object.ObjectStorage, key string, size int64, b
fsize: size,
blockSize: bSize,
concurrent: concurrent,
buffers: make(map[int64]*chunk.Page),
buffers: make(map[int64]*[]byte),
}
down.notify = sync.NewCond(down)
go down.download()
Expand Down
32 changes: 16 additions & 16 deletions pkg/sync/download_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ func TestDownload(t *testing.T) {
os.RemoveAll("/tmp/download/")
})
type config struct {
blockSize int64
concurrent int
fsize int64
}
Expand All @@ -43,7 +42,7 @@ func TestDownload(t *testing.T) {
}

tcases := []tcase{
{config: config{fsize: 1110, concurrent: 4, blockSize: 300}, tfunc: func(t *testing.T, pr *parallelDownloader, content []byte) {
{config: config{fsize: downloadBufSize*3 + 100, concurrent: 4}, tfunc: func(t *testing.T, pr *parallelDownloader, content []byte) {
defer pr.Close()
res, err := io.ReadAll(pr)
if err != nil {
Expand All @@ -54,7 +53,7 @@ func TestDownload(t *testing.T) {
}
}},

{config: config{fsize: 97340326, concurrent: 4, blockSize: 5 << 20}, tfunc: func(t *testing.T, pr *parallelDownloader, content []byte) {
{config: config{fsize: 97340326, concurrent: 4}, tfunc: func(t *testing.T, pr *parallelDownloader, content []byte) {
defer pr.Close()
res, err := io.ReadAll(pr)
if err != nil {
Expand All @@ -65,7 +64,7 @@ func TestDownload(t *testing.T) {
}
}},

{config: config{fsize: 1110, concurrent: 5, blockSize: 300}, tfunc: func(t *testing.T, pr *parallelDownloader, content []byte) {
{config: config{fsize: downloadBufSize*3 + 100, concurrent: 5}, tfunc: func(t *testing.T, pr *parallelDownloader, content []byte) {
defer pr.Close()
res, err := io.ReadAll(pr)
if err != nil {
Expand All @@ -76,7 +75,7 @@ func TestDownload(t *testing.T) {
}
}},

{config: config{fsize: 1, concurrent: 5, blockSize: 10}, tfunc: func(t *testing.T, pr *parallelDownloader, content []byte) {
{config: config{fsize: 1, concurrent: 5}, tfunc: func(t *testing.T, pr *parallelDownloader, content []byte) {
defer pr.Close()
res := make([]byte, 1)
n, err := pr.Read(res)
Expand All @@ -89,7 +88,7 @@ func TestDownload(t *testing.T) {
}
}},

{config: config{fsize: 2, concurrent: 5, blockSize: 10}, tfunc: func(t *testing.T, pr *parallelDownloader, content []byte) {
{config: config{fsize: 2, concurrent: 5}, tfunc: func(t *testing.T, pr *parallelDownloader, content []byte) {
defer pr.Close()
res := make([]byte, 1)
n, err := pr.Read(res)
Expand All @@ -106,7 +105,7 @@ func TestDownload(t *testing.T) {
}
}},

{config: config{fsize: 2, concurrent: 1, blockSize: 10}, tfunc: func(t *testing.T, pr *parallelDownloader, content []byte) {
{config: config{fsize: 2, concurrent: 1}, tfunc: func(t *testing.T, pr *parallelDownloader, content []byte) {
defer pr.Close()
res := make([]byte, 1)
n, err := pr.Read(res)
Expand All @@ -124,17 +123,18 @@ func TestDownload(t *testing.T) {
}
}},

{config: config{fsize: 1000, concurrent: 3, blockSize: 5}, tfunc: func(t *testing.T, pr *parallelDownloader, content []byte) {
{config: config{fsize: downloadBufSize * 20, concurrent: 3}, tfunc: func(t *testing.T, pr *parallelDownloader, content []byte) {
defer pr.Close()
res := make([]byte, 20)
resSize := 4 * downloadBufSize
res := make([]byte, 4*downloadBufSize)
n, err := io.ReadFull(pr, res)

if err != nil || n != 20 || res[0] != content[0] {
t.Fatalf("read 20 byte should succeed, but got %d, %s", n, err)
if err != nil || n != resSize || res[0] != content[0] {
t.Fatalf("read %v byte should succeed, but got %d, %s", resSize, n, err)
}
n, err = io.ReadFull(pr, res)
if err != nil || n != 20 || res[0] != content[20] {
t.Fatalf("read 20 byte should succeed, but got %d, %s", n, err)
if err != nil || n != resSize || res[0] != content[resSize] {
t.Fatalf("read %v byte should succeed, but got %d, %s", resSize, n, err)
}
_ = a.Delete(key)
n, err = io.ReadFull(pr, res)
Expand All @@ -144,7 +144,7 @@ func TestDownload(t *testing.T) {
}
}},

{config: config{fsize: 0, concurrent: 5, blockSize: 10}, tfunc: func(t *testing.T, pr *parallelDownloader, content []byte) {
{config: config{fsize: 0, concurrent: 5}, tfunc: func(t *testing.T, pr *parallelDownloader, content []byte) {
defer pr.Close()
res := make([]byte, 1)
n, err := pr.Read(res)
Expand All @@ -153,7 +153,7 @@ func TestDownload(t *testing.T) {
}
}},

{config: config{fsize: 100, concurrent: 5, blockSize: 10}, tfunc: func(t *testing.T, pr *parallelDownloader, content []byte) {
{config: config{fsize: 10 * downloadBufSize, concurrent: 5}, tfunc: func(t *testing.T, pr *parallelDownloader, content []byte) {
defer pr.Close()
res := make([]byte, 1)
pr.key = "notExist"
Expand All @@ -168,6 +168,6 @@ func TestDownload(t *testing.T) {
content := make([]byte, c.config.fsize)
utils.RandRead(content)
_ = a.Put(key, bytes.NewReader(content))
c.tfunc(t, newParallelDownloader(a, key, c.config.fsize, c.blockSize, make(chan int, c.concurrent)), content)
c.tfunc(t, newParallelDownloader(a, key, c.config.fsize, downloadBufSize, make(chan int, c.concurrent)), content)
}
}
2 changes: 1 addition & 1 deletion pkg/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func doCopySingle(src, dst object.ObjectStorage, key string, size int64) error {
if size > maxBlock && !inMap(dst, readInMem) && !inMap(src, fastStreamRead) {
var err error
var in io.Reader
downer := newParallelDownloader(src, key, size, 10<<20, concurrent)
downer := newParallelDownloader(src, key, size, downloadBufSize, concurrent)
defer downer.Close()
if inMap(dst, streamWrite) {
in = downer
Expand Down

0 comments on commit 705561b

Please sign in to comment.