Skip to content

Commit

Permalink
Add EventSorter to warrant event order
Browse files Browse the repository at this point in the history
Add order to mongodb_v2 eventstore
  • Loading branch information
totemcaf committed Dec 11, 2023
1 parent eaed382 commit abedad9
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 3 deletions.
1 change: 1 addition & 0 deletions eventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type EventStore interface {
Load(context.Context, uuid.UUID) ([]Event, error)

// LoadFrom loads all events from version for the aggregate id from the store.
// Event store should provide events in version order
LoadFrom(ctx context.Context, id uuid.UUID, version int) ([]Event, error)

// Close closes the EventStore.
Expand Down
61 changes: 61 additions & 0 deletions eventstore/eventsorter/event_sorter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package eventsorter

import (
"context"
eh "github.com/looplab/eventhorizon"
"github.com/looplab/eventhorizon/uuid"
"sort"
)

// EventSorter is an event store wrapper that warrants events are provided in version order.
// Version order is required for event sourcing to work correctly.
// Use it with an event store that does not warrant version order.
type EventSorter struct {
inner eh.EventStore
}

var _ eh.EventStore = (*EventSorter)(nil)

// NewEventSorter creates a new EventSorter wrapping the provided event store
func NewEventSorter(inner eh.EventStore) *EventSorter {
return &EventSorter{inner: inner}
}

func (e EventSorter) Save(ctx context.Context, events []eh.Event, originalVersion int) error {
return e.inner.Save(ctx, events, originalVersion)
}

func (e EventSorter) Load(ctx context.Context, uuid uuid.UUID) ([]eh.Event, error) {
events, err := e.inner.Load(ctx, uuid)

if err != nil {
return nil, err
}

return e.SortEvents(events), nil
}

func (e EventSorter) LoadFrom(ctx context.Context, id uuid.UUID, version int) ([]eh.Event, error) {
events, err := e.inner.LoadFrom(ctx, id, version)

if err != nil {
return nil, err
}

return e.SortEvents(events), nil
}

func (e EventSorter) Close() error {
return e.inner.Close()
}

func (e EventSorter) SortEvents(events []eh.Event) []eh.Event {
byVersion := func(i, j int) bool {
return events[i].Version() < events[j].Version()
}

// It is ok to sort in place, events slice is already the inner store response
sort.Slice(events, byVersion)

return events
}
103 changes: 103 additions & 0 deletions eventstore/eventsorter/event_sorter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package eventsorter

import (
"context"
"github.com/AltScore/lcib-api/pkg/xeh/ehmocks"
eh "github.com/looplab/eventhorizon"
"github.com/looplab/eventhorizon/uuid"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"testing"
"time"
)

type EventSorterTestSuite struct {
suite.Suite

innerStore *ehmocks.EventStoreMock
eventSorter *EventSorter

unsortedEventList []eh.Event
}

// In order for 'go test' to run this suite, we need to create
// a normal test function and pass our suite to suite.Run
func TestEventSorterTestSuite(t *testing.T) {
suite.Run(t, &EventSorterTestSuite{})
}

// before each test
func (s *EventSorterTestSuite) SetupTest() {
s.innerStore = &ehmocks.EventStoreMock{}

s.eventSorter = NewEventSorter(s.innerStore)

s.unsortedEventList = []eh.Event{
eh.NewEvent("test", nil, time.Now(), eh.ForAggregate("test", uuid.New(), 3)),
eh.NewEvent("test", nil, time.Now(), eh.ForAggregate("test", uuid.New(), 2)),
eh.NewEvent("test", nil, time.Now(), eh.ForAggregate("test", uuid.New(), 1)),
}
}

func (s *EventSorterTestSuite) Test_can_sort_empty_event_list_on_Load() {
// Given a event store with no events
s.innerStore.On("Load", mock.Anything, mock.Anything).Return([]eh.Event{}, nil)

// When we load the events
events, err := s.eventSorter.Load(context.TODO(), uuid.New())

// Then no error is returned
s.NoError(err)

// And empty event list is returned
s.Len(events, 0)
}

func (s *EventSorterTestSuite) Test_can_sort_empty_event_list_on_LoafFrom() {
// Given a event store with no events
s.innerStore.On("LoadFrom", mock.Anything, mock.Anything, mock.Anything).Return([]eh.Event{}, nil)

// When we load the events
events, err := s.eventSorter.LoadFrom(context.TODO(), uuid.New(), 8)

// Then no error is returned
s.NoError(err)

// And empty event list is returned
s.Len(events, 0)
}

func (s *EventSorterTestSuite) Test_can_sort_event_list_on_Load() {
// Given a event store with no events
s.innerStore.On("Load", mock.Anything, mock.Anything).Return(s.unsortedEventList, nil)

// When we load the events
events, err := s.eventSorter.Load(context.TODO(), uuid.New())

// Then no error is returned
s.NoError(err)

// And the events are returned in version order
s.Len(events, 3)

s.Equal(1, events[0].Version())
s.Equal(2, events[1].Version())
s.Equal(3, events[2].Version())
}

func (s *EventSorterTestSuite) Test_can_sort_event_list_on_LoadFrom() {
// Given a event store with no events
s.innerStore.On("LoadFrom", mock.Anything, mock.Anything, 2).Return(s.unsortedEventList, nil)

// When we load the events
events, err := s.eventSorter.LoadFrom(context.TODO(), uuid.New(), 2)

// Then no error is returned
s.NoError(err)

// And the events are returned in version order
s.Len(events, 2)

s.Equal(2, events[0].Version())
s.Equal(3, events[1].Version())
}
3 changes: 2 additions & 1 deletion eventstore/mongodb/eventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package mongodb

import (
"context"
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -332,7 +333,7 @@ func (s *EventStore) LoadFrom(ctx context.Context, id uuid.UUID, version int) ([
var aggregate aggregateRecord
if err := s.aggregates.FindOne(ctx, bson.M{"_id": id}).Decode(&aggregate); err != nil {
// Translate to our own not found error.
if err == mongo.ErrNoDocuments {
if errors.Is(err, mongo.ErrNoDocuments) {
err = eh.ErrAggregateNotFound
}

Expand Down
7 changes: 5 additions & 2 deletions eventstore/mongodb_v2/eventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
// EventStore is an eventhorizon.EventStore for MongoDB, using one collection
// for all events and another to keep track of all aggregates/streams. It also
// keeps track of the global position of events, stored as metadata.
// This implementation warrants event order by Version on Load and LoadFrom methods.
type EventStore struct {
client *mongo.Client
clientOwnership clientOwnership
Expand Down Expand Up @@ -430,7 +431,8 @@ func (s *EventStore) Save(ctx context.Context, events []eh.Event, originalVersio

// Load implements the Load method of the eventhorizon.EventStore interface.
func (s *EventStore) Load(ctx context.Context, id uuid.UUID) ([]eh.Event, error) {
cursor, err := s.events.Find(ctx, bson.M{"aggregate_id": id})
opts := options.Find().SetSort(bson.M{"version": 1})
cursor, err := s.events.Find(ctx, bson.M{"aggregate_id": id}, opts)
if err != nil {
return nil, &eh.EventStoreError{
Err: fmt.Errorf("could not find event: %w", err),
Expand All @@ -444,7 +446,8 @@ func (s *EventStore) Load(ctx context.Context, id uuid.UUID) ([]eh.Event, error)

// LoadFrom implements LoadFrom method of the eventhorizon.SnapshotStore interface.
func (s *EventStore) LoadFrom(ctx context.Context, id uuid.UUID, version int) ([]eh.Event, error) {
cursor, err := s.events.Find(ctx, bson.M{"aggregate_id": id, "version": bson.M{"$gte": version}})
opts := options.Find().SetSort(bson.M{"version": 1})
cursor, err := s.events.Find(ctx, bson.M{"aggregate_id": id, "version": bson.M{"$gte": version}}, opts)
if err != nil {
return nil, &eh.EventStoreError{
Err: fmt.Errorf("could not find event: %w", err),
Expand Down

0 comments on commit abedad9

Please sign in to comment.