From f28550c80fe1dd0f63d8eee508721b640b170cbd Mon Sep 17 00:00:00 2001 From: Chloroplast Yu Date: Tue, 16 Nov 2021 11:29:47 +0800 Subject: [PATCH] Bug Fix & Add Feature: Fix the bug of path parsing error under windows, Add speed limit function --- cmd/cp.go | 22 ++++++++++++++-------- cmd/rm.go | 2 +- cmd/sync.go | 22 ++++++++++++++-------- util/download.go | 16 ++++++++-------- util/synchronize.go | 45 +++++++++++++++++++++++++-------------------- util/upload.go | 14 +++++++------- 6 files changed, 69 insertions(+), 52 deletions(-) diff --git a/cmd/cp.go b/cmd/cp.go index 505ce8e..ce3bb42 100644 --- a/cmd/cp.go +++ b/cmd/cp.go @@ -39,15 +39,18 @@ Example: include, _ := cmd.Flags().GetString("include") exclude, _ := cmd.Flags().GetString("exclude") storageClass, _ := cmd.Flags().GetString("storage-class") + rateLimiting, _ := cmd.Flags().GetFloat32("rate-limiting") + partSize, _ := cmd.Flags().GetInt64("part-size") + threadNum, _ := cmd.Flags().GetInt("thread-num") // args[0]: 源地址 // args[1]: 目标地址 if !util.IsCosPath(args[0]) && util.IsCosPath(args[1]) { // 上传 - upload(args, recursive, include, exclude, storageClass) + upload(args, recursive, include, exclude, storageClass, rateLimiting, partSize, threadNum) } if util.IsCosPath(args[0]) && !util.IsCosPath(args[1]) { // 下载 - download(args, recursive, include, exclude) + download(args, recursive, include, exclude, rateLimiting, partSize, threadNum) } if util.IsCosPath(args[0]) && util.IsCosPath(args[1]) { // 拷贝 @@ -63,29 +66,32 @@ func init() { cpCmd.Flags().String("include", "", "Include files that meet the specified criteria") cpCmd.Flags().String("exclude", "", "Exclude files that meet the specified criteria") cpCmd.Flags().String("storage-class", "", "Specifying a storage class") + cpCmd.Flags().Float32("rate-limiting", 0, "Upload or download speed limit(MB/s)") + cpCmd.Flags().Int64("part-size", 32, "Specifies the block size(MB)") + cpCmd.Flags().Int("thread-num", 5, "Specifies the number of concurrent upload or download threads") } -func upload(args []string, recursive bool, include string, exclude string, storageClass string) { +func upload(args []string, recursive bool, include string, exclude string, storageClass string, rateLimiting float32, partSize int64, threadNum int) { _, localPath := util.ParsePath(args[0]) bucketName, cosPath := util.ParsePath(args[1]) c := util.NewClient(&config, bucketName) if recursive { - util.MultiUpload(c, localPath, bucketName, cosPath, include, exclude, storageClass) + util.MultiUpload(c, localPath, bucketName, cosPath, include, exclude, storageClass, rateLimiting, partSize, threadNum) } else { - util.SingleUpload(c, localPath, bucketName, cosPath, storageClass) + util.SingleUpload(c, localPath, bucketName, cosPath, storageClass, rateLimiting, partSize, threadNum) } } -func download(args []string, recursive bool, include string, exclude string) { +func download(args []string, recursive bool, include string, exclude string, rateLimiting float32, partSize int64, threadNum int) { bucketName, cosPath := util.ParsePath(args[0]) _, localPath := util.ParsePath(args[1]) c := util.NewClient(&config, bucketName) if recursive { - util.MultiDownload(c, bucketName, cosPath, localPath, include, exclude) + util.MultiDownload(c, bucketName, cosPath, localPath, include, exclude, rateLimiting, partSize, threadNum) } else { - util.SingleDownload(c, bucketName, cosPath, localPath) + util.SingleDownload(c, bucketName, cosPath, localPath, rateLimiting, partSize, threadNum) } } diff --git a/cmd/rm.go b/cmd/rm.go index 744fff0..40250b7 100644 --- a/cmd/rm.go +++ b/cmd/rm.go @@ -114,7 +114,7 @@ func removeObjects1(args []string, include string, exclude string, force bool) { bucketName, cosDir := util.ParsePath(arg) c := util.NewClient(&config, bucketName) - if cosDir != "" && (cosDir[len(cosDir)-1] != '/' || cosDir[len(cosDir)-1] != '\\') { + if cosDir != "" && cosDir[len(cosDir)-1] != '/' { cosDir += "/" } diff --git a/cmd/sync.go b/cmd/sync.go index 9e170d3..5723f64 100644 --- a/cmd/sync.go +++ b/cmd/sync.go @@ -40,15 +40,18 @@ Example: include, _ := cmd.Flags().GetString("include") exclude, _ := cmd.Flags().GetString("exclude") storageClass, _ := cmd.Flags().GetString("storage-class") + rateLimiting, _ := cmd.Flags().GetFloat32("rate-limiting") + partSize, _ := cmd.Flags().GetInt64("part-size") + threadNum, _ := cmd.Flags().GetInt("thread-num") // args[0]: 源地址 // args[1]: 目标地址 if !util.IsCosPath(args[0]) && util.IsCosPath(args[1]) { // 上传 - syncUpload(args, recursive, include, exclude, storageClass) + syncUpload(args, recursive, include, exclude, storageClass, rateLimiting, partSize, threadNum) } if util.IsCosPath(args[0]) && !util.IsCosPath(args[1]) { // 下载 - syncDownload(args, recursive, include, exclude) + syncDownload(args, recursive, include, exclude, rateLimiting, partSize, threadNum) } if util.IsCosPath(args[0]) && util.IsCosPath(args[1]) { // 拷贝 @@ -64,29 +67,32 @@ func init() { syncCmd.Flags().String("include", "", "List files that meet the specified criteria") syncCmd.Flags().String("exclude", "", "Exclude files that meet the specified criteria") syncCmd.Flags().String("storage-class", "", "Specifying a storage class") + syncCmd.Flags().Float32("rate-limiting", 0, "Upload or download speed limit(MB/s)") + syncCmd.Flags().Int64("part-size", 32, "Specifies the block size(MB)") + syncCmd.Flags().Int("thread-num", 5, "Specifies the number of concurrent upload or download threads") } -func syncUpload(args []string, recursive bool, include string, exclude string, storageClass string) { +func syncUpload(args []string, recursive bool, include string, exclude string, storageClass string, rateLimiting float32, partSize int64, threadNum int) { _, localPath := util.ParsePath(args[0]) bucketName, cosPath := util.ParsePath(args[1]) c := util.NewClient(&config, bucketName) if recursive { - util.SyncMultiUpload(c, localPath, bucketName, cosPath, include, exclude, storageClass) + util.SyncMultiUpload(c, localPath, bucketName, cosPath, include, exclude, storageClass, rateLimiting, partSize, threadNum) } else { - util.SyncSingleUpload(c, localPath, bucketName, cosPath, storageClass) + util.SyncSingleUpload(c, localPath, bucketName, cosPath, storageClass, rateLimiting, partSize, threadNum) } } -func syncDownload(args []string, recursive bool, include string, exclude string) { +func syncDownload(args []string, recursive bool, include string, exclude string, rateLimiting float32, partSize int64, threadNum int) { bucketName, cosPath := util.ParsePath(args[0]) _, localPath := util.ParsePath(args[1]) c := util.NewClient(&config, bucketName) if recursive { - util.SyncMultiDownload(c, bucketName, cosPath, localPath, include, exclude) + util.SyncMultiDownload(c, bucketName, cosPath, localPath, include, exclude, rateLimiting, partSize, threadNum) } else { - util.SyncSingleDownload(c, bucketName, cosPath, localPath) + util.SyncSingleDownload(c, bucketName, cosPath, localPath, rateLimiting, partSize, threadNum) } } diff --git a/util/download.go b/util/download.go index 5bfc2da..ce32529 100644 --- a/util/download.go +++ b/util/download.go @@ -9,7 +9,7 @@ import ( "strings" ) -func SingleDownload(c *cos.Client, bucketName string, cosPath string, localPath string) { +func SingleDownload(c *cos.Client, bucketName string, cosPath string, localPath string, rateLimiting float32, partSize int64, threadNum int) { opt := &cos.MultiDownloadOptions{ Opt: &cos.ObjectGetOptions{ ResponseContentType: "", @@ -24,11 +24,11 @@ func SingleDownload(c *cos.Client, bucketName string, cosPath string, localPath XCosSSECustomerKey: "", XCosSSECustomerKeyMD5: "", XOptionHeader: nil, - XCosTrafficLimit: 0, + XCosTrafficLimit: (int)(rateLimiting * 1024 * 1024 * 8), Listener: &CosListener{}, }, - PartSize: 32, - ThreadPoolSize: 5, + PartSize: partSize, + ThreadPoolSize: threadNum, CheckPoint: true, CheckPointFile: "", } @@ -52,7 +52,7 @@ func SingleDownload(c *cos.Client, bucketName string, cosPath string, localPath // 创建文件夹 var path string - if localPath[len(localPath) - 1] == '/' { + if localPath[len(localPath) - 1] == '/' || localPath[len(localPath) - 1] == '\\' { pathList := strings.Split(cosPath, "/") fileName := pathList[len(pathList)-1] path = localPath @@ -77,8 +77,8 @@ func SingleDownload(c *cos.Client, bucketName string, cosPath string, localPath } } -func MultiDownload(c *cos.Client, bucketName string, cosDir string, localDir string, include string, exclude string) { - if localDir != "" && localDir[len(localDir)-1] != '/' { +func MultiDownload(c *cos.Client, bucketName string, cosDir string, localDir string, include string, exclude string, rateLimiting float32, partSize int64, threadNum int) { + if localDir != "" && (localDir[len(localDir)-1] != '/' || localDir[len(localDir)-1] != '\\') { localDir += "/" } if cosDir != "" && cosDir[len(cosDir)-1] != '/' { @@ -90,6 +90,6 @@ func MultiDownload(c *cos.Client, bucketName string, cosDir string, localDir str for _, o := range objects { objName := o.Key[len(cosDir):] localPath := localDir + objName - SingleDownload(c, bucketName, o.Key, localPath) + SingleDownload(c, bucketName, o.Key, localPath, rateLimiting, partSize, threadNum) } } diff --git a/util/synchronize.go b/util/synchronize.go index 9dffaa4..228909d 100644 --- a/util/synchronize.go +++ b/util/synchronize.go @@ -7,7 +7,7 @@ import ( "os" ) -func SyncSingleUpload(c *cos.Client, localPath string, bucketName string, cosPath string, storageClass string) { +func SyncSingleUpload(c *cos.Client, localPath string, bucketName string, cosPath string, storageClass string, rateLimiting float32, partSize int64, threadNum int) { headOpt := &cos.ObjectHeadOptions{ IfModifiedSince: "", XCosSSECustomerAglo: "", @@ -20,22 +20,27 @@ func SyncSingleUpload(c *cos.Client, localPath string, bucketName string, cosPat if err != nil { if resp != nil && resp.StatusCode == 404 { // 文件不在cos上,上传 - SingleUpload(c, localPath, bucketName, cosPath, storageClass) + SingleUpload(c, localPath, bucketName, cosPath, storageClass, rateLimiting, partSize, threadNum) } else { _, _ = fmt.Fprintln(os.Stderr, err) os.Exit(1) } } else { if resp.StatusCode != 404 { - // 补全 localPath - if localPath[0] != '/' { - dirPath, err := os.Getwd() - if err != nil { - _, _ = fmt.Fprintln(os.Stderr, err) - os.Exit(1) - } - localPath = dirPath + "/" + localPath - } + //isWindowsAbsolute, err := regexp.MatchString(WindowsAbsolutePattern, localPath) + //if err != nil { + // _, _ = fmt.Fprintln(os.Stderr, err) + // os.Exit(1) + //} + //// 补全 localPath + //if localPath[0] != '/' && !isWindowsAbsolute { + // dirPath, err := os.Getwd() + // if err != nil { + // _, _ = fmt.Fprintln(os.Stderr, err) + // os.Exit(1) + // } + // localPath = dirPath + "/" + localPath + //} cosCrc := resp.Header.Get("x-cos-hash-crc64ecma") localCrc, _ := CalculateHash(localPath, "crc64") if cosCrc == localCrc { @@ -44,12 +49,12 @@ func SyncSingleUpload(c *cos.Client, localPath string, bucketName string, cosPat } } - SingleUpload(c, localPath, bucketName, cosPath, storageClass) + SingleUpload(c, localPath, bucketName, cosPath, storageClass, rateLimiting, partSize, threadNum) } } -func SyncMultiUpload(c *cos.Client, localDir string, bucketName string, cosDir string, include string, exclude string, storageClass string) { - if localDir != "" && localDir[len(localDir)-1] != '/' { +func SyncMultiUpload(c *cos.Client, localDir string, bucketName string, cosDir string, include string, exclude string, storageClass string, rateLimiting float32, partSize int64, threadNum int) { + if localDir != "" && (localDir[len(localDir)-1] != '/' || localDir[len(localDir)-1] !='\\') { localDir += "/" } if cosDir != "" && cosDir[len(cosDir)-1] != '/' { @@ -62,16 +67,16 @@ func SyncMultiUpload(c *cos.Client, localDir string, bucketName string, cosDir s localPath := localDir + f cosPath := cosDir + f - SyncSingleUpload(c, localPath, bucketName, cosPath, storageClass) + SyncSingleUpload(c, localPath, bucketName, cosPath, storageClass, rateLimiting, partSize, threadNum) } } -func SyncSingleDownload(c *cos.Client, bucketName string, cosPath string, localPath string) { +func SyncSingleDownload(c *cos.Client, bucketName string, cosPath string, localPath string, rateLimiting float32, partSize int64, threadNum int) { _, err := os.Stat(localPath) if err != nil { if os.IsNotExist(err) { // 文件不在本地,下载 - SingleDownload(c, bucketName, cosPath, localPath) + SingleDownload(c, bucketName, cosPath, localPath, rateLimiting, partSize, threadNum) } else { _, _ = fmt.Fprintln(os.Stderr, err) os.Exit(1) @@ -84,11 +89,11 @@ func SyncSingleDownload(c *cos.Client, bucketName string, cosPath string, localP return } - SingleDownload(c, bucketName, cosPath, localPath) + SingleDownload(c, bucketName, cosPath, localPath, rateLimiting, partSize, threadNum) } } -func SyncMultiDownload(c *cos.Client, bucketName string, cosDir string, localDir string, include string, exclude string) { +func SyncMultiDownload(c *cos.Client, bucketName string, cosDir string, localDir string, include string, exclude string, rateLimiting float32, partSize int64, threadNum int) { if localDir != "" && localDir[len(localDir)-1] != '/' { localDir += "/" } @@ -101,6 +106,6 @@ func SyncMultiDownload(c *cos.Client, bucketName string, cosDir string, localDir for _, o := range objects { objName := o.Key[len(cosDir):] localPath := localDir + objName - SyncSingleDownload(c, bucketName, o.Key, localPath) + SyncSingleDownload(c, bucketName, o.Key, localPath, rateLimiting, partSize, threadNum) } } diff --git a/util/upload.go b/util/upload.go index fa871f9..8a4871d 100644 --- a/util/upload.go +++ b/util/upload.go @@ -9,7 +9,7 @@ import ( "strings" ) -func SingleUpload(c *cos.Client, localPath string, bucketName string, cosPath string, storageClass string) { +func SingleUpload(c *cos.Client, localPath string, bucketName string, cosPath string, storageClass string, rateLimiting float32, partSize int64, threadNum int) { opt := &cos.MultiUploadOptions{ OptIni: &cos.InitiateMultipartUploadOptions{ ACLHeaderOptions: &cos.ACLHeaderOptions{ @@ -38,12 +38,12 @@ func SingleUpload(c *cos.Client, localPath string, bucketName string, cosPath st XCosSSECustomerKey: "", XCosSSECustomerKeyMD5: "", XOptionHeader: nil, - XCosTrafficLimit: 0, + XCosTrafficLimit: (int)(rateLimiting * 1024 * 1024 * 8), Listener: &CosListener{}, }, }, - PartSize: 32, - ThreadPoolSize: 5, + PartSize: partSize, + ThreadPoolSize: threadNum, CheckPoint: true, EnableVerification: false, } @@ -83,8 +83,8 @@ func SingleUpload(c *cos.Client, localPath string, bucketName string, cosPath st } } -func MultiUpload(c *cos.Client, localDir string, bucketName string, cosDir string, include string, exclude string, storageClass string) { - if localDir != "" && localDir[len(localDir)-1] != '/' { +func MultiUpload(c *cos.Client, localDir string, bucketName string, cosDir string, include string, exclude string, storageClass string, rateLimiting float32, partSize int64, threadNum int) { + if localDir != "" && (localDir[len(localDir)-1] != '/' || localDir[len(localDir)-1] != '\\') { localDir += "/" } if cosDir != "" && cosDir[len(cosDir)-1] != '/' { @@ -97,6 +97,6 @@ func MultiUpload(c *cos.Client, localDir string, bucketName string, cosDir strin localPath := localDir + f cosPath := cosDir + f - SingleUpload(c, localPath, bucketName, cosPath, storageClass) + SingleUpload(c, localPath, bucketName, cosPath, storageClass, rateLimiting, partSize, threadNum) } }