Skip to content

Commit

Permalink
Add StorageID to ObjectPointer (#8522)
Browse files Browse the repository at this point in the history
* Add StorageID to Repository entity

* Bug fixes

* Remove API changes

* Add StorageID to Repo endpoints

* Almost empty commit

* Revert almso empty commit

* Remove from API

* Add param to Creation

* Fix PR comments

* Fix tests

* Add basic unit-tests

* Fix param

* Update tests

* Allow only empty StorageID

* Revert "Allow only empty StorageID"

This reverts commit 298bc0d.

* Revert "Revert "Allow only empty StorageID""

This reverts commit 299c292.

* Change validation

* Remove StorageID in a test

* Update tests

* Add StorageID to ObjectPointer inits

* Add StorageID to WriteBlob

* Add StorageID to uploadFile

* Add log fields

* Add comments

* Remove go.work.sum

* Fix PR comments
  • Loading branch information
itaigilo authored Jan 22, 2025
1 parent 08411e4 commit 49becea
Show file tree
Hide file tree
Showing 24 changed files with 143 additions and 39 deletions.
35 changes: 31 additions & 4 deletions pkg/api/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ func (c *Controller) CreatePresignMultipartUpload(w http.ResponseWriter, r *http

// create a new multipart upload
mpuResp, err := c.BlockAdapter.CreateMultiPartUpload(ctx, block.ObjectPointer{
StorageID: repo.StorageID,
StorageNamespace: repo.StorageNamespace,
IdentifierType: block.IdentifierTypeRelative,
Identifier: address,
Expand All @@ -229,6 +230,7 @@ func (c *Controller) CreatePresignMultipartUpload(w http.ResponseWriter, r *http
for i := 0; i < swag.IntValue(params.Parts); i++ {
// generate a pre-signed PUT url for the given request
preSignedURL, err := c.BlockAdapter.GetPresignUploadPartURL(ctx, block.ObjectPointer{
StorageID: repo.StorageID,
StorageNamespace: repo.StorageNamespace,
Identifier: address,
IdentifierType: block.IdentifierTypeRelative,
Expand Down Expand Up @@ -300,6 +302,7 @@ func (c *Controller) AbortPresignMultipartUpload(w http.ResponseWriter, r *http.
}

if err := c.BlockAdapter.AbortMultiPartUpload(ctx, block.ObjectPointer{
StorageID: repo.StorageID,
StorageNamespace: repo.StorageNamespace,
IdentifierType: block.IdentifierTypeRelative,
Identifier: physicalAddress,
Expand Down Expand Up @@ -372,6 +375,7 @@ func (c *Controller) CompletePresignMultipartUpload(w http.ResponseWriter, r *ht
}

mpuResp, err := c.BlockAdapter.CompleteMultiPartUpload(ctx, block.ObjectPointer{
StorageID: repo.StorageID,
StorageNamespace: repo.StorageNamespace,
IdentifierType: block.IdentifierTypeRelative,
Identifier: physicalAddress,
Expand Down Expand Up @@ -702,6 +706,7 @@ func (c *Controller) GetPhysicalAddress(w http.ResponseWriter, r *http.Request,
if swag.BoolValue(params.Presign) {
// generate a pre-signed PUT url for the given request
preSignedURL, expiry, err := c.BlockAdapter.GetPreSignedURL(ctx, block.ObjectPointer{
StorageID: repo.StorageID,
StorageNamespace: repo.StorageNamespace,
Identifier: address,
IdentifierType: block.IdentifierTypeRelative,
Expand Down Expand Up @@ -2092,6 +2097,7 @@ func (c *Controller) ensureStorageNamespace(ctx context.Context, storageNamespac
// this serves two purposes, first, we maintain safety check for older lakeFS version.
// second, in scenarios where lakeFS shouldn't have access to the root namespace (i.e pre-sign URL only).
if c.Config.GetBaseConfig().Graveler.EnsureReadableRootNamespace {
// TODO (gilo): ObjectPointer init - add StorageID here
rootObj := block.ObjectPointer{
StorageNamespace: storageNamespace,
IdentifierType: block.IdentifierTypeRelative,
Expand All @@ -2109,6 +2115,7 @@ func (c *Controller) ensureStorageNamespace(ctx context.Context, storageNamespac

// check if the dummy file exists
obj := block.ObjectPointer{
// TODO (gilo): ObjectPointer init - add StorageID here
StorageNamespace: storageNamespace,
IdentifierType: block.IdentifierTypeRelative,
Identifier: dummyKey,
Expand Down Expand Up @@ -2559,6 +2566,7 @@ func (c *Controller) GetRunHookOutput(w http.ResponseWriter, r *http.Request, re

logPath := taskResult.LogPath()
reader, err := c.BlockAdapter.Get(ctx, block.ObjectPointer{
StorageID: repo.StorageID,
StorageNamespace: repo.StorageNamespace,
IdentifierType: block.IdentifierTypeRelative,
Identifier: logPath,
Expand Down Expand Up @@ -3206,13 +3214,19 @@ func (c *Controller) UploadObject(w http.ResponseWriter, r *http.Request, reposi
writeError(w, r, http.StatusInternalServerError, err)
return
}
opts := block.PutOpts{StorageClass: params.StorageClass}

var blob *upload.Blob
if mediaType != "multipart/form-data" {
// handle non-multipart, direct content upload
address := c.PathProvider.NewPath()
blob, err = upload.WriteBlob(ctx, c.BlockAdapter, repo.StorageNamespace, address, r.Body, r.ContentLength,
block.PutOpts{StorageClass: params.StorageClass})
objectPointer := block.ObjectPointer{
StorageID: repo.StorageID,
StorageNamespace: repo.StorageNamespace,
IdentifierType: block.IdentifierTypeRelative,
Identifier: address,
}
blob, err = upload.WriteBlob(ctx, c.BlockAdapter, objectPointer, r.Body, r.ContentLength, opts)
if err != nil {
writeError(w, r, http.StatusInternalServerError, err)
return
Expand Down Expand Up @@ -3240,8 +3254,13 @@ func (c *Controller) UploadObject(w http.ResponseWriter, r *http.Request, reposi
partName := part.FormName()
if partName == "content" {
// upload the first "content" and exit the loop
address := c.PathProvider.NewPath()
blob, err = upload.WriteBlob(ctx, c.BlockAdapter, repo.StorageNamespace, address, part, -1, block.PutOpts{StorageClass: params.StorageClass})
objectPointer := block.ObjectPointer{
StorageID: repo.StorageID,
StorageNamespace: repo.StorageNamespace,
IdentifierType: block.IdentifierTypeRelative,
Identifier: c.PathProvider.NewPath(),
}
blob, err = upload.WriteBlob(ctx, c.BlockAdapter, objectPointer, part, -1, opts)
if err != nil {
_ = part.Close()
writeError(w, r, http.StatusInternalServerError, err)
Expand Down Expand Up @@ -3626,6 +3645,7 @@ func (c *Controller) PrepareGarbageCollectionCommits(w http.ResponseWriter, r *h
if c.handleAPIError(ctx, w, r, err) {
return
}
// TODO (gilo): ObjectPointer init - add StorageID here
presignedURL, _, err := c.BlockAdapter.GetPreSignedURL(ctx, block.ObjectPointer{
Identifier: gcRunMetadata.CommitsCSVLocation,
IdentifierType: block.IdentifierTypeFull,
Expand Down Expand Up @@ -3856,6 +3876,7 @@ func (c *Controller) DumpRefs(w http.ResponseWriter, r *http.Request, repository
return
}
_, err = c.BlockAdapter.Put(ctx, block.ObjectPointer{
StorageID: repo.StorageID,
StorageNamespace: repo.StorageNamespace,
IdentifierType: block.IdentifierTypeRelative,
Identifier: fmt.Sprintf("%s/refs_manifest.json", c.Config.GetBaseConfig().Committed.BlockStoragePrefix),
Expand Down Expand Up @@ -4196,6 +4217,7 @@ func writeSymlink(ctx context.Context, repo *catalog.Repository, branch, path st
address := fmt.Sprintf("%s/%s/%s/%s/symlink.txt", lakeFSPrefix, repo.Name, branch, path)
data := strings.Join(addresses, "\n")
_, err := adapter.Put(ctx, block.ObjectPointer{
StorageID: repo.StorageID,
StorageNamespace: repo.StorageNamespace,
IdentifierType: block.IdentifierTypeRelative,
Identifier: address,
Expand Down Expand Up @@ -4403,6 +4425,7 @@ func (c *Controller) GetMetadataObject(w http.ResponseWriter, r *http.Request, r

// if pre-sign, return a redirect
pointer := block.ObjectPointer{
StorageID: repo.StorageID,
StorageNamespace: repo.StorageNamespace,
IdentifierType: block.IdentifierTypeRelative,
Identifier: objPath,
Expand Down Expand Up @@ -4482,6 +4505,7 @@ func (c *Controller) GetObject(w http.ResponseWriter, r *http.Request, repositor

// if pre-sign, return a redirect
pointer := block.ObjectPointer{
StorageID: repo.StorageID,
StorageNamespace: repo.StorageNamespace,
IdentifierType: entry.AddressType.ToIdentifierType(),
Identifier: entry.PhysicalAddress,
Expand Down Expand Up @@ -4628,6 +4652,7 @@ func (c *Controller) ListObjects(w http.ResponseWriter, r *http.Request, reposit
if authResponse.Allowed {
var expiry time.Time
objStat.PhysicalAddress, expiry, err = c.BlockAdapter.GetPreSignedURL(ctx, block.ObjectPointer{
StorageID: repo.StorageID,
StorageNamespace: repo.StorageNamespace,
IdentifierType: entry.AddressType.ToIdentifierType(),
Identifier: entry.PhysicalAddress,
Expand Down Expand Up @@ -4710,6 +4735,7 @@ func (c *Controller) StatObject(w http.ResponseWriter, r *http.Request, reposito
} else if swag.BoolValue(params.Presign) {
// need to pre-sign the physical address
preSignedURL, expiry, err := c.BlockAdapter.GetPreSignedURL(ctx, block.ObjectPointer{
StorageID: repo.StorageID,
StorageNamespace: repo.StorageNamespace,
IdentifierType: entry.AddressType.ToIdentifierType(),
Identifier: entry.PhysicalAddress,
Expand Down Expand Up @@ -4771,6 +4797,7 @@ func (c *Controller) GetUnderlyingProperties(w http.ResponseWriter, r *http.Requ

// read object properties from underlying storage
properties, err := c.BlockAdapter.GetProperties(ctx, block.ObjectPointer{
StorageID: repo.StorageID,
StorageNamespace: repo.StorageNamespace,
IdentifierType: entry.AddressType.ToIdentifierType(),
Identifier: entry.PhysicalAddress,
Expand Down
18 changes: 14 additions & 4 deletions pkg/api/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2549,8 +2549,13 @@ func TestController_ObjectsHeadObjectHandler(t *testing.T) {

buf := new(bytes.Buffer)
buf.WriteString("this is file content made up of bytes")
address := upload.DefaultPathProvider.NewPath()
blob, err := upload.WriteBlob(context.Background(), deps.blocks, onBlock(deps, "ns1"), address, buf, 37, block.PutOpts{StorageClass: &expensiveString})
objectPointer := block.ObjectPointer{
StorageID: "",
StorageNamespace: onBlock(deps, "ns1"),
IdentifierType: block.IdentifierTypeRelative,
Identifier: upload.DefaultPathProvider.NewPath(),
}
blob, err := upload.WriteBlob(context.Background(), deps.blocks, objectPointer, buf, 37, block.PutOpts{StorageClass: &expensiveString})
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -2625,8 +2630,13 @@ func TestController_ObjectsGetObjectHandler(t *testing.T) {

buf := new(bytes.Buffer)
buf.WriteString("this is file content made up of bytes")
address := upload.DefaultPathProvider.NewPath()
blob, err := upload.WriteBlob(context.Background(), deps.blocks, onBlock(deps, "ns1"), address, buf, 37, block.PutOpts{StorageClass: &expensiveString})
objectPointer := block.ObjectPointer{
StorageID: "",
StorageNamespace: onBlock(deps, "ns1"),
IdentifierType: block.IdentifierTypeRelative,
Identifier: upload.DefaultPathProvider.NewPath(),
}
blob, err := upload.WriteBlob(context.Background(), deps.blocks, objectPointer, buf, 37, block.PutOpts{StorageClass: &expensiveString})
if err != nil {
t.Fatal(err)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/block/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ const DefaultPreSignExpiryDuration = 15 * time.Minute
// ObjectPointer is a unique identifier of an object in the object
// store: the store is a 1:1 mapping between pointers and objects.
type ObjectPointer struct {
StorageID string
StorageNamespace string
Identifier string

Expand Down
1 change: 1 addition & 0 deletions pkg/block/azure/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ func ResolveBlobURLInfoFromURL(pathURL *url.URL) (BlobURLInfo, error) {

func resolveBlobURLInfo(obj block.ObjectPointer) (BlobURLInfo, error) {
key := obj.Identifier
// we're in the context of a specific storage here, so there's no need for StorageID.
defaultNamespace := obj.StorageNamespace
var qk BlobURLInfo
// check if the key is fully qualified
Expand Down
4 changes: 4 additions & 0 deletions pkg/block/blocktest/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func testAdapterGetRange(t *testing.T, adapter block.Adapter, storageNamespace s
part1 := "this is the first part "
part2 := "this is the last part"
_, err := adapter.Put(ctx, block.ObjectPointer{
StorageID: "",
StorageNamespace: storageNamespace,
Identifier: "test_file",
IdentifierType: block.IdentifierTypeRelative,
Expand All @@ -63,6 +64,7 @@ func testAdapterGetRange(t *testing.T, adapter block.Adapter, storageNamespace s
for _, tt := range cases {
t.Run(tt.name, func(t *testing.T) {
reader, err := adapter.GetRange(ctx, block.ObjectPointer{
StorageID: "",
StorageNamespace: storageNamespace,
Identifier: "test_file",
IdentifierType: block.IdentifierTypeRelative,
Expand All @@ -89,6 +91,7 @@ func testAdapterWalker(t *testing.T, adapter block.Adapter, storageNamespace str
for i := 0; i < filesAndFolders; i++ {
for j := 0; j < filesAndFolders; j++ {
_, err := adapter.Put(ctx, block.ObjectPointer{
StorageID: "",
StorageNamespace: storageNamespace,
Identifier: fmt.Sprintf("%s/folder_%d/test_file_%d", testPrefix, filesAndFolders-i-1, filesAndFolders-j-1),
IdentifierType: block.IdentifierTypeRelative,
Expand All @@ -98,6 +101,7 @@ func testAdapterWalker(t *testing.T, adapter block.Adapter, storageNamespace str
}

_, err := adapter.Put(ctx, block.ObjectPointer{
StorageID: "",
StorageNamespace: storageNamespace,
Identifier: fmt.Sprintf("%s/folder_0.txt", testPrefix),
IdentifierType: block.IdentifierTypeRelative,
Expand Down
8 changes: 8 additions & 0 deletions pkg/block/blocktest/basic_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func testAdapterPutGet(t *testing.T, adapter block.Adapter, storageNamespace, ex
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
obj := block.ObjectPointer{
StorageID: "",
StorageNamespace: storageNamespace,
Identifier: c.path,
IdentifierType: c.identifierType,
Expand All @@ -64,11 +65,13 @@ func testAdapterCopy(t *testing.T, adapter block.Adapter, storageNamespace strin
ctx := context.Background()
contents := "foo bar baz quux"
src := block.ObjectPointer{
StorageID: "",
StorageNamespace: storageNamespace,
Identifier: "src",
IdentifierType: block.IdentifierTypeRelative,
}
dst := block.ObjectPointer{
StorageID: "",
StorageNamespace: storageNamespace,
Identifier: "export/to/dst",
IdentifierType: block.IdentifierTypeRelative,
Expand Down Expand Up @@ -130,6 +133,7 @@ func testAdapterRemove(t *testing.T, adapter block.Adapter, storageNamespace str
envObjects = append(envObjects, tt.path)
for _, p := range envObjects {
obj := block.ObjectPointer{
StorageID: "",
StorageNamespace: storageNamespace,
Identifier: tt.name + "/" + p,
IdentifierType: block.IdentifierTypeRelative,
Expand All @@ -140,6 +144,7 @@ func testAdapterRemove(t *testing.T, adapter block.Adapter, storageNamespace str

// test Remove
obj := block.ObjectPointer{
StorageID: "",
StorageNamespace: storageNamespace,
Identifier: tt.name + "/" + tt.path,
IdentifierType: block.IdentifierTypeRelative,
Expand All @@ -165,13 +170,15 @@ func testAdapterExists(t *testing.T, adapter block.Adapter, storageNamespace str
const contents = "exists"
ctx := context.Background()
_, err := adapter.Put(ctx, block.ObjectPointer{
StorageID: "",
StorageNamespace: storageNamespace,
Identifier: contents,
IdentifierType: block.IdentifierTypeRelative,
}, int64(len(contents)), strings.NewReader(contents), block.PutOpts{})
require.NoError(t, err)

_, err = adapter.Put(ctx, block.ObjectPointer{
StorageID: "",
StorageNamespace: storageNamespace,
Identifier: "nested/and/" + contents,
IdentifierType: block.IdentifierTypeRelative,
Expand All @@ -192,6 +199,7 @@ func testAdapterExists(t *testing.T, adapter block.Adapter, storageNamespace str
for _, tt := range cases {
t.Run(tt.name, func(t *testing.T) {
ok, err := adapter.Exists(ctx, block.ObjectPointer{
StorageID: "",
StorageNamespace: storageNamespace,
Identifier: tt.path,
IdentifierType: block.IdentifierTypeRelative,
Expand Down
3 changes: 3 additions & 0 deletions pkg/block/blocktest/multipart_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func testAdapterMultipartUpload(t *testing.T, adapter block.Adapter, storageName
t.Run(c.name, func(t *testing.T) {
blockstoreType := adapter.BlockstoreType()
obj := block.ObjectPointer{
StorageID: "",
StorageNamespace: storageNamespace,
Identifier: c.path,
IdentifierType: block.IdentifierTypeRelative,
Expand Down Expand Up @@ -296,12 +297,14 @@ func requireEqualBigByteSlice(t *testing.T, exp, actual []byte) {

func objPointers(storageNamespace string) (block.ObjectPointer, block.ObjectPointer) {
var obj = block.ObjectPointer{
StorageID: "",
StorageNamespace: storageNamespace,
Identifier: "abc",
IdentifierType: block.IdentifierTypeRelative,
}

var objCopy = block.ObjectPointer{
StorageID: "",
StorageNamespace: storageNamespace,
Identifier: "abcCopy",
IdentifierType: block.IdentifierTypeRelative,
Expand Down
23 changes: 20 additions & 3 deletions pkg/block/local/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,12 @@ func (l *Adapter) UploadCopyPart(ctx context.Context, sourceObj, destinationObj
}
md5Read := block.NewHashingReader(r, block.HashFunctionMD5)
fName := uploadID + fmt.Sprintf("-%05d", partNumber)
_, err = l.Put(ctx, block.ObjectPointer{StorageNamespace: destinationObj.StorageNamespace, Identifier: fName}, -1, md5Read, block.PutOpts{})
objectPointer := block.ObjectPointer{
StorageID: destinationObj.StorageID,
StorageNamespace: destinationObj.StorageNamespace,
Identifier: fName,
}
_, err = l.Put(ctx, objectPointer, -1, md5Read, block.PutOpts{})
if err != nil {
return nil, fmt.Errorf("copy put: %w", err)
}
Expand All @@ -266,7 +271,12 @@ func (l *Adapter) UploadCopyPartRange(ctx context.Context, sourceObj, destinatio
}
md5Read := block.NewHashingReader(r, block.HashFunctionMD5)
fName := uploadID + fmt.Sprintf("-%05d", partNumber)
_, err = l.Put(ctx, block.ObjectPointer{StorageNamespace: destinationObj.StorageNamespace, Identifier: fName}, -1, md5Read, block.PutOpts{})
objectPointer := block.ObjectPointer{
StorageID: destinationObj.StorageID,
StorageNamespace: destinationObj.StorageNamespace,
Identifier: fName,
}
_, err = l.Put(ctx, objectPointer, -1, md5Read, block.PutOpts{})
if err != nil {
return nil, fmt.Errorf("copy range put: %w", err)
}
Expand Down Expand Up @@ -398,7 +408,12 @@ func (l *Adapter) UploadPart(ctx context.Context, obj block.ObjectPointer, _ int
}
md5Read := block.NewHashingReader(reader, block.HashFunctionMD5)
fName := uploadID + fmt.Sprintf("-%05d", partNumber)
_, err := l.Put(ctx, block.ObjectPointer{StorageNamespace: obj.StorageNamespace, Identifier: fName}, -1, md5Read, block.PutOpts{})
objectPointer := block.ObjectPointer{
StorageID: obj.StorageID,
StorageNamespace: obj.StorageNamespace,
Identifier: fName,
}
_, err := l.Put(ctx, objectPointer, -1, md5Read, block.PutOpts{})
etag := hex.EncodeToString(md5Read.Md5.Sum(nil))
return &block.UploadPartResponse{
ETag: etag,
Expand Down Expand Up @@ -505,6 +520,7 @@ func (l *Adapter) removePartFiles(files []string) error {

func (l *Adapter) getPartFiles(uploadID string, obj block.ObjectPointer) ([]string, error) {
newObj := block.ObjectPointer{
StorageID: obj.StorageID,
StorageNamespace: obj.StorageNamespace,
Identifier: uploadID,
}
Expand Down Expand Up @@ -544,6 +560,7 @@ func (l *Adapter) ResolveNamespace(storageNamespace, key string, identifierType
}

// Check if path allowed and return error if path is not allowed
// TODO (gilo): ObjectPointer init - add StorageID here
_, err = l.extractParamsFromObj(block.ObjectPointer{
StorageNamespace: storageNamespace,
Identifier: key,
Expand Down
Loading

0 comments on commit 49becea

Please sign in to comment.