From c5b520305ec536ef43355abd1bd1a93a69b5ae30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9zar?= Date: Wed, 11 Dec 2024 15:49:33 -0300 Subject: [PATCH] fix: deletes queues by chunks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * errors were observed while the housekeeper attempt to clear queues, the biggest doc didnt have the mongo max of 16MB, so the whole query result was the problem. Signed-off-by: Cézar --- internal/queue/storage/mongo_storage.go | 39 ++++++++++++++----------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/internal/queue/storage/mongo_storage.go b/internal/queue/storage/mongo_storage.go index 10e0d28..44549e8 100644 --- a/internal/queue/storage/mongo_storage.go +++ b/internal/queue/storage/mongo_storage.go @@ -414,24 +414,9 @@ func (storage *MongoStorage) Remove(ctx context.Context, queue string, ids ...st }() for i := 0; i < len(ids); i += deleteChunkSize { - end := i + deleteChunkSize - if end > len(ids) { - end = len(ids) - } - chunk := ids[i:end] - - filter := bson.M{ - "queue": queue, - "id": bson.M{ - "$in": chunk, - }, - } - - logger.S(ctx).Debugw("Storage operation: delete many operation.", "filter", filter) - - res, err := storage.messagesCollection.DeleteMany(context.Background(), filter) + res, err := attemptChunkDeletion(ctx, i, ids, queue, storage) if err != nil { - return deleted, fmt.Errorf("error deleting storage elements: %w", err) + return deleted, fmt.Errorf("error deleting storage elements: %w, ids = %v", err, ids) } deleted += res.DeletedCount @@ -440,6 +425,26 @@ func (storage *MongoStorage) Remove(ctx context.Context, queue string, ids ...st return deleted, nil } +func attemptChunkDeletion(ctx context.Context, i int, ids []string, queue string, storage *MongoStorage) (*mongo.DeleteResult, error) { + end := i + deleteChunkSize + if end > len(ids) { + end = len(ids) + } + chunk := ids[i:end] + + filter := bson.M{ + "queue": queue, + "id": bson.M{ + "$in": chunk, + }, + } + + logger.S(ctx).Debugw("Storage operation: delete many operation.", "filter", filter) + + res, err := storage.messagesCollection.DeleteMany(context.Background(), filter) + return res, err +} + func (storage *MongoStorage) Insert(ctx context.Context, messages ...*message.Message) (insertedCount int64, modifiedCount int64, err error) { updates := make([]mongo.WriteModel, 0, len(messages))