From 1abf73c99e5b4cfa1789d480e5ea893e0081db3e Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Fri, 7 Jun 2019 17:22:09 +0100 Subject: [PATCH] Actually map batches --- src/Propulsion.Cosmos/CosmosSink.fs | 1 + src/Propulsion.EventStore/EventStoreSink.fs | 1 + src/Propulsion.EventStore/EventStoreSource.fs | 1 - src/Propulsion/Streams.fs | 1 + 4 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/Propulsion.Cosmos/CosmosSink.fs b/src/Propulsion.Cosmos/CosmosSink.fs index de5d61f6..7fcf8840 100644 --- a/src/Propulsion.Cosmos/CosmosSink.fs +++ b/src/Propulsion.Cosmos/CosmosSink.fs @@ -173,6 +173,7 @@ type CosmosSink = let dumpStreams (s : Scheduling.StreamStates<_>) l = s.Dump(l, Propulsion.Streams.Buffering.StreamState.eventsSize, categorize) let streamScheduler = Internal.CosmosSchedulingEngine.Create(log, cosmosContexts, dispatcher, projectionStats, dumpStreams) let mapBatch onCompletion (x : Submission.SubmissionBatch>) : Scheduling.StreamsBatch<_> = + let onCompletion () = x.onCompletion(); onCompletion() Scheduling.StreamsBatch.Create(onCompletion, x.messages) |> fst let submitBatch (x : Scheduling.StreamsBatch<_>) : int = streamScheduler.Submit x diff --git a/src/Propulsion.EventStore/EventStoreSink.fs b/src/Propulsion.EventStore/EventStoreSink.fs index af48fc54..6ff33939 100644 --- a/src/Propulsion.EventStore/EventStoreSink.fs +++ b/src/Propulsion.EventStore/EventStoreSink.fs @@ -151,6 +151,7 @@ type EventStoreSink = let dumpStats (s : Scheduling.StreamStates<_>) l = s.Dump(l, Propulsion.Streams.Buffering.StreamState.eventsSize, categorize) let streamScheduler = Internal.EventStoreSchedulingEngine.Create(log, storeLog, conns, dispatcher, projectionStats, dumpStats) let mapBatch onCompletion (x : Submission.SubmissionBatch>) : Scheduling.StreamsBatch<_> = + let onCompletion () = x.onCompletion(); onCompletion() Scheduling.StreamsBatch.Create(onCompletion, x.messages) |> fst let submitBatch (x : Scheduling.StreamsBatch<_>) : int = streamScheduler.Submit x diff --git a/src/Propulsion.EventStore/EventStoreSource.fs b/src/Propulsion.EventStore/EventStoreSource.fs index 6188a6b3..7ded5d6a 100644 --- a/src/Propulsion.EventStore/EventStoreSource.fs +++ b/src/Propulsion.EventStore/EventStoreSource.fs @@ -95,6 +95,5 @@ type EventStoreSource = let cp = pos.CommitPosition striper.Submit <| Message.Batch(seriesId, cp, checkpoints.Commit cp, xs) let reader = Reader.EventStoreReader(conns, spec.batchSize, spec.minBatchSize, categorize, tryMapEvent, post, spec.tailInterval, dop) - log.Warning("Pumping reader") do! reader.Pump(initialSeriesId, startPos, maxPos) log.Warning("Finished Pumping reader") } \ No newline at end of file diff --git a/src/Propulsion/Streams.fs b/src/Propulsion/Streams.fs index 34438424..055da720 100644 --- a/src/Propulsion/Streams.fs +++ b/src/Propulsion/Streams.fs @@ -602,6 +602,7 @@ module Projector = static member Start(log : Serilog.ILogger, pumpDispatcher, pumpScheduler, maxReadAhead, submitStreamsBatch, statsInterval, ?ingesterStatsInterval, ?maxSubmissionsPerPartition) = let maxSubmissionsPerPartition = defaultArg maxSubmissionsPerPartition 5 let mapBatch onCompletion (x : Submission.SubmissionBatch>) : Scheduling.StreamsBatch<_> = + let onCompletion () = x.onCompletion(); onCompletion() Scheduling.StreamsBatch.Create(onCompletion, x.messages) |> fst let submitBatch (x : Scheduling.StreamsBatch<_>) : int = submitStreamsBatch x