Skip to content

Commit

Permalink
Let configure sort option for mongodb event store
Browse files Browse the repository at this point in the history
  • Loading branch information
totemcaf committed Dec 14, 2023
1 parent dd4717c commit e0dce60
Showing 1 changed file with 22 additions and 6 deletions.
28 changes: 22 additions & 6 deletions eventstore/mongodb_v2/eventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import (
// EventStore is an eventhorizon.EventStore for MongoDB, using one collection
// for all events and another to keep track of all aggregates/streams. It also
// keeps track of the global position of events, stored as metadata.
// This implementation warrants event order by Version on Load and LoadFrom methods.
// This implementation warrants event order by Version on Load and LoadFrom methods (configurable, see WithSortEventsOnDB).
type EventStore struct {
client *mongo.Client
clientOwnership clientOwnership
Expand All @@ -53,6 +53,7 @@ type EventStore struct {
eventHandlerAfterSave eh.EventHandler
eventHandlerInTX eh.EventHandler
skipNonRegisteredEvents bool
sortEventsOnDb bool // if true, events will be sorted on DB side. Default is false for backward compatibility.
}

type clientOwnership int
Expand Down Expand Up @@ -224,6 +225,16 @@ func WithSnapshotCollectionName(snapshotColl string) Option {
}
}

// WithSortEventsOnDB enables sorting events on DB.
// Without this option, events order should be warranted by DB default ordering. This is not the case for MongoDB.
func WithSortEventsOnDB() Option {
return func(s *EventStore) error {
s.sortEventsOnDb = true

return nil
}
}

// Save implements the Save method of the eventhorizon.EventStore interface.
func (s *EventStore) Save(ctx context.Context, events []eh.Event, originalVersion int) error {
if len(events) == 0 {
Expand Down Expand Up @@ -431,8 +442,7 @@ func (s *EventStore) Save(ctx context.Context, events []eh.Event, originalVersio

// Load implements the Load method of the eventhorizon.EventStore interface.
func (s *EventStore) Load(ctx context.Context, id uuid.UUID) ([]eh.Event, error) {
opts := options.Find().SetSort(bson.M{"version": 1})
cursor, err := s.events.Find(ctx, bson.M{"aggregate_id": id}, opts)
cursor, err := s.events.Find(ctx, bson.M{"aggregate_id": id}, s.makeFindOptions())
if err != nil {
return nil, &eh.EventStoreError{
Err: fmt.Errorf("could not find event: %w", err),
Expand All @@ -446,8 +456,7 @@ func (s *EventStore) Load(ctx context.Context, id uuid.UUID) ([]eh.Event, error)

// LoadFrom implements LoadFrom method of the eventhorizon.SnapshotStore interface.
func (s *EventStore) LoadFrom(ctx context.Context, id uuid.UUID, version int) ([]eh.Event, error) {
opts := options.Find().SetSort(bson.M{"version": 1})
cursor, err := s.events.Find(ctx, bson.M{"aggregate_id": id, "version": bson.M{"$gte": version}}, opts)
cursor, err := s.events.Find(ctx, bson.M{"aggregate_id": id, "version": bson.M{"$gte": version}}, s.makeFindOptions())
if err != nil {
return nil, &eh.EventStoreError{
Err: fmt.Errorf("could not find event: %w", err),
Expand Down Expand Up @@ -527,7 +536,7 @@ func (s *EventStore) loadFromCursor(ctx context.Context, id uuid.UUID, cursor *m
}

func (s *EventStore) LoadSnapshot(ctx context.Context, id uuid.UUID) (*eh.Snapshot, error) {
result := s.snapshots.FindOne(ctx, bson.M{"aggregate_id": id}, options.FindOne().SetSort(bson.M{"version": -1}))
result := s.snapshots.FindOne(ctx, bson.M{"aggregate_id": id})
if err := result.Err(); err != nil {
if errors.Is(err, mongo.ErrNoDocuments) {
return nil, nil
Expand Down Expand Up @@ -577,6 +586,13 @@ func (s *EventStore) LoadSnapshot(ctx context.Context, id uuid.UUID) (*eh.Snapsh
return snapshot, nil
}

func (s *EventStore) makeFindOptions() *mongoOptions.FindOptions {
if s.sortEventsOnDb {
return options.Find().SetSort(bson.M{"version": -1})
}
return options.Find()
}

func (s *EventStore) SaveSnapshot(ctx context.Context, id uuid.UUID, snapshot eh.Snapshot) (err error) {
if snapshot.AggregateType == "" {
return &eh.EventStoreError{
Expand Down

0 comments on commit e0dce60

Please sign in to comment.