Skip to content

Commit

Permalink
fix: deletes queue messages by chunks (#58)
Browse files Browse the repository at this point in the history
* errors were observed while the housekeeper attempt to clear queues,
the biggest doc didnt have the mongo max of 16MB, so the whole query
result was the problem.

"queue/queue.go:413","message":"Error removing elements from storage:
rrror deleting storage elements: an inserted document is too large"

---------

Signed-off-by: Cézar <[email protected]>
  • Loading branch information
cezar-tech authored Jan 13, 2025
1 parent a738047 commit 5ab3556
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 13 deletions.
54 changes: 41 additions & 13 deletions internal/queue/storage/mongo_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,30 @@ import (
"go.opentelemetry.io/otel/metric"
)

type MongoCollectionInterface interface {
UpdateOne(ctx context.Context, filter interface{}, update interface{}, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error)
BulkWrite(ctx context.Context, models []mongo.WriteModel,
opts ...*options.BulkWriteOptions) (*mongo.BulkWriteResult, error)
Distinct(ctx context.Context, fieldName string, filter interface{},
opts ...*options.DistinctOptions) ([]interface{}, error)
DeleteMany(ctx context.Context, filter interface{},
opts ...*options.DeleteOptions) (*mongo.DeleteResult, error)
CountDocuments(ctx context.Context, filter interface{}, opts ...*options.CountOptions) (int64, error)
Find(ctx context.Context, filter interface{},
opts ...*options.FindOptions) (cur *mongo.Cursor, err error)
}

// MongoStorage is an implementation of the Storage Interface using MongoDB.
type MongoStorage struct {
client *mongo.Client
clientPrimaryPreference *mongo.Client
messagesCollection *mongo.Collection
messagesCollection MongoCollectionInterface
messagesCollectionPrimaryRead *mongo.Collection
queueConfigurationCollection *mongo.Collection
}

var _ Storage = &MongoStorage{}
var deleteChunkSize = 100

func NewMongoStorage(ctx context.Context) (*MongoStorage, error) {
mongoSecondaryOpts := createOptions()
Expand Down Expand Up @@ -394,27 +408,41 @@ func (storage *MongoStorage) Remove(ctx context.Context, queue string, ids ...st
return 0, nil
}

now := dtime.Now()
defer func() {
metrics.StorageLatency.Record(ctx, dtime.ElapsedTime(now), metric.WithAttributes(attribute.String("op", "remove")))
}()

for i := 0; i < len(ids); i += deleteChunkSize {
res, err := attemptChunkDeletion(ctx, i, ids, queue, storage)
if err != nil {
return deleted, fmt.Errorf("error deleting storage elements: %w, ids = %v", err, ids)
}

deleted += res.DeletedCount
}

return deleted, nil
}

func attemptChunkDeletion(ctx context.Context, i int, ids []string, queue string, storage *MongoStorage) (*mongo.DeleteResult, error) {
end := i + deleteChunkSize
if end > len(ids) {
end = len(ids)
}
chunk := ids[i:end]

filter := bson.M{
"queue": queue,

"id": bson.M{
"$in": ids,
"$in": chunk,
},
}

logger.S(ctx).Debugw("Storage operation: delete many operation.", "filter", filter)

now := dtime.Now()
defer func() {
metrics.StorageLatency.Record(ctx, dtime.ElapsedTime(now), metric.WithAttributes(attribute.String("op", "remove")))
}()

res, err := storage.messagesCollection.DeleteMany(context.Background(), filter)
if err != nil {
return 0, fmt.Errorf("rrror deleting storage elements: %w", err)
}

return res.DeletedCount, nil
return res, err
}

func (storage *MongoStorage) Insert(ctx context.Context, messages ...*message.Message) (insertedCount int64, modifiedCount int64, err error) {
Expand Down
96 changes: 96 additions & 0 deletions internal/queue/storage/mongo_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package storage

import (
"context"
"fmt"
"os"
"testing"

Expand All @@ -12,8 +13,54 @@ import (
"github.com/takenet/deckard/internal/queue/message"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

type MockCollection struct {
deleteManyArgs []interface{}
deleteManyCalls int
errorDeleteMany error
}

func newMockCollection() *MockCollection {
return newMockCollectionErr(nil)
}
func newMockCollectionErr(err error) *MockCollection {
return &MockCollection{
deleteManyArgs: []interface{}{},
deleteManyCalls: 0,
errorDeleteMany: err,
}
}

func (this *MockCollection) UpdateOne(ctx context.Context, filter interface{}, update interface{}, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error) {
return nil, nil
}
func (this *MockCollection) BulkWrite(ctx context.Context, models []mongo.WriteModel,
opts ...*options.BulkWriteOptions) (*mongo.BulkWriteResult, error) {
return nil, nil
}
func (this *MockCollection) Distinct(ctx context.Context, fieldName string, filter interface{},
opts ...*options.DistinctOptions) ([]interface{}, error) {
return nil, nil
}
func (this *MockCollection) DeleteMany(ctx context.Context, filter interface{},
opts ...*options.DeleteOptions) (*mongo.DeleteResult, error) {
this.deleteManyArgs = append(this.deleteManyArgs, filter)
this.deleteManyCalls++
return &mongo.DeleteResult{
DeletedCount: 1,
}, this.errorDeleteMany
}
func (this *MockCollection) CountDocuments(ctx context.Context, filter interface{}, opts ...*options.CountOptions) (int64, error) {
return 0, nil
}
func (this *MockCollection) Find(ctx context.Context, filter interface{},
opts ...*options.FindOptions) (cur *mongo.Cursor, err error) {
return nil, nil
}

func TestMongoStorageIntegration(t *testing.T) {
if testing.Short() {
t.Skip()
Expand Down Expand Up @@ -233,3 +280,52 @@ func TestGetMongoMessageWithManyIds(t *testing.T) {
message,
)
}

func TestRemove(t *testing.T) {
deleteChunkSize = 1
defer func() {
deleteChunkSize = 100
}()

colMock := newMockCollection()
storage := &MongoStorage{
messagesCollection: colMock,
}

queue := "test_queue"
count, err := storage.Remove(context.Background(), queue, "1", "2")
require.NoError(t, err)
require.Equal(t, int64(2), count)
require.Equal(t, 2, colMock.deleteManyCalls)
require.Equal(t, []interface{}{bson.M{
"queue": queue,
"id": bson.M{
"$in": []string{"1"},
},
},
bson.M{
"queue": queue,
"id": bson.M{
"$in": []string{"2"},
},
}}, colMock.deleteManyArgs)
}

func TestRemoveErrors(t *testing.T) {
colMock := newMockCollectionErr(fmt.Errorf("Mocked error"))
storage := &MongoStorage{
messagesCollection: colMock,
}

queue := "test_queue"
count, err := storage.Remove(context.Background(), queue, "1", "2")
require.ErrorContains(t, err, "Mocked error")
require.Equal(t, int64(0), count)
require.Equal(t, 1, colMock.deleteManyCalls)
require.Equal(t, []interface{}{bson.M{
"queue": queue,
"id": bson.M{
"$in": []string{"1", "2"},
},
}}, colMock.deleteManyArgs)
}

0 comments on commit 5ab3556

Please sign in to comment.