Skip to content

Commit

Permalink
clean up cs3 share manager and the indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
David Christofas committed Mar 29, 2022
1 parent 41c737c commit 4671a81
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 84 deletions.
8 changes: 6 additions & 2 deletions pkg/publicshare/manager/cs3/cs3.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,9 @@ func (m *Manager) getWithPassword(ctx context.Context, ref *link.PublicShareRefe
}

func (m *Manager) getByID(ctx context.Context, id string) (*PublicShareWithPassword, error) {
tokens, err := m.indexer.FindBy(&link.PublicShare{}, "Id.OpaqueId", id)
tokens, err := m.indexer.FindBy(&link.PublicShare{},
indexer.NewField("Id.OpaqueId", id),
)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -298,7 +300,9 @@ func (m *Manager) ListPublicShares(ctx context.Context, u *user.User, filters []
return nil, err
}

tokens, err := m.indexer.FindBy(&link.PublicShare{}, "Owner", userIDToIndex(u.Id))
tokens, err := m.indexer.FindBy(&link.PublicShare{},
indexer.NewField("Owner", userIDToIndex(u.Id)),
)
if err != nil {
return nil, err
}
Expand Down
78 changes: 40 additions & 38 deletions pkg/share/manager/cs3/cs3.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (m *Manager) initialize() error {
}
err = m.indexer.AddIndex(&collaboration.Share{}, option.IndexByFunc{
Name: "ResourceId",
Func: indexResourceIdFunc,
Func: indexResourceIDFunc,
}, "Id.OpaqueId", "shares", "non_unique", nil, true)
if err != nil {
return err
Expand Down Expand Up @@ -252,74 +252,68 @@ func (m *Manager) ListShares(ctx context.Context, filters []*collaboration.Filte
return nil, errtypes.UserRequired("error getting user from context")
}

ownedShareIds, err := m.indexer.FindBy(&collaboration.Share{}, "OwnerId", userIDToIndex(user.GetId()))
if err != nil {
return nil, err
}
createdShareIds, err := m.indexer.FindBy(&collaboration.Share{}, "CreatorId", userIDToIndex(user.GetId()))
createdShareIds, err := m.indexer.FindBy(&collaboration.Share{},
indexer.NewField("OwnerId", userIDToIndex(user.Id)),
indexer.NewField("CreatorId", userIDToIndex(user.Id)),
)
if err != nil {
return nil, err
}

mem := make(map[string]struct{})
// We use shareMem as a temporary lookup store to check which shares were
// already added. This is to prevent duplicates.
shareMem := make(map[string]struct{})
result := []*collaboration.Share{}
for _, id := range ownedShareIds {
s, err := m.getShareByID(ctx, id)
if err != nil {
return nil, err
}
if share.MatchesFilters(s, filters) {
result = append(result, s)
mem[s.Id.OpaqueId] = struct{}{}
}
}
for _, id := range createdShareIds {
if _, handled := mem[id]; handled {
// We don't want to add a share multiple times when we added it
// already.
continue
}
s, err := m.getShareByID(ctx, id)
if err != nil {
return nil, err
}
if share.MatchesFilters(s, filters) {
result = append(result, s)
mem[s.Id.OpaqueId] = struct{}{}
shareMem[s.Id.OpaqueId] = struct{}{}
}
}

// If a user requests to list shares which have not been created by them
// we have to explicitly fetch these shares and check if the user is
// allowed to list the shares.
// Only then can we add these shares to the result.
grouped := share.GroupFiltersByType(filters)
idFilter, ok := grouped[collaboration.Filter_TYPE_RESOURCE_ID]
if !ok {
return result, nil
}

// shareIDsByResourceID contains the shareID as the key and the resourceID
// as the value.
shareIDsByResourceID := make(map[string]*provider.ResourceId)
for _, filter := range idFilter {
resourceID := filter.GetResourceId()
ids, err := m.indexer.FindBy(&collaboration.Share{}, "ResourceId", resourceIDToIndex(resourceID))
shareIDs, err := m.indexer.FindBy(&collaboration.Share{},
indexer.NewField("ResourceId", resourceIDToIndex(resourceID)),
)
if err != nil {
return nil, err
continue
}

for _, id := range ids {
shareIDsByResourceID[id] = resourceID
for _, shareID := range shareIDs {
shareIDsByResourceID[shareID] = resourceID
}
}

// statMem is used as a local cache to prevent statting resources which
// already have been checked.
statMem := make(map[string]struct{})
for shareID, resourceID := range shareIDsByResourceID {
if _, handled := mem[shareID]; handled {
if _, handled := shareMem[shareID]; handled {
// We don't want to add a share multiple times when we added it
// already.
continue
}

if _, checked := statMem[resourceIDToIndex(resourceID)]; !checked {
sRes, err := m.gatewayClient.Stat(ctx, &provider.StatRequest{Ref: &provider.Reference{ResourceId: resourceID}})
sReq := &provider.StatRequest{
Ref: &provider.Reference{ResourceId: resourceID},
}
sRes, err := m.gatewayClient.Stat(ctx, sReq)
if err != nil {
continue
}
Expand All @@ -338,7 +332,7 @@ func (m *Manager) ListShares(ctx context.Context, filters []*collaboration.Filte
}
if share.MatchesFilters(s, filters) {
result = append(result, s)
mem[s.Id.OpaqueId] = struct{}{}
shareMem[s.Id.OpaqueId] = struct{}{}
}
}

Expand Down Expand Up @@ -386,7 +380,9 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati
if err != nil {
return nil, err
}
receivedIds, err := m.indexer.FindBy(&collaboration.Share{}, "GranteeId", ids)
receivedIds, err := m.indexer.FindBy(&collaboration.Share{},
indexer.NewField("GranteeId", ids),
)
if err != nil {
return nil, err
}
Expand All @@ -398,7 +394,9 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati
if err != nil {
return nil, err
}
groupIds, err := m.indexer.FindBy(&collaboration.Share{}, "GranteeId", index)
groupIds, err := m.indexer.FindBy(&collaboration.Share{},
indexer.NewField("GranteeId", index),
)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -549,15 +547,19 @@ func (m *Manager) getShareByID(ctx context.Context, id string) (*collaboration.S
}

func (m *Manager) getShareByKey(ctx context.Context, key *collaboration.ShareKey) (*collaboration.Share, error) {
ownerIds, err := m.indexer.FindBy(&collaboration.Share{}, "OwnerId", userIDToIndex(key.Owner))
ownerIds, err := m.indexer.FindBy(&collaboration.Share{},
indexer.NewField("OwnerId", userIDToIndex(key.Owner)),
)
if err != nil {
return nil, err
}
granteeIndex, err := granteeToIndex(key.Grantee)
if err != nil {
return nil, err
}
granteeIds, err := m.indexer.FindBy(&collaboration.Share{}, "GranteeId", granteeIndex)
granteeIds, err := m.indexer.FindBy(&collaboration.Share{},
indexer.NewField("GranteeId", granteeIndex),
)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -622,7 +624,7 @@ func indexGranteeFunc(v interface{}) (string, error) {
return granteeToIndex(share.Grantee)
}

func indexResourceIdFunc(v interface{}) (string, error) {
func indexResourceIDFunc(v interface{}) (string, error) {
share, ok := v.(*collaboration.Share)
if !ok {
return "", fmt.Errorf("given entity is not a share")
Expand Down
58 changes: 47 additions & 11 deletions pkg/share/manager/cs3/cs3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ var _ = Describe("Manager", func() {

Describe("New", func() {
JustBeforeEach(func() {
m, err := cs3.New(storage, indexer)
m, err := cs3.New(nil, storage, indexer)
Expect(err).ToNot(HaveOccurred())
Expect(m).ToNot(BeNil())
})
Expand All @@ -163,7 +163,7 @@ var _ = Describe("Manager", func() {

JustBeforeEach(func() {
var err error
m, err = cs3.New(storage, indexer)
m, err = cs3.New(nil, storage, indexer)
Expect(err).ToNot(HaveOccurred())
data, err := json.Marshal(share)
Expect(err).ToNot(HaveOccurred())
Expand Down Expand Up @@ -277,9 +277,13 @@ var _ = Describe("Manager", func() {

Context("when requesting the share by key", func() {
It("returns NotFound", func() {
indexer.On("FindBy", mock.Anything, "OwnerId", url.QueryEscape(share.Owner.Idp+":"+share.Owner.OpaqueId)).
indexer.On("FindBy", mock.Anything,
indexerpkg.NewField("OwnerId", url.QueryEscape(share.Owner.Idp+":"+share.Owner.OpaqueId)),
).
Return([]string{share.Id.OpaqueId, share2.Id.OpaqueId}, nil)
indexer.On("FindBy", mock.Anything, "GranteeId", url.QueryEscape("user:"+grantee.Id.Idp+":"+grantee.Id.OpaqueId)).
indexer.On("FindBy", mock.Anything,
indexerpkg.NewField("GranteeId", url.QueryEscape("user:"+grantee.Id.Idp+":"+grantee.Id.OpaqueId)),
).
Return([]string{}, nil)
returnedShare, err := m.GetShare(ctx, &collaboration.ShareReference{
Spec: &collaboration.ShareReference_Key{
Expand All @@ -294,9 +298,13 @@ var _ = Describe("Manager", func() {
Expect(returnedShare).To(BeNil())
})
It("gets the share", func() {
indexer.On("FindBy", mock.Anything, "OwnerId", url.QueryEscape(share.Owner.Idp+":"+share.Owner.OpaqueId)).
indexer.On("FindBy", mock.Anything,
indexerpkg.NewField("OwnerId", url.QueryEscape(share.Owner.Idp+":"+share.Owner.OpaqueId)),
).
Return([]string{share.Id.OpaqueId, share2.Id.OpaqueId}, nil)
indexer.On("FindBy", mock.Anything, "GranteeId", url.QueryEscape("user:"+grantee.Id.Idp+":"+grantee.Id.OpaqueId)).
indexer.On("FindBy", mock.Anything,
indexerpkg.NewField("GranteeId", url.QueryEscape("user:"+grantee.Id.Idp+":"+grantee.Id.OpaqueId)),
).
Return([]string{share2.Id.OpaqueId}, nil)
returnedShare, err := m.GetShare(ctx, &collaboration.ShareReference{
Spec: &collaboration.ShareReference_Key{
Expand Down Expand Up @@ -343,7 +351,19 @@ var _ = Describe("Manager", func() {

Describe("ListShares", func() {
JustBeforeEach(func() {
indexer.On("FindBy", mock.Anything, "OwnerId", mock.Anything).Return([]string{share.Id.OpaqueId, share2.Id.OpaqueId}, nil)
indexer.On("FindBy", mock.Anything,
mock.MatchedBy(func(input indexerpkg.Field) bool {
return input.Name == "OwnerId"
}),
mock.MatchedBy(func(input indexerpkg.Field) bool {
return input.Name == "CreatorId"
}),
).Return([]string{share.Id.OpaqueId, share2.Id.OpaqueId}, nil)
indexer.On("FindBy", mock.Anything,
mock.MatchedBy(func(input indexerpkg.Field) bool {
return input.Name == "ResourceId" && input.Value == "!abcd"
}),
).Return([]string{share.Id.OpaqueId}, nil)
})
It("uses the index to get the owned shares", func() {
shares, err := m.ListShares(ctx, []*collaboration.Filter{})
Expand Down Expand Up @@ -371,9 +391,17 @@ var _ = Describe("Manager", func() {
Describe("ListReceivedShares", func() {
Context("with a received user share", func() {
BeforeEach(func() {
indexer.On("FindBy", mock.Anything, "GranteeId", granteeFn).
indexer.On("FindBy", mock.Anything,
mock.MatchedBy(func(input indexerpkg.Field) bool {
return input.Name == "GranteeId" && input.Value == granteeFn
}),
).
Return([]string{share2.Id.OpaqueId}, nil)
indexer.On("FindBy", mock.Anything, "GranteeId", mock.Anything).
indexer.On("FindBy", mock.Anything,
mock.MatchedBy(func(input indexerpkg.Field) bool {
return input.Name == "GranteeId"
}),
).
Return([]string{}, nil)
})

Expand All @@ -393,9 +421,17 @@ var _ = Describe("Manager", func() {

Context("with a received group share", func() {
BeforeEach(func() {
indexer.On("FindBy", mock.Anything, "GranteeId", groupFn).
indexer.On("FindBy", mock.Anything,
mock.MatchedBy(func(input indexerpkg.Field) bool {
return input.Name == "GranteeId" && input.Value == groupFn
}),
).
Return([]string{share2.Id.OpaqueId}, nil)
indexer.On("FindBy", mock.Anything, "GranteeId", mock.Anything).
indexer.On("FindBy", mock.Anything,
mock.MatchedBy(func(input indexerpkg.Field) bool {
return input.Name == "GranteeId"
}),
).
Return([]string{}, nil)
})

Expand Down
52 changes: 33 additions & 19 deletions pkg/storage/utils/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,21 @@ import (
type Indexer interface {
AddIndex(t interface{}, indexBy option.IndexBy, pkName, entityDirName, indexType string, bound *option.Bound, caseInsensitive bool) error
Add(t interface{}) ([]IdxAddResult, error)
FindBy(t interface{}, field string, val string) ([]string, error)
FindBy(t interface{}, fields ...Field) ([]string, error)
Delete(t interface{}) error
}

// Field combines the name and value of an indexed field.
type Field struct {
Name string
Value string
}

// NewField is a utility function to create a new Field.
func NewField(name, value string) Field {
return Field{Name: name, Value: value}
}

// StorageIndexer is the indexer implementation using metadata storage
type StorageIndexer struct {
storage metadata.Storage
Expand Down Expand Up @@ -145,36 +156,38 @@ func (i *StorageIndexer) Add(t interface{}) ([]IdxAddResult, error) {
return results, nil
}

// FindBy finds a value on an index by field and value.
func (i *StorageIndexer) FindBy(t interface{}, findBy, val string) ([]string, error) {
// FindBy finds a value on an index by fields.
// If multiple fields are given then they are handled like an or condition.
func (i *StorageIndexer) FindBy(t interface{}, queryFields ...Field) ([]string, error) {
typeName := getTypeFQN(t)

i.mu.RLock(typeName)
defer i.mu.RUnlock(typeName)

resultPaths := make([]string, 0)
resultPaths := make(map[string]struct{})
if fields, ok := i.indices[typeName]; ok {
for _, idx := range fields.IndicesByField[strcase.ToCamel(findBy)] {
idxVal := val
res, err := idx.Lookup(idxVal)
if err != nil {
if _, ok := err.(errtypes.IsNotFound); ok {
continue
}

for _, field := range queryFields {
for _, idx := range fields.IndicesByField[strcase.ToCamel(field.Name)] {
res, err := idx.Lookup(field.Value)
if err != nil {
return nil, err
if _, ok := err.(errtypes.IsNotFound); ok {
continue
}

if err != nil {
return nil, err
}
}
for _, r := range res {
resultPaths[path.Base(r)] = struct{}{}
}
}

resultPaths = append(resultPaths, res...)

}
}

result := make([]string, 0, len(resultPaths))
for _, v := range resultPaths {
result = append(result, path.Base(v))
for p := range resultPaths {
result = append(result, path.Base(p))
}

return result, nil
Expand Down Expand Up @@ -340,7 +353,8 @@ func (i *StorageIndexer) resolveTree(t interface{}, tree *queryTree, partials *[
return err
}

r, err := i.FindBy(t, operand.field, operand.value)
field := Field{Name: operand.field, Value: operand.value}
r, err := i.FindBy(t, field)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 4671a81

Please sign in to comment.