From 0ff08988573672025bf84bb55b2928e9860f239d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9zar?= Date: Wed, 11 Dec 2024 15:49:33 -0300 Subject: [PATCH] fix: deletes queues by chunks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * errors were observed while the housekeeper attempt to clear queues, the biggest doc didnt have the mongo max of 16MB, so the whole query result was the problem. Signed-off-by: Cézar --- internal/queue/storage/mongo_storage.go | 53 ++++++++--- internal/queue/storage/mongo_storage_test.go | 96 ++++++++++++++++++++ 2 files changed, 134 insertions(+), 15 deletions(-) diff --git a/internal/queue/storage/mongo_storage.go b/internal/queue/storage/mongo_storage.go index 1daff2c..10e0d28 100644 --- a/internal/queue/storage/mongo_storage.go +++ b/internal/queue/storage/mongo_storage.go @@ -27,16 +27,30 @@ import ( "go.opentelemetry.io/otel/metric" ) +type MongoCollectionInterface interface { + UpdateOne(ctx context.Context, filter interface{}, update interface{}, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error) + BulkWrite(ctx context.Context, models []mongo.WriteModel, + opts ...*options.BulkWriteOptions) (*mongo.BulkWriteResult, error) + Distinct(ctx context.Context, fieldName string, filter interface{}, + opts ...*options.DistinctOptions) ([]interface{}, error) + DeleteMany(ctx context.Context, filter interface{}, + opts ...*options.DeleteOptions) (*mongo.DeleteResult, error) + CountDocuments(ctx context.Context, filter interface{}, opts ...*options.CountOptions) (int64, error) + Find(ctx context.Context, filter interface{}, + opts ...*options.FindOptions) (cur *mongo.Cursor, err error) +} + // MongoStorage is an implementation of the Storage Interface using MongoDB. type MongoStorage struct { client *mongo.Client clientPrimaryPreference *mongo.Client - messagesCollection *mongo.Collection + messagesCollection MongoCollectionInterface messagesCollectionPrimaryRead *mongo.Collection queueConfigurationCollection *mongo.Collection } var _ Storage = &MongoStorage{} +var deleteChunkSize = 100 func NewMongoStorage(ctx context.Context) (*MongoStorage, error) { mongoSecondaryOpts := createOptions() @@ -394,27 +408,36 @@ func (storage *MongoStorage) Remove(ctx context.Context, queue string, ids ...st return 0, nil } - filter := bson.M{ - "queue": queue, - - "id": bson.M{ - "$in": ids, - }, - } - - logger.S(ctx).Debugw("Storage operation: delete many operation.", "filter", filter) - now := dtime.Now() defer func() { metrics.StorageLatency.Record(ctx, dtime.ElapsedTime(now), metric.WithAttributes(attribute.String("op", "remove"))) }() - res, err := storage.messagesCollection.DeleteMany(context.Background(), filter) - if err != nil { - return 0, fmt.Errorf("rrror deleting storage elements: %w", err) + for i := 0; i < len(ids); i += deleteChunkSize { + end := i + deleteChunkSize + if end > len(ids) { + end = len(ids) + } + chunk := ids[i:end] + + filter := bson.M{ + "queue": queue, + "id": bson.M{ + "$in": chunk, + }, + } + + logger.S(ctx).Debugw("Storage operation: delete many operation.", "filter", filter) + + res, err := storage.messagesCollection.DeleteMany(context.Background(), filter) + if err != nil { + return deleted, fmt.Errorf("error deleting storage elements: %w", err) + } + + deleted += res.DeletedCount } - return res.DeletedCount, nil + return deleted, nil } func (storage *MongoStorage) Insert(ctx context.Context, messages ...*message.Message) (insertedCount int64, modifiedCount int64, err error) { diff --git a/internal/queue/storage/mongo_storage_test.go b/internal/queue/storage/mongo_storage_test.go index 977b0b6..07f3709 100644 --- a/internal/queue/storage/mongo_storage_test.go +++ b/internal/queue/storage/mongo_storage_test.go @@ -2,6 +2,7 @@ package storage import ( "context" + "fmt" "os" "testing" @@ -12,8 +13,54 @@ import ( "github.com/takenet/deckard/internal/queue/message" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" ) +type MockCollection struct { + deleteManyArgs []interface{} + deleteManyCalls int + errorDeleteMany error +} + +func newMockCollection() *MockCollection { + return newMockCollectionErr(nil) +} +func newMockCollectionErr(err error) *MockCollection { + return &MockCollection{ + deleteManyArgs: []interface{}{}, + deleteManyCalls: 0, + errorDeleteMany: err, + } +} + +func (this *MockCollection) UpdateOne(ctx context.Context, filter interface{}, update interface{}, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error) { + return nil, nil +} +func (this *MockCollection) BulkWrite(ctx context.Context, models []mongo.WriteModel, + opts ...*options.BulkWriteOptions) (*mongo.BulkWriteResult, error) { + return nil, nil +} +func (this *MockCollection) Distinct(ctx context.Context, fieldName string, filter interface{}, + opts ...*options.DistinctOptions) ([]interface{}, error) { + return nil, nil +} +func (this *MockCollection) DeleteMany(ctx context.Context, filter interface{}, + opts ...*options.DeleteOptions) (*mongo.DeleteResult, error) { + this.deleteManyArgs = append(this.deleteManyArgs, filter) + this.deleteManyCalls++ + return &mongo.DeleteResult{ + DeletedCount: 1, + }, this.errorDeleteMany +} +func (this *MockCollection) CountDocuments(ctx context.Context, filter interface{}, opts ...*options.CountOptions) (int64, error) { + return 0, nil +} +func (this *MockCollection) Find(ctx context.Context, filter interface{}, + opts ...*options.FindOptions) (cur *mongo.Cursor, err error) { + return nil, nil +} + func TestMongoStorageIntegration(t *testing.T) { if testing.Short() { t.Skip() @@ -233,3 +280,52 @@ func TestGetMongoMessageWithManyIds(t *testing.T) { message, ) } + +func TestRemove(t *testing.T) { + deleteChunkSize = 1 + defer func() { + deleteChunkSize = 100 + }() + + colMock := newMockCollection() + storage := &MongoStorage{ + messagesCollection: colMock, + } + + queue := "test_queue" + count, err := storage.Remove(context.Background(), queue, "1", "2") + require.NoError(t, err) + require.Equal(t, int64(2), count) + require.Equal(t, 2, colMock.deleteManyCalls) + require.Equal(t, []interface{}{bson.M{ + "queue": queue, + "id": bson.M{ + "$in": []string{"1"}, + }, + }, + bson.M{ + "queue": queue, + "id": bson.M{ + "$in": []string{"2"}, + }, + }}, colMock.deleteManyArgs) +} + +func TestRemoveErrors(t *testing.T) { + colMock := newMockCollectionErr(fmt.Errorf("Mocked error")) + storage := &MongoStorage{ + messagesCollection: colMock, + } + + queue := "test_queue" + count, err := storage.Remove(context.Background(), queue, "1", "2") + require.ErrorContains(t, err, "Mocked error") + require.Equal(t, int64(0), count) + require.Equal(t, 1, colMock.deleteManyCalls) + require.Equal(t, []interface{}{bson.M{ + "queue": queue, + "id": bson.M{ + "$in": []string{"1", "2"}, + }, + }}, colMock.deleteManyArgs) +}