Skip to content

Commit

Permalink
feat: upload progress recovery (AlistGo#4987)
Browse files Browse the repository at this point in the history
* feat(189pc):upload progress recovery

* fix:some err

* feat(baidu_netdisk,baidu_photo):upload progress recovery

* feat(mopan):upload progress recovery

* feat(baidu_netdisk):custom upload api
  • Loading branch information
foxxorcat authored Aug 11, 2023
1 parent c59dbb4 commit c1db3a3
Show file tree
Hide file tree
Showing 14 changed files with 284 additions and 141 deletions.
5 changes: 2 additions & 3 deletions drivers/189pc/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,8 @@ func toDesc(o string) string {
func ParseHttpHeader(str string) map[string]string {
header := make(map[string]string)
for _, value := range strings.Split(str, "&") {
i := strings.Index(value, "=")
if i > 0 {
header[strings.TrimSpace(value[0:i])] = strings.TrimSpace(value[i+1:])
if k, v, found := strings.Cut(value, "="); found {
header[k] = v
}
}
return header
Expand Down
17 changes: 14 additions & 3 deletions drivers/189pc/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,14 +239,25 @@ type InitMultiUploadResp struct {
} `json:"data"`
}
type UploadUrlsResp struct {
Code string `json:"code"`
UploadUrls map[string]Part `json:"uploadUrls"`
Code string `json:"code"`
Data map[string]UploadUrlsData `json:"uploadUrls"`
}
type Part struct {
type UploadUrlsData struct {
RequestURL string `json:"requestURL"`
RequestHeader string `json:"requestHeader"`
}

type UploadUrlInfo struct {
PartNumber int
Headers map[string]string
UploadUrlsData
}

type UploadProgress struct {
UploadInfo InitMultiUploadResp
UploadParts []string
}

/* 第二种上传方式 */
type CreateUploadFileResp struct {
// 上传文件请求ID
Expand Down
213 changes: 131 additions & 82 deletions drivers/189pc/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"net/url"
"os"
"regexp"
"sort"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -471,15 +472,21 @@ func (y *Cloud189PC) StreamUpload(ctx context.Context, dstDir model.Obj, file mo
return nil, err
}

threadG, upCtx := errgroup.NewGroupWithContext(ctx, y.uploadThread,
retry.Attempts(3),
retry.Delay(time.Second),
retry.DelayType(retry.BackOffDelay))

fileMd5 := md5.New()
silceMd5 := md5.New()
silceMd5Hexs := make([]string, 0, count)
byteData := make([]byte, sliceSize)

for i := 1; i <= count; i++ {
if utils.IsCanceled(ctx) {
return nil, ctx.Err()
if utils.IsCanceled(upCtx) {
break
}

byteData := make([]byte, sliceSize)
if i == count {
byteData = byteData[:lastPartSize]
}
Expand All @@ -493,36 +500,26 @@ func (y *Cloud189PC) StreamUpload(ctx context.Context, dstDir model.Obj, file mo
// 计算块md5并进行hex和base64编码
md5Bytes := silceMd5.Sum(nil)
silceMd5Hexs = append(silceMd5Hexs, strings.ToUpper(hex.EncodeToString(md5Bytes)))
silceMd5Base64 := base64.StdEncoding.EncodeToString(md5Bytes)

// 获取上传链接
var uploadUrl UploadUrlsResp
_, err = y.request(fullUrl+"/getMultiUploadUrls", http.MethodGet,
func(req *resty.Request) {
req.SetContext(ctx)
}, Params{
"partInfo": fmt.Sprintf("%d-%s", i, silceMd5Base64),
"uploadFileId": initMultiUpload.Data.UploadFileID,
}, &uploadUrl)
if err != nil {
return nil, err
}
partInfo := fmt.Sprintf("%d-%s", i, base64.StdEncoding.EncodeToString(md5Bytes))

// 开始上传
uploadData := uploadUrl.UploadUrls[fmt.Sprint("partNumber_", i)]
threadG.Go(func(ctx context.Context) error {
uploadUrls, err := y.GetMultiUploadUrls(ctx, initMultiUpload.Data.UploadFileID, partInfo)
if err != nil {
return err
}

err = retry.Do(func() error {
_, err := y.put(ctx, uploadData.RequestURL, ParseHttpHeader(uploadData.RequestHeader), false, bytes.NewReader(byteData))
return err
},
retry.Context(ctx),
retry.Attempts(3),
retry.Delay(time.Second),
retry.DelayType(retry.BackOffDelay))
if err != nil {
return nil, err
}
up(int(i * 100 / count))
// step.4 上传切片
uploadUrl := uploadUrls[0]
_, err = y.put(ctx, uploadUrl.RequestURL, uploadUrl.Headers, false, bytes.NewReader(byteData))
if err != nil {
return err
}
up(int(threadG.Success()) * 100 / count)
return nil
})
}
if err = threadG.Wait(); err != nil {
return nil, err
}

fileMd5Hex := strings.ToUpper(hex.EncodeToString(fileMd5.Sum(nil)))
Expand Down Expand Up @@ -564,24 +561,24 @@ func (y *Cloud189PC) FastUpload(ctx context.Context, dstDir model.Obj, file mode

var sliceSize = partSize(file.GetSize())
count := int(math.Ceil(float64(file.GetSize()) / float64(sliceSize)))
lastPartSize := file.GetSize() % sliceSize
if file.GetSize() > 0 && lastPartSize == 0 {
lastPartSize = sliceSize
lastSliceSize := file.GetSize() % sliceSize
if file.GetSize() > 0 && lastSliceSize == 0 {
lastSliceSize = sliceSize
}

//step.1 优先计算所需信息
byteSize := sliceSize
fileMd5 := md5.New()
silceMd5 := md5.New()
silceMd5Hexs := make([]string, 0, count)
silceMd5Base64s := make([]string, 0, count)
partInfos := make([]string, 0, count)
for i := 1; i <= count; i++ {
if utils.IsCanceled(ctx) {
return nil, ctx.Err()
}

if i == count {
byteSize = lastPartSize
byteSize = lastSliceSize
}

silceMd5.Reset()
Expand All @@ -590,7 +587,7 @@ func (y *Cloud189PC) FastUpload(ctx context.Context, dstDir model.Obj, file mode
}
md5Byte := silceMd5.Sum(nil)
silceMd5Hexs = append(silceMd5Hexs, strings.ToUpper(hex.EncodeToString(md5Byte)))
silceMd5Base64s = append(silceMd5Base64s, fmt.Sprint(i, "-", base64.StdEncoding.EncodeToString(md5Byte)))
partInfos = append(partInfos, fmt.Sprint(i, "-", base64.StdEncoding.EncodeToString(md5Byte)))
}

fileMd5Hex := strings.ToUpper(hex.EncodeToString(fileMd5.Sum(nil)))
Expand All @@ -599,88 +596,95 @@ func (y *Cloud189PC) FastUpload(ctx context.Context, dstDir model.Obj, file mode
sliceMd5Hex = strings.ToUpper(utils.GetMD5EncodeStr(strings.Join(silceMd5Hexs, "\n")))
}

//step.2 预上传
params := Params{
"parentFolderId": dstDir.GetID(),
"fileName": url.QueryEscape(file.GetName()),
"fileSize": fmt.Sprint(file.GetSize()),
"fileMd5": fileMd5Hex,
"sliceSize": fmt.Sprint(sliceSize),
"sliceMd5": sliceMd5Hex,
}

fullUrl := UPLOAD_URL
if y.isFamily() {
params.Set("familyId", y.FamilyID)
fullUrl += "/family"
} else {
//params.Set("extend", `{"opScene":"1","relativepath":"","rootfolderid":""}`)
fullUrl += "/person"
}

var uploadInfo InitMultiUploadResp
_, err = y.request(fullUrl+"/initMultiUpload", http.MethodGet, func(req *resty.Request) {
req.SetContext(ctx)
}, params, &uploadInfo)
if err != nil {
return nil, err
}

// 网盘中不存在该文件,开始上传
if uploadInfo.Data.FileDataExists != 1 {
// step.3 获取上传切片信息
var uploadUrls UploadUrlsResp
_, err = y.request(fullUrl+"/getMultiUploadUrls", http.MethodGet,
func(req *resty.Request) {
req.SetContext(ctx)
}, Params{
"uploadFileId": uploadInfo.Data.UploadFileID,
"partInfo": strings.Join(silceMd5Base64s, ","),
}, &uploadUrls)
// 尝试恢复进度
uploadProgress, ok := base.GetUploadProgress[*UploadProgress](y, y.tokenInfo.SessionKey, fileMd5Hex)
if !ok {
//step.2 预上传
params := Params{
"parentFolderId": dstDir.GetID(),
"fileName": url.QueryEscape(file.GetName()),
"fileSize": fmt.Sprint(file.GetSize()),
"fileMd5": fileMd5Hex,
"sliceSize": fmt.Sprint(sliceSize),
"sliceMd5": sliceMd5Hex,
}
if y.isFamily() {
params.Set("familyId", y.FamilyID)
}
var uploadInfo InitMultiUploadResp
_, err = y.request(fullUrl+"/initMultiUpload", http.MethodGet, func(req *resty.Request) {
req.SetContext(ctx)
}, params, &uploadInfo)
if err != nil {
return nil, err
}
uploadProgress = &UploadProgress{
UploadInfo: uploadInfo,
UploadParts: partInfos,
}
}

// step.4 上传切片
uploadInfo := uploadProgress.UploadInfo.Data
// 网盘中不存在该文件,开始上传
if uploadInfo.FileDataExists != 1 {
threadG, upCtx := errgroup.NewGroupWithContext(ctx, y.uploadThread,
retry.Attempts(3),
retry.Delay(time.Second),
retry.DelayType(retry.BackOffDelay))
for k, part := range uploadUrls.UploadUrls {
for i, uploadPart := range uploadProgress.UploadParts {
if utils.IsCanceled(upCtx) {
break
}
partNumber, err := strconv.Atoi(strings.TrimPrefix(k, "partNumber_"))
if err != nil {
return nil, err
}

part, byteSize, offset := part, sliceSize, int64(partNumber-1)*sliceSize
if partNumber == count {
byteSize = lastPartSize
}

i, uploadPart := i, uploadPart
threadG.Go(func(ctx context.Context) error {
_, err := y.put(ctx, part.RequestURL, ParseHttpHeader(part.RequestHeader), false, io.NewSectionReader(tempFile, offset, byteSize))
// step.3 获取上传链接
uploadUrls, err := y.GetMultiUploadUrls(ctx, uploadInfo.UploadFileID, uploadPart)
if err != nil {
return err
}
up(int(threadG.Success()) * 100 / len(uploadUrls.UploadUrls))
uploadUrl := uploadUrls[0]

byteSize, offset := sliceSize, int64(uploadUrl.PartNumber-1)*sliceSize
if uploadUrl.PartNumber == count {
byteSize = lastSliceSize
}

// step.4 上传切片
_, err = y.put(ctx, uploadUrl.RequestURL, uploadUrl.Headers, false, io.NewSectionReader(tempFile, offset, byteSize))
if err != nil {
return err
}

up(int(threadG.Success()) * 100 / len(uploadUrls))
uploadProgress.UploadParts[i] = ""
return nil
})
}
if err = threadG.Wait(); err != nil {
if errors.Is(err, context.Canceled) {
uploadProgress.UploadParts = utils.SliceFilter(uploadProgress.UploadParts, func(s string) bool { return s != "" })
base.SaveUploadProgress(y, uploadProgress, y.tokenInfo.SessionKey, fileMd5Hex)
}
return nil, err
}
}

// 提交
// step.5 提交
var resp CommitMultiUploadFileResp
_, err = y.request(fullUrl+"/commitMultiUploadFile", http.MethodGet,
func(req *resty.Request) {
req.SetContext(ctx)
}, Params{
"uploadFileId": uploadInfo.Data.UploadFileID,
"uploadFileId": uploadInfo.UploadFileID,
"isLog": "0",
"opertype": "3",
}, &resp)
Expand All @@ -690,6 +694,51 @@ func (y *Cloud189PC) FastUpload(ctx context.Context, dstDir model.Obj, file mode
return resp.toFile(), nil
}

// 获取上传切片信息
// 对http body有大小限制,分片信息太多会出错
func (y *Cloud189PC) GetMultiUploadUrls(ctx context.Context, uploadFileId string, partInfo ...string) ([]UploadUrlInfo, error) {
fullUrl := UPLOAD_URL
if y.isFamily() {
fullUrl += "/family"
} else {
fullUrl += "/person"
}

var uploadUrlsResp UploadUrlsResp
_, err := y.request(fullUrl+"/getMultiUploadUrls", http.MethodGet,
func(req *resty.Request) {
req.SetContext(ctx)
}, Params{
"uploadFileId": uploadFileId,
"partInfo": strings.Join(partInfo, ","),
}, &uploadUrlsResp)
if err != nil {
return nil, err
}
uploadUrls := uploadUrlsResp.Data

if len(uploadUrls) != len(partInfo) {
return nil, fmt.Errorf("uploadUrls get error, due to get length %d, real length %d", len(partInfo), len(uploadUrls))
}

uploadUrlInfos := make([]UploadUrlInfo, 0, len(uploadUrls))
for k, uploadUrl := range uploadUrls {
partNumber, err := strconv.Atoi(strings.TrimPrefix(k, "partNumber_"))
if err != nil {
return nil, err
}
uploadUrlInfos = append(uploadUrlInfos, UploadUrlInfo{
PartNumber: partNumber,
Headers: ParseHttpHeader(uploadUrl.RequestHeader),
UploadUrlsData: uploadUrl,
})
}
sort.Slice(uploadUrlInfos, func(i, j int) bool {
return uploadUrlInfos[i].PartNumber < uploadUrlInfos[j].PartNumber
})
return uploadUrlInfos, nil
}

// 旧版本上传,家庭云不支持覆盖
func (y *Cloud189PC) OldUpload(ctx context.Context, dstDir model.Obj, file model.FileStreamer, up driver.UpdateProgress) (model.Obj, error) {
// 需要获取完整文件md5,必须支持 io.Seek
Expand Down
Loading

0 comments on commit c1db3a3

Please sign in to comment.