Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport 1.1.6 #5583

Merged
merged 6 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .github/scripts/command/sync_cluster.sh
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ test_sync_without_mount_point(){

sudo -u juicedata meta_url=$META_URL ./juicefs sync -v jfs://meta_url/data/ minio://minioadmin:[email protected]:9000/data1/ \
--manager-addr 172.20.0.1:8081 --worker [email protected],[email protected] \
--list-threads 10 --list-depth 5 \
--list-threads 10 --list-depth 5 --check-new \
2>&1 | tee sync.log
# diff data/ /jfs/data1/
check_sync_log $file_count
Expand All @@ -86,6 +86,10 @@ test_sync_without_mount_point2(){
--list-threads 10 --list-depth 5\
2>&1 | tee sync.log
check_sync_log $file_count
sudo -u juicedata meta_url=$META_URL ./juicefs sync -v minio://minioadmin:[email protected]:9000/data/ jfs://meta_url/ \
--manager-addr 172.20.0.1:8081 --worker [email protected],[email protected] \
--list-threads 10 --list-depth 5 --check-all \
2>&1 | tee sync.log
./juicefs mount -d $META_URL /jfs
diff data/ /jfs/data/
./mc rm -r --force myminio/data
Expand Down
2 changes: 1 addition & 1 deletion pkg/chunk/disk_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ func (cache *cacheStore) stage(key string, data []byte, keepCache bool) (string,
if cache.capacity > 0 && keepCache {
path := cache.cachePath(key)
cache.createDir(filepath.Dir(path))
if err := os.Link(stagingPath, path); err == nil {
if err = os.Link(stagingPath, path); err == nil {
cache.add(key, -int32(len(data)), uint32(time.Now().Unix()))
} else {
logger.Warnf("link %s to %s failed: %s", stagingPath, path, err)
Expand Down
8 changes: 7 additions & 1 deletion pkg/fuse/fuse.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package fuse

import (
"fmt"
"math"
"os"
"os/exec"
"runtime"
Expand Down Expand Up @@ -297,7 +298,12 @@ func (fs *fileSystem) Fallocate(cancel <-chan struct{}, in *fuse.FallocateIn) (c
func (fs *fileSystem) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn) (written uint32, code fuse.Status) {
ctx := fs.newContext(cancel, &in.InHeader)
defer releaseContext(ctx)
copied, err := fs.v.CopyFileRange(ctx, Ino(in.NodeId), in.FhIn, in.OffIn, Ino(in.NodeIdOut), in.FhOut, in.OffOut, in.Len, uint32(in.Flags))
var len = in.Len
if len > math.MaxUint32 {
// written may overflow
len = math.MaxUint32 + 1 - meta.ChunkSize
}
copied, err := fs.v.CopyFileRange(ctx, Ino(in.NodeId), in.FhIn, in.OffIn, Ino(in.NodeIdOut), in.FhOut, in.OffOut, len, uint32(in.Flags))
if err != 0 {
return 0, fuse.Status(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/meta/tkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -1863,8 +1863,8 @@ func (m *kvMeta) doDeleteSustainedInode(sid uint64, inode Ino) error {
if a == nil {
return nil
}
newSpace = -align4K(attr.Length)
m.parseAttr(a, &attr)
newSpace = -align4K(attr.Length)
tx.set(m.delfileKey(inode, attr.Length), m.packInt64(time.Now().Unix()))
tx.delete(m.inodeKey(inode))
tx.delete(m.sustainedKey(sid, inode))
Expand Down
38 changes: 36 additions & 2 deletions pkg/object/hdfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ type hdfsclient struct {
c *hdfs.Client
dfsReplication int
umask os.FileMode
closeTimeout time.Duration
closeMaxDelay time.Duration
}

func (h *hdfsclient) String() string {
Expand Down Expand Up @@ -164,13 +166,31 @@ func (h *hdfsclient) Put(key string, in io.Reader) (err error) {
_ = f.Close()
return err
}
err = f.Close()
if err != nil && !IsErrReplicating(err) {
start := time.Now()
sleeptime := 400 * time.Millisecond
for {
err = f.Close()
if IsErrReplicating(err) && start.Add(h.closeTimeout).After(time.Now()) {
time.Sleep(sleeptime)
sleeptime = min(2*sleeptime, h.closeMaxDelay)
continue
} else {
break
}
}
if err != nil {
return err
}
return h.c.Rename(tmp, p)
}

func min(a, b time.Duration) time.Duration {
if a < b {
return a
}
return b
}

func IsErrReplicating(err error) bool {
pe, ok := err.(*os.PathError)
return ok && pe.Err == hdfs.ErrReplicating
Expand Down Expand Up @@ -326,13 +346,27 @@ func newHDFS(addr, username, sk, token string) (ObjectStorage, error) {
umask = uint16(x)
}
}
var closeTimeout = 120 * time.Second
if v, found := conf["ipc.client.rpc-timeout.ms"]; found {
if x, err := strconv.Atoi(v); err == nil {
closeTimeout = time.Duration(x) * time.Millisecond
}
}
var closeMaxDelay = 60 * time.Second
if v, found := conf["dfs.client.block.write.locateFollowingBlock.max.delay.ms"]; found {
if x, err := strconv.Atoi(v); err == nil {
closeMaxDelay = time.Duration(x) * time.Millisecond
}
}

return &hdfsclient{
addr: strings.Join(rpcAddr, ","),
basePath: basePath,
c: c,
dfsReplication: replication,
umask: os.FileMode(umask),
closeTimeout: closeTimeout,
closeMaxDelay: closeMaxDelay,
}, nil
}

Expand Down
19 changes: 17 additions & 2 deletions pkg/sync/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,16 @@ func launchWorker(address string, config *Config, wg *sync.WaitGroup) {
func marshalObjects(objs []object.Object) ([]byte, error) {
var arr []map[string]interface{}
for _, o := range objs {
arr = append(arr, object.MarshalObject(o))
obj := object.MarshalObject(o)
switch oo := o.(type) {
case *withSize:
obj["nsize"] = oo.nsize
obj["size"] = oo.Object.Size()
case *withFSize:
obj["fnsize"] = oo.nsize
obj["size"] = oo.File.Size()
}
arr = append(arr, obj)
}
return json.MarshalIndent(arr, "", " ")
}
Expand All @@ -327,7 +336,13 @@ func unmarshalObjects(d []byte) ([]object.Object, error) {
}
var objs []object.Object
for _, m := range arr {
objs = append(objs, object.UnmarshalObject(m))
obj := object.UnmarshalObject(m)
if nsize, ok := m["nsize"]; ok {
obj = &withSize{obj, int64(nsize.(float64))}
} else if fnsize, ok := m["fnsize"]; ok {
obj = &withFSize{obj.(object.File), int64(fnsize.(float64))}
}
objs = append(objs, obj)
}
return objs, nil
}
Expand Down
29 changes: 29 additions & 0 deletions pkg/sync/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package sync

import (
"os"
"testing"
"time"

Expand All @@ -36,6 +37,9 @@ func (o *obj) Mtime() time.Time { return o.mtime }
func (o *obj) IsDir() bool { return o.isDir }
func (o *obj) IsSymlink() bool { return o.isSymlink }
func (o *obj) StorageClass() string { return "" }
func (o *obj) Owner() string { return "" }
func (o *obj) Group() string { return "" }
func (o *obj) Mode() os.FileMode { return 0 }

func TestCluster(t *testing.T) {
// manager
Expand All @@ -62,3 +66,28 @@ func TestCluster(t *testing.T) {
t.Fatalf("should end")
}
}

func TestMarshal(t *testing.T) {
var objs = []object.Object{
&obj{key: "test"},
&withSize{&obj{key: "test1", size: 100}, -4},
&withFSize{&obj{key: "test2", size: 200}, -1},
}
d, err := marshalObjects(objs)
if err != nil {
t.Fatal(err)
}
objs2, e := unmarshalObjects(d)
if e != nil {
t.Fatal(e)
}
if objs2[0].Key() != "test" {
t.Fatalf("expect test but got %s", objs2[0].Key())
}
if objs2[1].Key() != "test1" || objs2[1].Size() != -4 || objs2[1].(*withSize).Object.Size() != 100 {
t.Fatalf("expect withSize but got %s", objs2[0].Key())
}
if objs2[2].Key() != "test2" || objs2[2].Size() != -1 || objs2[2].(*withFSize).File.Size() != 200 {
t.Fatalf("expect withFSize but got %s", objs2[0].Key())
}
}
31 changes: 20 additions & 11 deletions pkg/sync/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"io"
"sync"

"github.com/juicedata/juicefs/pkg/chunk"
"github.com/juicedata/juicefs/pkg/object"
)

Expand All @@ -33,7 +32,7 @@ type parallelDownloader struct {
fsize int64
blockSize int64
concurrent chan int
buffers map[int64]*chunk.Page
buffers map[int64]*[]byte
off int64
err error
}
Expand All @@ -50,6 +49,15 @@ func (r *parallelDownloader) setErr(err error) {
r.err = err
}

const downloadBufSize = 10 << 20

var downloadBufPool = sync.Pool{
New: func() interface{} {
buf := make([]byte, 0, downloadBufSize)
return &buf
},
}

func (r *parallelDownloader) download() {
for off := int64(0); off < r.fsize; off += r.blockSize {
r.concurrent <- 1
Expand All @@ -73,18 +81,19 @@ func (r *parallelDownloader) download() {
r.setErr(e)
} else { //nolint:typecheck
defer in.Close()
p := chunk.NewOffPage(int(size))
_, e = io.ReadFull(in, p.Data)
p := downloadBufPool.Get().(*[]byte)
*p = (*p)[:size]
_, e = io.ReadFull(in, *p)
if e != nil {
r.setErr(e)
p.Release()
downloadBufPool.Put(p)
} else {
r.Lock()
if r.buffers != nil {
r.buffers[off] = p
saved = true
} else {
p.Release()
downloadBufPool.Put(p)
}
r.Unlock()
}
Expand Down Expand Up @@ -115,10 +124,10 @@ func (r *parallelDownloader) Read(b []byte) (int, error) {
if p == nil {
return 0, r.err
}
n := copy(b, p.Data[r.off-off:])
n := copy(b, (*p)[r.off-off:])
r.off += int64(n)
if r.off == off+int64(len(p.Data)) {
p.Release()
if r.off == off+int64(len(*p)) {
downloadBufPool.Put(p)
r.Lock()
delete(r.buffers, off)
r.Unlock()
Expand All @@ -134,7 +143,7 @@ func (r *parallelDownloader) Close() {
r.Lock()
defer r.Unlock()
for _, p := range r.buffers {
p.Release()
downloadBufPool.Put(p)
}
r.buffers = nil
if r.err == nil {
Expand All @@ -152,7 +161,7 @@ func newParallelDownloader(store object.ObjectStorage, key string, size int64, b
fsize: size,
blockSize: bSize,
concurrent: concurrent,
buffers: make(map[int64]*chunk.Page),
buffers: make(map[int64]*[]byte),
}
down.notify = sync.NewCond(down)
go down.download()
Expand Down
Loading
Loading