Skip to content

Commit

Permalink
switch to manual projection scanning instead of automatic (#35)
Browse files Browse the repository at this point in the history
  • Loading branch information
runeanielsen authored Feb 11, 2023
1 parent c97cc50 commit abc6180
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,34 @@ public class DependencyInjectionTests
private IServiceProvider _serviceProvider;
private IEventStore _eventStore;

public DependencyInjectionTests(IServiceProvider serviceProvider, IEventStore eventStore)
public DependencyInjectionTests(
IServiceProvider serviceProvider,
IEventStore eventStore)
{
_serviceProvider = serviceProvider;

_eventStore = eventStore;
}

[Fact]
public void TestThatProjectionAreAutomaticallyPickedUpInIoCByEventStore()
public void TestRegistrationOfProjections()
{
_eventStore.ScanForProjections();

// Setup
var poopProjection = _serviceProvider.GetServices<IProjection>().First(p => p is PoopProjection) as PoopProjection;
var poopProjection = _serviceProvider
.GetServices<IProjection>()
.First(p => p is PoopProjection) as PoopProjection;

// Act
var snoopy = new DogAggregate(Guid.NewGuid(), "Snoopy");
snoopy.Poop(200);
_eventStore.Aggregates.Store(snoopy);

// Assert
poopProjection.PoopReport.Find(d => d.DogName == "Snoopy").PoopTotal.Should().Be(200);
poopProjection.PoopReport
.Find(d => d.DogName == "Snoopy")
.PoopTotal
.Should().Be(200);
}

[Fact]
Expand Down
1 change: 1 addition & 0 deletions OpenFTTH.EventSourcing/IEventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ public interface IEventStore
Task<long> CatchUpAsync(CancellationToken cancellationToken = default);
long? CurrentStreamVersion(Guid streamId);
Task<long?> CurrentStreamVersionAsync(Guid streamId);
void ScanForProjections();
}
}
1 change: 1 addition & 0 deletions OpenFTTH.EventSourcing/IProjectionRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ public interface IProjectionRepository
{
void Add(IProjection projection);
T Get<T>();
void ScanServiceProviderForProjections();
}
}
5 changes: 5 additions & 0 deletions OpenFTTH.EventSourcing/InMem/InMemEventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -142,5 +142,10 @@ public async Task<long> CatchUpAsync(CancellationToken cancellationToken = defau
// We -1 because we start at version 0.
return Task.FromResult((long?)_events[streamId].Count() - 1);
}

public void ScanForProjections()
{
_projectionRepository.ScanServiceProviderForProjections();
}
}
}
7 changes: 6 additions & 1 deletion OpenFTTH.EventSourcing/Postgres/PostgresEventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ private static List<string> GetMartenDotNetTypeFormat(List<IProjection> projecti

private long? GetNewestSequenceNumber()
{

string sql = $"SELECT MAX(seq_id) FROM {_store.Options.DatabaseSchemaName}.mt_events";
using var conn = new NpgsqlConnection(_connectionString);
using var cmd = new NpgsqlCommand(sql, conn);
Expand Down Expand Up @@ -395,5 +395,10 @@ await _projectionRepository

return (long?)result;
}

public void ScanForProjections()
{
_projectionRepository.ScanServiceProviderForProjections();
}
}
}
39 changes: 17 additions & 22 deletions OpenFTTH.EventSourcing/ProjectionRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ public class ProjectionRepository : IProjectionRepository
{
private readonly IServiceProvider _serviceProvider;
private readonly ConcurrentBag<IProjection> _projections = new ConcurrentBag<IProjection>();
private bool _projectionsHasBeScanned = false;

public ProjectionRepository(IServiceProvider serviceProvider)
{
Expand All @@ -28,8 +27,6 @@ internal List<IProjection> GetAll()

internal void ApplyEvents(IReadOnlyList<IEventEnvelope> events)
{
ScanServiceProviderForProjections();

foreach (var projection in _projections)
{
projection.Apply(events);
Expand All @@ -38,8 +35,6 @@ internal void ApplyEvents(IReadOnlyList<IEventEnvelope> events)

internal async Task ApplyEventsAsync(IReadOnlyList<IEventEnvelope> events)
{
ScanServiceProviderForProjections();

foreach (var projection in _projections)
{
await projection.ApplyAsync(events).ConfigureAwait(false);
Expand All @@ -48,8 +43,6 @@ internal async Task ApplyEventsAsync(IReadOnlyList<IEventEnvelope> events)

internal void ApplyEvent(IEventEnvelope @event)
{
ScanServiceProviderForProjections();

foreach (var projection in _projections)
{
projection.Apply(@event);
Expand All @@ -58,37 +51,39 @@ internal void ApplyEvent(IEventEnvelope @event)

internal async Task ApplyEventAsync(IEventEnvelope @event)
{
ScanServiceProviderForProjections();

foreach (var projection in _projections)
{
await projection.ApplyAsync(@event).ConfigureAwait(false);
}
}

private void ScanServiceProviderForProjections()
public void ScanServiceProviderForProjections()
{
if (!_projectionsHasBeScanned && _serviceProvider != null)
if (_serviceProvider is null)
{
throw new InvalidOperationException(
"Service provider is required to scan for projections.");
}

var projections = _serviceProvider.GetServices<IProjection>() ??
throw new InvalidOperationException(
$"Could not resolve any services implementing '{nameof(IProjection)}'.");

foreach (var projection in projections)
{
var projections = _serviceProvider.GetServices<IProjection>();
var containsProjection = _projections
.Any(existingProjection =>
existingProjection.GetType() == projection.GetType());

if (projections != null)
if (!containsProjection)
{
foreach (var projection in projections)
{
if (!_projections.Any(existingProjection => existingProjection.GetType() == projection.GetType()))
_projections.Add(projection);
}
_projections.Add(projection);
}
}

_projectionsHasBeScanned = true;
}

public T Get<T>()
{
ScanServiceProviderForProjections();

foreach (var projection in _projections)
{
if (projection is T)
Expand Down

0 comments on commit abc6180

Please sign in to comment.