From 47baab44a701aae2c0ffa5d795baf9052260c289 Mon Sep 17 00:00:00 2001
From: tickstep <tickstep@outlook.com>
Date: Fri, 30 Aug 2024 11:49:43 +0800
Subject: [PATCH] fix file upload PartNotSequential error #445

---
 internal/file/uploader/block.go               |  8 ++-
 internal/file/uploader/error.go               |  9 ++-
 internal/file/uploader/instance_state.go      | 18 ++----
 internal/file/uploader/multiuploader.go       | 23 ++++---
 internal/file/uploader/multiworker.go         | 32 ++++++----
 internal/functions/panupload/upload.go        | 22 ++++++-
 .../functions/panupload/upload_task_unit.go   | 64 ++++++++++++++++++-
 7 files changed, 135 insertions(+), 41 deletions(-)

diff --git a/internal/file/uploader/block.go b/internal/file/uploader/block.go
index d043f3d7..77f22bd9 100644
--- a/internal/file/uploader/block.go
+++ b/internal/file/uploader/block.go
@@ -4,7 +4,7 @@
 // you may not use this file except in compliance with the License.
 // You may obtain a copy of the License at
 //
-//     http://www.apache.org/licenses/LICENSE-2.0
+//	http://www.apache.org/licenses/LICENSE-2.0
 //
 // Unless required by applicable law or agreed to in writing, software
 // distributed under the License is distributed on an "AS IS" BASIS,
@@ -30,6 +30,7 @@ type (
 		io.Seeker
 		Range() transfer.Range
 		Left() int64
+		ResetReader(readerAt io.ReaderAt)
 	}
 
 	fileBlock struct {
@@ -147,3 +148,8 @@ func (fb *fileBlock) Range() transfer.Range {
 func (fb *fileBlock) Readed() int64 {
 	return fb.readed
 }
+
+func (fb *fileBlock) ResetReader(readerAt io.ReaderAt) {
+	fb.readerAt = readerAt
+	fb.readed = 0
+}
diff --git a/internal/file/uploader/error.go b/internal/file/uploader/error.go
index 3cf5551e..16c418fd 100644
--- a/internal/file/uploader/error.go
+++ b/internal/file/uploader/error.go
@@ -4,7 +4,7 @@
 // you may not use this file except in compliance with the License.
 // You may obtain a copy of the License at
 //
-//     http://www.apache.org/licenses/LICENSE-2.0
+//	http://www.apache.org/licenses/LICENSE-2.0
 //
 // Unless required by applicable law or agreed to in writing, software
 // distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,8 +18,10 @@ import "fmt"
 var (
 	UploadUrlExpired       = fmt.Errorf("UrlExpired")
 	UploadPartNotSeq       = fmt.Errorf("PartNotSequential")
+	UploadNoSuchUpload     = fmt.Errorf("NoSuchUpload")
 	UploadTerminate        = fmt.Errorf("UploadErrorTerminate")
 	UploadPartAlreadyExist = fmt.Errorf("PartAlreadyExist")
+	UploadHttpError        = fmt.Errorf("HttpError")
 )
 
 type (
@@ -33,5 +35,8 @@ type (
 )
 
 func (me *MultiError) Error() string {
-	return me.Err.Error()
+	if me.Err != nil {
+		return me.Err.Error()
+	}
+	return ""
 }
diff --git a/internal/file/uploader/instance_state.go b/internal/file/uploader/instance_state.go
index 0f24c142..3d1d1e08 100644
--- a/internal/file/uploader/instance_state.go
+++ b/internal/file/uploader/instance_state.go
@@ -4,7 +4,7 @@
 // you may not use this file except in compliance with the License.
 // You may obtain a copy of the License at
 //
-//     http://www.apache.org/licenses/LICENSE-2.0
+//	http://www.apache.org/licenses/LICENSE-2.0
 //
 // Unless required by applicable law or agreed to in writing, software
 // distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,9 +20,9 @@ import (
 type (
 	// BlockState 文件区块信息
 	BlockState struct {
-		ID       int            `json:"id"`
-		Range    transfer.Range `json:"range"`
-		UploadDone bool `json:"upload_done"`
+		ID         int            `json:"id"`
+		Range      transfer.Range `json:"range"`
+		UploadDone bool           `json:"upload_done"`
 	}
 
 	// InstanceState 上传断点续传信息
@@ -39,18 +39,14 @@ func (muer *MultiUploader) getWorkerListByInstanceState(is *InstanceState) worke
 				id:         blockState.ID,
 				partOffset: blockState.Range.Begin,
 				splitUnit:  NewBufioSplitUnit(muer.file, blockState.Range, muer.speedsStat, muer.rateLimit, muer.globalSpeedsStat),
-				uploadDone:   false,
+				uploadDone: false,
 			})
 		} else {
-			// 已经完成的, 也要加入 (可继续优化)
+			// 已经完成的, 也要加入
 			workers = append(workers, &worker{
 				id:         blockState.ID,
 				partOffset: blockState.Range.Begin,
-				splitUnit: &fileBlock{
-					readRange: blockState.Range,
-					readed:    blockState.Range.End - blockState.Range.Begin,
-					readerAt:  muer.file,
-				},
+				splitUnit:  NewBufioSplitUnit(muer.file, blockState.Range, muer.speedsStat, muer.rateLimit, muer.globalSpeedsStat),
 				uploadDone: true,
 			})
 		}
diff --git a/internal/file/uploader/multiuploader.go b/internal/file/uploader/multiuploader.go
index f71f4768..7a9c4644 100644
--- a/internal/file/uploader/multiuploader.go
+++ b/internal/file/uploader/multiuploader.go
@@ -4,7 +4,7 @@
 // you may not use this file except in compliance with the License.
 // You may obtain a copy of the License at
 //
-//     http://www.apache.org/licenses/LICENSE-2.0
+//	http://www.apache.org/licenses/LICENSE-2.0
 //
 // Unless required by applicable law or agreed to in writing, software
 // distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,6 +16,7 @@ package uploader
 import (
 	"context"
 	"github.com/tickstep/aliyunpan-api/aliyunpan"
+	"github.com/tickstep/aliyunpan/internal/config"
 	"github.com/tickstep/aliyunpan/internal/utils"
 	"github.com/tickstep/library-go/converter"
 	"github.com/tickstep/library-go/logger"
@@ -61,6 +62,7 @@ type (
 
 		// 网盘上传参数
 		UploadOpEntity *aliyunpan.CreateFileUploadResult `json:"uploadOpEntity"`
+		panClient      *config.PanClient
 	}
 
 	// MultiUploaderConfig 多线程上传配置
@@ -72,12 +74,13 @@ type (
 )
 
 // NewMultiUploader 初始化上传
-func NewMultiUploader(multiUpload MultiUpload, file rio.ReaderAtLen64, config *MultiUploaderConfig, uploadOpEntity *aliyunpan.CreateFileUploadResult, globalSpeedsStat *speeds.Speeds) *MultiUploader {
+func NewMultiUploader(multiUpload MultiUpload, file rio.ReaderAtLen64, config *MultiUploaderConfig, uploadOpEntity *aliyunpan.CreateFileUploadResult, panClient *config.PanClient, globalSpeedsStat *speeds.Speeds) *MultiUploader {
 	return &MultiUploader{
 		multiUpload:      multiUpload,
 		file:             file,
 		config:           config,
 		UploadOpEntity:   uploadOpEntity,
+		panClient:        panClient,
 		globalSpeedsStat: globalSpeedsStat,
 	}
 }
@@ -137,13 +140,13 @@ func (muer *MultiUploader) Execute() error {
 	// 分配任务
 	if muer.instanceState != nil {
 		muer.workers = muer.getWorkerListByInstanceState(muer.instanceState)
-		logger.Verboseln("upload task CREATED from instance state\n")
+		logger.Verbosef("upload task CREATED from instance state\n")
 	} else {
 		muer.workers = muer.getWorkerListByInstanceState(&InstanceState{
 			BlockList: SplitBlock(muer.file.Len(), muer.config.BlockSize),
 		})
 
-		logger.Verboseln("upload task CREATED: block size: %d, num: %d\n", muer.config.BlockSize, len(muer.workers))
+		logger.Verbosef("upload task CREATED: block size: %d, num: %d\n", muer.config.BlockSize, len(muer.workers))
 	}
 
 	// 开始上传
@@ -195,32 +198,32 @@ func (muer *MultiUploader) Cancel() {
 	close(muer.canceled)
 }
 
-//OnExecute 设置开始上传事件
+// OnExecute 设置开始上传事件
 func (muer *MultiUploader) OnExecute(onExecuteEvent requester.Event) {
 	muer.onExecuteEvent = onExecuteEvent
 }
 
-//OnSuccess 设置成功上传事件
+// OnSuccess 设置成功上传事件
 func (muer *MultiUploader) OnSuccess(onSuccessEvent requester.Event) {
 	muer.onSuccessEvent = onSuccessEvent
 }
 
-//OnFinish 设置结束上传事件
+// OnFinish 设置结束上传事件
 func (muer *MultiUploader) OnFinish(onFinishEvent requester.Event) {
 	muer.onFinishEvent = onFinishEvent
 }
 
-//OnCancel 设置取消上传事件
+// OnCancel 设置取消上传事件
 func (muer *MultiUploader) OnCancel(onCancelEvent requester.Event) {
 	muer.onCancelEvent = onCancelEvent
 }
 
-//OnError 设置上传发生错误事件
+// OnError 设置上传发生错误事件
 func (muer *MultiUploader) OnError(onErrorEvent requester.EventOnError) {
 	muer.onErrorEvent = onErrorEvent
 }
 
-//OnUploadStatusEvent 设置上传状态事件
+// OnUploadStatusEvent 设置上传状态事件
 func (muer *MultiUploader) OnUploadStatusEvent(f UploadStatusFunc) {
 	muer.onUploadStatusEvent = f
 }
diff --git a/internal/file/uploader/multiworker.go b/internal/file/uploader/multiworker.go
index 1c29ba7b..18b1f509 100644
--- a/internal/file/uploader/multiworker.go
+++ b/internal/file/uploader/multiworker.go
@@ -4,7 +4,7 @@
 // you may not use this file except in compliance with the License.
 // You may obtain a copy of the License at
 //
-//     http://www.apache.org/licenses/LICENSE-2.0
+//	http://www.apache.org/licenses/LICENSE-2.0
 //
 // Unless required by applicable law or agreed to in writing, software
 // distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,11 +15,12 @@ package uploader
 
 import (
 	"context"
+	"errors"
 	"github.com/oleiade/lane"
 	"github.com/tickstep/aliyunpan/internal/waitgroup"
 	"github.com/tickstep/library-go/logger"
 	"github.com/tickstep/library-go/requester"
-	"os"
+	"io"
 	"strconv"
 )
 
@@ -89,7 +90,7 @@ func (muer *MultiUploader) upload() (uperr error) {
 			)
 			go func() {
 				if !wer.uploadDone {
-					logger.Verboseln("begin to upload part: " + strconv.Itoa(wer.id))
+					logger.Verboseln("begin to upload part num: " + strconv.Itoa(wer.id+1))
 					uploadDone, terr = muer.multiUpload.UploadFile(ctx, int(wer.id), wer.partOffset, wer.splitUnit.Range().End, wer.splitUnit, uploadClient)
 				} else {
 					uploadDone = true
@@ -102,11 +103,11 @@ func (muer *MultiUploader) upload() (uperr error) {
 				return
 			case <-doneChan:
 				// continue
-				logger.Verboseln("multiUpload worker upload file done")
+				logger.Verboseln("multiUpload worker upload file http action done")
 			}
 			cancel()
 			if terr != nil {
-				logger.Verboseln("upload file part err: %+v", terr)
+				logger.Verbosef("upload file part err: %+v\n", terr)
 				if me, ok := terr.(*MultiError); ok {
 					if me.Terminated { // 终止
 						muer.closeCanceledOnce.Do(func() { // 只关闭一次
@@ -115,18 +116,21 @@ func (muer *MultiUploader) upload() (uperr error) {
 						uperr = me.Err
 						return
 					} else if me.NeedStartOver {
-						logger.Verboseln("upload start over: %d\n", wer.id)
+						logger.Verbosef("upload start over: %d\n", wer.id)
 						// 从头开始上传
 						muer.closeCanceledOnce.Do(func() { // 只关闭一次
 							close(muer.canceled)
 						})
 						uperr = me.Err
 						return
+					} else {
+						uperr = me.Err
+						return
 					}
 				}
 
-				logger.Verboseln("upload err: %s, id: %d\n", terr, wer.id)
-				wer.splitUnit.Seek(0, os.SEEK_SET)
+				logger.Verbosef("upload worker err: %s, id: %d\n", terr, wer.id)
+				wer.splitUnit.Seek(0, io.SeekStart)
 				uploadDeque.Prepend(wer) // 放回上传队列首位
 				return
 			}
@@ -139,9 +143,10 @@ func (muer *MultiUploader) upload() (uperr error) {
 		}()
 		wg.Wait()
 		if uperr != nil {
-			if uperr == UploadPartNotSeq {
-				// 分片出现乱序,需要重新上传,取消本次所有剩余的分片的上传
-				break
+			if errors.Is(uperr, UploadPartNotSeq) {
+				// 分片出现乱序,停止上传
+				// 清空数据,准备重新上传
+				uploadDeque = lane.NewDeque() // 清空待上传列表
 			}
 		}
 		// 没有任务了
@@ -153,6 +158,11 @@ func (muer *MultiUploader) upload() (uperr error) {
 	// 释放链路
 	uploadClient.CloseIdleConnections()
 
+	// 返回错误,通知上层客户端
+	if errors.Is(uperr, UploadPartNotSeq) {
+		return uperr
+	}
+
 	select {
 	case <-muer.canceled:
 		if uperr != nil {
diff --git a/internal/functions/panupload/upload.go b/internal/functions/panupload/upload.go
index d195d8c6..1640e1ef 100644
--- a/internal/functions/panupload/upload.go
+++ b/internal/functions/panupload/upload.go
@@ -129,12 +129,20 @@ func (pu *PanUpload) UploadFile(ctx context.Context, partseq int, partOffset int
 					errResp := &apierror.ErrorXmlResp{}
 					if err := xml.Unmarshal(buf, errResp); err == nil {
 						if errResp.Code != "" {
-							if "PartNotSequential" == errResp.Code || "NoSuchUpload" == errResp.Code {
+							if "PartNotSequential" == errResp.Code {
 								respError = uploader.UploadPartNotSeq
 								respErr = &uploader.MultiError{
 									Err:           uploader.UploadPartNotSeq,
 									Terminated:    false,
-									NeedStartOver: true,
+									NeedStartOver: false,
+								}
+								return resp, respError
+							} else if "NoSuchUpload" == errResp.Code {
+								respError = uploader.UploadNoSuchUpload
+								respErr = &uploader.MultiError{
+									Err:           uploader.UploadNoSuchUpload,
+									Terminated:    true,
+									NeedStartOver: false,
 								}
 								return resp, respError
 							} else if "AccessDenied" == errResp.Code && "Request has expired." == errResp.Message {
@@ -153,6 +161,14 @@ func (pu *PanUpload) UploadFile(ctx context.Context, partseq int, partOffset int
 								return resp, respError
 							}
 						}
+					} else {
+						respError = uploader.UploadHttpError
+						respErr = &uploader.MultiError{
+							Err:           uploader.UploadHttpError,
+							Terminated:    false,
+							NeedStartOver: false,
+						}
+						return resp, respError
 					}
 				}
 			} else {
@@ -204,7 +220,7 @@ func (pu *PanUpload) UploadFile(ctx context.Context, partseq int, partOffset int
 			// success
 			return true, nil
 		} else if respErr.Err == uploader.UploadPartNotSeq {
-			// 上传分片乱序了,需要重新从0分片开始上传
+			// 上传分片乱序了
 			// 先直接返回,后续再优化
 			return false, respErr
 		} else {
diff --git a/internal/functions/panupload/upload_task_unit.go b/internal/functions/panupload/upload_task_unit.go
index fed648f9..53ccd94f 100644
--- a/internal/functions/panupload/upload_task_unit.go
+++ b/internal/functions/panupload/upload_task_unit.go
@@ -14,6 +14,7 @@
 package panupload
 
 import (
+	"errors"
 	"fmt"
 	"github.com/tickstep/aliyunpan/internal/log"
 	"github.com/tickstep/aliyunpan/internal/plugins"
@@ -167,7 +168,7 @@ func (utu *UploadTaskUnit) upload() (result *taskframework.TaskUnitRunResult) {
 			Parallel:  utu.Parallel,
 			BlockSize: utu.BlockSize,
 			MaxRate:   config.Config.MaxUploadRate,
-		}, utu.LocalFileChecksum.UploadOpEntity, utu.GlobalSpeedsStat)
+		}, utu.LocalFileChecksum.UploadOpEntity, utu.PanClient, utu.GlobalSpeedsStat)
 
 	// 设置断点续传
 	if utu.state != nil {
@@ -240,6 +241,10 @@ func (utu *UploadTaskUnit) upload() (result *taskframework.TaskUnitRunResult) {
 	if er != nil {
 		result.ResultMessage = StrUploadFailed
 		result.NeedRetry = true
+		if errors.Is(er, uploader.UploadNoSuchUpload) {
+			// do not need retry
+			result.NeedRetry = false
+		}
 		result.Err = er
 	}
 	return
@@ -555,14 +560,67 @@ stepUploadUpload:
 	// 正常上传流程
 	uploadResult := utu.upload()
 	if uploadResult != nil && uploadResult.Err != nil {
-		if uploadResult.Err == uploader.UploadPartNotSeq {
-			fmt.Printf("[%s] %s 文件分片上传顺序错误,开始重新上传文件\n", utu.taskInfo.Id(), time.Now().Format("2006-01-02 15:04:06"))
+		// 处理上传错误
+		if errors.Is(uploadResult.Err, uploader.UploadPartNotSeq) {
+			// 分片乱序错误
+			utu.amendFileUploadPartNum()
+			goto stepUploadUpload
+		}
+		if errors.Is(uploadResult.Err, uploader.UploadNoSuchUpload) {
+			// 上传任务过期
+			fmt.Printf("[%s] %s 网盘上传任务不存在,创建新任务重新上传文件\n", utu.taskInfo.Id(), time.Now().Format("2006-01-02 15:04:06"))
 			// 需要重新从0开始上传
 			uploadResult = nil
 			utu.LocalFileChecksum.UploadOpEntity = nil
 			utu.state = nil
 			goto StepUploadPrepareUpload
 		}
+		var apier *apierror.ApiError
+		if errors.As(uploadResult.Err, &apier) {
+			// 上传任务过期
+			if apier.Code == apierror.ApiCodeUploadIdNotFound {
+				fmt.Printf("[%s] %s 网盘上传任务已失效,创建新任务重新上传文件\n", utu.taskInfo.Id(), time.Now().Format("2006-01-02 15:04:06"))
+				uploadResult = nil
+				utu.LocalFileChecksum.UploadOpEntity = nil
+				utu.state = nil
+				goto StepUploadPrepareUpload
+			}
+		}
 	}
 	return uploadResult
 }
+
+// amendFileUploadPartNum 修正文件分片上传顺序错误
+func (utu *UploadTaskUnit) amendFileUploadPartNum() {
+	if utu.LocalFileChecksum.LocalFileMeta.UploadOpEntity == nil || utu.state == nil {
+		return
+	}
+	logger.Verbosef("adjust the uploaded parts num error\n")
+	// 分片出现乱序
+	// 获取的已上传分片信息,修正正确的分片顺序
+	uploadedParts, uper := utu.PanClient.OpenapiPanClient().GetUploadedPartInfoAllItem(&aliyunpan.GetUploadedPartsParam{
+		DriveId:  utu.LocalFileChecksum.LocalFileMeta.UploadOpEntity.DriveId,
+		FileId:   utu.LocalFileChecksum.LocalFileMeta.UploadOpEntity.FileId,
+		UploadId: utu.LocalFileChecksum.LocalFileMeta.UploadOpEntity.UploadId,
+	})
+	if uper != nil {
+		logger.Verbosef("get uploaded parts info error: %+v\n", uper)
+		return
+	}
+	// 获取最后上传的分片编号
+	lastUploadedPartNum := -1
+	if len(uploadedParts.UploadedParts) > 0 {
+		lastUploadedPartNum = uploadedParts.UploadedParts[len(uploadedParts.UploadedParts)-1].PartNumber
+	}
+	// 修正分片上传的标识
+	if lastUploadedPartNum > 0 {
+		logger.Verbosef("get the right uploaded parts num: %d\n", lastUploadedPartNum)
+		for _, w := range utu.state.BlockList {
+			if (w.ID + 1) <= lastUploadedPartNum { // 分片的编号从1开始,BlockList的id是从0开始
+				w.UploadDone = true
+			} else {
+				w.UploadDone = false
+			}
+		}
+	}
+}