diff --git a/internal/queue/storage/mongo_storage.go b/internal/queue/storage/mongo_storage.go index 1daff2c..44549e8 100644 --- a/internal/queue/storage/mongo_storage.go +++ b/internal/queue/storage/mongo_storage.go @@ -27,16 +27,30 @@ import ( "go.opentelemetry.io/otel/metric" ) +type MongoCollectionInterface interface { + UpdateOne(ctx context.Context, filter interface{}, update interface{}, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error) + BulkWrite(ctx context.Context, models []mongo.WriteModel, + opts ...*options.BulkWriteOptions) (*mongo.BulkWriteResult, error) + Distinct(ctx context.Context, fieldName string, filter interface{}, + opts ...*options.DistinctOptions) ([]interface{}, error) + DeleteMany(ctx context.Context, filter interface{}, + opts ...*options.DeleteOptions) (*mongo.DeleteResult, error) + CountDocuments(ctx context.Context, filter interface{}, opts ...*options.CountOptions) (int64, error) + Find(ctx context.Context, filter interface{}, + opts ...*options.FindOptions) (cur *mongo.Cursor, err error) +} + // MongoStorage is an implementation of the Storage Interface using MongoDB. type MongoStorage struct { client *mongo.Client clientPrimaryPreference *mongo.Client - messagesCollection *mongo.Collection + messagesCollection MongoCollectionInterface messagesCollectionPrimaryRead *mongo.Collection queueConfigurationCollection *mongo.Collection } var _ Storage = &MongoStorage{} +var deleteChunkSize = 100 func NewMongoStorage(ctx context.Context) (*MongoStorage, error) { mongoSecondaryOpts := createOptions() @@ -394,27 +408,41 @@ func (storage *MongoStorage) Remove(ctx context.Context, queue string, ids ...st return 0, nil } + now := dtime.Now() + defer func() { + metrics.StorageLatency.Record(ctx, dtime.ElapsedTime(now), metric.WithAttributes(attribute.String("op", "remove"))) + }() + + for i := 0; i < len(ids); i += deleteChunkSize { + res, err := attemptChunkDeletion(ctx, i, ids, queue, storage) + if err != nil { + return deleted, fmt.Errorf("error deleting storage elements: %w, ids = %v", err, ids) + } + + deleted += res.DeletedCount + } + + return deleted, nil +} + +func attemptChunkDeletion(ctx context.Context, i int, ids []string, queue string, storage *MongoStorage) (*mongo.DeleteResult, error) { + end := i + deleteChunkSize + if end > len(ids) { + end = len(ids) + } + chunk := ids[i:end] + filter := bson.M{ "queue": queue, - "id": bson.M{ - "$in": ids, + "$in": chunk, }, } logger.S(ctx).Debugw("Storage operation: delete many operation.", "filter", filter) - now := dtime.Now() - defer func() { - metrics.StorageLatency.Record(ctx, dtime.ElapsedTime(now), metric.WithAttributes(attribute.String("op", "remove"))) - }() - res, err := storage.messagesCollection.DeleteMany(context.Background(), filter) - if err != nil { - return 0, fmt.Errorf("rrror deleting storage elements: %w", err) - } - - return res.DeletedCount, nil + return res, err } func (storage *MongoStorage) Insert(ctx context.Context, messages ...*message.Message) (insertedCount int64, modifiedCount int64, err error) { diff --git a/internal/queue/storage/mongo_storage_test.go b/internal/queue/storage/mongo_storage_test.go index 977b0b6..07f3709 100644 --- a/internal/queue/storage/mongo_storage_test.go +++ b/internal/queue/storage/mongo_storage_test.go @@ -2,6 +2,7 @@ package storage import ( "context" + "fmt" "os" "testing" @@ -12,8 +13,54 @@ import ( "github.com/takenet/deckard/internal/queue/message" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" ) +type MockCollection struct { + deleteManyArgs []interface{} + deleteManyCalls int + errorDeleteMany error +} + +func newMockCollection() *MockCollection { + return newMockCollectionErr(nil) +} +func newMockCollectionErr(err error) *MockCollection { + return &MockCollection{ + deleteManyArgs: []interface{}{}, + deleteManyCalls: 0, + errorDeleteMany: err, + } +} + +func (this *MockCollection) UpdateOne(ctx context.Context, filter interface{}, update interface{}, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error) { + return nil, nil +} +func (this *MockCollection) BulkWrite(ctx context.Context, models []mongo.WriteModel, + opts ...*options.BulkWriteOptions) (*mongo.BulkWriteResult, error) { + return nil, nil +} +func (this *MockCollection) Distinct(ctx context.Context, fieldName string, filter interface{}, + opts ...*options.DistinctOptions) ([]interface{}, error) { + return nil, nil +} +func (this *MockCollection) DeleteMany(ctx context.Context, filter interface{}, + opts ...*options.DeleteOptions) (*mongo.DeleteResult, error) { + this.deleteManyArgs = append(this.deleteManyArgs, filter) + this.deleteManyCalls++ + return &mongo.DeleteResult{ + DeletedCount: 1, + }, this.errorDeleteMany +} +func (this *MockCollection) CountDocuments(ctx context.Context, filter interface{}, opts ...*options.CountOptions) (int64, error) { + return 0, nil +} +func (this *MockCollection) Find(ctx context.Context, filter interface{}, + opts ...*options.FindOptions) (cur *mongo.Cursor, err error) { + return nil, nil +} + func TestMongoStorageIntegration(t *testing.T) { if testing.Short() { t.Skip() @@ -233,3 +280,52 @@ func TestGetMongoMessageWithManyIds(t *testing.T) { message, ) } + +func TestRemove(t *testing.T) { + deleteChunkSize = 1 + defer func() { + deleteChunkSize = 100 + }() + + colMock := newMockCollection() + storage := &MongoStorage{ + messagesCollection: colMock, + } + + queue := "test_queue" + count, err := storage.Remove(context.Background(), queue, "1", "2") + require.NoError(t, err) + require.Equal(t, int64(2), count) + require.Equal(t, 2, colMock.deleteManyCalls) + require.Equal(t, []interface{}{bson.M{ + "queue": queue, + "id": bson.M{ + "$in": []string{"1"}, + }, + }, + bson.M{ + "queue": queue, + "id": bson.M{ + "$in": []string{"2"}, + }, + }}, colMock.deleteManyArgs) +} + +func TestRemoveErrors(t *testing.T) { + colMock := newMockCollectionErr(fmt.Errorf("Mocked error")) + storage := &MongoStorage{ + messagesCollection: colMock, + } + + queue := "test_queue" + count, err := storage.Remove(context.Background(), queue, "1", "2") + require.ErrorContains(t, err, "Mocked error") + require.Equal(t, int64(0), count) + require.Equal(t, 1, colMock.deleteManyCalls) + require.Equal(t, []interface{}{bson.M{ + "queue": queue, + "id": bson.M{ + "$in": []string{"1", "2"}, + }, + }}, colMock.deleteManyArgs) +}