diff --git a/changelog/unreleased/delete-stale-shares.md b/changelog/unreleased/delete-stale-shares.md index 323867b85f..87b23995cd 100644 --- a/changelog/unreleased/delete-stale-shares.md +++ b/changelog/unreleased/delete-stale-shares.md @@ -2,4 +2,5 @@ Bugfix: Delete stale shares in the jsoncs3 share manager The jsoncs3 share manager now properly deletes all references to removed shares and shares that belong to a space that was deleted +https://github.com/cs3org/reva/pull/4983 https://github.com/cs3org/reva/pull/4975 diff --git a/pkg/share/manager/jsoncs3/jsoncs3.go b/pkg/share/manager/jsoncs3/jsoncs3.go index 12c17075a0..6e67ae2b2a 100644 --- a/pkg/share/manager/jsoncs3/jsoncs3.go +++ b/pkg/share/manager/jsoncs3/jsoncs3.go @@ -936,7 +936,13 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati } for shareID, state := range w.rspace.States { s, err := m.Cache.Get(ctx, storageID, spaceID, shareID, true) - if err != nil || s == nil { + if err != nil { + sublogr.Error().Err(err).Msg("could not retrieve share") + continue + } + if s == nil { + sublogr.Warn().Str("shareid", shareID).Msg("share not found. cleaning up") + _ = m.UserReceivedStates.Remove(ctx, user.Id.OpaqueId, w.ssid, shareID) continue } sublogr = sublogr.With().Str("shareid", shareID).Logger() @@ -1227,3 +1233,51 @@ func (m *Manager) removeShare(ctx context.Context, s *collaboration.Share, skipS return eg.Wait() } + +func (m *Manager) CleanupStaleShares(ctx context.Context) { + log := appctx.GetLogger(ctx) + + if err := m.initialize(ctx); err != nil { + return + } + + // list all shares + providers, err := m.Cache.All(ctx) + if err != nil { + log.Error().Err(err).Msg("error listing all shares") + return + } + + client, err := m.gatewaySelector.Next() + if err != nil { + log.Error().Err(err).Msg("could not get gateway client") + } + + providers.Range(func(storage string, spaces *providercache.Spaces) bool { + log.Info().Str("storage", storage).Interface("spaceCount", spaces.Spaces.Count()).Msg("checking storage") + + spaces.Spaces.Range(func(space string, shares *providercache.Shares) bool { + log.Info().Str("storage", storage).Str("space", space).Interface("shareCount", len(shares.Shares)).Msg("checking space") + + for _, s := range shares.Shares { + req := &provider.StatRequest{ + Ref: &provider.Reference{ResourceId: s.ResourceId, Path: "."}, + } + res, err := client.Stat(ctx, req) + if err != nil { + log.Error().Err(err).Str("storage", storage).Str("space", space).Msg("could not stat shared resource") + } + if res.Status.Code == rpcv1beta1.Code_CODE_NOT_FOUND { + log.Info().Str("storage", storage).Str("space", space).Msg("shared resource does not exist anymore. cleaning up shares") + if err := m.removeShare(ctx, s, false); err != nil { + log.Error().Err(err).Str("storage", storage).Str("space", space).Msg("could not remove share") + } + } + } + + return true + }) + + return true + }) +} diff --git a/pkg/share/manager/jsoncs3/providercache/providercache.go b/pkg/share/manager/jsoncs3/providercache/providercache.go index bb30c1d3f6..3631b96c56 100644 --- a/pkg/share/manager/jsoncs3/providercache/providercache.go +++ b/pkg/share/manager/jsoncs3/providercache/providercache.go @@ -25,6 +25,7 @@ import ( "os" "path" "path/filepath" + "strings" "sync" "time" @@ -319,6 +320,36 @@ func (c *Cache) Get(ctx context.Context, storageID, spaceID, shareID string, ski return space.Shares[shareID], nil } +// All returns all entries in the storage +func (c *Cache) All(ctx context.Context) (*mtimesyncedcache.Map[string, *Spaces], error) { + ctx, span := tracer.Start(ctx, "All") + defer span.End() + + providers, err := c.storage.ListDir(ctx, "/storages") + if err != nil { + return nil, err + } + for _, provider := range providers { + storageID := provider.Name + spaces, err := c.storage.ListDir(ctx, path.Join("/storages", storageID)) + if err != nil { + return nil, err + } + for _, space := range spaces { + spaceID := strings.TrimSuffix(space.Name, ".json") + + unlock := c.LockSpace(spaceID) + span.AddEvent("got lock for space " + spaceID) + if err := c.syncWithLock(ctx, storageID, spaceID); err != nil { + return nil, err + } + unlock() + } + } + + return &c.Providers, nil +} + // ListSpace returns the list of shares in a given space func (c *Cache) ListSpace(ctx context.Context, storageID, spaceID string) (*Shares, error) { ctx, span := tracer.Start(ctx, "ListSpace") @@ -438,7 +469,11 @@ func (c *Cache) PurgeSpace(ctx context.Context, storageID, spaceID string) error if !ok { return nil } - spaces.Spaces.Store(spaceID, &Shares{}) + newShares := &Shares{} + if space, ok := spaces.Spaces.Load(spaceID); ok { + newShares.Etag = space.Etag // keep the etag to allow overwriting the state on the server + } + spaces.Spaces.Store(spaceID, newShares) return c.Persist(ctx, storageID, spaceID) } diff --git a/pkg/share/manager/jsoncs3/providercache/providercache_test.go b/pkg/share/manager/jsoncs3/providercache/providercache_test.go index f1e15a470b..f63c351646 100644 --- a/pkg/share/manager/jsoncs3/providercache/providercache_test.go +++ b/pkg/share/manager/jsoncs3/providercache/providercache_test.go @@ -191,5 +191,13 @@ var _ = Describe("Cache", func() { Expect(s).To(BeNil()) }) }) + + Describe("All", func() { + It("returns all entries", func() { + entries, err := c.All(ctx) + Expect(err).ToNot(HaveOccurred()) + Expect(entries.Count()).To(Equal(1)) + }) + }) }) }) diff --git a/pkg/storage/utils/decomposedfs/mtimesyncedcache/map.go b/pkg/storage/utils/decomposedfs/mtimesyncedcache/map.go index 830634fc23..aa1d0cdd24 100644 --- a/pkg/storage/utils/decomposedfs/mtimesyncedcache/map.go +++ b/pkg/storage/utils/decomposedfs/mtimesyncedcache/map.go @@ -34,3 +34,12 @@ func (m *Map[K, V]) Range(f func(key K, value V) bool) { } func (m *Map[K, V]) Store(key K, value V) { m.m.Store(key, value) } + +func (m *Map[K, V]) Count() int { + l := 0 + m.Range(func(_ K, _ V) bool { + l++ + return true + }) + return l +} diff --git a/pkg/storage/utils/metadata/cs3.go b/pkg/storage/utils/metadata/cs3.go index aa71502442..263976afd3 100644 --- a/pkg/storage/utils/metadata/cs3.go +++ b/pkg/storage/utils/metadata/cs3.go @@ -552,7 +552,11 @@ func (cs3 *CS3) getAuthContext(ctx context.Context) (context.Context, error) { authCtx, span := tracer.Start(authCtx, "getAuthContext", trace.WithLinks(trace.LinkFromContext(ctx))) defer span.End() - client, err := pool.GetGatewayServiceClient(cs3.gatewayAddr) + selector, err := pool.GatewaySelector(cs3.gatewayAddr) + if err != nil { + return nil, err + } + client, err := selector.Next() if err != nil { return nil, err }