diff --git a/internal/queue/storage/mongo_storage.go b/internal/queue/storage/mongo_storage.go index 1daff2c..10e0d28 100644 --- a/internal/queue/storage/mongo_storage.go +++ b/internal/queue/storage/mongo_storage.go @@ -27,16 +27,30 @@ import ( "go.opentelemetry.io/otel/metric" ) +type MongoCollectionInterface interface { + UpdateOne(ctx context.Context, filter interface{}, update interface{}, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error) + BulkWrite(ctx context.Context, models []mongo.WriteModel, + opts ...*options.BulkWriteOptions) (*mongo.BulkWriteResult, error) + Distinct(ctx context.Context, fieldName string, filter interface{}, + opts ...*options.DistinctOptions) ([]interface{}, error) + DeleteMany(ctx context.Context, filter interface{}, + opts ...*options.DeleteOptions) (*mongo.DeleteResult, error) + CountDocuments(ctx context.Context, filter interface{}, opts ...*options.CountOptions) (int64, error) + Find(ctx context.Context, filter interface{}, + opts ...*options.FindOptions) (cur *mongo.Cursor, err error) +} + // MongoStorage is an implementation of the Storage Interface using MongoDB. type MongoStorage struct { client *mongo.Client clientPrimaryPreference *mongo.Client - messagesCollection *mongo.Collection + messagesCollection MongoCollectionInterface messagesCollectionPrimaryRead *mongo.Collection queueConfigurationCollection *mongo.Collection } var _ Storage = &MongoStorage{} +var deleteChunkSize = 100 func NewMongoStorage(ctx context.Context) (*MongoStorage, error) { mongoSecondaryOpts := createOptions() @@ -394,27 +408,36 @@ func (storage *MongoStorage) Remove(ctx context.Context, queue string, ids ...st return 0, nil } - filter := bson.M{ - "queue": queue, - - "id": bson.M{ - "$in": ids, - }, - } - - logger.S(ctx).Debugw("Storage operation: delete many operation.", "filter", filter) - now := dtime.Now() defer func() { metrics.StorageLatency.Record(ctx, dtime.ElapsedTime(now), metric.WithAttributes(attribute.String("op", "remove"))) }() - res, err := storage.messagesCollection.DeleteMany(context.Background(), filter) - if err != nil { - return 0, fmt.Errorf("rrror deleting storage elements: %w", err) + for i := 0; i < len(ids); i += deleteChunkSize { + end := i + deleteChunkSize + if end > len(ids) { + end = len(ids) + } + chunk := ids[i:end] + + filter := bson.M{ + "queue": queue, + "id": bson.M{ + "$in": chunk, + }, + } + + logger.S(ctx).Debugw("Storage operation: delete many operation.", "filter", filter) + + res, err := storage.messagesCollection.DeleteMany(context.Background(), filter) + if err != nil { + return deleted, fmt.Errorf("error deleting storage elements: %w", err) + } + + deleted += res.DeletedCount } - return res.DeletedCount, nil + return deleted, nil } func (storage *MongoStorage) Insert(ctx context.Context, messages ...*message.Message) (insertedCount int64, modifiedCount int64, err error) { diff --git a/internal/queue/storage/mongo_storage_test.go b/internal/queue/storage/mongo_storage_test.go index 977b0b6..673434b 100644 --- a/internal/queue/storage/mongo_storage_test.go +++ b/internal/queue/storage/mongo_storage_test.go @@ -12,8 +12,49 @@ import ( "github.com/takenet/deckard/internal/queue/message" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" ) +type MockCollection struct { + deleteManyArgs []interface{} + deleteManyCalls int +} + +func newMockCollection() *MockCollection { + return &MockCollection{ + deleteManyArgs: []interface{}{}, + deleteManyCalls: 0, + } +} + +func (this *MockCollection) UpdateOne(ctx context.Context, filter interface{}, update interface{}, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error) { + return nil, nil +} +func (this *MockCollection) BulkWrite(ctx context.Context, models []mongo.WriteModel, + opts ...*options.BulkWriteOptions) (*mongo.BulkWriteResult, error) { + return nil, nil +} +func (this *MockCollection) Distinct(ctx context.Context, fieldName string, filter interface{}, + opts ...*options.DistinctOptions) ([]interface{}, error) { + return nil, nil +} +func (this *MockCollection) DeleteMany(ctx context.Context, filter interface{}, + opts ...*options.DeleteOptions) (*mongo.DeleteResult, error) { + this.deleteManyArgs = append(this.deleteManyArgs, filter) + this.deleteManyCalls++ + return &mongo.DeleteResult{ + DeletedCount: 1, + }, nil +} +func (this *MockCollection) CountDocuments(ctx context.Context, filter interface{}, opts ...*options.CountOptions) (int64, error) { + return 0, nil +} +func (this *MockCollection) Find(ctx context.Context, filter interface{}, + opts ...*options.FindOptions) (cur *mongo.Cursor, err error) { + return nil, nil +} + func TestMongoStorageIntegration(t *testing.T) { if testing.Short() { t.Skip() @@ -233,3 +274,29 @@ func TestGetMongoMessageWithManyIds(t *testing.T) { message, ) } + +func TestRemove(t *testing.T) { + deleteChunkSize = 1 + colMock := newMockCollection() + storage := &MongoStorage{ + messagesCollection: colMock, + } + + queue := "test_queue" + count, err := storage.Remove(context.Background(), queue, "1", "2") + require.NoError(t, err) + require.Equal(t, int64(2), count) + require.Equal(t, 2, colMock.deleteManyCalls) + require.Equal(t, []interface{}{bson.M{ + "queue": queue, + "id": bson.M{ + "$in": []string{"1"}, + }, + }, + bson.M{ + "queue": queue, + "id": bson.M{ + "$in": []string{"2"}, + }, + }}, colMock.deleteManyArgs) +}