Skip to content

Commit

Permalink
Merge branch 'fix/115-leak-memory'
Browse files Browse the repository at this point in the history
# Conflicts:
#	drivers/115/util.go
  • Loading branch information
Mmx233 committed Sep 7, 2024
2 parents 3ec274e + afe5d76 commit 3461bc9
Showing 1 changed file with 24 additions and 27 deletions.
51 changes: 24 additions & 27 deletions drivers/115/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,11 @@ func (d *Pan115) rapidUpload(ctx context.Context, fileSize int64, fileName, dirI
form.Set("sig", d.client.GenerateSignature(fileID, target))

signKey, signVal := "", ""
tryUpload := func() error {
tryUpload := func() (ok bool, err error) {
t := driver115.Now()

if encodedToken, err = ecdhCipher.EncodeToken(t.ToInt64()); err != nil {
return err
return
}

params := map[string]string{
Expand All @@ -179,59 +179,52 @@ func (d *Pan115) rapidUpload(ctx context.Context, fileSize int64, fileName, dirI
form.Set("sign_val", signVal)
}
if encrypted, err = ecdhCipher.Encrypt([]byte(form.Encode())); err != nil {
return err
return
}

req := d.client.NewRequest().
SetQueryParams(params).
SetBody(encrypted).
SetHeaderVerbatim("Content-Type", "application/x-www-form-urlencoded").
SetDoNotParseResponse(true)
req.SetContext(ctx)
resp, err := req.Post(driver115.ApiUploadInit)
if err != nil {
return err
return
}
data := resp.RawBody()
defer data.Close()
if bodyBytes, err = io.ReadAll(data); err != nil {
return err
return
}
if decrypted, err = ecdhCipher.Decrypt(bodyBytes); err != nil {
return err
return
}
if err = driver115.CheckErr(json.Unmarshal(decrypted, &result), &result, resp); err != nil {
return err
return
}
if result.Status == 7 {
// Update signKey & signVal
signKey = result.SignKey
signVal, err = UploadDigestRange(stream, result.SignCheck)
if err != nil {
return err
return
}
} else {
return nil
ok = true
}
result.SHA1 = fileID
return fmt.Errorf("115 rapid upload api returned status %d", result.Status)
return
}
for maxRetry := 10; ; maxRetry-- {
if maxRetry <= 0 {
return nil, fmt.Errorf("115 driver rapid upload max retry limit reached")
}

if err := tryUpload(); err == nil {
// Original logic is no error printing and no max retry times.
// Context controlling, time limiter and max retry is added at once
utils.Log.Debugln("115 driver rapid upload failed:", err)
for {
if ok, err := tryUpload(); err != nil {
return nil, err
} else if ok {
break
}

select {
case <-ctx.Done():
if ctx.Err() != nil {
return nil, ctx.Err()
case <-time.After(time.Second * 5):
// continue
}
}

Expand Down Expand Up @@ -303,6 +296,7 @@ func (d *Pan115) UploadByMultipart(ctx context.Context, params *driver115.Upload
if imur, err = bucket.InitiateMultipartUpload(params.Object,
oss.SetHeader(driver115.OssSecurityTokenHeaderName, ossToken.SecurityToken),
oss.UserAgentHeader(driver115.OSSUserAgent),
oss.WithContext(ctx),
); err != nil {
return err
}
Expand Down Expand Up @@ -349,8 +343,10 @@ func (d *Pan115) UploadByMultipart(ctx context.Context, params *driver115.Upload
}

b := bytes.NewBuffer(buf)
if part, err = bucket.UploadPart(imur, b, chunk.Size, chunk.Number, driver115.OssOption(params, ossToken)...); err == nil {
if part, err = bucket.UploadPart(imur, b, chunk.Size, chunk.Number, append(driver115.OssOption(params, ossToken), oss.WithContext(ctx))...); err == nil {
break
} else if errors.Is(err, context.Canceled) {
return
}
}
if err != nil {
Expand Down Expand Up @@ -389,13 +385,13 @@ LOOP:
}

// EOF错误是xml的Unmarshal导致的,响应其实是json格式,所以实际上上传是成功的
if _, err = bucket.CompleteMultipartUpload(imur, parts, driver115.OssOption(params, ossToken)...); err != nil && !errors.Is(err, io.EOF) {
if _, err = bucket.CompleteMultipartUpload(imur, parts, append(driver115.OssOption(params, ossToken), oss.WithContext(ctx))...); err != nil && !errors.Is(err, io.EOF) {
// 当文件名含有 &< 这两个字符之一时响应的xml解析会出现错误,实际上上传是成功的
if filename := filepath.Base(stream.GetName()); !strings.ContainsAny(filename, "&<") {
return err
}
}
return d.checkUploadStatus(dirID, params.SHA1)
return d.checkUploadStatus(ctx, dirID, params.SHA1)
}

func chunksProducer(ctx context.Context, ch chan oss.FileChunk, chunks []oss.FileChunk) {
Expand All @@ -407,9 +403,10 @@ func chunksProducer(ctx context.Context, ch chan oss.FileChunk, chunks []oss.Fil
}
}

func (d *Pan115) checkUploadStatus(dirID, sha1 string) error {
func (d *Pan115) checkUploadStatus(ctx context.Context, dirID, sha1 string) error {
// 验证上传是否成功
req := d.client.NewRequest().ForceContentType("application/json;charset=UTF-8")
req.SetContext(ctx)
opts := []driver115.GetFileOptions{
driver115.WithOrder(driver115.FileOrderByTime),
driver115.WithShowDirEnable(false),
Expand Down

0 comments on commit 3461bc9

Please sign in to comment.