Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

8422 list multipart uploads #8531

Open
wants to merge 50 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
e7fb7b8
Revert "docs: clarify auditing availble on enterprise (#8289)"
ItamarYuran Oct 23, 2024
cd5b80d
Merge branch 'master' of github.com:treeverse/lakeFS
ItamarYuran Jan 2, 2025
299032a
Merge branch 'master' of github.com:treeverse/lakeFS
ItamarYuran Jan 19, 2025
d9ea540
no work no mess
ItamarYuran Jan 20, 2025
7b1500d
got it
ItamarYuran Jan 21, 2025
08ad93c
f**** auditing
ItamarYuran Jan 21, 2025
1f7bf56
with tests
ItamarYuran Jan 21, 2025
2fbe42a
slight adjustment
ItamarYuran Jan 21, 2025
11f8d46
cross ya fingaz
ItamarYuran Jan 21, 2025
08977ec
yalla
ItamarYuran Jan 21, 2025
749f305
yalla
ItamarYuran Jan 21, 2025
7d121bd
tests dont yet work
ItamarYuran Jan 22, 2025
e04cd1b
check output
ItamarYuran Jan 22, 2025
dfc6699
check output
ItamarYuran Jan 22, 2025
8aa34f5
clean
ItamarYuran Jan 22, 2025
f8c8215
no tests
ItamarYuran Jan 22, 2025
cff37f2
tests
ItamarYuran Jan 23, 2025
219d4f4
ooooo baby
ItamarYuran Jan 23, 2025
0a37251
moving further
ItamarYuran Jan 23, 2025
52ef352
check errr
ItamarYuran Jan 23, 2025
ea30b22
check errr
ItamarYuran Jan 23, 2025
62f9e44
err not found
ItamarYuran Jan 23, 2025
1bdd16f
err not found
ItamarYuran Jan 23, 2025
644a390
yalla
ItamarYuran Jan 23, 2025
22c1756
check
ItamarYuran Jan 23, 2025
3d9225e
check
ItamarYuran Jan 23, 2025
b1d588b
schlafen
ItamarYuran Jan 23, 2025
f6d95c7
adapter
ItamarYuran Jan 23, 2025
9a4c5e1
namespace
ItamarYuran Jan 26, 2025
033784a
thank yoou
ItamarYuran Jan 26, 2025
2542855
names
ItamarYuran Jan 26, 2025
c74b714
refactor
ItamarYuran Jan 27, 2025
deb1fd3
refactored now
ItamarYuran Jan 27, 2025
5922ef8
no error
ItamarYuran Jan 27, 2025
2f740f4
trimmming
ItamarYuran Jan 27, 2025
f91771d
corrections
ItamarYuran Jan 27, 2025
b3e5884
fixin
ItamarYuran Jan 27, 2025
5a20568
yalla
ItamarYuran Jan 27, 2025
62f8bbb
testing key marker
ItamarYuran Jan 28, 2025
8f55605
is truncated
ItamarYuran Jan 28, 2025
645e5cf
with error not implemented
ItamarYuran Jan 28, 2025
4ecb691
testts will now pass
ItamarYuran Jan 28, 2025
58961d1
truncated for real
ItamarYuran Jan 28, 2025
fa64a69
tests
ItamarYuran Jan 29, 2025
82f74a3
final
ItamarYuran Jan 29, 2025
dfbee5c
kadima
ItamarYuran Jan 30, 2025
d21b831
final adjusments
ItamarYuran Jan 30, 2025
8ea2f8e
max uploads added
ItamarYuran Jan 30, 2025
eaf8972
kadima
ItamarYuran Jan 30, 2025
ad126b7
nu
ItamarYuran Jan 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 134 additions & 0 deletions esti/s3_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,140 @@ func TestS3IfNoneMatch(t *testing.T) {
}
}

func TestListMultipartUploads(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test should run only on S3 - I assume that the tests didn't pass for azure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @idanovo asked, these tests run for other adapters as well, only until the the first list mpu request in which they are expected to fail. then they are being skipped.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nopcoder I wanted to make sure we get the right error for other block stores

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, but I don't understand how it works as we currently do not support this flow for all adapters.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

	if blockStoreType != "s3" {
		require.Contains(t, err.Error(), "NotImplemented")
		return
	}

Copy link
Contributor

@nopcoder nopcoder Jan 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I see the check down the code of this function - but why do we even start to upload if we can't test it on non-s3 implementation?
we have a different test to check missing functionality

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be tested in another test.
@ItamarYuran, my suggestion is to:

  1. Change TestListMultipartUploads TestListMultipartUploadsUnsupported back to t.skipp with a proper message if blockStoreType != "s3"
  2. Change TestListMultipartUploadsUnsupported name to indicate it checks only unsupported parameters
  3. Add another test that only checks that ListMultipartUpload returns the right error if blockStoreType != "s3"
    @nopcoder WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think that the latest change include the above

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't

  1. We are returning and not skipping
  2. We don't test the error we get for object storage that are not s3

blockStoreType := viper.GetString(ViperBlockstoreType)
if blockStoreType != "s3" {
return
}
ctx, logger, repo := setupTest(t)
defer tearDownTest(repo)
s3Endpoint := viper.GetString("s3_endpoint")
s3Client := createS3Client(s3Endpoint, t)
multipartNumberOfParts := 3
multipartPartSize := 5 * 1024 * 1024

// create two objects for two mpus
obj1 := "object1"
obj2 := "object2"
keysPrefix := "main/"
key1 := keysPrefix + obj1
key2 := keysPrefix + obj2

input1 := &s3.CreateMultipartUploadInput{
Bucket: aws.String(repo),
Key: aws.String(key1),
}
input2 := &s3.CreateMultipartUploadInput{
Bucket: aws.String(repo),
Key: aws.String(key2),
}
// create first mpu
resp1, err := s3Client.CreateMultipartUpload(ctx, input1)
require.NoError(t, err, "failed to create multipart upload")
parts := make([][]byte, multipartNumberOfParts)
for i := 0; i < multipartNumberOfParts; i++ {
parts[i] = randstr.Bytes(multipartPartSize + i)
}

completedParts1 := uploadMultipartParts(t, ctx, s3Client, logger, resp1, parts, 0)

completeInput1 := &s3.CompleteMultipartUploadInput{
Bucket: resp1.Bucket,
Key: resp1.Key,
UploadId: resp1.UploadId,
MultipartUpload: &types.CompletedMultipartUpload{
Parts: completedParts1,
},
}
// check first mpu appears
output, err := s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket})
require.NoError(t, err, "error listing multiparts")
keys := extractUploadKeys(output)
require.Contains(t, keys, obj1)

// create second mpu check both appear
_, err = s3Client.CreateMultipartUpload(ctx, input2)
require.NoError(t, err, "failed to create multipart upload")
output, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket})
keys = extractUploadKeys(output)
require.Contains(t, keys, obj1)
require.Contains(t, keys, obj2)

// testing maxuploads - only first upload should return
maxUploads := aws.Int32(1)
output, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket, MaxUploads: maxUploads})
require.NoError(t, err, "failed to list multipart uploads")
keys = extractUploadKeys(output)
require.Contains(t, keys, obj1)
require.NotContains(t, keys, obj2)

// testing key marker and upload id marker for pagination. only records after marker should return
keyMarker := output.NextKeyMarker
uploadIDMarker := output.NextUploadIdMarker
output, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket, MaxUploads: maxUploads, KeyMarker: keyMarker, UploadIdMarker: uploadIDMarker})
require.NoError(t, err, "failed to list multipart uploads")
keys = extractUploadKeys(output)
require.NotContains(t, keys, obj1)
require.Contains(t, keys, obj2)

// finish first mpu check only second appear
_, err = s3Client.CompleteMultipartUpload(ctx, completeInput1)
nopcoder marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, err, "failed to complete multipart upload")
output, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket})
require.NoError(t, err, "error listing multiparts")
keys = extractUploadKeys(output)
require.NotContains(t, keys, obj1)
require.Contains(t, keys, obj2)

}

func TestListMultipartUploadsUnsupported(t *testing.T) {
blockStoreType := viper.GetString(ViperBlockstoreType)
if blockStoreType != "s3" {
return
}
ctx, _, repo := setupTest(t)
defer tearDownTest(repo)
s3Endpoint := viper.GetString("s3_endpoint")
s3Client := createS3Client(s3Endpoint, t)
Bucket := aws.String(repo)

delimiter := aws.String("/")
prefix := aws.String("prefix")
encodingType := types.EncodingTypeUrl

t.Run("Delimiter", func(t *testing.T) {
_, err := s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: Bucket, Delimiter: delimiter})
require.Error(t, err)
require.Contains(t, err.Error(), "NotImplemented")
})

t.Run("Prefix", func(t *testing.T) {
_, err := s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: Bucket, Prefix: prefix})
require.Error(t, err)
require.Contains(t, err.Error(), "NotImplemented")
})

t.Run("EncodingType", func(t *testing.T) {
_, err := s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: Bucket, EncodingType: encodingType})
require.Error(t, err)
require.Contains(t, err.Error(), "NotImplemented")
})
}

func extractUploadKeys(output *s3.ListMultipartUploadsOutput) []string {
if output == nil {
return nil
}
keys := make([]string, 0, len(output.Uploads))
for _, upload := range output.Uploads {
if upload.Key != nil {
keys = append(keys, *upload.Key)
}
}
return keys
}

func verifyObjectInfo(t *testing.T, got minio.ObjectInfo, expectedSize int) {
if got.Err != nil {
t.Errorf("%s: %s", got.Key, got.Err)
Expand Down
19 changes: 18 additions & 1 deletion pkg/block/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"net/http"
"net/url"
"time"

"github.com/aws/aws-sdk-go-v2/service/s3/types"
)

// MultipartPart single multipart information
Expand Down Expand Up @@ -121,6 +123,14 @@ type ListPartsResponse struct {
IsTruncated bool
}

type ListMultipartUploadsResponse struct {
Uploads []types.MultipartUpload
NextUploadIDMarker *string
NextKeyMarker *string
IsTruncated bool
MaxUploads *int32
}

// CreateMultiPartUploadOpts contains optional arguments for
// CreateMultiPartUpload. These should be analogous to options on
// some underlying storage layer. Missing arguments are mapped to the
Expand All @@ -139,6 +149,12 @@ type ListPartsOpts struct {
PartNumberMarker *string
}

type ListMultipartUploadsOpts struct {
MaxUploads *int32
UploadIDMarker *string
KeyMarker *string
}

// Properties of an object stored on the underlying block store.
// Refer to the actual underlying Adapter for which properties are
// actually reported.
Expand Down Expand Up @@ -187,8 +203,9 @@ type Adapter interface {

CreateMultiPartUpload(ctx context.Context, obj ObjectPointer, r *http.Request, opts CreateMultiPartUploadOpts) (*CreateMultiPartUploadResponse, error)
UploadPart(ctx context.Context, obj ObjectPointer, sizeBytes int64, reader io.Reader, uploadID string, partNumber int) (*UploadPartResponse, error)
ListParts(ctx context.Context, obj ObjectPointer, uploadID string, opts ListPartsOpts) (*ListPartsResponse, error)
UploadCopyPart(ctx context.Context, sourceObj, destinationObj ObjectPointer, uploadID string, partNumber int) (*UploadPartResponse, error)
ListParts(ctx context.Context, obj ObjectPointer, uploadID string, opts ListPartsOpts) (*ListPartsResponse, error)
ListMultipartUploads(ctx context.Context, obj ObjectPointer, opts ListMultipartUploadsOpts) (*ListMultipartUploadsResponse, error)
UploadCopyPartRange(ctx context.Context, sourceObj, destinationObj ObjectPointer, uploadID string, partNumber int, startPosition, endPosition int64) (*UploadPartResponse, error)
AbortMultiPartUpload(ctx context.Context, obj ObjectPointer, uploadID string) error
CompleteMultiPartUpload(ctx context.Context, obj ObjectPointer, uploadID string, multipartList *MultipartUploadCompletion) (*CompleteMultiPartUploadResponse, error)
Expand Down
3 changes: 3 additions & 0 deletions pkg/block/azure/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,9 @@ func (a *Adapter) GetPresignUploadPartURL(_ context.Context, _ block.ObjectPoint
func (a *Adapter) ListParts(_ context.Context, _ block.ObjectPointer, _ string, _ block.ListPartsOpts) (*block.ListPartsResponse, error) {
return nil, block.ErrOperationNotSupported
}
func (a *Adapter) ListMultipartUploads(_ context.Context, _ block.ObjectPointer, _ block.ListMultipartUploadsOpts) (*block.ListMultipartUploadsResponse, error) {
return nil, block.ErrOperationNotSupported
}

// ParseURL - parses url and extracts account name and domain. If either are not found returns an error
func ParseURL(uri *url.URL) (accountName string, domain string, err error) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/block/gs/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,3 +717,6 @@ func (a *Adapter) GetPresignUploadPartURL(_ context.Context, _ block.ObjectPoint
func (a *Adapter) ListParts(_ context.Context, _ block.ObjectPointer, _ string, _ block.ListPartsOpts) (*block.ListPartsResponse, error) {
return nil, block.ErrOperationNotSupported
}
func (a *Adapter) ListMultipartUploads(_ context.Context, _ block.ObjectPointer, _ block.ListMultipartUploadsOpts) (*block.ListMultipartUploadsResponse, error) {
return nil, block.ErrOperationNotSupported
}
3 changes: 3 additions & 0 deletions pkg/block/local/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,3 +600,6 @@ func (l *Adapter) GetPresignUploadPartURL(_ context.Context, _ block.ObjectPoint
func (l *Adapter) ListParts(_ context.Context, _ block.ObjectPointer, _ string, _ block.ListPartsOpts) (*block.ListPartsResponse, error) {
return nil, block.ErrOperationNotSupported
}
func (l *Adapter) ListMultipartUploads(_ context.Context, _ block.ObjectPointer, _ block.ListMultipartUploadsOpts) (*block.ListMultipartUploadsResponse, error) {
return nil, block.ErrOperationNotSupported
}
3 changes: 3 additions & 0 deletions pkg/block/mem/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,3 +369,6 @@ func (a *Adapter) GetPresignUploadPartURL(_ context.Context, _ block.ObjectPoint
func (a *Adapter) ListParts(_ context.Context, _ block.ObjectPointer, _ string, _ block.ListPartsOpts) (*block.ListPartsResponse, error) {
return nil, block.ErrOperationNotSupported
}
func (a *Adapter) ListMultipartUploads(_ context.Context, _ block.ObjectPointer, _ block.ListMultipartUploadsOpts) (*block.ListMultipartUploadsResponse, error) {
return nil, block.ErrOperationNotSupported
}
4 changes: 4 additions & 0 deletions pkg/block/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ func (m *MetricsAdapter) ListParts(ctx context.Context, obj ObjectPointer, uploa
ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType())
return m.adapter.ListParts(ctx, obj, uploadID, opts)
}
func (m *MetricsAdapter) ListMultipartUploads(ctx context.Context, obj ObjectPointer, opts ListMultipartUploadsOpts) (*ListMultipartUploadsResponse, error) {
ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType())
return m.adapter.ListMultipartUploads(ctx, obj, opts)
}

func (m *MetricsAdapter) UploadCopyPart(ctx context.Context, sourceObj, destinationObj ObjectPointer, uploadID string, partNumber int) (*UploadPartResponse, error) {
ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType())
Expand Down
38 changes: 38 additions & 0 deletions pkg/block/s3/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,44 @@ func (a *Adapter) ListParts(ctx context.Context, obj block.ObjectPointer, upload
return &partsResp, nil
}

func (a *Adapter) ListMultipartUploads(ctx context.Context, obj block.ObjectPointer, opts block.ListMultipartUploadsOpts) (*block.ListMultipartUploadsResponse, error) {
var err error
defer reportMetrics("ListMultipartUploads", time.Now(), nil, &err)
bucket, key, qualifiedKey, err := a.extractParamsFromObj(obj)
if err != nil {
return nil, err
}
input := &s3.ListMultipartUploadsInput{
Bucket: aws.String(bucket),
Prefix: aws.String(key),
MaxUploads: opts.MaxUploads,
UploadIdMarker: opts.UploadIDMarker,
KeyMarker: opts.KeyMarker,
}

lg := a.log(ctx).WithFields(logging.Fields{
"qualified_ns": qualifiedKey.GetStorageNamespace(),
"qualified_key": qualifiedKey.GetKey(),
"key": obj.Identifier,
})

client := a.clients.Get(ctx, bucket)
resp, err := client.ListMultipartUploads(ctx, input)
if err != nil {
lg.WithError(err).Error("List multipart uploads failed")
return nil, err
}

mpuResp := block.ListMultipartUploadsResponse{
Uploads: resp.Uploads,
NextUploadIDMarker: resp.NextUploadIdMarker,
NextKeyMarker: resp.NextKeyMarker,
IsTruncated: aws.ToBool(resp.IsTruncated),
MaxUploads: resp.MaxUploads,
}
return &mpuResp, nil
}

func (a *Adapter) BlockstoreType() string {
return block.BlockstoreTypeS3
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/block/transient/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,3 +178,6 @@ func (a *Adapter) GetPresignUploadPartURL(_ context.Context, _ block.ObjectPoint
func (a *Adapter) ListParts(_ context.Context, _ block.ObjectPointer, _ string, _ block.ListPartsOpts) (*block.ListPartsResponse, error) {
return nil, block.ErrOperationNotSupported
}
func (a *Adapter) ListMultipartUploads(_ context.Context, _ block.ObjectPointer, _ block.ListMultipartUploadsOpts) (*block.ListMultipartUploadsResponse, error) {
return nil, block.ErrOperationNotSupported
}
Loading
Loading