From abc618022904ecd3eb4c697d7f594b6e96a02fbb Mon Sep 17 00:00:00 2001 From: Rune Nielsen Date: Sat, 11 Feb 2023 15:43:40 +0100 Subject: [PATCH] switch to manual projection scanning instead of automatic (#35) --- .../DependencyInjectionTests.cs | 17 +++++--- OpenFTTH.EventSourcing/IEventStore.cs | 1 + .../IProjectionRepository.cs | 1 + .../InMem/InMemEventStore.cs | 5 +++ .../Postgres/PostgresEventStore.cs | 7 +++- .../ProjectionRepository.cs | 39 ++++++++----------- 6 files changed, 42 insertions(+), 28 deletions(-) diff --git a/OpenFTTH.EventSourcing.Tests/DependencyInjection/DependencyInjectionTests.cs b/OpenFTTH.EventSourcing.Tests/DependencyInjection/DependencyInjectionTests.cs index 6a0eb72..89748f5 100644 --- a/OpenFTTH.EventSourcing.Tests/DependencyInjection/DependencyInjectionTests.cs +++ b/OpenFTTH.EventSourcing.Tests/DependencyInjection/DependencyInjectionTests.cs @@ -12,19 +12,23 @@ public class DependencyInjectionTests private IServiceProvider _serviceProvider; private IEventStore _eventStore; - public DependencyInjectionTests(IServiceProvider serviceProvider, IEventStore eventStore) + public DependencyInjectionTests( + IServiceProvider serviceProvider, + IEventStore eventStore) { _serviceProvider = serviceProvider; - _eventStore = eventStore; } [Fact] - public void TestThatProjectionAreAutomaticallyPickedUpInIoCByEventStore() + public void TestRegistrationOfProjections() { + _eventStore.ScanForProjections(); // Setup - var poopProjection = _serviceProvider.GetServices().First(p => p is PoopProjection) as PoopProjection; + var poopProjection = _serviceProvider + .GetServices() + .First(p => p is PoopProjection) as PoopProjection; // Act var snoopy = new DogAggregate(Guid.NewGuid(), "Snoopy"); @@ -32,7 +36,10 @@ public void TestThatProjectionAreAutomaticallyPickedUpInIoCByEventStore() _eventStore.Aggregates.Store(snoopy); // Assert - poopProjection.PoopReport.Find(d => d.DogName == "Snoopy").PoopTotal.Should().Be(200); + poopProjection.PoopReport + .Find(d => d.DogName == "Snoopy") + .PoopTotal + .Should().Be(200); } [Fact] diff --git a/OpenFTTH.EventSourcing/IEventStore.cs b/OpenFTTH.EventSourcing/IEventStore.cs index 6eba3cd..6ba975c 100644 --- a/OpenFTTH.EventSourcing/IEventStore.cs +++ b/OpenFTTH.EventSourcing/IEventStore.cs @@ -22,5 +22,6 @@ public interface IEventStore Task CatchUpAsync(CancellationToken cancellationToken = default); long? CurrentStreamVersion(Guid streamId); Task CurrentStreamVersionAsync(Guid streamId); + void ScanForProjections(); } } diff --git a/OpenFTTH.EventSourcing/IProjectionRepository.cs b/OpenFTTH.EventSourcing/IProjectionRepository.cs index 397980f..f22a98e 100644 --- a/OpenFTTH.EventSourcing/IProjectionRepository.cs +++ b/OpenFTTH.EventSourcing/IProjectionRepository.cs @@ -4,5 +4,6 @@ public interface IProjectionRepository { void Add(IProjection projection); T Get(); + void ScanServiceProviderForProjections(); } } diff --git a/OpenFTTH.EventSourcing/InMem/InMemEventStore.cs b/OpenFTTH.EventSourcing/InMem/InMemEventStore.cs index bc854a5..8be5732 100644 --- a/OpenFTTH.EventSourcing/InMem/InMemEventStore.cs +++ b/OpenFTTH.EventSourcing/InMem/InMemEventStore.cs @@ -142,5 +142,10 @@ public async Task CatchUpAsync(CancellationToken cancellationToken = defau // We -1 because we start at version 0. return Task.FromResult((long?)_events[streamId].Count() - 1); } + + public void ScanForProjections() + { + _projectionRepository.ScanServiceProviderForProjections(); + } } } diff --git a/OpenFTTH.EventSourcing/Postgres/PostgresEventStore.cs b/OpenFTTH.EventSourcing/Postgres/PostgresEventStore.cs index 5a58ea3..f428581 100644 --- a/OpenFTTH.EventSourcing/Postgres/PostgresEventStore.cs +++ b/OpenFTTH.EventSourcing/Postgres/PostgresEventStore.cs @@ -315,7 +315,7 @@ private static List GetMartenDotNetTypeFormat(List projecti private long? GetNewestSequenceNumber() { - + string sql = $"SELECT MAX(seq_id) FROM {_store.Options.DatabaseSchemaName}.mt_events"; using var conn = new NpgsqlConnection(_connectionString); using var cmd = new NpgsqlCommand(sql, conn); @@ -395,5 +395,10 @@ await _projectionRepository return (long?)result; } + + public void ScanForProjections() + { + _projectionRepository.ScanServiceProviderForProjections(); + } } } diff --git a/OpenFTTH.EventSourcing/ProjectionRepository.cs b/OpenFTTH.EventSourcing/ProjectionRepository.cs index 6169ad7..fcb61c7 100644 --- a/OpenFTTH.EventSourcing/ProjectionRepository.cs +++ b/OpenFTTH.EventSourcing/ProjectionRepository.cs @@ -11,7 +11,6 @@ public class ProjectionRepository : IProjectionRepository { private readonly IServiceProvider _serviceProvider; private readonly ConcurrentBag _projections = new ConcurrentBag(); - private bool _projectionsHasBeScanned = false; public ProjectionRepository(IServiceProvider serviceProvider) { @@ -28,8 +27,6 @@ internal List GetAll() internal void ApplyEvents(IReadOnlyList events) { - ScanServiceProviderForProjections(); - foreach (var projection in _projections) { projection.Apply(events); @@ -38,8 +35,6 @@ internal void ApplyEvents(IReadOnlyList events) internal async Task ApplyEventsAsync(IReadOnlyList events) { - ScanServiceProviderForProjections(); - foreach (var projection in _projections) { await projection.ApplyAsync(events).ConfigureAwait(false); @@ -48,8 +43,6 @@ internal async Task ApplyEventsAsync(IReadOnlyList events) internal void ApplyEvent(IEventEnvelope @event) { - ScanServiceProviderForProjections(); - foreach (var projection in _projections) { projection.Apply(@event); @@ -58,37 +51,39 @@ internal void ApplyEvent(IEventEnvelope @event) internal async Task ApplyEventAsync(IEventEnvelope @event) { - ScanServiceProviderForProjections(); - foreach (var projection in _projections) { await projection.ApplyAsync(@event).ConfigureAwait(false); } } - private void ScanServiceProviderForProjections() + public void ScanServiceProviderForProjections() { - if (!_projectionsHasBeScanned && _serviceProvider != null) + if (_serviceProvider is null) + { + throw new InvalidOperationException( + "Service provider is required to scan for projections."); + } + + var projections = _serviceProvider.GetServices() ?? + throw new InvalidOperationException( + $"Could not resolve any services implementing '{nameof(IProjection)}'."); + + foreach (var projection in projections) { - var projections = _serviceProvider.GetServices(); + var containsProjection = _projections + .Any(existingProjection => + existingProjection.GetType() == projection.GetType()); - if (projections != null) + if (!containsProjection) { - foreach (var projection in projections) - { - if (!_projections.Any(existingProjection => existingProjection.GetType() == projection.GetType())) - _projections.Add(projection); - } + _projections.Add(projection); } } - - _projectionsHasBeScanned = true; } public T Get() { - ScanServiceProviderForProjections(); - foreach (var projection in _projections) { if (projection is T)