Skip to content

Commit

Permalink
fix file upload PartNotSequential error #445
Browse files Browse the repository at this point in the history
  • Loading branch information
tickstep committed Aug 30, 2024
1 parent 0b5bd60 commit 47baab4
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 41 deletions.
8 changes: 7 additions & 1 deletion internal/file/uploader/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -30,6 +30,7 @@ type (
io.Seeker
Range() transfer.Range
Left() int64
ResetReader(readerAt io.ReaderAt)
}

fileBlock struct {
Expand Down Expand Up @@ -147,3 +148,8 @@ func (fb *fileBlock) Range() transfer.Range {
func (fb *fileBlock) Readed() int64 {
return fb.readed
}

func (fb *fileBlock) ResetReader(readerAt io.ReaderAt) {
fb.readerAt = readerAt
fb.readed = 0
}
9 changes: 7 additions & 2 deletions internal/file/uploader/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -18,8 +18,10 @@ import "fmt"
var (
UploadUrlExpired = fmt.Errorf("UrlExpired")
UploadPartNotSeq = fmt.Errorf("PartNotSequential")
UploadNoSuchUpload = fmt.Errorf("NoSuchUpload")
UploadTerminate = fmt.Errorf("UploadErrorTerminate")
UploadPartAlreadyExist = fmt.Errorf("PartAlreadyExist")
UploadHttpError = fmt.Errorf("HttpError")
)

type (
Expand All @@ -33,5 +35,8 @@ type (
)

func (me *MultiError) Error() string {
return me.Err.Error()
if me.Err != nil {
return me.Err.Error()
}
return ""
}
18 changes: 7 additions & 11 deletions internal/file/uploader/instance_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -20,9 +20,9 @@ import (
type (
// BlockState 文件区块信息
BlockState struct {
ID int `json:"id"`
Range transfer.Range `json:"range"`
UploadDone bool `json:"upload_done"`
ID int `json:"id"`
Range transfer.Range `json:"range"`
UploadDone bool `json:"upload_done"`
}

// InstanceState 上传断点续传信息
Expand All @@ -39,18 +39,14 @@ func (muer *MultiUploader) getWorkerListByInstanceState(is *InstanceState) worke
id: blockState.ID,
partOffset: blockState.Range.Begin,
splitUnit: NewBufioSplitUnit(muer.file, blockState.Range, muer.speedsStat, muer.rateLimit, muer.globalSpeedsStat),
uploadDone: false,
uploadDone: false,
})
} else {
// 已经完成的, 也要加入 (可继续优化)
// 已经完成的, 也要加入
workers = append(workers, &worker{
id: blockState.ID,
partOffset: blockState.Range.Begin,
splitUnit: &fileBlock{
readRange: blockState.Range,
readed: blockState.Range.End - blockState.Range.Begin,
readerAt: muer.file,
},
splitUnit: NewBufioSplitUnit(muer.file, blockState.Range, muer.speedsStat, muer.rateLimit, muer.globalSpeedsStat),
uploadDone: true,
})
}
Expand Down
23 changes: 13 additions & 10 deletions internal/file/uploader/multiuploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -16,6 +16,7 @@ package uploader
import (
"context"
"github.com/tickstep/aliyunpan-api/aliyunpan"
"github.com/tickstep/aliyunpan/internal/config"
"github.com/tickstep/aliyunpan/internal/utils"
"github.com/tickstep/library-go/converter"
"github.com/tickstep/library-go/logger"
Expand Down Expand Up @@ -61,6 +62,7 @@ type (

// 网盘上传参数
UploadOpEntity *aliyunpan.CreateFileUploadResult `json:"uploadOpEntity"`
panClient *config.PanClient
}

// MultiUploaderConfig 多线程上传配置
Expand All @@ -72,12 +74,13 @@ type (
)

// NewMultiUploader 初始化上传
func NewMultiUploader(multiUpload MultiUpload, file rio.ReaderAtLen64, config *MultiUploaderConfig, uploadOpEntity *aliyunpan.CreateFileUploadResult, globalSpeedsStat *speeds.Speeds) *MultiUploader {
func NewMultiUploader(multiUpload MultiUpload, file rio.ReaderAtLen64, config *MultiUploaderConfig, uploadOpEntity *aliyunpan.CreateFileUploadResult, panClient *config.PanClient, globalSpeedsStat *speeds.Speeds) *MultiUploader {
return &MultiUploader{
multiUpload: multiUpload,
file: file,
config: config,
UploadOpEntity: uploadOpEntity,
panClient: panClient,
globalSpeedsStat: globalSpeedsStat,
}
}
Expand Down Expand Up @@ -137,13 +140,13 @@ func (muer *MultiUploader) Execute() error {
// 分配任务
if muer.instanceState != nil {
muer.workers = muer.getWorkerListByInstanceState(muer.instanceState)
logger.Verboseln("upload task CREATED from instance state\n")
logger.Verbosef("upload task CREATED from instance state\n")
} else {
muer.workers = muer.getWorkerListByInstanceState(&InstanceState{
BlockList: SplitBlock(muer.file.Len(), muer.config.BlockSize),
})

logger.Verboseln("upload task CREATED: block size: %d, num: %d\n", muer.config.BlockSize, len(muer.workers))
logger.Verbosef("upload task CREATED: block size: %d, num: %d\n", muer.config.BlockSize, len(muer.workers))
}

// 开始上传
Expand Down Expand Up @@ -195,32 +198,32 @@ func (muer *MultiUploader) Cancel() {
close(muer.canceled)
}

//OnExecute 设置开始上传事件
// OnExecute 设置开始上传事件
func (muer *MultiUploader) OnExecute(onExecuteEvent requester.Event) {
muer.onExecuteEvent = onExecuteEvent
}

//OnSuccess 设置成功上传事件
// OnSuccess 设置成功上传事件
func (muer *MultiUploader) OnSuccess(onSuccessEvent requester.Event) {
muer.onSuccessEvent = onSuccessEvent
}

//OnFinish 设置结束上传事件
// OnFinish 设置结束上传事件
func (muer *MultiUploader) OnFinish(onFinishEvent requester.Event) {
muer.onFinishEvent = onFinishEvent
}

//OnCancel 设置取消上传事件
// OnCancel 设置取消上传事件
func (muer *MultiUploader) OnCancel(onCancelEvent requester.Event) {
muer.onCancelEvent = onCancelEvent
}

//OnError 设置上传发生错误事件
// OnError 设置上传发生错误事件
func (muer *MultiUploader) OnError(onErrorEvent requester.EventOnError) {
muer.onErrorEvent = onErrorEvent
}

//OnUploadStatusEvent 设置上传状态事件
// OnUploadStatusEvent 设置上传状态事件
func (muer *MultiUploader) OnUploadStatusEvent(f UploadStatusFunc) {
muer.onUploadStatusEvent = f
}
32 changes: 21 additions & 11 deletions internal/file/uploader/multiworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -15,11 +15,12 @@ package uploader

import (
"context"
"errors"
"github.com/oleiade/lane"
"github.com/tickstep/aliyunpan/internal/waitgroup"
"github.com/tickstep/library-go/logger"
"github.com/tickstep/library-go/requester"
"os"
"io"
"strconv"
)

Expand Down Expand Up @@ -89,7 +90,7 @@ func (muer *MultiUploader) upload() (uperr error) {
)
go func() {
if !wer.uploadDone {
logger.Verboseln("begin to upload part: " + strconv.Itoa(wer.id))
logger.Verboseln("begin to upload part num: " + strconv.Itoa(wer.id+1))
uploadDone, terr = muer.multiUpload.UploadFile(ctx, int(wer.id), wer.partOffset, wer.splitUnit.Range().End, wer.splitUnit, uploadClient)
} else {
uploadDone = true
Expand All @@ -102,11 +103,11 @@ func (muer *MultiUploader) upload() (uperr error) {
return
case <-doneChan:
// continue
logger.Verboseln("multiUpload worker upload file done")
logger.Verboseln("multiUpload worker upload file http action done")
}
cancel()
if terr != nil {
logger.Verboseln("upload file part err: %+v", terr)
logger.Verbosef("upload file part err: %+v\n", terr)
if me, ok := terr.(*MultiError); ok {
if me.Terminated { // 终止
muer.closeCanceledOnce.Do(func() { // 只关闭一次
Expand All @@ -115,18 +116,21 @@ func (muer *MultiUploader) upload() (uperr error) {
uperr = me.Err
return
} else if me.NeedStartOver {
logger.Verboseln("upload start over: %d\n", wer.id)
logger.Verbosef("upload start over: %d\n", wer.id)
// 从头开始上传
muer.closeCanceledOnce.Do(func() { // 只关闭一次
close(muer.canceled)
})
uperr = me.Err
return
} else {
uperr = me.Err
return
}
}

logger.Verboseln("upload err: %s, id: %d\n", terr, wer.id)
wer.splitUnit.Seek(0, os.SEEK_SET)
logger.Verbosef("upload worker err: %s, id: %d\n", terr, wer.id)
wer.splitUnit.Seek(0, io.SeekStart)
uploadDeque.Prepend(wer) // 放回上传队列首位
return
}
Expand All @@ -139,9 +143,10 @@ func (muer *MultiUploader) upload() (uperr error) {
}()
wg.Wait()
if uperr != nil {
if uperr == UploadPartNotSeq {
// 分片出现乱序,需要重新上传,取消本次所有剩余的分片的上传
break
if errors.Is(uperr, UploadPartNotSeq) {
// 分片出现乱序,停止上传
// 清空数据,准备重新上传
uploadDeque = lane.NewDeque() // 清空待上传列表
}
}
// 没有任务了
Expand All @@ -153,6 +158,11 @@ func (muer *MultiUploader) upload() (uperr error) {
// 释放链路
uploadClient.CloseIdleConnections()

// 返回错误,通知上层客户端
if errors.Is(uperr, UploadPartNotSeq) {
return uperr
}

select {
case <-muer.canceled:
if uperr != nil {
Expand Down
22 changes: 19 additions & 3 deletions internal/functions/panupload/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,20 @@ func (pu *PanUpload) UploadFile(ctx context.Context, partseq int, partOffset int
errResp := &apierror.ErrorXmlResp{}
if err := xml.Unmarshal(buf, errResp); err == nil {
if errResp.Code != "" {
if "PartNotSequential" == errResp.Code || "NoSuchUpload" == errResp.Code {
if "PartNotSequential" == errResp.Code {
respError = uploader.UploadPartNotSeq
respErr = &uploader.MultiError{
Err: uploader.UploadPartNotSeq,
Terminated: false,
NeedStartOver: true,
NeedStartOver: false,
}
return resp, respError
} else if "NoSuchUpload" == errResp.Code {
respError = uploader.UploadNoSuchUpload
respErr = &uploader.MultiError{
Err: uploader.UploadNoSuchUpload,
Terminated: true,
NeedStartOver: false,
}
return resp, respError
} else if "AccessDenied" == errResp.Code && "Request has expired." == errResp.Message {
Expand All @@ -153,6 +161,14 @@ func (pu *PanUpload) UploadFile(ctx context.Context, partseq int, partOffset int
return resp, respError
}
}
} else {
respError = uploader.UploadHttpError
respErr = &uploader.MultiError{
Err: uploader.UploadHttpError,
Terminated: false,
NeedStartOver: false,
}
return resp, respError
}
}
} else {
Expand Down Expand Up @@ -204,7 +220,7 @@ func (pu *PanUpload) UploadFile(ctx context.Context, partseq int, partOffset int
// success
return true, nil
} else if respErr.Err == uploader.UploadPartNotSeq {
// 上传分片乱序了,需要重新从0分片开始上传
// 上传分片乱序了
// 先直接返回,后续再优化
return false, respErr
} else {
Expand Down
Loading

0 comments on commit 47baab4

Please sign in to comment.