Skip to content

Commit

Permalink
fix: deletes queues by chunks
Browse files Browse the repository at this point in the history
* errors were observed while the housekeeper attempt to clear queues, the biggest doc didnt have the mongo max of 16MB, so the whole query result was the problem.

Signed-off-by: Cézar <[email protected]>
  • Loading branch information
cezar-tech committed Dec 12, 2024
1 parent 0ff0898 commit c5b5203
Showing 1 changed file with 22 additions and 17 deletions.
39 changes: 22 additions & 17 deletions internal/queue/storage/mongo_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,24 +414,9 @@ func (storage *MongoStorage) Remove(ctx context.Context, queue string, ids ...st
}()

for i := 0; i < len(ids); i += deleteChunkSize {
end := i + deleteChunkSize
if end > len(ids) {
end = len(ids)
}
chunk := ids[i:end]

filter := bson.M{
"queue": queue,
"id": bson.M{
"$in": chunk,
},
}

logger.S(ctx).Debugw("Storage operation: delete many operation.", "filter", filter)

res, err := storage.messagesCollection.DeleteMany(context.Background(), filter)
res, err := attemptChunkDeletion(ctx, i, ids, queue, storage)
if err != nil {
return deleted, fmt.Errorf("error deleting storage elements: %w", err)
return deleted, fmt.Errorf("error deleting storage elements: %w, ids = %v", err, ids)
}

deleted += res.DeletedCount
Expand All @@ -440,6 +425,26 @@ func (storage *MongoStorage) Remove(ctx context.Context, queue string, ids ...st
return deleted, nil
}

func attemptChunkDeletion(ctx context.Context, i int, ids []string, queue string, storage *MongoStorage) (*mongo.DeleteResult, error) {
end := i + deleteChunkSize
if end > len(ids) {
end = len(ids)
}
chunk := ids[i:end]

filter := bson.M{
"queue": queue,
"id": bson.M{
"$in": chunk,
},
}

logger.S(ctx).Debugw("Storage operation: delete many operation.", "filter", filter)

res, err := storage.messagesCollection.DeleteMany(context.Background(), filter)
return res, err
}

func (storage *MongoStorage) Insert(ctx context.Context, messages ...*message.Message) (insertedCount int64, modifiedCount int64, err error) {
updates := make([]mongo.WriteModel, 0, len(messages))

Expand Down

0 comments on commit c5b5203

Please sign in to comment.