diff --git a/internal/queue/storage/mongo_storage.go b/internal/queue/storage/mongo_storage.go index 10e0d28..44549e8 100644 --- a/internal/queue/storage/mongo_storage.go +++ b/internal/queue/storage/mongo_storage.go @@ -414,24 +414,9 @@ func (storage *MongoStorage) Remove(ctx context.Context, queue string, ids ...st }() for i := 0; i < len(ids); i += deleteChunkSize { - end := i + deleteChunkSize - if end > len(ids) { - end = len(ids) - } - chunk := ids[i:end] - - filter := bson.M{ - "queue": queue, - "id": bson.M{ - "$in": chunk, - }, - } - - logger.S(ctx).Debugw("Storage operation: delete many operation.", "filter", filter) - - res, err := storage.messagesCollection.DeleteMany(context.Background(), filter) + res, err := attemptChunkDeletion(ctx, i, ids, queue, storage) if err != nil { - return deleted, fmt.Errorf("error deleting storage elements: %w", err) + return deleted, fmt.Errorf("error deleting storage elements: %w, ids = %v", err, ids) } deleted += res.DeletedCount @@ -440,6 +425,26 @@ func (storage *MongoStorage) Remove(ctx context.Context, queue string, ids ...st return deleted, nil } +func attemptChunkDeletion(ctx context.Context, i int, ids []string, queue string, storage *MongoStorage) (*mongo.DeleteResult, error) { + end := i + deleteChunkSize + if end > len(ids) { + end = len(ids) + } + chunk := ids[i:end] + + filter := bson.M{ + "queue": queue, + "id": bson.M{ + "$in": chunk, + }, + } + + logger.S(ctx).Debugw("Storage operation: delete many operation.", "filter", filter) + + res, err := storage.messagesCollection.DeleteMany(context.Background(), filter) + return res, err +} + func (storage *MongoStorage) Insert(ctx context.Context, messages ...*message.Message) (insertedCount int64, modifiedCount int64, err error) { updates := make([]mongo.WriteModel, 0, len(messages))