Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: deletes queue messages by chunks #58

Merged
merged 2 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 41 additions & 13 deletions internal/queue/storage/mongo_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,30 @@ import (
"go.opentelemetry.io/otel/metric"
)

type MongoCollectionInterface interface {
UpdateOne(ctx context.Context, filter interface{}, update interface{}, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error)
BulkWrite(ctx context.Context, models []mongo.WriteModel,
opts ...*options.BulkWriteOptions) (*mongo.BulkWriteResult, error)
Distinct(ctx context.Context, fieldName string, filter interface{},
opts ...*options.DistinctOptions) ([]interface{}, error)
DeleteMany(ctx context.Context, filter interface{},
opts ...*options.DeleteOptions) (*mongo.DeleteResult, error)
CountDocuments(ctx context.Context, filter interface{}, opts ...*options.CountOptions) (int64, error)
Find(ctx context.Context, filter interface{},
opts ...*options.FindOptions) (cur *mongo.Cursor, err error)
}

// MongoStorage is an implementation of the Storage Interface using MongoDB.
type MongoStorage struct {
client *mongo.Client
clientPrimaryPreference *mongo.Client
messagesCollection *mongo.Collection
messagesCollection MongoCollectionInterface
messagesCollectionPrimaryRead *mongo.Collection
queueConfigurationCollection *mongo.Collection
}

var _ Storage = &MongoStorage{}
var deleteChunkSize = 100

func NewMongoStorage(ctx context.Context) (*MongoStorage, error) {
mongoSecondaryOpts := createOptions()
Expand Down Expand Up @@ -394,27 +408,41 @@ func (storage *MongoStorage) Remove(ctx context.Context, queue string, ids ...st
return 0, nil
}

now := dtime.Now()
defer func() {
metrics.StorageLatency.Record(ctx, dtime.ElapsedTime(now), metric.WithAttributes(attribute.String("op", "remove")))
}()

for i := 0; i < len(ids); i += deleteChunkSize {
res, err := attemptChunkDeletion(ctx, i, ids, queue, storage)
if err != nil {
return deleted, fmt.Errorf("error deleting storage elements: %w, ids = %v", err, ids)
}

deleted += res.DeletedCount
}

return deleted, nil
}

func attemptChunkDeletion(ctx context.Context, i int, ids []string, queue string, storage *MongoStorage) (*mongo.DeleteResult, error) {
end := i + deleteChunkSize
if end > len(ids) {
end = len(ids)
}
chunk := ids[i:end]

filter := bson.M{
"queue": queue,

"id": bson.M{
"$in": ids,
"$in": chunk,
},
}

logger.S(ctx).Debugw("Storage operation: delete many operation.", "filter", filter)

now := dtime.Now()
defer func() {
metrics.StorageLatency.Record(ctx, dtime.ElapsedTime(now), metric.WithAttributes(attribute.String("op", "remove")))
}()

res, err := storage.messagesCollection.DeleteMany(context.Background(), filter)
if err != nil {
return 0, fmt.Errorf("rrror deleting storage elements: %w", err)
}

return res.DeletedCount, nil
return res, err
}

func (storage *MongoStorage) Insert(ctx context.Context, messages ...*message.Message) (insertedCount int64, modifiedCount int64, err error) {
Expand Down
96 changes: 96 additions & 0 deletions internal/queue/storage/mongo_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package storage

import (
"context"
"fmt"
"os"
"testing"

Expand All @@ -12,8 +13,54 @@ import (
"github.com/takenet/deckard/internal/queue/message"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

type MockCollection struct {
deleteManyArgs []interface{}
deleteManyCalls int
errorDeleteMany error
}

func newMockCollection() *MockCollection {
return newMockCollectionErr(nil)
}
func newMockCollectionErr(err error) *MockCollection {
return &MockCollection{
deleteManyArgs: []interface{}{},
deleteManyCalls: 0,
errorDeleteMany: err,
}
}

func (this *MockCollection) UpdateOne(ctx context.Context, filter interface{}, update interface{}, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error) {
return nil, nil
}
func (this *MockCollection) BulkWrite(ctx context.Context, models []mongo.WriteModel,
opts ...*options.BulkWriteOptions) (*mongo.BulkWriteResult, error) {
return nil, nil
}
func (this *MockCollection) Distinct(ctx context.Context, fieldName string, filter interface{},
opts ...*options.DistinctOptions) ([]interface{}, error) {
return nil, nil
}
func (this *MockCollection) DeleteMany(ctx context.Context, filter interface{},
opts ...*options.DeleteOptions) (*mongo.DeleteResult, error) {
this.deleteManyArgs = append(this.deleteManyArgs, filter)
this.deleteManyCalls++
return &mongo.DeleteResult{
DeletedCount: 1,
}, this.errorDeleteMany
}
func (this *MockCollection) CountDocuments(ctx context.Context, filter interface{}, opts ...*options.CountOptions) (int64, error) {
return 0, nil
}
func (this *MockCollection) Find(ctx context.Context, filter interface{},
opts ...*options.FindOptions) (cur *mongo.Cursor, err error) {
return nil, nil
}

func TestMongoStorageIntegration(t *testing.T) {
if testing.Short() {
t.Skip()
Expand Down Expand Up @@ -233,3 +280,52 @@ func TestGetMongoMessageWithManyIds(t *testing.T) {
message,
)
}

func TestRemove(t *testing.T) {
deleteChunkSize = 1
defer func() {
deleteChunkSize = 100
}()

colMock := newMockCollection()
storage := &MongoStorage{
messagesCollection: colMock,
}

queue := "test_queue"
count, err := storage.Remove(context.Background(), queue, "1", "2")
require.NoError(t, err)
require.Equal(t, int64(2), count)
require.Equal(t, 2, colMock.deleteManyCalls)
require.Equal(t, []interface{}{bson.M{
"queue": queue,
"id": bson.M{
"$in": []string{"1"},
},
},
bson.M{
"queue": queue,
"id": bson.M{
"$in": []string{"2"},
},
}}, colMock.deleteManyArgs)
}

func TestRemoveErrors(t *testing.T) {
colMock := newMockCollectionErr(fmt.Errorf("Mocked error"))
storage := &MongoStorage{
messagesCollection: colMock,
}

queue := "test_queue"
count, err := storage.Remove(context.Background(), queue, "1", "2")
require.ErrorContains(t, err, "Mocked error")
require.Equal(t, int64(0), count)
require.Equal(t, 1, colMock.deleteManyCalls)
require.Equal(t, []interface{}{bson.M{
"queue": queue,
"id": bson.M{
"$in": []string{"1", "2"},
},
}}, colMock.deleteManyArgs)
}
Loading