Skip to content

Commit

Permalink
Fix few bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
cl-bvl committed Jan 25, 2024
1 parent 3ad83b9 commit 9e2eb7f
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 24 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ With 128 workers we get avg sync speed around 2k obj/sec (small objects 1-20 kb)
* Each object is loaded into RAM. So you need `<avg object size> * <workers count>` RAM.
If you don't have enough RAM, you can use swap. A large (32-64 Gb) swap on SSD does not affect the tool performance.
This happened because the tool was designed to synchronize billions of small files and optimized for this workload.
To avoid this you can use streaming storage drivers (now available only for S3 and FS). It's uses less RAM, but slower on small objects.

## Usage
```
Expand Down
1 change: 1 addition & 0 deletions cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ type args struct {
DisableHTTP2 bool `arg:"--disable-http2" help:"Disable HTTP2 for http client"`
ListBuffer uint `arg:"--list-buffer" help:"Size of list buffer" default:"1000"`
SkipSSLVerify bool `arg:"--skip-ssl-verify" help:"Disable SSL verification for S3"`
ServerGzip bool `arg:"--server-gzip" help:"Workaround for S3 servers with enabled gzip compression for all files."`
Profiler bool `arg:"--profiler" help:"Enable profiler on :8080"`
// Rate Limit
RateLimitObjPerSec uint `arg:"--ratelimit-objects" help:"Rate limit objects per second"`
Expand Down
4 changes: 2 additions & 2 deletions cli/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func setupStorages(ctx context.Context, syncGroup *pipeline.Group, cli *argsPars
switch cli.Source.Type {
case storage.TypeS3:
sourceStorage = s3.NewS3Storage(cli.SourceNoSign, cli.SourceKey, cli.SourceSecret, cli.SourceToken, cli.SourceRegion, cli.SourceEndpoint,
cli.Source.Bucket, cli.Source.Path, cli.S3KeysPerReq, cli.S3Retry, cli.S3RetryInterval, cli.SkipSSLVerify,
cli.Source.Bucket, cli.Source.Path, cli.S3KeysPerReq, cli.S3Retry, cli.S3RetryInterval, cli.SkipSSLVerify, cli.ServerGzip,
)
case storage.TypeS3Stream:
sourceStorage = s3stream.NewS3StreamStorage(cli.SourceNoSign, cli.SourceKey, cli.SourceSecret, cli.SourceToken, cli.SourceRegion, cli.SourceEndpoint,
Expand All @@ -39,7 +39,7 @@ func setupStorages(ctx context.Context, syncGroup *pipeline.Group, cli *argsPars
switch cli.Target.Type {
case storage.TypeS3:
targetStorage = s3.NewS3Storage(cli.TargetNoSign, cli.TargetKey, cli.TargetSecret, cli.TargetToken, cli.TargetRegion, cli.TargetEndpoint,
cli.Target.Bucket, cli.Target.Path, cli.S3KeysPerReq, cli.S3Retry, cli.S3RetryInterval, cli.SkipSSLVerify,
cli.Target.Bucket, cli.Target.Path, cli.S3KeysPerReq, cli.S3Retry, cli.S3RetryInterval, cli.SkipSSLVerify, cli.ServerGzip,
)
case storage.TypeS3Stream:
targetStorage = s3stream.NewS3StreamStorage(cli.TargetNoSign, cli.TargetKey, cli.TargetSecret, cli.TargetToken, cli.TargetRegion, cli.TargetEndpoint,
Expand Down
10 changes: 6 additions & 4 deletions pipeline/collection/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package collection

import (
"github.com/larrabee/ratelimit"
"github.com/sirupsen/logrus"

"github.com/larrabee/s3sync/pipeline"
"github.com/larrabee/s3sync/storage"
"github.com/sirupsen/logrus"
)

// Terminator like a /dev/null
Expand All @@ -27,9 +28,10 @@ var Logger pipeline.StepFn = func(group *pipeline.Group, stepNum int, input <-ch
for obj := range input {
if ok {
cfg.WithFields(logrus.Fields{
"key": *obj.Key,
"size": *obj.ContentLength,
"Content-Type": *obj.ContentType,
"key": storage.ToValue(obj.Key),
"size": storage.ToValue(obj.ContentLength),
"Content-Type": storage.ToValue(obj.ContentType),
"Content-Encoding": storage.ToValue(obj.ContentEncoding),
}).Infof("Sync file")
output <- obj
}
Expand Down
11 changes: 11 additions & 0 deletions storage/s3/opts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package s3

import (
"github.com/aws/aws-sdk-go/aws/request"
)

func withAcceptEncoding(e string) request.Option {
return func(r *request.Request) {
r.HTTPRequest.Header.Add("Accept-Encoding", e)
}
}
38 changes: 20 additions & 18 deletions storage/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ import (
"context"
"crypto/tls"
"errors"
"github.com/aws/aws-sdk-go/aws/request"
"io"
"net/http"
"net/url"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws/request"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/defaults"
Expand All @@ -34,12 +35,13 @@ type S3Storage struct {
ctx context.Context
listMarker *string
rlBucket ratelimit.Bucket
serverGzip bool
}

// NewS3Storage return new configured S3 storage.
//
// You should always create new storage with this constructor.
func NewS3Storage(awsNoSign bool, awsAccessKey, awsSecretKey, awsToken, awsRegion, endpoint, bucketName, prefix string, keysPerReq int64, retryCnt uint, retryDelay time.Duration, skipSSLVerify bool) *S3Storage {
func NewS3Storage(awsNoSign bool, awsAccessKey, awsSecretKey, awsToken, awsRegion, endpoint, bucketName, prefix string, keysPerReq int64, retryCnt uint, retryDelay time.Duration, skipSSLVerify bool, serverGzip bool) *S3Storage {
sess := session.Must(session.NewSessionWithOptions(session.Options{
SharedConfigState: session.SharedConfigEnable,
}))
Expand Down Expand Up @@ -83,6 +85,7 @@ func NewS3Storage(awsNoSign bool, awsAccessKey, awsSecretKey, awsToken, awsRegio
retryInterval: retryDelay,
ctx: context.TODO(),
rlBucket: ratelimit.NewFakeBucket(),
serverGzip: serverGzip,
}

return &st
Expand All @@ -105,7 +108,7 @@ func (st *S3Storage) WithRateLimit(limit int) error {

// List S3 bucket and send founded objects to chan.
func (st *S3Storage) List(output chan<- *storage.Object) error {
listObjectsFn := func(p *s3.ListObjectsOutput, lastPage bool) bool {
listObjectsFn := func(p *s3.ListObjectsV2Output, lastPage bool) bool {
for _, o := range p.Contents {
key, _ := url.QueryUnescape(aws.StringValue(o.Key))
key = strings.Replace(key, st.prefix, "", 1)
Expand All @@ -117,19 +120,19 @@ func (st *S3Storage) List(output chan<- *storage.Object) error {
IsLatest: aws.Bool(true),
}
}
st.listMarker = p.Marker
st.listMarker = p.NextContinuationToken
return !lastPage // continue paging
}

input := &s3.ListObjectsInput{
Bucket: st.awsBucket,
Prefix: aws.String(st.prefix),
MaxKeys: aws.Int64(st.keysPerReq),
EncodingType: aws.String(s3.EncodingTypeUrl),
Marker: st.listMarker,
input := &s3.ListObjectsV2Input{
Bucket: st.awsBucket,
Prefix: aws.String(st.prefix),
MaxKeys: aws.Int64(st.keysPerReq),
EncodingType: aws.String(s3.EncodingTypeUrl),
ContinuationToken: st.listMarker,
}

if err := st.awsSvc.ListObjectsPagesWithContext(st.ctx, input, listObjectsFn); err != nil {
if err := st.awsSvc.ListObjectsV2PagesWithContext(st.ctx, input, listObjectsFn); err != nil {
return err
}
storage.Log.Debugf("Listing bucket finished")
Expand Down Expand Up @@ -191,12 +194,6 @@ func (st *S3Storage) PutObject(obj *storage.Object) error {
return nil
}

func withAcceptEncoding(e string) request.Option {
return func(r *request.Request) {
r.HTTPRequest.Header.Add("Accept-Encoding", e)
}
}

// GetObjectContent read object content and metadata from S3.
func (st *S3Storage) GetObjectContent(obj *storage.Object) error {
input := &s3.GetObjectInput{
Expand All @@ -205,7 +202,12 @@ func (st *S3Storage) GetObjectContent(obj *storage.Object) error {
VersionId: obj.VersionId,
}

result, err := st.awsSvc.GetObjectWithContext(st.ctx, input, withAcceptEncoding("gzip"))
opts := make([]request.Option, 0, 1)
if !st.serverGzip {
opts = append(opts, withAcceptEncoding("gzip"))
}

result, err := st.awsSvc.GetObjectWithContext(st.ctx, input, opts...)
if err != nil {
return err
}
Expand Down
11 changes: 11 additions & 0 deletions storage/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,14 @@ func GetInsecureRandString(n int) string {

return sb.String()
}

func ToPtr[K any](val K) *K {
return &val
}

func ToValue[K any](val *K) K {
if val == nil {
return *new(K)
}
return *val
}

0 comments on commit 9e2eb7f

Please sign in to comment.