Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport 1.2.3 #5582

Merged
merged 8 commits into from
Jan 22, 2025
6 changes: 5 additions & 1 deletion .github/scripts/sync/sync_cluster.sh
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ test_sync_without_mount_point(){

sudo -u juicedata meta_url=$META_URL ./juicefs sync -v jfs://meta_url/data/ minio://minioadmin:[email protected]:9000/data1/ \
--manager-addr 172.20.0.1:8081 --worker [email protected],[email protected] \
--list-threads 10 --list-depth 5 \
--list-threads 10 --list-depth 5 --check-new \
2>&1 | tee sync.log
# diff data/ /jfs/data1/
check_sync_log $file_count
Expand All @@ -86,6 +86,10 @@ test_sync_without_mount_point2(){
--list-threads 10 --list-depth 5\
2>&1 | tee sync.log
check_sync_log $file_count
sudo -u juicedata meta_url=$META_URL ./juicefs sync -v minio://minioadmin:[email protected]:9000/data/ jfs://meta_url/ \
--manager-addr 172.20.0.1:8081 --worker [email protected],[email protected] \
--list-threads 10 --list-depth 5 --check-all \
2>&1 | tee sync.log
./juicefs mount -d $META_URL /jfs
diff data/ /jfs/data/
./mc rm -r --force myminio/data
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ require (
github.com/jcmturner/gokrb5/v8 v8.4.4
github.com/json-iterator/go v1.1.12
github.com/juicedata/godaemon v0.0.0-20210629045518-3da5144a127d
github.com/juicedata/gogfapi v0.0.0-20230626071140-fc28e5537825
github.com/juicedata/gogfapi v0.0.0-20241204082332-ecd102647f80
github.com/juju/ratelimit v1.0.2
github.com/ks3sdklib/aws-sdk-go v1.2.2
github.com/l0wl3vel/bunny-storage-go-sdk v0.0.10
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -507,8 +507,8 @@ github.com/juicedata/go-nfs-client v0.0.0-20231018052507-dbca444fa7e8 h1:mVVipCb
github.com/juicedata/go-nfs-client v0.0.0-20231018052507-dbca444fa7e8/go.mod h1:xOMqi3lOrcGe9uZLnSzgaq94Vc3oz6VPCNDLJUnXpKs=
github.com/juicedata/godaemon v0.0.0-20210629045518-3da5144a127d h1:kpQMvNZJKGY3PTt7OSoahYc4nM0HY67SvK0YyS0GLwA=
github.com/juicedata/godaemon v0.0.0-20210629045518-3da5144a127d/go.mod h1:dlxKkLh3qAIPtgr2U/RVzsZJDuXA1ffg+Njikfmhvgw=
github.com/juicedata/gogfapi v0.0.0-20230626071140-fc28e5537825 h1:7KrwI4HPqvNLKVcfkfDMLQQmT0GrnCs8T9EX+XCdZnM=
github.com/juicedata/gogfapi v0.0.0-20230626071140-fc28e5537825/go.mod h1:Ho5G4KgrgbMKW0buAJdOmYoJcOImkzznJQaLiATrsx4=
github.com/juicedata/gogfapi v0.0.0-20241204082332-ecd102647f80 h1:EPg/f3lhbAOjE2M0WpVi47Fk62mEmmPejRuGVdOFQww=
github.com/juicedata/gogfapi v0.0.0-20241204082332-ecd102647f80/go.mod h1:Ho5G4KgrgbMKW0buAJdOmYoJcOImkzznJQaLiATrsx4=
github.com/juicedata/huaweicloud-sdk-go-obs v3.22.12-0.20230228031208-386e87b5c091+incompatible h1:2/ttSmYoX+QMegpNyAJR0Y6aHcVk57F7RJit5xN2T/s=
github.com/juicedata/huaweicloud-sdk-go-obs v3.22.12-0.20230228031208-386e87b5c091+incompatible/go.mod h1:Ukwa8ffRQLV6QRwpqGioPjn2Wnf7TBDA4DbennDOqHE=
github.com/juicedata/minio v0.0.0-20240719032536-5d15c7c0135d h1:rDGD7VqSTs2gTr8HNFgnit1xrUPUWl6v+5HsgL2QrYM=
Expand Down
2 changes: 1 addition & 1 deletion pkg/chunk/disk_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,7 @@ func (cache *cacheStore) stage(key string, data []byte, keepCache bool) (string,
if cache.capacity > 0 && keepCache {
path := cache.cachePath(key)
cache.createDir(filepath.Dir(path))
if err := os.Link(stagingPath, path); err == nil {
if err = os.Link(stagingPath, path); err == nil {
cache.add(key, -int32(len(data)), uint32(time.Now().Unix()))
} else {
logger.Warnf("link %s to %s failed: %s", stagingPath, path, err)
Expand Down
8 changes: 7 additions & 1 deletion pkg/fuse/fuse.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package fuse
import (
"fmt"
"log"
"math"
"os"
"os/exec"
"runtime"
Expand Down Expand Up @@ -298,7 +299,12 @@ func (fs *fileSystem) Fallocate(cancel <-chan struct{}, in *fuse.FallocateIn) (c
func (fs *fileSystem) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn) (written uint32, code fuse.Status) {
ctx := fs.newContext(cancel, &in.InHeader)
defer releaseContext(ctx)
copied, err := fs.v.CopyFileRange(ctx, Ino(in.NodeId), in.FhIn, in.OffIn, Ino(in.NodeIdOut), in.FhOut, in.OffOut, in.Len, uint32(in.Flags))
var len = in.Len
if len > math.MaxUint32 {
// written may overflow
len = math.MaxUint32 + 1 - meta.ChunkSize
}
copied, err := fs.v.CopyFileRange(ctx, Ino(in.NodeId), in.FhIn, in.OffIn, Ino(in.NodeIdOut), in.FhOut, in.OffOut, len, uint32(in.Flags))
if err != 0 {
return 0, fuse.Status(err)
}
Expand Down
67 changes: 30 additions & 37 deletions pkg/meta/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -828,14 +828,11 @@ func (m *redisMeta) Resolve(ctx Context, parent Ino, path string, inode *Ino, at
}

func (m *redisMeta) doGetAttr(ctx Context, inode Ino, attr *Attr) syscall.Errno {
return errno(m.rdb.Watch(ctx, func(tx *redis.Tx) error {
val, err := tx.Get(ctx, m.inodeKey(inode)).Bytes()
if err != nil {
return err
}
m.parseAttr(val, attr)
return nil
}, m.inodeKey(inode)))
a, err := m.rdb.Get(ctx, m.inodeKey(inode)).Bytes()
if err == nil {
m.parseAttr(a, attr)
}
return errno(err)
}

type timeoutError interface {
Expand Down Expand Up @@ -4429,29 +4426,27 @@ func (m *redisMeta) doSetFacl(ctx Context, ino Ino, aclType uint8, rule *aclAPI.
}

func (m *redisMeta) doGetFacl(ctx Context, ino Ino, aclType uint8, aclId uint32, rule *aclAPI.Rule) syscall.Errno {
return errno(m.rdb.Watch(ctx, func(tx *redis.Tx) error {
if aclId == aclAPI.None {
val, err := tx.Get(ctx, m.inodeKey(ino)).Bytes()
if err != nil {
return err
}
attr := &Attr{}
m.parseAttr(val, attr)
m.of.Update(ino, attr)

aclId = getAttrACLId(attr, aclType)
}

a, err := m.getACL(ctx, tx, aclId)
if aclId == aclAPI.None {
val, err := m.rdb.Get(ctx, m.inodeKey(ino)).Bytes()
if err != nil {
return err
}
if a == nil {
return ENOATTR
return errno(err)
}
*rule = *a
return nil
}, m.inodeKey(ino)))
attr := &Attr{}
m.parseAttr(val, attr)
m.of.Update(ino, attr)

aclId = getAttrACLId(attr, aclType)
}

a, err := m.getACL(ctx, nil, aclId)
if err != nil {
return errno(err)
}
if a == nil {
return ENOATTR
}
*rule = *a
return 0
}

func (m *redisMeta) getACL(ctx Context, tx *redis.Tx, id uint32) (*aclAPI.Rule, error) {
Expand All @@ -4462,15 +4457,13 @@ func (m *redisMeta) getACL(ctx Context, tx *redis.Tx, id uint32) (*aclAPI.Rule,
return cRule, nil
}

cmds, err := tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.HGet(ctx, m.aclKey(), strconv.FormatUint(uint64(id), 10))
return nil
})
if err != nil {
return nil, err
var val []byte
var err error
if tx != nil {
val, err = tx.HGet(ctx, m.aclKey(), strconv.FormatUint(uint64(id), 10)).Bytes()
} else {
val, err = m.rdb.HGet(ctx, m.aclKey(), strconv.FormatUint(uint64(id), 10)).Bytes()
}

val, err := cmds[0].(*redis.StringCmd).Bytes()
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/meta/tkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -1864,8 +1864,8 @@ func (m *kvMeta) doDeleteSustainedInode(sid uint64, inode Ino) error {
if a == nil {
return nil
}
newSpace = -align4K(attr.Length)
m.parseAttr(a, &attr)
newSpace = -align4K(attr.Length)
tx.set(m.delfileKey(inode, attr.Length), m.packInt64(time.Now().Unix()))
tx.delete(m.inodeKey(inode))
tx.delete(m.sustainedKey(sid, inode))
Expand Down
2 changes: 1 addition & 1 deletion pkg/object/gluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (g *gluster) Delete(key string, getters ...AttrGetter) error {
// a sorted list of directory entries.
func (g *gluster) readDirSorted(dirname string, followLink bool) ([]*mEntry, error) {
v := g.vol()
f, err := v.Open(dirname)
f, err := v.OpenDir(dirname)
if err != nil {
return nil, err
}
Expand Down
38 changes: 36 additions & 2 deletions pkg/object/hdfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type hdfsclient struct {
c *hdfs.Client
dfsReplication int
umask os.FileMode
closeTimeout time.Duration
closeMaxDelay time.Duration
}

func (h *hdfsclient) String() string {
Expand Down Expand Up @@ -163,8 +165,19 @@ func (h *hdfsclient) Put(key string, in io.Reader, getters ...AttrGetter) (err e
_ = f.Close()
return err
}
err = f.Close()
if err != nil && !IsErrReplicating(err) {
start := time.Now()
sleeptime := 400 * time.Millisecond
for {
err = f.Close()
if IsErrReplicating(err) && start.Add(h.closeTimeout).After(time.Now()) {
time.Sleep(sleeptime)
sleeptime = min(2*sleeptime, h.closeMaxDelay)
continue
} else {
break
}
}
if err != nil {
return err
}
if !PutInplace {
Expand All @@ -173,6 +186,13 @@ func (h *hdfsclient) Put(key string, in io.Reader, getters ...AttrGetter) (err e
return err
}

func min(a, b time.Duration) time.Duration {
if a < b {
return a
}
return b
}

func IsErrReplicating(err error) bool {
pe, ok := err.(*os.PathError)
return ok && pe.Err == hdfs.ErrReplicating
Expand Down Expand Up @@ -328,13 +348,27 @@ func newHDFS(addr, username, sk, token string) (ObjectStorage, error) {
umask = uint16(x)
}
}
var closeTimeout = 120 * time.Second
if v, found := conf["ipc.client.rpc-timeout.ms"]; found {
if x, err := strconv.Atoi(v); err == nil {
closeTimeout = time.Duration(x) * time.Millisecond
}
}
var closeMaxDelay = 60 * time.Second
if v, found := conf["dfs.client.block.write.locateFollowingBlock.max.delay.ms"]; found {
if x, err := strconv.Atoi(v); err == nil {
closeMaxDelay = time.Duration(x) * time.Millisecond
}
}

return &hdfsclient{
addr: strings.Join(rpcAddr, ","),
basePath: basePath,
c: c,
dfsReplication: replication,
umask: os.FileMode(umask),
closeTimeout: closeTimeout,
closeMaxDelay: closeMaxDelay,
}, nil
}

Expand Down
19 changes: 17 additions & 2 deletions pkg/sync/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,16 @@ func launchWorker(address string, config *Config, wg *sync.WaitGroup) {
func marshalObjects(objs []object.Object) ([]byte, error) {
var arr []map[string]interface{}
for _, o := range objs {
arr = append(arr, object.MarshalObject(o))
obj := object.MarshalObject(o)
switch oo := o.(type) {
case *withSize:
obj["nsize"] = oo.nsize
obj["size"] = oo.Object.Size()
case *withFSize:
obj["fnsize"] = oo.nsize
obj["size"] = oo.File.Size()
}
arr = append(arr, obj)
}
return json.MarshalIndent(arr, "", " ")
}
Expand All @@ -338,7 +347,13 @@ func unmarshalObjects(d []byte) ([]object.Object, error) {
}
var objs []object.Object
for _, m := range arr {
objs = append(objs, object.UnmarshalObject(m))
obj := object.UnmarshalObject(m)
if nsize, ok := m["nsize"]; ok {
obj = &withSize{obj, int64(nsize.(float64))}
} else if fnsize, ok := m["fnsize"]; ok {
obj = &withFSize{obj.(object.File), int64(fnsize.(float64))}
}
objs = append(objs, obj)
}
return objs, nil
}
Expand Down
29 changes: 29 additions & 0 deletions pkg/sync/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package sync

import (
"os"
"testing"
"time"

Expand All @@ -36,6 +37,9 @@ func (o *obj) Mtime() time.Time { return o.mtime }
func (o *obj) IsDir() bool { return o.isDir }
func (o *obj) IsSymlink() bool { return o.isSymlink }
func (o *obj) StorageClass() string { return "" }
func (o *obj) Owner() string { return "" }
func (o *obj) Group() string { return "" }
func (o *obj) Mode() os.FileMode { return 0 }

func TestCluster(t *testing.T) {
// manager
Expand All @@ -62,3 +66,28 @@ func TestCluster(t *testing.T) {
t.Fatalf("should end")
}
}

func TestMarshal(t *testing.T) {
var objs = []object.Object{
&obj{key: "test"},
&withSize{&obj{key: "test1", size: 100}, -4},
&withFSize{&obj{key: "test2", size: 200}, -1},
}
d, err := marshalObjects(objs)
if err != nil {
t.Fatal(err)
}
objs2, e := unmarshalObjects(d)
if e != nil {
t.Fatal(e)
}
if objs2[0].Key() != "test" {
t.Fatalf("expect test but got %s", objs2[0].Key())
}
if objs2[1].Key() != "test1" || objs2[1].Size() != -4 || objs2[1].(*withSize).Object.Size() != 100 {
t.Fatalf("expect withSize but got %s", objs2[0].Key())
}
if objs2[2].Key() != "test2" || objs2[2].Size() != -1 || objs2[2].(*withFSize).File.Size() != 200 {
t.Fatalf("expect withFSize but got %s", objs2[0].Key())
}
}
Loading
Loading