Skip to content

Commit

Permalink
fix: deletes queues by chunks
Browse files Browse the repository at this point in the history
* errors were observed while the housekeeper attempt to clear queues, the biggest doc didnt have the mongo max of 16MB, so the whole query result was the problem.

Signed-off-by: Cézar <[email protected]>
  • Loading branch information
cezar-tech committed Dec 11, 2024
1 parent a738047 commit 2d4957e
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 15 deletions.
53 changes: 38 additions & 15 deletions internal/queue/storage/mongo_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,30 @@ import (
"go.opentelemetry.io/otel/metric"
)

type MongoCollectionInterface interface {
UpdateOne(ctx context.Context, filter interface{}, update interface{}, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error)
BulkWrite(ctx context.Context, models []mongo.WriteModel,
opts ...*options.BulkWriteOptions) (*mongo.BulkWriteResult, error)
Distinct(ctx context.Context, fieldName string, filter interface{},
opts ...*options.DistinctOptions) ([]interface{}, error)
DeleteMany(ctx context.Context, filter interface{},
opts ...*options.DeleteOptions) (*mongo.DeleteResult, error)
CountDocuments(ctx context.Context, filter interface{}, opts ...*options.CountOptions) (int64, error)
Find(ctx context.Context, filter interface{},
opts ...*options.FindOptions) (cur *mongo.Cursor, err error)
}

// MongoStorage is an implementation of the Storage Interface using MongoDB.
type MongoStorage struct {
client *mongo.Client
clientPrimaryPreference *mongo.Client
messagesCollection *mongo.Collection
messagesCollection MongoCollectionInterface
messagesCollectionPrimaryRead *mongo.Collection
queueConfigurationCollection *mongo.Collection
}

var _ Storage = &MongoStorage{}
var deleteChunkSize = 100

func NewMongoStorage(ctx context.Context) (*MongoStorage, error) {
mongoSecondaryOpts := createOptions()
Expand Down Expand Up @@ -394,27 +408,36 @@ func (storage *MongoStorage) Remove(ctx context.Context, queue string, ids ...st
return 0, nil
}

filter := bson.M{
"queue": queue,

"id": bson.M{
"$in": ids,
},
}

logger.S(ctx).Debugw("Storage operation: delete many operation.", "filter", filter)

now := dtime.Now()
defer func() {
metrics.StorageLatency.Record(ctx, dtime.ElapsedTime(now), metric.WithAttributes(attribute.String("op", "remove")))
}()

res, err := storage.messagesCollection.DeleteMany(context.Background(), filter)
if err != nil {
return 0, fmt.Errorf("rrror deleting storage elements: %w", err)
for i := 0; i < len(ids); i += deleteChunkSize {
end := i + deleteChunkSize
if end > len(ids) {
end = len(ids)
}
chunk := ids[i:end]

filter := bson.M{
"queue": queue,
"id": bson.M{
"$in": chunk,
},
}

logger.S(ctx).Debugw("Storage operation: delete many operation.", "filter", filter)

res, err := storage.messagesCollection.DeleteMany(context.Background(), filter)
if err != nil {
return deleted, fmt.Errorf("error deleting storage elements: %w", err)
}

Check warning on line 435 in internal/queue/storage/mongo_storage.go

View check run for this annotation

Codecov / codecov/patch

internal/queue/storage/mongo_storage.go#L434-L435

Added lines #L434 - L435 were not covered by tests

deleted += res.DeletedCount
}

return res.DeletedCount, nil
return deleted, nil
}

func (storage *MongoStorage) Insert(ctx context.Context, messages ...*message.Message) (insertedCount int64, modifiedCount int64, err error) {
Expand Down
67 changes: 67 additions & 0 deletions internal/queue/storage/mongo_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,49 @@ import (
"github.com/takenet/deckard/internal/queue/message"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

type MockCollection struct {
deleteManyArgs []interface{}
deleteManyCalls int
}

func newMockCollection() *MockCollection {
return &MockCollection{
deleteManyArgs: []interface{}{},
deleteManyCalls: 0,
}
}

func (this *MockCollection) UpdateOne(ctx context.Context, filter interface{}, update interface{}, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error) {
return nil, nil
}
func (this *MockCollection) BulkWrite(ctx context.Context, models []mongo.WriteModel,
opts ...*options.BulkWriteOptions) (*mongo.BulkWriteResult, error) {
return nil, nil
}
func (this *MockCollection) Distinct(ctx context.Context, fieldName string, filter interface{},
opts ...*options.DistinctOptions) ([]interface{}, error) {
return nil, nil
}
func (this *MockCollection) DeleteMany(ctx context.Context, filter interface{},
opts ...*options.DeleteOptions) (*mongo.DeleteResult, error) {
this.deleteManyArgs = append(this.deleteManyArgs, filter)
this.deleteManyCalls++
return &mongo.DeleteResult{
DeletedCount: 1,
}, nil
}
func (this *MockCollection) CountDocuments(ctx context.Context, filter interface{}, opts ...*options.CountOptions) (int64, error) {
return 0, nil
}
func (this *MockCollection) Find(ctx context.Context, filter interface{},
opts ...*options.FindOptions) (cur *mongo.Cursor, err error) {
return nil, nil
}

func TestMongoStorageIntegration(t *testing.T) {
if testing.Short() {
t.Skip()
Expand Down Expand Up @@ -233,3 +274,29 @@ func TestGetMongoMessageWithManyIds(t *testing.T) {
message,
)
}

func TestRemove(t *testing.T) {
deleteChunkSize = 1
colMock := newMockCollection()
storage := &MongoStorage{
messagesCollection: colMock,
}

queue := "test_queue"
count, err := storage.Remove(context.Background(), queue, "1", "2")
require.NoError(t, err)
require.Equal(t, int64(2), count)
require.Equal(t, 2, colMock.deleteManyCalls)
require.Equal(t, []interface{}{bson.M{
"queue": queue,
"id": bson.M{
"$in": []string{"1"},
},
},
bson.M{
"queue": queue,
"id": bson.M{
"$in": []string{"2"},
},
}}, colMock.deleteManyArgs)
}

0 comments on commit 2d4957e

Please sign in to comment.