Skip to content

Commit

Permalink
cacheutil: added cache Delete method (grafana#3515)
Browse files Browse the repository at this point in the history
* cacheutil: added delete method

Signed-off-by: Miguel Ángel Ortuño <[email protected]>

* updated CHANGELOG.md

Signed-off-by: Miguel Ángel Ortuño <[email protected]>

* make linter happy

Signed-off-by: Miguel Ángel Ortuño <[email protected]>

* addressed PR feedback

Signed-off-by: Miguel Ángel Ortuño <[email protected]>

Signed-off-by: Miguel Ángel Ortuño <[email protected]>
  • Loading branch information
ortuman authored and mason committed Dec 16, 2022
1 parent 5d5b76b commit 2966c35
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 11 deletions.
64 changes: 53 additions & 11 deletions pkg/cacheutil/memcached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (

const (
opSet = "set"
opSetMulti = "setmulti"
opGetMulti = "getmulti"
opDelete = "delete"
reasonMaxItemSize = "max-item-size"
reasonAsyncBufferFull = "async-buffer-full"
reasonMalformedKey = "malformed-key"
Expand Down Expand Up @@ -73,6 +73,11 @@ type RemoteCacheClient interface {
// underlying async operation will fail, the error will be tracked/logged.
SetAsync(ctx context.Context, key string, value []byte, ttl time.Duration) error

// Delete deletes a key from memcached.
// This is a synchronous operation. If an asynchronous set operation for key is still
// pending to be processed, it will wait for it to complete before performing deletion.
Delete(ctx context.Context, key string) error

// Stop client and release underlying resources.
Stop()
}
Expand All @@ -84,6 +89,7 @@ type MemcachedClient = RemoteCacheClient
type memcachedClientBackend interface {
GetMulti(keys []string) (map[string]*memcache.Item, error)
Set(item *memcache.Item) error
Delete(key string) error
}

// updatableServerSelector extends the interface used for picking a memcached server
Expand Down Expand Up @@ -299,21 +305,19 @@ func newMemcachedClient(
}, []string{"operation"})
c.operations.WithLabelValues(opGetMulti)
c.operations.WithLabelValues(opSet)
c.operations.WithLabelValues(opDelete)

c.failures = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "thanos_memcached_operation_failures_total",
Help: "Total number of operations against memcached that failed.",
}, []string{"operation", "reason"})
c.failures.WithLabelValues(opGetMulti, reasonTimeout)
c.failures.WithLabelValues(opGetMulti, reasonMalformedKey)
c.failures.WithLabelValues(opGetMulti, reasonServerError)
c.failures.WithLabelValues(opGetMulti, reasonNetworkError)
c.failures.WithLabelValues(opGetMulti, reasonOther)
c.failures.WithLabelValues(opSet, reasonTimeout)
c.failures.WithLabelValues(opSet, reasonMalformedKey)
c.failures.WithLabelValues(opSet, reasonServerError)
c.failures.WithLabelValues(opSet, reasonNetworkError)
c.failures.WithLabelValues(opSet, reasonOther)
for _, op := range []string{opGetMulti, opSet, opDelete} {
c.failures.WithLabelValues(op, reasonTimeout)
c.failures.WithLabelValues(op, reasonMalformedKey)
c.failures.WithLabelValues(op, reasonServerError)
c.failures.WithLabelValues(op, reasonNetworkError)
c.failures.WithLabelValues(op, reasonOther)
}

c.skipped = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "thanos_memcached_operation_skipped_total",
Expand All @@ -330,6 +334,7 @@ func newMemcachedClient(
}, []string{"operation"})
c.duration.WithLabelValues(opGetMulti)
c.duration.WithLabelValues(opSet)
c.duration.WithLabelValues(opDelete)

c.dataSize = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "thanos_memcached_operation_data_size_bytes",
Expand Down Expand Up @@ -441,6 +446,43 @@ func (c *memcachedClient) GetMulti(ctx context.Context, keys []string) map[strin
return hits
}

func (c *memcachedClient) Delete(ctx context.Context, key string) error {
errCh := make(chan error, 1)

enqueueErr := c.enqueueAsync(func() {
start := time.Now()
c.operations.WithLabelValues(opDelete).Inc()

var err error
select {
case <-ctx.Done():
err = ctx.Err()
default:
err = c.client.Delete(key)
}

if err != nil {
level.Debug(c.logger).Log(
"msg", "failed to delete memcached item",
"key", key,
"err", err,
)
c.trackError(opDelete, err)
} else {
c.duration.WithLabelValues(opDelete).Observe(time.Since(start).Seconds())
}
errCh <- err
})

if errors.Is(enqueueErr, errMemcachedAsyncBufferFull) {
c.skipped.WithLabelValues(opDelete, reasonAsyncBufferFull).Inc()
level.Debug(c.logger).Log("msg", "failed to delete memcached item because the async buffer is full", "err", enqueueErr, "size", len(c.asyncQueue))
return enqueueErr
}
// Wait for the delete operation to complete.
return <-errCh
}

func (c *memcachedClient) getMultiBatched(ctx context.Context, keys []string) ([]map[string]*memcache.Item, error) {
// Do not batch if the input keys are less than the max batch size.
if (c.config.MaxGetMultiBatchSize <= 0) || (len(keys) <= c.config.MaxGetMultiBatchSize) {
Expand Down
6 changes: 6 additions & 0 deletions pkg/storegateway/indexcache/memcached_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,12 @@ func (c *mockedMemcachedClient) SetAsync(ctx context.Context, key string, value
return nil
}

func (c *mockedMemcachedClient) Delete(_ context.Context, key string) error {
delete(c.cache, key)

return nil
}

func (c *mockedMemcachedClient) Stop() {
// Nothing to do.
}
Expand Down

0 comments on commit 2966c35

Please sign in to comment.