Skip to content

Commit

Permalink
Bug Fix & Add Feature: Fix the bug of path parsing error under window…
Browse files Browse the repository at this point in the history
…s, Add speed limit function
  • Loading branch information
ChloroplastYu committed Nov 16, 2021
1 parent 179519e commit f28550c
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 52 deletions.
22 changes: 14 additions & 8 deletions cmd/cp.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,18 @@ Example:
include, _ := cmd.Flags().GetString("include")
exclude, _ := cmd.Flags().GetString("exclude")
storageClass, _ := cmd.Flags().GetString("storage-class")
rateLimiting, _ := cmd.Flags().GetFloat32("rate-limiting")
partSize, _ := cmd.Flags().GetInt64("part-size")
threadNum, _ := cmd.Flags().GetInt("thread-num")
// args[0]: 源地址
// args[1]: 目标地址
if !util.IsCosPath(args[0]) && util.IsCosPath(args[1]) {
// 上传
upload(args, recursive, include, exclude, storageClass)
upload(args, recursive, include, exclude, storageClass, rateLimiting, partSize, threadNum)
}
if util.IsCosPath(args[0]) && !util.IsCosPath(args[1]) {
// 下载
download(args, recursive, include, exclude)
download(args, recursive, include, exclude, rateLimiting, partSize, threadNum)
}
if util.IsCosPath(args[0]) && util.IsCosPath(args[1]) {
// 拷贝
Expand All @@ -63,29 +66,32 @@ func init() {
cpCmd.Flags().String("include", "", "Include files that meet the specified criteria")
cpCmd.Flags().String("exclude", "", "Exclude files that meet the specified criteria")
cpCmd.Flags().String("storage-class", "", "Specifying a storage class")
cpCmd.Flags().Float32("rate-limiting", 0, "Upload or download speed limit(MB/s)")
cpCmd.Flags().Int64("part-size", 32, "Specifies the block size(MB)")
cpCmd.Flags().Int("thread-num", 5, "Specifies the number of concurrent upload or download threads")
}

func upload(args []string, recursive bool, include string, exclude string, storageClass string) {
func upload(args []string, recursive bool, include string, exclude string, storageClass string, rateLimiting float32, partSize int64, threadNum int) {
_, localPath := util.ParsePath(args[0])
bucketName, cosPath := util.ParsePath(args[1])
c := util.NewClient(&config, bucketName)

if recursive {
util.MultiUpload(c, localPath, bucketName, cosPath, include, exclude, storageClass)
util.MultiUpload(c, localPath, bucketName, cosPath, include, exclude, storageClass, rateLimiting, partSize, threadNum)
} else {
util.SingleUpload(c, localPath, bucketName, cosPath, storageClass)
util.SingleUpload(c, localPath, bucketName, cosPath, storageClass, rateLimiting, partSize, threadNum)
}
}

func download(args []string, recursive bool, include string, exclude string) {
func download(args []string, recursive bool, include string, exclude string, rateLimiting float32, partSize int64, threadNum int) {
bucketName, cosPath := util.ParsePath(args[0])
_, localPath := util.ParsePath(args[1])
c := util.NewClient(&config, bucketName)

if recursive {
util.MultiDownload(c, bucketName, cosPath, localPath, include, exclude)
util.MultiDownload(c, bucketName, cosPath, localPath, include, exclude, rateLimiting, partSize, threadNum)
} else {
util.SingleDownload(c, bucketName, cosPath, localPath)
util.SingleDownload(c, bucketName, cosPath, localPath, rateLimiting, partSize, threadNum)
}
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/rm.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func removeObjects1(args []string, include string, exclude string, force bool) {
bucketName, cosDir := util.ParsePath(arg)
c := util.NewClient(&config, bucketName)

if cosDir != "" && (cosDir[len(cosDir)-1] != '/' || cosDir[len(cosDir)-1] != '\\') {
if cosDir != "" && cosDir[len(cosDir)-1] != '/' {
cosDir += "/"
}

Expand Down
22 changes: 14 additions & 8 deletions cmd/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,18 @@ Example:
include, _ := cmd.Flags().GetString("include")
exclude, _ := cmd.Flags().GetString("exclude")
storageClass, _ := cmd.Flags().GetString("storage-class")
rateLimiting, _ := cmd.Flags().GetFloat32("rate-limiting")
partSize, _ := cmd.Flags().GetInt64("part-size")
threadNum, _ := cmd.Flags().GetInt("thread-num")
// args[0]: 源地址
// args[1]: 目标地址
if !util.IsCosPath(args[0]) && util.IsCosPath(args[1]) {
// 上传
syncUpload(args, recursive, include, exclude, storageClass)
syncUpload(args, recursive, include, exclude, storageClass, rateLimiting, partSize, threadNum)
}
if util.IsCosPath(args[0]) && !util.IsCosPath(args[1]) {
// 下载
syncDownload(args, recursive, include, exclude)
syncDownload(args, recursive, include, exclude, rateLimiting, partSize, threadNum)
}
if util.IsCosPath(args[0]) && util.IsCosPath(args[1]) {
// 拷贝
Expand All @@ -64,29 +67,32 @@ func init() {
syncCmd.Flags().String("include", "", "List files that meet the specified criteria")
syncCmd.Flags().String("exclude", "", "Exclude files that meet the specified criteria")
syncCmd.Flags().String("storage-class", "", "Specifying a storage class")
syncCmd.Flags().Float32("rate-limiting", 0, "Upload or download speed limit(MB/s)")
syncCmd.Flags().Int64("part-size", 32, "Specifies the block size(MB)")
syncCmd.Flags().Int("thread-num", 5, "Specifies the number of concurrent upload or download threads")
}

func syncUpload(args []string, recursive bool, include string, exclude string, storageClass string) {
func syncUpload(args []string, recursive bool, include string, exclude string, storageClass string, rateLimiting float32, partSize int64, threadNum int) {
_, localPath := util.ParsePath(args[0])
bucketName, cosPath := util.ParsePath(args[1])
c := util.NewClient(&config, bucketName)

if recursive {
util.SyncMultiUpload(c, localPath, bucketName, cosPath, include, exclude, storageClass)
util.SyncMultiUpload(c, localPath, bucketName, cosPath, include, exclude, storageClass, rateLimiting, partSize, threadNum)
} else {
util.SyncSingleUpload(c, localPath, bucketName, cosPath, storageClass)
util.SyncSingleUpload(c, localPath, bucketName, cosPath, storageClass, rateLimiting, partSize, threadNum)
}
}

func syncDownload(args []string, recursive bool, include string, exclude string) {
func syncDownload(args []string, recursive bool, include string, exclude string, rateLimiting float32, partSize int64, threadNum int) {
bucketName, cosPath := util.ParsePath(args[0])
_, localPath := util.ParsePath(args[1])
c := util.NewClient(&config, bucketName)

if recursive {
util.SyncMultiDownload(c, bucketName, cosPath, localPath, include, exclude)
util.SyncMultiDownload(c, bucketName, cosPath, localPath, include, exclude, rateLimiting, partSize, threadNum)
} else {
util.SyncSingleDownload(c, bucketName, cosPath, localPath)
util.SyncSingleDownload(c, bucketName, cosPath, localPath, rateLimiting, partSize, threadNum)
}
}

Expand Down
16 changes: 8 additions & 8 deletions util/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"strings"
)

func SingleDownload(c *cos.Client, bucketName string, cosPath string, localPath string) {
func SingleDownload(c *cos.Client, bucketName string, cosPath string, localPath string, rateLimiting float32, partSize int64, threadNum int) {
opt := &cos.MultiDownloadOptions{
Opt: &cos.ObjectGetOptions{
ResponseContentType: "",
Expand All @@ -24,11 +24,11 @@ func SingleDownload(c *cos.Client, bucketName string, cosPath string, localPath
XCosSSECustomerKey: "",
XCosSSECustomerKeyMD5: "",
XOptionHeader: nil,
XCosTrafficLimit: 0,
XCosTrafficLimit: (int)(rateLimiting * 1024 * 1024 * 8),
Listener: &CosListener{},
},
PartSize: 32,
ThreadPoolSize: 5,
PartSize: partSize,
ThreadPoolSize: threadNum,
CheckPoint: true,
CheckPointFile: "",
}
Expand All @@ -52,7 +52,7 @@ func SingleDownload(c *cos.Client, bucketName string, cosPath string, localPath

// 创建文件夹
var path string
if localPath[len(localPath) - 1] == '/' {
if localPath[len(localPath) - 1] == '/' || localPath[len(localPath) - 1] == '\\' {
pathList := strings.Split(cosPath, "/")
fileName := pathList[len(pathList)-1]
path = localPath
Expand All @@ -77,8 +77,8 @@ func SingleDownload(c *cos.Client, bucketName string, cosPath string, localPath
}
}

func MultiDownload(c *cos.Client, bucketName string, cosDir string, localDir string, include string, exclude string) {
if localDir != "" && localDir[len(localDir)-1] != '/' {
func MultiDownload(c *cos.Client, bucketName string, cosDir string, localDir string, include string, exclude string, rateLimiting float32, partSize int64, threadNum int) {
if localDir != "" && (localDir[len(localDir)-1] != '/' || localDir[len(localDir)-1] != '\\') {
localDir += "/"
}
if cosDir != "" && cosDir[len(cosDir)-1] != '/' {
Expand All @@ -90,6 +90,6 @@ func MultiDownload(c *cos.Client, bucketName string, cosDir string, localDir str
for _, o := range objects {
objName := o.Key[len(cosDir):]
localPath := localDir + objName
SingleDownload(c, bucketName, o.Key, localPath)
SingleDownload(c, bucketName, o.Key, localPath, rateLimiting, partSize, threadNum)
}
}
45 changes: 25 additions & 20 deletions util/synchronize.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"os"
)

func SyncSingleUpload(c *cos.Client, localPath string, bucketName string, cosPath string, storageClass string) {
func SyncSingleUpload(c *cos.Client, localPath string, bucketName string, cosPath string, storageClass string, rateLimiting float32, partSize int64, threadNum int) {
headOpt := &cos.ObjectHeadOptions{
IfModifiedSince: "",
XCosSSECustomerAglo: "",
Expand All @@ -20,22 +20,27 @@ func SyncSingleUpload(c *cos.Client, localPath string, bucketName string, cosPat
if err != nil {
if resp != nil && resp.StatusCode == 404 {
// 文件不在cos上,上传
SingleUpload(c, localPath, bucketName, cosPath, storageClass)
SingleUpload(c, localPath, bucketName, cosPath, storageClass, rateLimiting, partSize, threadNum)
} else {
_, _ = fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
} else {
if resp.StatusCode != 404 {
// 补全 localPath
if localPath[0] != '/' {
dirPath, err := os.Getwd()
if err != nil {
_, _ = fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
localPath = dirPath + "/" + localPath
}
//isWindowsAbsolute, err := regexp.MatchString(WindowsAbsolutePattern, localPath)
//if err != nil {
// _, _ = fmt.Fprintln(os.Stderr, err)
// os.Exit(1)
//}
//// 补全 localPath
//if localPath[0] != '/' && !isWindowsAbsolute {
// dirPath, err := os.Getwd()
// if err != nil {
// _, _ = fmt.Fprintln(os.Stderr, err)
// os.Exit(1)
// }
// localPath = dirPath + "/" + localPath
//}
cosCrc := resp.Header.Get("x-cos-hash-crc64ecma")
localCrc, _ := CalculateHash(localPath, "crc64")
if cosCrc == localCrc {
Expand All @@ -44,12 +49,12 @@ func SyncSingleUpload(c *cos.Client, localPath string, bucketName string, cosPat
}
}

SingleUpload(c, localPath, bucketName, cosPath, storageClass)
SingleUpload(c, localPath, bucketName, cosPath, storageClass, rateLimiting, partSize, threadNum)
}
}

func SyncMultiUpload(c *cos.Client, localDir string, bucketName string, cosDir string, include string, exclude string, storageClass string) {
if localDir != "" && localDir[len(localDir)-1] != '/' {
func SyncMultiUpload(c *cos.Client, localDir string, bucketName string, cosDir string, include string, exclude string, storageClass string, rateLimiting float32, partSize int64, threadNum int) {
if localDir != "" && (localDir[len(localDir)-1] != '/' || localDir[len(localDir)-1] !='\\') {
localDir += "/"
}
if cosDir != "" && cosDir[len(cosDir)-1] != '/' {
Expand All @@ -62,16 +67,16 @@ func SyncMultiUpload(c *cos.Client, localDir string, bucketName string, cosDir s
localPath := localDir + f
cosPath := cosDir + f

SyncSingleUpload(c, localPath, bucketName, cosPath, storageClass)
SyncSingleUpload(c, localPath, bucketName, cosPath, storageClass, rateLimiting, partSize, threadNum)
}
}

func SyncSingleDownload(c *cos.Client, bucketName string, cosPath string, localPath string) {
func SyncSingleDownload(c *cos.Client, bucketName string, cosPath string, localPath string, rateLimiting float32, partSize int64, threadNum int) {
_, err := os.Stat(localPath)
if err != nil {
if os.IsNotExist(err) {
// 文件不在本地,下载
SingleDownload(c, bucketName, cosPath, localPath)
SingleDownload(c, bucketName, cosPath, localPath, rateLimiting, partSize, threadNum)
} else {
_, _ = fmt.Fprintln(os.Stderr, err)
os.Exit(1)
Expand All @@ -84,11 +89,11 @@ func SyncSingleDownload(c *cos.Client, bucketName string, cosPath string, localP
return
}

SingleDownload(c, bucketName, cosPath, localPath)
SingleDownload(c, bucketName, cosPath, localPath, rateLimiting, partSize, threadNum)
}
}

func SyncMultiDownload(c *cos.Client, bucketName string, cosDir string, localDir string, include string, exclude string) {
func SyncMultiDownload(c *cos.Client, bucketName string, cosDir string, localDir string, include string, exclude string, rateLimiting float32, partSize int64, threadNum int) {
if localDir != "" && localDir[len(localDir)-1] != '/' {
localDir += "/"
}
Expand All @@ -101,6 +106,6 @@ func SyncMultiDownload(c *cos.Client, bucketName string, cosDir string, localDir
for _, o := range objects {
objName := o.Key[len(cosDir):]
localPath := localDir + objName
SyncSingleDownload(c, bucketName, o.Key, localPath)
SyncSingleDownload(c, bucketName, o.Key, localPath, rateLimiting, partSize, threadNum)
}
}
14 changes: 7 additions & 7 deletions util/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"strings"
)

func SingleUpload(c *cos.Client, localPath string, bucketName string, cosPath string, storageClass string) {
func SingleUpload(c *cos.Client, localPath string, bucketName string, cosPath string, storageClass string, rateLimiting float32, partSize int64, threadNum int) {
opt := &cos.MultiUploadOptions{
OptIni: &cos.InitiateMultipartUploadOptions{
ACLHeaderOptions: &cos.ACLHeaderOptions{
Expand Down Expand Up @@ -38,12 +38,12 @@ func SingleUpload(c *cos.Client, localPath string, bucketName string, cosPath st
XCosSSECustomerKey: "",
XCosSSECustomerKeyMD5: "",
XOptionHeader: nil,
XCosTrafficLimit: 0,
XCosTrafficLimit: (int)(rateLimiting * 1024 * 1024 * 8),
Listener: &CosListener{},
},
},
PartSize: 32,
ThreadPoolSize: 5,
PartSize: partSize,
ThreadPoolSize: threadNum,
CheckPoint: true,
EnableVerification: false,
}
Expand Down Expand Up @@ -83,8 +83,8 @@ func SingleUpload(c *cos.Client, localPath string, bucketName string, cosPath st
}
}

func MultiUpload(c *cos.Client, localDir string, bucketName string, cosDir string, include string, exclude string, storageClass string) {
if localDir != "" && localDir[len(localDir)-1] != '/' {
func MultiUpload(c *cos.Client, localDir string, bucketName string, cosDir string, include string, exclude string, storageClass string, rateLimiting float32, partSize int64, threadNum int) {
if localDir != "" && (localDir[len(localDir)-1] != '/' || localDir[len(localDir)-1] != '\\') {
localDir += "/"
}
if cosDir != "" && cosDir[len(cosDir)-1] != '/' {
Expand All @@ -97,6 +97,6 @@ func MultiUpload(c *cos.Client, localDir string, bucketName string, cosDir strin
localPath := localDir + f
cosPath := cosDir + f

SingleUpload(c, localPath, bucketName, cosPath, storageClass)
SingleUpload(c, localPath, bucketName, cosPath, storageClass, rateLimiting, partSize, threadNum)
}
}

0 comments on commit f28550c

Please sign in to comment.