Skip to content

Commit

Permalink
perf(drivers): fs operations and cache (#4965)
Browse files Browse the repository at this point in the history
* perf(baidu_photo):multi-thread upload

* perf(baidu_netdisk):multi-thread upload and cache optimization

* fix:LimitWriter

* fix(weiyun):only one login is allowed

* feat(189pc):multi threaded upload

* feat(baidu_netdisk):multi threaded upload

* feat(baidu_photo):multi threaded upload

* feat(weiyun):multi threaded upload

* perf(aliyundriver_open):optimize upload code and optimize cache

* fix(weiyun):invalid directory ID

* fix(baidu_netdisk):modified time

* fix(baidu_netdisk,baidu_photo):upload slice error

* perf(baidu_netdisk):cancel unnecessary retries

* fix(limitWriter):must return a non-nil error if it returns n < len(p)

* fix(aliyundrive_open):Name and Filename only use one

* perf(mopan):multi-thread upload
  • Loading branch information
foxxorcat authored Aug 9, 2023
1 parent 9d45718 commit df6b306
Show file tree
Hide file tree
Showing 22 changed files with 645 additions and 349 deletions.
9 changes: 9 additions & 0 deletions drivers/189pc/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package _189pc
import (
"context"
"net/http"
"strconv"
"strings"
"time"

Expand All @@ -24,6 +25,8 @@ type Cloud189PC struct {

loginParam *LoginParam
tokenInfo *AppSessionResp

uploadThread int
}

func (y *Cloud189PC) Config() driver.Config {
Expand All @@ -44,6 +47,12 @@ func (y *Cloud189PC) Init(ctx context.Context) (err error) {
y.FamilyID = ""
}

// 限制上传线程数
y.uploadThread, _ = strconv.Atoi(y.UploadThread)
if y.uploadThread < 1 || y.uploadThread > 32 {
y.uploadThread, y.UploadThread = 3, "3"
}

// 初始化请求客户端
if y.client == nil {
y.client = base.NewRestyClient().SetHeaders(map[string]string{
Expand Down
1 change: 1 addition & 0 deletions drivers/189pc/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Addition struct {
Type string `json:"type" type:"select" options:"personal,family" default:"personal"`
FamilyID string `json:"family_id"`
UploadMethod string `json:"upload_method" type:"select" options:"stream,rapid,old" default:"stream"`
UploadThread string `json:"upload_thread" default:"3" help:"1<=thread<=32"`
NoUseOcr bool `json:"no_use_ocr"`
}

Expand Down
99 changes: 60 additions & 39 deletions drivers/189pc/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"net/url"
"os"
"regexp"
"strconv"
"strings"
"time"

Expand All @@ -24,6 +25,7 @@ import (
"github.com/alist-org/alist/v3/internal/model"
"github.com/alist-org/alist/v3/internal/op"
"github.com/alist-org/alist/v3/internal/setting"
"github.com/alist-org/alist/v3/pkg/errgroup"
"github.com/alist-org/alist/v3/pkg/utils"

"github.com/avast/retry-go"
Expand Down Expand Up @@ -436,14 +438,18 @@ func (y *Cloud189PC) refreshSession() (err error) {
// 普通上传
// 无法上传大小为0的文件
func (y *Cloud189PC) StreamUpload(ctx context.Context, dstDir model.Obj, file model.FileStreamer, up driver.UpdateProgress) (model.Obj, error) {
var DEFAULT = partSize(file.GetSize())
var count = int(math.Ceil(float64(file.GetSize()) / float64(DEFAULT)))
var sliceSize = partSize(file.GetSize())
count := int(math.Ceil(float64(file.GetSize()) / float64(sliceSize)))
lastPartSize := file.GetSize() % sliceSize
if file.GetSize() > 0 && lastPartSize == 0 {
lastPartSize = sliceSize
}

params := Params{
"parentFolderId": dstDir.GetID(),
"fileName": url.QueryEscape(file.GetName()),
"fileSize": fmt.Sprint(file.GetSize()),
"sliceSize": fmt.Sprint(DEFAULT),
"sliceSize": fmt.Sprint(sliceSize),
"lazyCheck": "1",
}

Expand All @@ -468,17 +474,19 @@ func (y *Cloud189PC) StreamUpload(ctx context.Context, dstDir model.Obj, file mo
fileMd5 := md5.New()
silceMd5 := md5.New()
silceMd5Hexs := make([]string, 0, count)
byteData := bytes.NewBuffer(make([]byte, DEFAULT))
byteData := make([]byte, sliceSize)
for i := 1; i <= count; i++ {
if utils.IsCanceled(ctx) {
return nil, ctx.Err()
}

if i == count {
byteData = byteData[:lastPartSize]
}

// 读取块
byteData.Reset()
silceMd5.Reset()
_, err := io.CopyN(io.MultiWriter(fileMd5, silceMd5, byteData), file, DEFAULT)
if err != io.EOF && err != io.ErrUnexpectedEOF && err != nil {
if _, err := io.ReadFull(io.TeeReader(file, io.MultiWriter(fileMd5, silceMd5)), byteData); err != io.EOF && err != nil {
return nil, err
}

Expand All @@ -504,13 +512,13 @@ func (y *Cloud189PC) StreamUpload(ctx context.Context, dstDir model.Obj, file mo
uploadData := uploadUrl.UploadUrls[fmt.Sprint("partNumber_", i)]

err = retry.Do(func() error {
_, err := y.put(ctx, uploadData.RequestURL, ParseHttpHeader(uploadData.RequestHeader), false, bytes.NewReader(byteData.Bytes()))
_, err := y.put(ctx, uploadData.RequestURL, ParseHttpHeader(uploadData.RequestHeader), false, bytes.NewReader(byteData))
return err
},
retry.Context(ctx),
retry.Attempts(3),
retry.Delay(time.Second),
retry.MaxDelay(5*time.Second))
retry.DelayType(retry.BackOffDelay))
if err != nil {
return nil, err
}
Expand All @@ -519,7 +527,7 @@ func (y *Cloud189PC) StreamUpload(ctx context.Context, dstDir model.Obj, file mo

fileMd5Hex := strings.ToUpper(hex.EncodeToString(fileMd5.Sum(nil)))
sliceMd5Hex := fileMd5Hex
if file.GetSize() > DEFAULT {
if file.GetSize() > sliceSize {
sliceMd5Hex = strings.ToUpper(utils.GetMD5EncodeStr(strings.Join(silceMd5Hexs, "\n")))
}

Expand Down Expand Up @@ -554,10 +562,15 @@ func (y *Cloud189PC) FastUpload(ctx context.Context, dstDir model.Obj, file mode
_ = os.Remove(tempFile.Name())
}()

var DEFAULT = partSize(file.GetSize())
count := int(math.Ceil(float64(file.GetSize()) / float64(DEFAULT)))
var sliceSize = partSize(file.GetSize())
count := int(math.Ceil(float64(file.GetSize()) / float64(sliceSize)))
lastPartSize := file.GetSize() % sliceSize
if file.GetSize() > 0 && lastPartSize == 0 {
lastPartSize = sliceSize
}

// 优先计算所需信息
//step.1 优先计算所需信息
byteSize := sliceSize
fileMd5 := md5.New()
silceMd5 := md5.New()
silceMd5Hexs := make([]string, 0, count)
Expand All @@ -567,31 +580,32 @@ func (y *Cloud189PC) FastUpload(ctx context.Context, dstDir model.Obj, file mode
return nil, ctx.Err()
}

if i == count {
byteSize = lastPartSize
}

silceMd5.Reset()
if _, err := io.CopyN(io.MultiWriter(fileMd5, silceMd5), tempFile, DEFAULT); err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
if _, err := io.CopyN(io.MultiWriter(fileMd5, silceMd5), tempFile, byteSize); err != nil && err != io.EOF {
return nil, err
}
md5Byte := silceMd5.Sum(nil)
silceMd5Hexs = append(silceMd5Hexs, strings.ToUpper(hex.EncodeToString(md5Byte)))
silceMd5Base64s = append(silceMd5Base64s, fmt.Sprint(i, "-", base64.StdEncoding.EncodeToString(md5Byte)))
}
if _, err = tempFile.Seek(0, io.SeekStart); err != nil {
return nil, err
}

fileMd5Hex := strings.ToUpper(hex.EncodeToString(fileMd5.Sum(nil)))
sliceMd5Hex := fileMd5Hex
if file.GetSize() > DEFAULT {
if file.GetSize() > sliceSize {
sliceMd5Hex = strings.ToUpper(utils.GetMD5EncodeStr(strings.Join(silceMd5Hexs, "\n")))
}

// 检测是否支持快传
//step.2 预上传
params := Params{
"parentFolderId": dstDir.GetID(),
"fileName": url.QueryEscape(file.GetName()),
"fileSize": fmt.Sprint(file.GetSize()),
"fileMd5": fileMd5Hex,
"sliceSize": fmt.Sprint(DEFAULT),
"sliceSize": fmt.Sprint(sliceSize),
"sliceMd5": sliceMd5Hex,
}

Expand All @@ -614,6 +628,7 @@ func (y *Cloud189PC) FastUpload(ctx context.Context, dstDir model.Obj, file mode

// 网盘中不存在该文件,开始上传
if uploadInfo.Data.FileDataExists != 1 {
// step.3 获取上传切片信息
var uploadUrls UploadUrlsResp
_, err = y.request(fullUrl+"/getMultiUploadUrls", http.MethodGet,
func(req *resty.Request) {
Expand All @@ -626,30 +641,36 @@ func (y *Cloud189PC) FastUpload(ctx context.Context, dstDir model.Obj, file mode
return nil, err
}

buf := make([]byte, DEFAULT)
for i := 1; i <= count; i++ {
if utils.IsCanceled(ctx) {
return nil, ctx.Err()
}

n, err := io.ReadFull(tempFile, buf)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
return nil, err
// step.4 上传切片
threadG, upCtx := errgroup.NewGroupWithContext(ctx, y.uploadThread,
retry.Attempts(3),
retry.Delay(time.Second),
retry.DelayType(retry.BackOffDelay))
for k, part := range uploadUrls.UploadUrls {
if utils.IsCanceled(upCtx) {
break
}
uploadData := uploadUrls.UploadUrls[fmt.Sprint("partNumber_", i)]
err = retry.Do(func() error {
_, err := y.put(ctx, uploadData.RequestURL, ParseHttpHeader(uploadData.RequestHeader), false, bytes.NewReader(buf[:n]))
return err
},
retry.Context(ctx),
retry.Attempts(3),
retry.Delay(time.Second),
retry.MaxDelay(5*time.Second))
partNumber, err := strconv.Atoi(strings.TrimPrefix(k, "partNumber_"))
if err != nil {
return nil, err
}

up(int(i * 100 / count))
part, byteSize, offset := part, sliceSize, int64(partNumber-1)*sliceSize
if partNumber == count {
byteSize = lastPartSize
}

threadG.Go(func(ctx context.Context) error {
_, err := y.put(ctx, part.RequestURL, ParseHttpHeader(part.RequestHeader), false, io.NewSectionReader(tempFile, offset, byteSize))
if err != nil {
return err
}
up(int(threadG.Success()) * 100 / len(uploadUrls.UploadUrls))
return nil
})
}
if err = threadG.Wait(); err != nil {
return nil, err
}
}

Expand Down
46 changes: 36 additions & 10 deletions drivers/aliyundrive_open/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,41 +107,63 @@ func (d *AliyundriveOpen) Link(ctx context.Context, file model.Obj, args model.L
return d.limitLink(ctx, file)
}

func (d *AliyundriveOpen) MakeDir(ctx context.Context, parentDir model.Obj, dirName string) error {
func (d *AliyundriveOpen) MakeDir(ctx context.Context, parentDir model.Obj, dirName string) (model.Obj, error) {
nowTime, _ := getNowTime()
newDir := File{CreatedAt: nowTime, UpdatedAt: nowTime}
_, err := d.request("/adrive/v1.0/openFile/create", http.MethodPost, func(req *resty.Request) {
req.SetBody(base.Json{
"drive_id": d.DriveId,
"parent_file_id": parentDir.GetID(),
"name": dirName,
"type": "folder",
"check_name_mode": "refuse",
})
}).SetResult(&newDir)
})
return err
if err != nil {
return nil, err
}
return fileToObj(newDir), nil
}

func (d *AliyundriveOpen) Move(ctx context.Context, srcObj, dstDir model.Obj) error {
func (d *AliyundriveOpen) Move(ctx context.Context, srcObj, dstDir model.Obj) (model.Obj, error) {
var resp MoveOrCopyResp
_, err := d.request("/adrive/v1.0/openFile/move", http.MethodPost, func(req *resty.Request) {
req.SetBody(base.Json{
"drive_id": d.DriveId,
"file_id": srcObj.GetID(),
"to_parent_file_id": dstDir.GetID(),
"check_name_mode": "refuse", // optional:ignore,auto_rename,refuse
//"new_name": "newName", // The new name to use when a file of the same name exists
})
}).SetResult(&resp)
})
return err
if err != nil {
return nil, err
}
if resp.Exist {
return nil, errors.New("existence of files with the same name")
}

if srcObj, ok := srcObj.(*model.ObjThumb); ok {
srcObj.ID = resp.FileID
srcObj.Modified = time.Now()
return srcObj, nil
}
return nil, nil
}

func (d *AliyundriveOpen) Rename(ctx context.Context, srcObj model.Obj, newName string) error {
func (d *AliyundriveOpen) Rename(ctx context.Context, srcObj model.Obj, newName string) (model.Obj, error) {
var newFile File
_, err := d.request("/adrive/v1.0/openFile/update", http.MethodPost, func(req *resty.Request) {
req.SetBody(base.Json{
"drive_id": d.DriveId,
"file_id": srcObj.GetID(),
"name": newName,
})
}).SetResult(&newFile)
})
return err
if err != nil {
return nil, err
}
return fileToObj(newFile), nil
}

func (d *AliyundriveOpen) Copy(ctx context.Context, srcObj, dstDir model.Obj) error {
Expand Down Expand Up @@ -170,7 +192,7 @@ func (d *AliyundriveOpen) Remove(ctx context.Context, obj model.Obj) error {
return err
}

func (d *AliyundriveOpen) Put(ctx context.Context, dstDir model.Obj, stream model.FileStreamer, up driver.UpdateProgress) error {
func (d *AliyundriveOpen) Put(ctx context.Context, dstDir model.Obj, stream model.FileStreamer, up driver.UpdateProgress) (model.Obj, error) {
return d.upload(ctx, dstDir, stream, up)
}

Expand Down Expand Up @@ -199,3 +221,7 @@ func (d *AliyundriveOpen) Other(ctx context.Context, args model.OtherArgs) (inte
}

var _ driver.Driver = (*AliyundriveOpen)(nil)
var _ driver.MkdirResult = (*AliyundriveOpen)(nil)
var _ driver.MoveResult = (*AliyundriveOpen)(nil)
var _ driver.RenameResult = (*AliyundriveOpen)(nil)
var _ driver.PutResult = (*AliyundriveOpen)(nil)
38 changes: 25 additions & 13 deletions drivers/aliyundrive_open/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,28 @@ type Files struct {
}

type File struct {
DriveId string `json:"drive_id"`
FileId string `json:"file_id"`
ParentFileId string `json:"parent_file_id"`
Name string `json:"name"`
Size int64 `json:"size"`
FileExtension string `json:"file_extension"`
ContentHash string `json:"content_hash"`
Category string `json:"category"`
Type string `json:"type"`
Thumbnail string `json:"thumbnail"`
Url string `json:"url"`
CreatedAt *time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
DriveId string `json:"drive_id"`
FileId string `json:"file_id"`
ParentFileId string `json:"parent_file_id"`
Name string `json:"name"`
Size int64 `json:"size"`
FileExtension string `json:"file_extension"`
ContentHash string `json:"content_hash"`
Category string `json:"category"`
Type string `json:"type"`
Thumbnail string `json:"thumbnail"`
Url string `json:"url"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`

// create only
FileName string `json:"file_name"`
}

func fileToObj(f File) *model.ObjThumb {
if f.Name == "" {
f.Name = f.FileName
}
return &model.ObjThumb{
Object: model.Object{
ID: f.FileId,
Expand Down Expand Up @@ -67,3 +73,9 @@ type CreateResp struct {
RapidUpload bool `json:"rapid_upload"`
PartInfoList []PartInfo `json:"part_info_list"`
}

type MoveOrCopyResp struct {
Exist bool `json:"exist"`
DriveID string `json:"drive_id"`
FileID string `json:"file_id"`
}
Loading

0 comments on commit df6b306

Please sign in to comment.