diff --git a/CHANGELOG.md b/CHANGELOG.md index 3c67b5ff..ef056495 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,9 @@ The `Unreleased` section name is replaced by the expected version of next releas ## [Unreleased] ### Added + +- `Propulsion.EventStore.EventStoreSource` (productized from `Equinox.Templates`'s `eqxsync`) + ### Changed - Targets `Microsoft.Azure.DocumentDB.ChangeFeedProcessor` v `2.2.7`, which includes critical lease management improvements diff --git a/README.md b/README.md index bd1293ba..eb348807 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Propulsion [![Build Status](https://dev.azure.com/jet-opensource/opensource/_apis/build/status/jet.Propulsion)](https://dev.azure.com/jet-opensource/opensource/_build/latest?definitionId=7) [![release](https://img.shields.io/github/release-pre/jet/propulsion.svg)](https://github.com/jet/propulsion/releases) [![NuGet](https://img.shields.io/nuget/vpre/Propulsion.svg?logo=nuget)](https://www.nuget.org/packages/Propulsion/) [![license](https://img.shields.io/github/license/jet/propulsion.svg)](LICENSE) ![code size](https://img.shields.io/github/languages/code-size/jet/propulsion.svg) -This is pre-production code; unfortunately it's also pre-documentation atm (will be adding more over the coming weeks) +This is pre-production code; unfortunately it's also pre-documentation atm (will add some info over the coming weeks) ## QuickStart diff --git a/src/Propulsion.EventStore/Checkpoint.fs b/src/Propulsion.EventStore/Checkpoint.fs new file mode 100644 index 00000000..df937b34 --- /dev/null +++ b/src/Propulsion.EventStore/Checkpoint.fs @@ -0,0 +1,110 @@ +module Propulsion.EventStore.Checkpoint + +open FSharp.UMX +open System // must shadow UMX to use DateTimeOffSet + +type CheckpointSeriesId = string +and [] checkpointSeriesId +module CheckpointSeriesId = let ofGroupName (groupName : string) = UMX.tag groupName + +// NB - these schemas reflect the actual storage formats and hence need to be versioned with care +module Events = + type Checkpoint = { at: DateTimeOffset; nextCheckpointDue: DateTimeOffset; pos: int64 } + type Config = { checkpointFreqS: int } + type Started = { config: Config; origin: Checkpoint } + type Checkpointed = { config: Config; pos: Checkpoint } + type Unfolded = { config: Config; state: Checkpoint } + type Event = + | Started of Started + | Checkpointed of Checkpointed + | Overrode of Checkpointed + | [] + Unfolded of Unfolded + interface TypeShape.UnionContract.IUnionContract + +module Folds = + type State = NotStarted | Running of Events.Unfolded + + let initial : State = NotStarted + let private evolve _ignoreState = function + | Events.Started { config = cfg; origin=originState } -> Running { config = cfg; state = originState } + | Events.Checkpointed e | Events.Overrode e -> Running { config = e.config; state = e.pos } + | Events.Unfolded runningState -> Running runningState + let fold (state: State) = Seq.fold evolve state + let isOrigin _state = true // we can build a state from any of the events and/or an unfold + let unfold state = + match state with + | NotStarted -> failwith "should never produce a NotStarted state" + | Running state -> Events.Unfolded {config = state.config; state=state.state} + + /// We only want to generate a first class event every N minutes, while efficiently writing contingent on the current etag value + //let postProcess events state = + // let checkpointEventIsRedundant (e: Events.Checkpointed) (s: Events.Unfolded) = + // s.state.nextCheckpointDue = e.pos.nextCheckpointDue + // && s.state.pos <> e.pos.pos + // match events, state with + // | [Events.Checkpointed e], (Running state as s) when checkpointEventIsRedundant e state -> + // [],unfold s + // | xs, state -> + // xs,unfold state + +type Command = + | Start of at: DateTimeOffset * checkpointFreq: TimeSpan * pos: int64 + | Override of at: DateTimeOffset * checkpointFreq: TimeSpan * pos: int64 + | Update of at: DateTimeOffset * pos: int64 + +module Commands = + let interpret command (state : Folds.State) = + let mkCheckpoint at next pos = { at=at; nextCheckpointDue = next; pos = pos } : Events.Checkpoint + let mk (at : DateTimeOffset) (interval: TimeSpan) pos : Events.Config * Events.Checkpoint= + let freq = int interval.TotalSeconds + let next = at.AddSeconds(float freq) + { checkpointFreqS = freq }, mkCheckpoint at next pos + match command, state with + | Start (at, freq, pos), Folds.NotStarted -> + let config, checkpoint = mk at freq pos + [Events.Started { config = config; origin = checkpoint}] + | Override (at, freq, pos), Folds.Running _ -> + let config, checkpoint = mk at freq pos + [Events.Overrode { config = config; pos = checkpoint}] + | Update (at,pos), Folds.Running state -> + // Force a write every N seconds regardless of whether the position has actually changed + if state.state.pos = pos && at < state.state.nextCheckpointDue then [] else + let freq = TimeSpan.FromSeconds <| float state.config.checkpointFreqS + let config, checkpoint = mk at freq pos + [Events.Checkpointed { config = config; pos = checkpoint}] + | c, s -> failwithf "Command %A invalid when %A" c s + +type Service(log, resolveStream, ?maxAttempts) = + let (|AggregateId|) (id : CheckpointSeriesId) = Equinox.AggregateId ("Sync", % id) + let (|Stream|) (AggregateId id) = Equinox.Stream(log, resolveStream id, defaultArg maxAttempts 3) + let execute (Stream stream) cmd = stream.Transact(Commands.interpret cmd) + + /// Determines the present state of the CheckpointSequence + member __.Read(Stream stream) = + stream.Query id + + /// Start a checkpointing series with the supplied parameters + /// NB will fail if already existing; caller should select to `Start` or `Override` based on whether Read indicates state is Running Or NotStarted + member __.Start(id, freq: TimeSpan, pos: int64) = + execute id <| Command.Start(DateTimeOffset.UtcNow, freq, pos) + + /// Override a checkpointing series with the supplied parameters + /// NB fails if not already initialized; caller should select to `Start` or `Override` based on whether Read indicates state is Running Or NotStarted + member __.Override(id, freq: TimeSpan, pos: int64) = + execute id <| Command.Override(DateTimeOffset.UtcNow, freq, pos) + + /// Ingest a position update + /// NB fails if not already initialized; caller should ensure correct initialization has taken place via Read -> Start + member __.Commit(id, pos: int64) = + execute id <| Command.Update(DateTimeOffset.UtcNow, pos) + +// General pattern is that an Equinox Service is a singleton and calls pass an inentifier for a stream per call +// This light wrapper means we can adhere to that general pattern yet still end up with lef=gible code while we in practice only maintain a single checkpoint series per running app +type CheckpointSeries(name, log, resolveStream) = + let seriesId = CheckpointSeriesId.ofGroupName name + let inner = Service(log, resolveStream) + member __.Read = inner.Read seriesId + member __.Start(freq, pos) = inner.Start(seriesId, freq, pos) + member __.Override(freq, pos) = inner.Override(seriesId, freq, pos) + member __.Commit(pos) = inner.Commit(seriesId, pos) \ No newline at end of file diff --git a/src/Propulsion.EventStore/EventStoreReader.fs b/src/Propulsion.EventStore/EventStoreReader.fs new file mode 100644 index 00000000..f3897271 --- /dev/null +++ b/src/Propulsion.EventStore/EventStoreReader.fs @@ -0,0 +1,312 @@ +module Propulsion.EventStore.Reader + +open Equinox.Store // AwaitTaskCorrect +open Propulsion.Internal // Sem +open Propulsion.Streams +open EventStore.ClientAPI +open Serilog // NB Needs to shadow ILogger +open System +open System.Collections.Generic +open System.Diagnostics +open System.Threading + +let inline arrayBytes (x:byte[]) = if x = null then 0 else x.Length +let inline recPayloadBytes (x: EventStore.ClientAPI.RecordedEvent) = arrayBytes x.Data + arrayBytes x.Metadata +let inline payloadBytes (x: EventStore.ClientAPI.ResolvedEvent) = recPayloadBytes x.Event + x.OriginalStreamId.Length * 2 +let inline mb x = float x / 1024. / 1024. + +/// Maintains ingestion stats (thread safe via lock free data structures so it can be used across multiple overlapping readers) +type OverallStats(?statsInterval) = + let intervalMs = let t = defaultArg statsInterval (TimeSpan.FromMinutes 5.) in t.TotalMilliseconds |> int64 + let overallStart, progressStart = Stopwatch.StartNew(), Stopwatch.StartNew() + let mutable totalEvents, totalBytes = 0L, 0L + member __.Ingest(batchEvents, batchBytes) = + Interlocked.Add(&totalEvents,batchEvents) |> ignore + Interlocked.Add(&totalBytes,batchBytes) |> ignore + member __.Bytes = totalBytes + member __.Events = totalEvents + member __.DumpIfIntervalExpired(?force) = + if progressStart.ElapsedMilliseconds > intervalMs || force = Some true then + let totalMb = mb totalBytes + if totalEvents <> 0L then + Log.Information("Reader Throughput {events} events {gb:n1}GB {mb:n2}MB/s", + totalEvents, totalMb/1024., totalMb*1000./float overallStart.ElapsedMilliseconds) + progressStart.Restart() + +/// Maintains stats for traversals of $all; Threadsafe [via naive locks] so can be used by multiple stripes reading concurrently +type SliceStatsBuffer(categorize, ?interval) = + let intervalMs = let t = defaultArg interval (TimeSpan.FromMinutes 5.) in t.TotalMilliseconds |> int64 + let recentCats, accStart = Dictionary(), Stopwatch.StartNew() + member __.Ingest(slice: AllEventsSlice) = + lock recentCats <| fun () -> + let mutable batchBytes = 0 + for x in slice.Events do + let cat = categorize x.OriginalStreamId + let eventBytes = payloadBytes x + match recentCats.TryGetValue cat with + | true, (currCount, currSize) -> recentCats.[cat] <- (currCount + 1, currSize+eventBytes) + | false, _ -> recentCats.[cat] <- (1, eventBytes) + batchBytes <- batchBytes + eventBytes + __.DumpIfIntervalExpired() + slice.Events.Length, int64 batchBytes + member __.DumpIfIntervalExpired(?force) = + if accStart.ElapsedMilliseconds > intervalMs || defaultArg force false then + lock recentCats <| fun () -> + let log kind limit xs = + let cats = + [| for KeyValue (s,(c,b)) in xs |> Seq.sortBy (fun (KeyValue (_,(_,b))) -> -b) -> + mb (int64 b) |> round, s, c |] + if (not << Array.isEmpty) cats then + let mb, events, top = Array.sumBy (fun (mb, _, _) -> mb) cats, Array.sumBy (fun (_, _, c) -> c) cats, Seq.truncate limit cats + Log.Information("Reader {kind} {mb:n0}MB {events:n0} events categories: {@cats} (MB/cat/count)", kind, mb, events, top) + recentCats |> log "Total" 3 + recentCats |> Seq.where (fun x -> x.Key.StartsWith "$" |> not) |> log "payload" 100 + recentCats |> Seq.where (fun x -> x.Key.StartsWith "$") |> log "meta" 100 + recentCats.Clear() + accStart.Restart() + +/// Defines a tranche of a traversal of a stream (or the store as a whole) +type Range(start, sliceEnd : Position option, ?max : Position) = + member val Current = start with get, set + member __.TryNext(pos: Position) = + __.Current <- pos + __.IsCompleted + member __.IsCompleted = + match sliceEnd with + | Some send when __.Current.CommitPosition >= send.CommitPosition -> false + | _ -> true + member __.PositionAsRangePercentage = + match max with + | None -> Double.NaN + | Some max -> + match __.Current.CommitPosition, max.CommitPosition with + | p,m when p > m -> Double.NaN + | p,m -> float p / float m + +(* Logic for computation of chunk offsets; ES writes chunks whose index starts at a multiple of 256MB + to be able to address an arbitrary position as a percentage, we need to consider this aspect as only a valid Position can be supplied to the read call *) + +// @scarvel8: event_global_position = 256 x 1024 x 1024 x chunk_number + chunk_header_size (128) + event_position_offset_in_chunk +let chunk (pos: Position) = uint64 pos.CommitPosition >>> 28 +let posFromChunk (chunk: int) = + let chunkBase = int64 chunk * 1024L * 1024L * 256L + Position(chunkBase,0L) +let posFromChunkAfter (pos: EventStore.ClientAPI.Position) = + let nextChunk = 1 + int (chunk pos) + posFromChunk nextChunk +let posFromPercentage (pct,max : Position) = + let rawPos = Position(float max.CommitPosition * pct / 100. |> int64, 0L) + let chunk = int (chunk rawPos) in posFromChunk chunk // &&& 0xFFFFFFFFE0000000L // rawPos / 256L / 1024L / 1024L * 1024L * 1024L * 256L + +/// Read the current tail position; used to be able to compute and log progress of ingestion +let fetchMax (conn : IEventStoreConnection) = async { + let! lastItemBatch = conn.ReadAllEventsBackwardAsync(Position.End, 1, resolveLinkTos = false) |> Async.AwaitTaskCorrect + let max = lastItemBatch.FromPosition + Log.Information("EventStore Tail Position: @ {pos} ({chunks} chunks, ~{gb:n1}GB)", max.CommitPosition, chunk max, mb max.CommitPosition/1024.) + return max } + +/// `fetchMax` wrapped in a retry loop; Sync process is entirely reliant on establishing the max so we have a crude retry loop +let establishMax (conn : IEventStoreConnection) = async { + let mutable max = None + while Option.isNone max do + try let! currentMax = fetchMax conn + max <- Some currentMax + with e -> + Log.Warning(e,"Could not establish max position") + do! Async.Sleep 5000 + return Option.get max } + +/// Walks a stream within the specified constraints; used to grab data when writing to a stream for which a prefix is missing +/// Can throw (in which case the caller is in charge of retrying, possibly with a smaller batch size) +let pullStream (conn : IEventStoreConnection, batchSize) (stream,pos,limit : int option) mapEvent (postBatch : string*StreamSpan<_> -> Async) = + let rec fetchFrom pos limit = async { + let reqLen = match limit with Some limit -> min limit batchSize | None -> batchSize + let! currentSlice = conn.ReadStreamEventsForwardAsync(stream, pos, reqLen, resolveLinkTos=true) |> Async.AwaitTaskCorrect + let events = currentSlice.Events |> Array.map (fun x -> mapEvent x.Event) + do! postBatch (stream,{ index = currentSlice.FromEventNumber; events = events }) + match limit with + | None when currentSlice.IsEndOfStream -> return () + | None -> return! fetchFrom currentSlice.NextEventNumber None + | Some limit when events.Length >= limit -> return () + | Some limit -> return! fetchFrom currentSlice.NextEventNumber (Some (limit - events.Length)) } + fetchFrom pos limit + +/// Walks the $all stream, yielding batches together with the associated Position info for the purposes of checkpointing +/// Can throw (in which case the caller is in charge of retrying, possibly with a smaller batch size) +type [] PullResult = Exn of exn: exn | Eof of Position | EndOfTranche +let pullAll (slicesStats : SliceStatsBuffer, overallStats : OverallStats) (conn : IEventStoreConnection, batchSize) + (range:Range, once) (tryMapEvent : ResolvedEvent -> StreamEvent<_> option) (postBatch : Position -> StreamEvent<_>[] -> Async) = + let sw = Stopwatch.StartNew() // we'll report the warmup/connect time on the first batch + let rec aux () = async { + let! currentSlice = conn.ReadAllEventsForwardAsync(range.Current, batchSize, resolveLinkTos = false) |> Async.AwaitTaskCorrect + sw.Stop() // Stop the clock after ChangeFeedProcessor hands off to us + let postSw = Stopwatch.StartNew() + let batchEvents, batchBytes = slicesStats.Ingest currentSlice in overallStats.Ingest(int64 batchEvents, batchBytes) + let batches = currentSlice.Events |> Seq.choose tryMapEvent |> Array.ofSeq + let streams = batches |> Seq.groupBy (fun b -> b.stream) |> Array.ofSeq + let usedStreams, usedCats = streams.Length, streams |> Seq.map fst |> Seq.distinct |> Seq.length + let! (cur,max) = postBatch currentSlice.NextPosition batches + Log.Information("Read {pos,10} {pct:p1} {ft:n3}s {mb:n1}MB {count,4} {categories,4}c {streams,4}s {events,4}e Post {pt:n3}s {cur}/{max}", + range.Current.CommitPosition, range.PositionAsRangePercentage, (let e = sw.Elapsed in e.TotalSeconds), mb batchBytes, + batchEvents, usedCats, usedStreams, batches.Length, (let e = postSw.Elapsed in e.TotalSeconds), cur, max) + if not (range.TryNext currentSlice.NextPosition && not once && not currentSlice.IsEndOfStream) then + if currentSlice.IsEndOfStream then return Eof currentSlice.NextPosition + else return EndOfTranche + else + sw.Restart() // restart the clock as we hand off back to the Reader + return! aux () } + async { + try return! aux () + with e -> return Exn e } + +/// Specification for work to be performed by a reader thread +[] +type Req = + /// Tail from a given start position, at intervals of the specified timespan (no waiting if catching up) + | Tail of seriesId: int * startPos: Position * max : Position * interval: TimeSpan * batchSize : int + /// Read a given segment of a stream (used when a stream needs to be rolled forward to lay down an event for which the preceding events are missing) + //| StreamPrefix of name: string * pos: int64 * len: int * batchSize: int + /// Read the entirity of a stream in blocks of the specified batchSize (TODO wire to commandline request) + //| Stream of name: string * batchSize: int + /// Read a specific chunk (min-max range), posting batches tagged with that chunk number + | Chunk of seriesId: int * range: Range * batchSize : int + +/// Data with context resulting from a reader thread +[] +type Res = + /// A batch read from a Chunk + | Batch of seriesId: int * pos: Position * items: StreamEvent seq + /// Ingestion buffer requires an explicit end of chunk message before next chunk can commence processing + | EndOfChunk of seriesId: int + /// A Batch read from a Stream or StreamPrefix + //| StreamSpan of span: State.StreamSpan + +/// Holds work queue, together with stats relating to the amount and/or categories of data being traversed +/// Processing is driven by external callers running multiple concurrent invocations of `Process` +type EventStoreReader(conns : _ [], defaultBatchSize, minBatchSize, categorize, tryMapEvent, post : Res -> Async, tailInterval, dop, ?statsInterval) = + let work = System.Collections.Concurrent.ConcurrentQueue() + let sleepIntervalMs = 100 + let overallStats = OverallStats(?statsInterval=statsInterval) + let slicesStats = SliceStatsBuffer(categorize) + let mutable eofSpottedInChunk = 0 + + /// Invoked by pump to process a tranche of work; can have parallel invocations + let exec conn req = async { + let adjust batchSize = if batchSize > minBatchSize then batchSize - 128 else batchSize + //let postSpan = ReadResult.StreamSpan >> post >> Async.Ignore + match req with + //| StreamPrefix (name,pos,len,batchSize) -> + // use _ = Serilog.Context.LogContext.PushProperty("Tranche",name) + // Log.Warning("Reading stream prefix; pos {pos} len {len} batch size {bs}", pos, len, batchSize) + // try let! t,() = pullStream (conn, batchSize) (name, pos, Some len) postSpan |> Stopwatch.Time + // Log.Information("completed stream prefix in {ms:n3}s", let e = t.Elapsed in e.TotalSeconds) + // with e -> + // let bs = adjust batchSize + // Log.Warning(e,"Could not read stream, retrying with batch size {bs}", bs) + // __.AddStreamPrefix(name, pos, len, bs) + // return false + //| Stream (name,batchSize) -> + // use _ = Serilog.Context.LogContext.PushProperty("Tranche",name) + // Log.Warning("Reading stream; batch size {bs}", batchSize) + // try let! t,() = pullStream (conn, batchSize) (name,0L,None) postSpan |> Stopwatch.Time + // Log.Information("completed stream in {ms:n3}s", let e = t.Elapsed in e.TotalSeconds) + // with e -> + // let bs = adjust batchSize + // Log.Warning(e,"Could not read stream, retrying with batch size {bs}", bs) + // __.AddStream(name, bs) + // return false + | Chunk (series, range, batchSize) -> + let postBatch pos items = post (Res.Batch (series, pos, items)) + use _ = Serilog.Context.LogContext.PushProperty("Tranche", series) + Log.Information("Commencing tranche, batch size {bs}", batchSize) + let! t, res = pullAll (slicesStats, overallStats) (conn, batchSize) (range, false) tryMapEvent postBatch |> Stopwatch.Time + match res with + | PullResult.Eof pos -> + Log.Warning("completed tranche AND REACHED THE END in {ms:n3}m", let e = t.Elapsed in e.TotalMinutes) + overallStats.DumpIfIntervalExpired(true) + let! _ = post (Res.EndOfChunk series) in () + if 1 = Interlocked.Increment &eofSpottedInChunk then work.Enqueue <| Req.Tail (series+1, pos, pos, tailInterval, defaultBatchSize) + | PullResult.EndOfTranche -> + Log.Information("completed tranche in {ms:n1}m", let e = t.Elapsed in e.TotalMinutes) + let! _ = post (Res.EndOfChunk series) in () + | PullResult.Exn e -> + let abs = adjust batchSize + Log.Warning(e, "Could not read All, retrying with batch size {bs}", abs) + work.Enqueue <| Req.Chunk (series, range, abs) + | Tail (series, pos, max, interval, batchSize) -> + let postBatch pos items = post (Res.Batch (series, pos, items)) + use _ = Serilog.Context.LogContext.PushProperty("Tranche", "Tail") + let mutable count, batchSize, range = 0, batchSize, Range(pos, None, max) + let statsInterval = defaultArg statsInterval (TimeSpan.FromMinutes 5.) + let progressIntervalMs, tailIntervalMs = int64 statsInterval.TotalMilliseconds, int64 interval.TotalMilliseconds + let tailSw = Stopwatch.StartNew() + let awaitInterval = async { + match tailIntervalMs - tailSw.ElapsedMilliseconds with + | waitTimeMs when waitTimeMs > 0L -> do! Async.Sleep (int waitTimeMs) + | _ -> () + tailSw.Restart() } + let slicesStats, stats = SliceStatsBuffer(categorize), OverallStats() + let progressSw = Stopwatch.StartNew() + while true do + let currentPos = range.Current + if progressSw.ElapsedMilliseconds > progressIntervalMs then + Log.Information("Tailed {count} times @ {pos} (chunk {chunk})", + count, currentPos.CommitPosition, chunk currentPos) + progressSw.Restart() + count <- count + 1 + let! res = pullAll (slicesStats,stats) (conn,batchSize) (range,true) tryMapEvent postBatch + do! awaitInterval + match res with + | PullResult.EndOfTranche | PullResult.Eof _ -> () + | PullResult.Exn e -> + batchSize <- adjust batchSize + Log.Warning(e, "Tail $all failed, adjusting batch size to {bs}", batchSize) } + + member __.Pump(initialSeriesId, initialPos, max) = async { + let mutable robin = 0 + let selectConn () = + let connIndex = Interlocked.Increment(&robin) % conns.Length + conns.[connIndex] + let dop = Sem dop + let forkRunRelease = + let r = new Random() + fun req -> async { // this is not called in parallel hence no need to lock `r` + let capacity = let used, max = dop.State in max-used + // Jitter is most relevant when processing commences - any commencement of a chunk can trigger significant page faults on server + // which we want to attempt to limit the effects of + let jitterMs = match capacity with 0 -> 200 | x -> r.Next(1000, 2000) + Log.Information("Waiting {jitter}ms to jitter reader stripes, {currentCount} further reader stripes awaiting start", jitterMs, capacity) + do! Async.Sleep jitterMs + let! _ = Async.StartChild <| async { + try let conn = selectConn () + do! exec conn req + finally dop.Release() } in () } + let mutable seriesId = initialSeriesId + let mutable remainder = + if conns.Length > 1 then + let nextPos = posFromChunkAfter initialPos + work.Enqueue <| Req.Chunk (initialSeriesId, new Range(initialPos, Some nextPos, max), defaultBatchSize) + Some nextPos + else + work.Enqueue <| Req.Tail (initialSeriesId, initialPos, max, tailInterval, defaultBatchSize) + None + let! ct = Async.CancellationToken + while not ct.IsCancellationRequested do + overallStats.DumpIfIntervalExpired() + let! _ = dop.Await ct + match work.TryDequeue(), remainder with + | (true, task), _ -> + do! forkRunRelease task + | (false, _), Some nextChunk when eofSpottedInChunk = 0 -> + seriesId <- seriesId + 1 + let nextPos = posFromChunkAfter nextChunk + remainder <- Some nextPos + do! forkRunRelease <| Req.Chunk (seriesId, Range(nextChunk, Some nextPos, max), defaultBatchSize) + | (false, _), Some _ -> + dop.Release() |> ignore + Log.Warning("No further ingestion work to commence, transitioning to tailing...") + // TODO release connections, reduce DOP, implement stream readers + remainder <- None + | (false, _), None -> + dop.Release() |> ignore + do! Async.Sleep sleepIntervalMs } \ No newline at end of file diff --git a/src/Propulsion.EventStore/EventStoreSource.fs b/src/Propulsion.EventStore/EventStoreSource.fs new file mode 100644 index 00000000..b696b1bd --- /dev/null +++ b/src/Propulsion.EventStore/EventStoreSource.fs @@ -0,0 +1,99 @@ +namespace Propulsion.EventStore + +open System + +type StartPos = Absolute of int64 | Chunk of int | Percentage of float | TailOrCheckpoint | StartOrCheckpoint + +type ReaderSpec = + { /// Identifier for this projection and it's state + groupName: string + /// Indicates user has specified that they wish to restart from the indicated position as opposed to resuming from the checkpoint position + forceRestart: bool + /// Start position from which forward reading is to commence // Assuming no stored position + start: StartPos + /// Frequency at which to update the Checkpoint store + checkpointInterval: TimeSpan + /// Delay when reading yields an empty batch + tailInterval: TimeSpan + /// Specify initial phase where interleaved reading stripes a 256MB chunk apart attain a balance between good reading speed and not killing the server + gorge: int option + /// Maximum number of striped readers to permit when tailing; this dictates how many stream readers will be used to perform catchup work on streams + /// that are missing a prefix (e.g. due to not starting from the start of the $all stream, and/or deleting data from the destination store) + streamReaders: int // TODO + /// Initial batch size to use when commencing reading + batchSize: int + /// Smallest batch size to degrade to in the presence of failures + minBatchSize: int } + +type StartMode = Starting | Resuming | Overridding + +[] +module Mapping = + open EventStore.ClientAPI + + type RecordedEvent with + member __.Timestamp = DateTimeOffset.FromUnixTimeMilliseconds(__.CreatedEpoch) + + let (|PropulsionEvent|) (x : RecordedEvent) = + { new Propulsion.Streams.IEvent<_> with + member __.EventType = x.EventType + member __.Data = if x.Data <> null && x.Data.Length = 0 then null else x.Data + member __.Meta = if x.Metadata <> null && x.Metadata.Length = 0 then null else x.Metadata + member __.Timestamp = x.Timestamp } + let (|PropulsionStreamEvent|) (x: RecordedEvent) : Propulsion.Streams.StreamEvent<_> = + { stream = x.EventStreamId; index = x.EventNumber; event = (|PropulsionEvent|) x } + +type EventStoreSource = + static member Run + ( log : Serilog.ILogger, sink : Propulsion.ProjectorPipeline<_>, checkpoints : Checkpoint.CheckpointSeries, + connect, spec, categorize, tryMapEvent, + maxReadAhead, statsInterval) = async { + let conn = connect () + let! maxInParallel = Async.StartChild <| Reader.establishMax conn + let! initialCheckpointState = checkpoints.Read + let! maxPos = maxInParallel + let! startPos = async { + let mkPos x = EventStore.ClientAPI.Position(x, 0L) + let requestedStartPos = + match spec.start with + | Absolute p -> mkPos p + | Chunk c -> Reader.posFromChunk c + | Percentage pct -> Reader.posFromPercentage (pct, maxPos) + | TailOrCheckpoint -> maxPos + | StartOrCheckpoint -> EventStore.ClientAPI.Position.Start + let! startMode, startPos, checkpointFreq = async { + match initialCheckpointState, requestedStartPos with + | Checkpoint.Folds.NotStarted, r -> + if spec.forceRestart then invalidOp "Cannot specify --forceRestart when no progress yet committed" + do! checkpoints.Start(spec.checkpointInterval, r.CommitPosition) + return Starting, r, spec.checkpointInterval + | Checkpoint.Folds.Running s, _ when not spec.forceRestart -> + return Resuming, mkPos s.state.pos, TimeSpan.FromSeconds(float s.config.checkpointFreqS) + | Checkpoint.Folds.Running _, r -> + do! checkpoints.Override(spec.checkpointInterval, r.CommitPosition) + return Overridding, r, spec.checkpointInterval } + log.Information("Sync {mode} {groupName} @ {pos} (chunk {chunk}, {pct:p1}) checkpointing every {checkpointFreq:n1}m", + startMode, spec.groupName, startPos.CommitPosition, Reader.chunk startPos, float startPos.CommitPosition/float maxPos.CommitPosition, + checkpointFreq.TotalMinutes) + return startPos } + let cosmosIngester = sink.StartIngester(log.ForContext("Tranche","Sync"), 0) + let initialSeriesId, conns, dop = + log.Information("Tailing every {intervalS:n1}s TODO with {streamReaders} stream catchup-readers", spec.tailInterval.TotalSeconds, spec.streamReaders) + match spec.gorge with + | Some factor -> + log.Information("Commencing Gorging with {stripes} $all reader stripes covering a 256MB chunk each", factor) + let extraConns = Seq.init (factor-1) (ignore >> connect) + let conns = [| yield conn; yield! extraConns |] + Reader.chunk startPos |> int, conns, (max (conns.Length) (spec.streamReaders+1)) + | None -> + 0, [|conn|], spec.streamReaders+1 + let striper = StripedIngester(log.ForContext("Tranche","EventStore"), cosmosIngester, maxReadAhead, initialSeriesId, statsInterval) + let! _pumpStripes = Async.StartChild striper.Pump // will die with us, which is only after a keyboard interrupt :point_down: + let post = function + | Reader.Res.EndOfChunk seriesId -> striper.Submit <| Message.CloseSeries seriesId + | Reader.Res.Batch (seriesId, pos, xs) -> + let cp = pos.CommitPosition + striper.Submit <| Message.Batch(seriesId, cp, checkpoints.Commit cp, xs) + let reader = Reader.EventStoreReader(conns, spec.batchSize, spec.minBatchSize, categorize, tryMapEvent, post, spec.tailInterval, dop) + let! _pumpReader = reader.Pump(initialSeriesId, startPos, maxPos) + do! Async.AwaitKeyboardInterrupt() } \ No newline at end of file diff --git a/src/Propulsion.EventStore/Infrastructure.fs b/src/Propulsion.EventStore/Infrastructure.fs new file mode 100644 index 00000000..5056b53e --- /dev/null +++ b/src/Propulsion.EventStore/Infrastructure.fs @@ -0,0 +1,20 @@ +namespace Propulsion.EventStore + +open System +open System.Threading +open System.Threading.Tasks + +#nowarn "21" // re AwaitKeyboardInterrupt +#nowarn "40" // re AwaitKeyboardInterrupt + +[] +module private AsyncHelpers = + type Async with + static member Sleep(t : TimeSpan) : Async = Async.Sleep(int t.TotalMilliseconds) + /// Asynchronously awaits the next keyboard interrupt event + static member AwaitKeyboardInterrupt () : Async = + Async.FromContinuations(fun (sc,_,_) -> + let isDisposed = ref 0 + let rec callback _ = Task.Run(fun () -> if Interlocked.Increment isDisposed = 1 then d.Dispose() ; sc ()) |> ignore + and d : IDisposable = Console.CancelKeyPress.Subscribe callback + in ()) \ No newline at end of file diff --git a/src/Propulsion.EventStore/Propulsion.EventStore.fsproj b/src/Propulsion.EventStore/Propulsion.EventStore.fsproj index 8159687f..157d8042 100644 --- a/src/Propulsion.EventStore/Propulsion.EventStore.fsproj +++ b/src/Propulsion.EventStore/Propulsion.EventStore.fsproj @@ -11,6 +11,11 @@ + + + + + @@ -21,6 +26,8 @@ + + diff --git a/src/Propulsion.EventStore/StripedIngester.fs b/src/Propulsion.EventStore/StripedIngester.fs new file mode 100644 index 00000000..2d3d2cf4 --- /dev/null +++ b/src/Propulsion.EventStore/StripedIngester.fs @@ -0,0 +1,127 @@ +namespace Propulsion.EventStore + +open Propulsion.Streams +open Serilog +open System +open System.Collections.Generic +open System.Collections.Concurrent +open System.Threading +open Propulsion.Internal + +type [] Message = + | Batch of seriesIndex: int * epoch: int64 * checkpoint: Async * items: StreamEvent seq + | CloseSeries of seriesIndex: int + +module StripedIngesterImpl = + + type Stats(log : ILogger, statsInterval) = + let statsDue = intervalCheck statsInterval + let mutable cycles, ingested = 0, 0 + let dumpStats (currentBuffer,maxBuffer) (readingAhead,ready) = + let mutable buffered = 0 + let count (xs : IDictionary>) = seq { for x in xs do buffered <- buffered + x.Value.Count; yield x.Key, x.Value.Count } |> Seq.sortBy fst |> Seq.toArray + let ahead, ready = count readingAhead, count ready + log.Information("Read {ingested} Cycles {cycles} Holding {buffered} Reading {@reading} Ready {@ready} Buffering {currentBuffer}/{maxBuffer}", + ingested, cycles, buffered, ahead, ready, currentBuffer,maxBuffer) + ingested <- 0; cycles <- 0 + member __.Handle : InternalMessage -> unit = function + | Batch _ -> ingested <- ingested + 1 + | ActivateSeries _ | CloseSeries _-> () // stats are managed via Added internal message in same cycle + member __.TryDump(readState, readingAhead, ready) = + cycles <- cycles + 1 + if statsDue () then + dumpStats readState (readingAhead,ready) + + and [] InternalMessage = + | Batch of seriesIndex: int * epoch: int64 * checkpoint: Async * items: StreamEvent seq + | CloseSeries of seriesIndex: int + | ActivateSeries of seriesIndex: int + + let tryTake key (dict: Dictionary<_,_>) = + match dict.TryGetValue key with + | true, value -> + dict.Remove key |> ignore + Some value + | false, _ -> None + +open StripedIngesterImpl + +/// Holds batches away from Core processing to limit in-flight processing +type StripedIngester + ( log : ILogger, inner : Propulsion.Ingestion.Ingester>,Propulsion.Submission.SubmissionBatch>>, + maxRead, initialSeriesIndex, statsInterval : TimeSpan, ?pumpInterval) = + let cts = new CancellationTokenSource() + let pumpInterval = defaultArg pumpInterval (TimeSpan.FromMilliseconds 5.) + let work = ConcurrentQueue() // Queue as need ordering semantically + let readMax = Sem maxRead + let stats = Stats(log, statsInterval) + let pending = Queue<_>() + let readingAhead, ready = Dictionary>(), Dictionary>() + let mutable activeSeries = initialSeriesIndex + + let handle = function + | Batch (seriesId, epoch, checkpoint, items) -> + let batchInfo = + let items = Array.ofSeq items + let markCompleted = async { + readMax.Release() // NB release needs to be before checkpoint, as that can fail, and will not be retried + do! checkpoint } + epoch,markCompleted,items + if activeSeries = seriesId then pending.Enqueue batchInfo + else + match readingAhead.TryGetValue seriesId with + | false, _ -> readingAhead.[seriesId] <- ResizeArray[|batchInfo|] + | true,current -> current.Add(batchInfo) + | CloseSeries seriesIndex -> + if activeSeries = seriesIndex then + log.Information("Completed reading active series {activeSeries}; moving to next", activeSeries) + work.Enqueue <| ActivateSeries (activeSeries + 1) + else + match readingAhead |> tryTake seriesIndex with + | Some batchesRead -> + ready.[seriesIndex] <- batchesRead + log.Information("Completed reading {series}, marking {buffered} buffered items ready", seriesIndex, batchesRead.Count) + | None -> + ready.[seriesIndex] <- ResizeArray() + log.Information("Completed reading {series}, leaving empty batch list", seriesIndex) + | ActivateSeries newActiveSeries -> + activeSeries <- newActiveSeries + let buffered = + match ready |> tryTake newActiveSeries with + | Some completedChunkBatches -> + completedChunkBatches |> Seq.iter pending.Enqueue + work.Enqueue <| ActivateSeries (newActiveSeries + 1) + completedChunkBatches.Count + | None -> + match readingAhead |> tryTake newActiveSeries with + | Some batchesReadToDate -> batchesReadToDate |> Seq.iter pending.Enqueue; batchesReadToDate.Count + | None -> 0 + log.Information("Moving to series {activeChunk}, releasing {buffered} buffered batches, {ready} others ready, {ahead} reading ahead", + newActiveSeries, buffered, ready.Count, readingAhead.Count) + + member __.Pump = async { + while not cts.IsCancellationRequested do + let mutable itemLimit = 4096 + while itemLimit > 0 do + match work.TryDequeue() with + | true, x -> handle x; stats.Handle x; itemLimit <- itemLimit - 1 + | false, _ -> itemLimit <- 0 + while pending.Count <> 0 do + let! _,_ = inner.Submit(pending.Dequeue()) in () + stats.TryDump(readMax.State,readingAhead,ready) + do! Async.Sleep pumpInterval } + + /// Awaits space in `read` to limit reading ahead - yields (used,maximum) counts from Read Semaphore for logging purposes + member __.Submit(content : Message) = async { + do! readMax.Await(cts.Token) + match content with + | Message.Batch (seriesId, epoch, checkpoint, events) -> + work.Enqueue <| Batch (seriesId, epoch, checkpoint, events) + // NB readMax.Release() is effected in the Batch handler's MarkCompleted() + | Message.CloseSeries seriesId -> + work.Enqueue <| CloseSeries seriesId + readMax.Release() + return readMax.State } + + /// As range assignments get revoked, a user is expected to `Stop `the active processing thread for the Ingester before releasing references to it + member __.Stop() = cts.Cancel() \ No newline at end of file