Skip to content

Commit

Permalink
Actually map batches
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Jun 7, 2019
1 parent 621516d commit 1abf73c
Show file tree
Hide file tree
Showing 4 changed files with 3 additions and 1 deletion.
1 change: 1 addition & 0 deletions src/Propulsion.Cosmos/CosmosSink.fs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ type CosmosSink =
let dumpStreams (s : Scheduling.StreamStates<_>) l = s.Dump(l, Propulsion.Streams.Buffering.StreamState.eventsSize, categorize)
let streamScheduler = Internal.CosmosSchedulingEngine.Create(log, cosmosContexts, dispatcher, projectionStats, dumpStreams)
let mapBatch onCompletion (x : Submission.SubmissionBatch<StreamEvent<_>>) : Scheduling.StreamsBatch<_> =
let onCompletion () = x.onCompletion(); onCompletion()
Scheduling.StreamsBatch.Create(onCompletion, x.messages) |> fst
let submitBatch (x : Scheduling.StreamsBatch<_>) : int =
streamScheduler.Submit x
Expand Down
1 change: 1 addition & 0 deletions src/Propulsion.EventStore/EventStoreSink.fs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ type EventStoreSink =
let dumpStats (s : Scheduling.StreamStates<_>) l = s.Dump(l, Propulsion.Streams.Buffering.StreamState.eventsSize, categorize)
let streamScheduler = Internal.EventStoreSchedulingEngine.Create(log, storeLog, conns, dispatcher, projectionStats, dumpStats)
let mapBatch onCompletion (x : Submission.SubmissionBatch<StreamEvent<_>>) : Scheduling.StreamsBatch<_> =
let onCompletion () = x.onCompletion(); onCompletion()
Scheduling.StreamsBatch.Create(onCompletion, x.messages) |> fst
let submitBatch (x : Scheduling.StreamsBatch<_>) : int =
streamScheduler.Submit x
Expand Down
1 change: 0 additions & 1 deletion src/Propulsion.EventStore/EventStoreSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,5 @@ type EventStoreSource =
let cp = pos.CommitPosition
striper.Submit <| Message.Batch(seriesId, cp, checkpoints.Commit cp, xs)
let reader = Reader.EventStoreReader(conns, spec.batchSize, spec.minBatchSize, categorize, tryMapEvent, post, spec.tailInterval, dop)
log.Warning("Pumping reader")
do! reader.Pump(initialSeriesId, startPos, maxPos)
log.Warning("Finished Pumping reader") }
1 change: 1 addition & 0 deletions src/Propulsion/Streams.fs
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,7 @@ module Projector =
static member Start(log : Serilog.ILogger, pumpDispatcher, pumpScheduler, maxReadAhead, submitStreamsBatch, statsInterval, ?ingesterStatsInterval, ?maxSubmissionsPerPartition) =
let maxSubmissionsPerPartition = defaultArg maxSubmissionsPerPartition 5
let mapBatch onCompletion (x : Submission.SubmissionBatch<StreamEvent<_>>) : Scheduling.StreamsBatch<_> =
let onCompletion () = x.onCompletion(); onCompletion()
Scheduling.StreamsBatch.Create(onCompletion, x.messages) |> fst
let submitBatch (x : Scheduling.StreamsBatch<_>) : int =
submitStreamsBatch x
Expand Down

0 comments on commit 1abf73c

Please sign in to comment.