Skip to content

Commit

Permalink
feat(aliyundrive_open): rapid upload (close #4766)
Browse files Browse the repository at this point in the history
  • Loading branch information
xhofe committed Jul 15, 2023
1 parent a4511c1 commit 3f7882b
Show file tree
Hide file tree
Showing 4 changed files with 279 additions and 132 deletions.
71 changes: 1 addition & 70 deletions drivers/aliyundrive_open/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package aliyundrive_open
import (
"context"
"fmt"
"io"
"math"
"net/http"
"time"

Expand Down Expand Up @@ -153,74 +151,7 @@ func (d *AliyundriveOpen) Remove(ctx context.Context, obj model.Obj) error {
}

func (d *AliyundriveOpen) Put(ctx context.Context, dstDir model.Obj, stream model.FileStreamer, up driver.UpdateProgress) error {
// rapid_upload is not currently supported
// 1. create
// Part Size Unit: Bytes, Default: 20MB,
// Maximum number of slices 10,000, ≈195.3125GB
var partSize int64 = 20 * 1024 * 1024
createData := base.Json{
"drive_id": d.DriveId,
"parent_file_id": dstDir.GetID(),
"name": stream.GetName(),
"type": "file",
"check_name_mode": "ignore",
}
count := 1
if stream.GetSize() > partSize {
if stream.GetSize() > 1*1024*1024*1024*1024 { // file Size over 1TB
partSize = 5 * 1024 * 1024 * 1024 // file part size 5GB
} else if stream.GetSize() > 768*1024*1024*1024 { // over 768GB
partSize = 109951163 // ≈ 104.8576MB, split 1TB into 10,000 part
} else if stream.GetSize() > 512*1024*1024*1024 { // over 512GB
partSize = 82463373 // ≈ 78.6432MB
} else if stream.GetSize() > 384*1024*1024*1024 { // over 384GB
partSize = 54975582 // ≈ 52.4288MB
} else if stream.GetSize() > 256*1024*1024*1024 { // over 256GB
partSize = 41231687 // ≈ 39.3216MB
} else if stream.GetSize() > 128*1024*1024*1024 { // over 128GB
partSize = 27487791 // ≈ 26.2144MB
}
count = int(math.Ceil(float64(stream.GetSize()) / float64(partSize)))
createData["part_info_list"] = makePartInfos(count)
}
var createResp CreateResp
_, err := d.request("/adrive/v1.0/openFile/create", http.MethodPost, func(req *resty.Request) {
req.SetBody(createData).SetResult(&createResp)
})
if err != nil {
return err
}
// 2. upload
preTime := time.Now()
for i := 1; i <= len(createResp.PartInfoList); i++ {
if utils.IsCanceled(ctx) {
return ctx.Err()
}
err = d.uploadPart(ctx, i, count, utils.NewMultiReadable(io.LimitReader(stream, partSize)), &createResp, true)
if err != nil {
return err
}
if count > 0 {
up(i * 100 / count)
}
// refresh upload url if 50 minutes passed
if time.Since(preTime) > 50*time.Minute {
createResp.PartInfoList, err = d.getUploadUrl(count, createResp.FileId, createResp.UploadId)
if err != nil {
return err
}
preTime = time.Now()
}
}
// 3. complete
_, err = d.request("/adrive/v1.0/openFile/complete", http.MethodPost, func(req *resty.Request) {
req.SetBody(base.Json{
"drive_id": d.DriveId,
"file_id": createResp.FileId,
"upload_id": createResp.UploadId,
})
})
return err
return d.upload(ctx, dstDir, stream, up)
}

func (d *AliyundriveOpen) Other(ctx context.Context, args model.OtherArgs) (interface{}, error) {
Expand Down
1 change: 1 addition & 0 deletions drivers/aliyundrive_open/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type Addition struct {
ClientID string `json:"client_id" required:"false" help:"Keep it empty if you don't have one"`
ClientSecret string `json:"client_secret" required:"false" help:"Keep it empty if you don't have one"`
RemoveWay string `json:"remove_way" required:"true" type:"select" options:"trash,delete"`
RapidUpload bool `json:"rapid_upload" help:"If you enable this option, the file will be uploaded to the server first, so the progress will be incorrect"`
InternalUpload bool `json:"internal_upload" help:"If you are using Aliyun ECS is located in Beijing, you can turn it on to boost the upload speed"`
AccessToken string
}
Expand Down
267 changes: 267 additions & 0 deletions drivers/aliyundrive_open/upload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
package aliyundrive_open

import (
"bytes"
"context"
"crypto/sha1"
"encoding/base64"
"encoding/hex"
"fmt"
"io"
"math"
"net/http"
"os"
"strconv"
"strings"
"time"

"github.com/alist-org/alist/v3/drivers/base"
"github.com/alist-org/alist/v3/internal/driver"
"github.com/alist-org/alist/v3/internal/model"
"github.com/alist-org/alist/v3/pkg/utils"
"github.com/go-resty/resty/v2"
log "github.com/sirupsen/logrus"
)

func makePartInfos(size int) []base.Json {
partInfoList := make([]base.Json, size)
for i := 0; i < size; i++ {
partInfoList[i] = base.Json{"part_number": 1 + i}
}
return partInfoList
}

func calPartSize(fileSize int64) int64 {
var partSize int64 = 20 * 1024 * 1024
if fileSize > partSize {
if fileSize > 1*1024*1024*1024*1024 { // file Size over 1TB
partSize = 5 * 1024 * 1024 * 1024 // file part size 5GB
} else if fileSize > 768*1024*1024*1024 { // over 768GB
partSize = 109951163 // ≈ 104.8576MB, split 1TB into 10,000 part
} else if fileSize > 512*1024*1024*1024 { // over 512GB
partSize = 82463373 // ≈ 78.6432MB
} else if fileSize > 384*1024*1024*1024 { // over 384GB
partSize = 54975582 // ≈ 52.4288MB
} else if fileSize > 256*1024*1024*1024 { // over 256GB
partSize = 41231687 // ≈ 39.3216MB
} else if fileSize > 128*1024*1024*1024 { // over 128GB
partSize = 27487791 // ≈ 26.2144MB
}
}
return partSize
}

func (d *AliyundriveOpen) getUploadUrl(count int, fileId, uploadId string) ([]PartInfo, error) {
partInfoList := makePartInfos(count)
var resp CreateResp
_, err := d.request("/adrive/v1.0/openFile/getUploadUrl", http.MethodPost, func(req *resty.Request) {
req.SetBody(base.Json{
"drive_id": d.DriveId,
"file_id": fileId,
"part_info_list": partInfoList,
"upload_id": uploadId,
}).SetResult(&resp)
})
return resp.PartInfoList, err
}

func (d *AliyundriveOpen) uploadPart(ctx context.Context, i, count int, reader *utils.MultiReadable, resp *CreateResp, retry bool) error {
partInfo := resp.PartInfoList[i-1]
uploadUrl := partInfo.UploadUrl
if d.InternalUpload {
uploadUrl = strings.ReplaceAll(uploadUrl, "https://cn-beijing-data.aliyundrive.net/", "http://ccp-bj29-bj-1592982087.oss-cn-beijing-internal.aliyuncs.com/")
}
req, err := http.NewRequest("PUT", uploadUrl, reader)
if err != nil {
return err
}
req = req.WithContext(ctx)
res, err := base.HttpClient.Do(req)
if err != nil {
if retry {
reader.Reset()
return d.uploadPart(ctx, i, count, reader, resp, false)
}
return err
}
res.Body.Close()
if retry && res.StatusCode == http.StatusForbidden {
resp.PartInfoList, err = d.getUploadUrl(count, resp.FileId, resp.UploadId)
if err != nil {
return err
}
reader.Reset()
return d.uploadPart(ctx, i, count, reader, resp, false)
}
if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusConflict {
return fmt.Errorf("upload status: %d", res.StatusCode)
}
return nil
}

func (d *AliyundriveOpen) normalUpload(ctx context.Context, stream model.FileStreamer, up driver.UpdateProgress, createResp CreateResp, count int, partSize int64) error {
log.Debugf("[aliyundive_open] normal upload")
// 2. upload
preTime := time.Now()
for i := 1; i <= len(createResp.PartInfoList); i++ {
if utils.IsCanceled(ctx) {
return ctx.Err()
}
err := d.uploadPart(ctx, i, count, utils.NewMultiReadable(io.LimitReader(stream, partSize)), &createResp, true)
if err != nil {
return err
}
if count > 0 {
up(i * 100 / count)
}
// refresh upload url if 50 minutes passed
if time.Since(preTime) > 50*time.Minute {
createResp.PartInfoList, err = d.getUploadUrl(count, createResp.FileId, createResp.UploadId)
if err != nil {
return err
}
preTime = time.Now()
}
}
// 3. complete
_, err := d.request("/adrive/v1.0/openFile/complete", http.MethodPost, func(req *resty.Request) {
req.SetBody(base.Json{
"drive_id": d.DriveId,
"file_id": createResp.FileId,
"upload_id": createResp.UploadId,
})
})
return err
}

type ProofRange struct {
Start int64
End int64
}

func getProofRange(input string, size int64) (*ProofRange, error) {
if size == 0 {
return &ProofRange{}, nil
}
tmpStr := utils.GetMD5EncodeStr(input)[0:16]
tmpInt, err := strconv.ParseUint(tmpStr, 16, 64)
if err != nil {
return nil, err
}
index := tmpInt % uint64(size)
pr := &ProofRange{
Start: int64(index),
End: int64(index) + 8,
}
if pr.End >= size {
pr.End = size
}
return pr, nil
}

func (d *AliyundriveOpen) calProofCode(file *os.File, fileSize int64) (string, error) {
proofRange, err := getProofRange(d.AccessToken, fileSize)
if err != nil {
return "", err
}
buf := make([]byte, proofRange.End-proofRange.Start)
_, err = file.ReadAt(buf, proofRange.Start)
if err != nil {
return "", err
}
return base64.StdEncoding.EncodeToString(buf), nil
}

func (d *AliyundriveOpen) upload(ctx context.Context, dstDir model.Obj, stream model.FileStreamer, up driver.UpdateProgress) error {
// 1. create
// Part Size Unit: Bytes, Default: 20MB,
// Maximum number of slices 10,000, ≈195.3125GB
var partSize = calPartSize(stream.GetSize())
createData := base.Json{
"drive_id": d.DriveId,
"parent_file_id": dstDir.GetID(),
"name": stream.GetName(),
"type": "file",
"check_name_mode": "ignore",
}
count := int(math.Ceil(float64(stream.GetSize()) / float64(partSize)))
createData["part_info_list"] = makePartInfos(count)
// rapid upload
rapidUpload := stream.GetSize() > 100*1024 && d.RapidUpload
if rapidUpload {
log.Debugf("[aliyundrive_open] start cal pre_hash")
// read 1024 bytes to calculate pre hash
buf := bytes.NewBuffer(make([]byte, 0, 1024))
_, err := io.CopyN(buf, stream, 1024)
if err != nil {
return err
}
createData["size"] = stream.GetSize()
createData["pre_hash"] = utils.GetSHA1Encode(buf.Bytes())
// if support seek, seek to start
if localFile, ok := stream.(io.Seeker); ok {
if _, err := localFile.Seek(0, io.SeekStart); err != nil {
return err
}
} else {
// Put spliced head back to stream
stream.SetReadCloser(struct {
io.Reader
io.Closer
}{
Reader: io.MultiReader(buf, stream.GetReadCloser()),
Closer: stream.GetReadCloser(),
})
}
}
var createResp CreateResp
_, err, e := d.requestReturnErrResp("/adrive/v1.0/openFile/create", http.MethodPost, func(req *resty.Request) {
req.SetBody(createData).SetResult(&createResp)
})
if err != nil {
if e.Code != "PreHashMatched" || !rapidUpload {
return err
}
log.Debugf("[aliyundrive_open] pre_hash matched, start rapid upload")
// convert to local file
file, err := utils.CreateTempFile(stream)
if err != nil {
return err
}
// calculate full hash
h := sha1.New()
_, err = io.Copy(h, file)
if err != nil {
return err
}
delete(createData, "pre_hash")
createData["proof_version"] = "v1"
createData["content_hash_name"] = "sha1"
createData["content_hash"] = hex.EncodeToString(h.Sum(nil))
// seek to start
if _, err = file.Seek(0, io.SeekStart); err != nil {
return err
}
createData["proof_code"], err = d.calProofCode(file, stream.GetSize())
if err != nil {
return fmt.Errorf("cal proof code error: %s", err.Error())
}
_, err = d.request("/adrive/v1.0/openFile/create", http.MethodPost, func(req *resty.Request) {
req.SetBody(createData).SetResult(&createResp)
})
if err != nil {
return err
}
if createResp.RapidUpload {
log.Debugf("[aliyundrive_open] rapid upload success, file id: %s", createResp.FileId)
return nil
}
// failed to rapid upload, try normal upload
if _, err = file.Seek(0, io.SeekStart); err != nil {
return err
}
stream.SetReadCloser(file)
}
log.Debugf("[aliyundrive_open] create file success, resp: %+v", createResp)
return d.normalUpload(ctx, stream, up, createResp, count, partSize)
}
Loading

2 comments on commit 3f7882b

@anwen-anyi
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

秒传成功后会留在temp临时文件留存等下次重启AList时才会释放,是预期行为么?
image

@xhofe
Copy link
Collaborator Author

@xhofe xhofe commented on 3f7882b Jul 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

秒传成功后会留在temp临时文件留存等下次重启AList时才会释放,是预期行为么? image

不是 我忘记删了…… 你可以提个issue

Please sign in to comment.