From c5c781c32de7ed3db9e3fd86893f24d878ec061e Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Fri, 8 Mar 2019 10:09:34 +0000 Subject: [PATCH] Add eqxprojector template (#11) --- README.md | 4 + .../.template.config/template.json | 48 ++++ equinox-projector/Consumer/Consumer.fsproj | 23 ++ equinox-projector/Consumer/Infrastructure.fs | 47 ++++ equinox-projector/Consumer/Program.fs | 169 ++++++++++++++ equinox-projector/Projector/Infrastructure.fs | 19 ++ equinox-projector/Projector/Program.fs | 212 ++++++++++++++++++ equinox-projector/Projector/Projector.fsproj | 25 +++ equinox-projector/README.md | 79 +++++++ .../equinox-projector-consumer.sln | 36 +++ equinox-projector/equinox-projector.sln | 30 +++ .../Equinox.Templates.fsproj | 4 +- 12 files changed, 694 insertions(+), 2 deletions(-) create mode 100644 equinox-projector/.template.config/template.json create mode 100644 equinox-projector/Consumer/Consumer.fsproj create mode 100644 equinox-projector/Consumer/Infrastructure.fs create mode 100644 equinox-projector/Consumer/Program.fs create mode 100644 equinox-projector/Projector/Infrastructure.fs create mode 100644 equinox-projector/Projector/Program.fs create mode 100644 equinox-projector/Projector/Projector.fsproj create mode 100644 equinox-projector/README.md create mode 100644 equinox-projector/equinox-projector-consumer.sln create mode 100644 equinox-projector/equinox-projector.sln diff --git a/README.md b/README.md index aaa30b735..c4e4691e0 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,7 @@ This repo hosts the source for Jet's [`dotnet new`](https://docs.microsoft.com/e - [`eqxweb`](equinox-web/README.md) - Boilerplate for an ASP .NET Core Web App, with an associated storage-independent Domain project. - [`eqxwebcs`](equinox-web-csharp/README.md) - Boilerplate for an ASP .NET Core Web App, with an associated storage-independent Domain project _ported to C#_. +- [`eqxprojector`](equinox-projector-cosmos/README.md) - Boilerplate for a CosmosDb ChangeFeedProcessor, with optional projection to Apache Kafka and associated consumer logic. ## How to use @@ -29,6 +30,9 @@ To use from the command line, the outline is: # see readme.md in the generated code for further instructions regarding the TodoBackend the above -t switch above triggers the inclusion of start readme.md + # to add a Projector and a Consumer (-k emits to Kafka and hence implies having a Consumer) + dotnet new eqxprojector -k + ## CONTRIBUTING Please don't hesitate to [create a GitHub issue](https://github.com/jet/dotnet-templates/issues/new) for any questions so others can benefit from the discussion. For any significant planned changes or additions, please err on the side of [reaching out early](https://github.com/jet/dotnet-templates/issues/new) so we can align expectationss - there's nothing more frustrating than having your hard work not yielding a mutually agreeable result ;) diff --git a/equinox-projector/.template.config/template.json b/equinox-projector/.template.config/template.json new file mode 100644 index 000000000..3eb827c4d --- /dev/null +++ b/equinox-projector/.template.config/template.json @@ -0,0 +1,48 @@ +{ + "$schema": "http://json.schemastore.org/template", + "author": "@jet @bartelink", + "classifications": [ + "Equinox", + "Event Sourcing", + "CosmosDb", + "ChangeFeed", + "Kafka" + ], + "tags": { + "language": "F#" + }, + "identity": "Equinox.Template.Projector.Cosmos", + "name": "Equinox Projector (with optional consumer)", + "shortName": "eqxprojector", + "sourceName": "ProjectorTemplate", + "preferNameDirectory": true, + + "symbols": { + "kafka": { + "type": "parameter", + "datatype": "bool", + "isRequired": false, + "defaultValue": "false", + "description": "Include a code projecting to, and consuming from Kafka." + } + }, + "sources": [ + { + "modifiers": [ + { + "condition": "(!kafka)", + "exclude": [ + "**/Consumer/**/*", + "equinox-projector-consumer-cosmos.sln" + ] + }, + { + "condition": "(kafka)", + "exclude": [ + "equinox-projector-cosmos.sln" + ] + } + ] + } + ] +} \ No newline at end of file diff --git a/equinox-projector/Consumer/Consumer.fsproj b/equinox-projector/Consumer/Consumer.fsproj new file mode 100644 index 000000000..bae441780 --- /dev/null +++ b/equinox-projector/Consumer/Consumer.fsproj @@ -0,0 +1,23 @@ + + + + Exe + netcoreapp2.1 + 5 + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/equinox-projector/Consumer/Infrastructure.fs b/equinox-projector/Consumer/Infrastructure.fs new file mode 100644 index 000000000..7ef7ded63 --- /dev/null +++ b/equinox-projector/Consumer/Infrastructure.fs @@ -0,0 +1,47 @@ +[] +module private ProjectorTemplate.Consumer.Infrastructure + +open System +open System.Threading +open System.Threading.Tasks + +type Async with + static member AwaitTaskCorrect (task : Task<'T>) : Async<'T> = + Async.FromContinuations <| fun (k,ek,_) -> + task.ContinueWith (fun (t:Task<'T>) -> + if t.IsFaulted then + let e = t.Exception + if e.InnerExceptions.Count = 1 then ek e.InnerExceptions.[0] + else ek e + elif t.IsCanceled then ek (TaskCanceledException("Task wrapped with Async has been cancelled.")) + elif t.IsCompleted then k t.Result + else ek(Exception "invalid Task state!")) + |> ignore + + static member AwaitTaskCorrect (task : Task) : Async = + Async.FromContinuations <| fun (k,ek,_) -> + task.ContinueWith (fun (t:Task) -> + if t.IsFaulted then + let e = t.Exception + if e.InnerExceptions.Count = 1 then ek e.InnerExceptions.[0] + else ek e + elif t.IsCanceled then ek (TaskCanceledException("Task wrapped with Async has been cancelled.")) + elif t.IsCompleted then k () + else ek(Exception "invalid Task state!")) + |> ignore + +type SemaphoreSlim with + /// F# friendly semaphore await function + member semaphore.Await(?timeout : TimeSpan) = async { + let! ct = Async.CancellationToken + let timeout = defaultArg timeout Timeout.InfiniteTimeSpan + let task = semaphore.WaitAsync(timeout, ct) + return! Async.AwaitTaskCorrect task + } + + /// Throttling wrapper which waits asynchronously until the semaphore has available capacity + member semaphore.Throttle(workflow : Async<'T>) : Async<'T> = async { + let! _ = semaphore.Await() + try return! workflow + finally semaphore.Release() |> ignore + } diff --git a/equinox-projector/Consumer/Program.fs b/equinox-projector/Consumer/Program.fs new file mode 100644 index 000000000..fe687fc66 --- /dev/null +++ b/equinox-projector/Consumer/Program.fs @@ -0,0 +1,169 @@ +module ProjectorTemplate.Consumer.Program + +open Jet.ConfluentKafka.FSharp +open Serilog +open System +open System.Threading + +module EventParser = + open Equinox.Projection.Codec + open Newtonsoft.Json + + type SkuId = string + + let settings = JsonSerializerSettings() + + // NB - these schemas reflect the actual storage formats and hence need to be versioned with care + module SavedForLater = + type Item = { skuId : SkuId; dateSaved : DateTimeOffset } + + type Added = { skus : SkuId []; dateSaved : DateTimeOffset } + type Removed = { skus : SkuId [] } + type Merged = { items : Item [] } + + type Event = + /// Inclusion of another set of state in this one + | Merged of Merged + /// Removal of a set of skus + | Removed of Removed + /// Addition of a collection of skus to the list + | Added of Added + interface TypeShape.UnionContract.IUnionContract + let codec = Equinox.Codec.JsonNet.JsonUtf8.Create(settings) + + // NB - these schemas reflect the actual storage formats and hence need to be versioned with care + module Favorites = + type Favorited = { date: DateTimeOffset; skuId: SkuId } + type Unfavorited = { skuId: SkuId } + + type Event = + | Favorited of Favorited + | Unfavorited of Unfavorited + interface TypeShape.UnionContract.IUnionContract + let codec = Equinox.Codec.JsonNet.JsonUtf8.Create(settings) + + let tryExtractCategory (x : RenderedEvent) = + x.s.Split([|'-'|], 2, StringSplitOptions.RemoveEmptyEntries) + |> Array.tryHead + let tryDecode (log : ILogger) (codec : Equinox.Codec.IUnionEncoder<_,_>) (x : RenderedEvent) = + match codec.TryDecode (Equinox.Codec.Core.EventData.Create(x.c, x.d)) with + | None -> + if log.IsEnabled Serilog.Events.LogEventLevel.Debug then + log.ForContext("event", System.Text.Encoding.UTF8.GetString(x.d), true) + .Debug("Codec {type} Could not decode {eventType} in {stream}", codec.GetType().FullName, x.c, x.s); + None + | Some e -> Some e + + // Example of filtering our relevant Events from the Kafka stream + // NB if the percentage of relevant events is low, one may wish to adjust the projector to project only a subset + type Interpreter() = + let log = Log.ForContext() + + /// Handles various category / eventType / payload types as produced by Equinox.Tool + member __.TryDecode(x : Confluent.Kafka.ConsumeResult<_,_>) = + let ke = JsonConvert.DeserializeObject(x.Value) + match tryExtractCategory ke with + | Some "Favorites" -> tryDecode log Favorites.codec ke |> Option.map Choice1Of2 + | Some "SavedForLater" -> tryDecode log SavedForLater.codec ke |> Option.map Choice2Of2 + | x -> + if log.IsEnabled Serilog.Events.LogEventLevel.Debug then + log.ForContext("event", ke).Debug("Event could not be interpreted due to unknown category {category}", x) + None + +module Consumer = + open EventParser + + /// Starts a consumer which will will be driven based on batches emanating from the supplied `cfg` + let start (cfg: KafkaConsumerConfig) (degreeOfParallelism: int) = + let log = Log.ForContext() + let dop = new SemaphoreSlim(degreeOfParallelism) + let decoder = Interpreter() + let consume msgs = async { + // TODO filter relevant events, fan our processing as appropriate + let mutable favorited, unfavorited, saved, cleared = 0, 0, 0, 0 + let handleFave = function + | Favorites.Favorited _ -> Interlocked.Increment &favorited |> ignore + | Favorites.Unfavorited _ -> Interlocked.Increment &unfavorited |> ignore + let handleSave = function + | SavedForLater.Added e -> Interlocked.Add(&saved,e.skus.Length) |> ignore + | SavedForLater.Removed e -> Interlocked.Add(&cleared,e.skus.Length) |> ignore + | SavedForLater.Merged e -> Interlocked.Add(&saved,e.items.Length) |> ignore + // While this does not need to be async in this toy case, we illustrate here as any real example will need to deal with it + let handle e = async { + match e with + | Choice1Of2 fe -> handleFave fe + | Choice2Of2 se -> handleSave se + } + let! _ = + msgs + |> Seq.choose decoder.TryDecode + |> Seq.map (handle >> dop.Throttle) + |> Async.Parallel + log.Information("Consumed {b} Favorited {f} Unfavorited {u} Saved {s} Cleared {c}", + Array.length msgs, favorited, unfavorited, saved, cleared) + } + KafkaConsumer.Start log cfg consume + +module CmdParser = + open Argu + + exception MissingArg of string + let envBackstop msg key = + match Environment.GetEnvironmentVariable key with + | null -> raise <| MissingArg (sprintf "Please provide a %s, either as an argment or via the %s environment variable" msg key) + | x -> x + + [] + type Arguments = + | [] Broker of string + | [] Topic of string + | [] Group of string + | [] Parallelism of int + | [] Verbose + + interface IArgParserTemplate with + member a.Usage = a |> function + | Broker _ -> "specify Kafka Broker, in host:port format. (optional if environment variable EQUINOX_KAFKA_BROKER specified)" + | Topic _ -> "specify Kafka Topic name. (optional if environment variable EQUINOX_KAFKA_TOPIC specified)" + | Group _ -> "specify Kafka Consumer Group Id. (optional if environment variable EQUINOX_KAFKA_GROUP specified)" + | Parallelism _ -> "parallelism constraint when handling batches being consumed (default: 2 * Environment.ProcessorCount)" + | Verbose _ -> "request verbose logging." + + /// Parse the commandline; can throw exceptions in response to missing arguments and/or `-h`/`--help` args + let parse argv : ParseResults = + let programName = Reflection.Assembly.GetEntryAssembly().GetName().Name + let parser = ArgumentParser.Create(programName = programName) + parser.ParseCommandLine argv + + type Parameters(args : ParseResults) = + member __.Broker = Uri(match args.TryGetResult Broker with Some x -> x | None -> envBackstop "Broker" "EQUINOX_KAFKA_BROKER") + member __.Topic = match args.TryGetResult Topic with Some x -> x | None -> envBackstop "Topic" "EQUINOX_KAFKA_TOPIC" + member __.Group = match args.TryGetResult Group with Some x -> x | None -> envBackstop "Group" "EQUINOX_KAFKA_GROUP" + member __.Parallelism = match args.TryGetResult Parallelism with Some x -> x | None -> 2 * Environment.ProcessorCount + member __.Verbose = args.Contains Verbose + +module Logging = + let initialize verbose = + Log.Logger <- + LoggerConfiguration() + .Destructure.FSharpTypes() + .Enrich.FromLogContext() + |> fun c -> if verbose then c.MinimumLevel.Debug() else c + |> fun c -> let theme = Sinks.SystemConsole.Themes.AnsiConsoleTheme.Code + if not verbose then c.WriteTo.Console(theme=theme) + else c.WriteTo.Console(theme=theme, outputTemplate="[{Timestamp:HH:mm:ss} {Level:u3}] {Message:lj}|{Properties}{NewLine}{Exception}") + |> fun c -> c.CreateLogger() + +[] +let main argv = + try let parsed = CmdParser.parse argv + let args = CmdParser.Parameters(parsed) + Logging.initialize args.Verbose + let cfg = KafkaConsumerConfig.Create("ProjectorTemplate", args.Broker, [args.Topic], args.Group) + + use c = Consumer.start cfg args.Parallelism + c.AwaitCompletion() |> Async.RunSynchronously + 0 + with :? Argu.ArguParseException as e -> eprintfn "%s" e.Message; 1 + | CmdParser.MissingArg msg -> eprintfn "%s" msg; 1 + | e -> Log.Fatal(e, "Exiting"); 1 \ No newline at end of file diff --git a/equinox-projector/Projector/Infrastructure.fs b/equinox-projector/Projector/Infrastructure.fs new file mode 100644 index 000000000..7f90ddfae --- /dev/null +++ b/equinox-projector/Projector/Infrastructure.fs @@ -0,0 +1,19 @@ +[] +module private ProjectorTemplate.Projector.Infrastructure + +open System +open System.Threading +open System.Threading.Tasks + +#nowarn "21" // re AwaitKeyboardInterrupt +#nowarn "40" // re AwaitKeyboardInterrupt + +type Async with + static member Sleep(t : TimeSpan) : Async = Async.Sleep(int t.TotalMilliseconds) + /// Asynchronously awaits the next keyboard interrupt event + static member AwaitKeyboardInterrupt () : Async = + Async.FromContinuations(fun (sc,_,_) -> + let isDisposed = ref 0 + let rec callback _ = Task.Run(fun () -> if Interlocked.Increment isDisposed = 1 then d.Dispose() ; sc ()) |> ignore + and d : IDisposable = Console.CancelKeyPress.Subscribe callback + in ()) diff --git a/equinox-projector/Projector/Program.fs b/equinox-projector/Projector/Program.fs new file mode 100644 index 000000000..3b8bb2018 --- /dev/null +++ b/equinox-projector/Projector/Program.fs @@ -0,0 +1,212 @@ +module ProjectorTemplate.Projector.Program + +//#if kafka +open Confluent.Kafka +//#endif +open Equinox.Cosmos +open Equinox.Cosmos.Projection +//#if kafka +open Equinox.Projection.Codec +open Jet.ConfluentKafka.FSharp +//#endif +open Equinox.Store +open Microsoft.Azure.Documents.ChangeFeedProcessor +open Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing +//#if kafka +open Newtonsoft.Json +//#endif +open Serilog +open System +open System.Collections.Generic +open System.Diagnostics + +module CmdParser = + open Argu + + exception MissingArg of string + let envBackstop msg key = + match Environment.GetEnvironmentVariable key with + | null -> raise <| MissingArg (sprintf "Please provide a %s, either as an argment or via the %s environment variable" msg key) + | x -> x + + module Cosmos = + type [] Arguments = + | [] ConnectionMode of Equinox.Cosmos.ConnectionMode + | [] Timeout of float + | [] Retries of int + | [] RetriesWaitTime of int + | [] Connection of string + | [] Database of string + | [] Collection of string + interface IArgParserTemplate with + member a.Usage = + match a with + | Timeout _ -> "specify operation timeout in seconds (default: 5)." + | Retries _ -> "specify operation retries (default: 1)." + | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds (default: 5)" + | Connection _ -> "specify a connection string for a Cosmos account (defaults: envvar:EQUINOX_COSMOS_CONNECTION, Cosmos Emulator)." + | ConnectionMode _ -> "override the connection mode (default: DirectTcp)." + | Database _ -> "specify a database name for Cosmos account (defaults: envvar:EQUINOX_COSMOS_DATABASE, test)." + | Collection _ -> "specify a collection name for Cosmos account (defaults: envvar:EQUINOX_COSMOS_COLLECTION, test)." + type Info(args : ParseResults) = + member __.Connection = match args.TryGetResult Connection with Some x -> x | None -> envBackstop "Connection" "EQUINOX_COSMOS_CONNECTION" + member __.Database = match args.TryGetResult Database with Some x -> x | None -> envBackstop "Database" "EQUINOX_COSMOS_DATABASE" + member __.Collection = match args.TryGetResult Collection with Some x -> x | None -> envBackstop "Collection" "EQUINOX_COSMOS_COLLECTION" + + member __.Timeout = args.GetResult(Timeout,5.) |> TimeSpan.FromSeconds + member __.Mode = args.GetResult(ConnectionMode,Equinox.Cosmos.ConnectionMode.DirectTcp) + member __.Retries = args.GetResult(Retries, 1) + member __.MaxRetryWaitTime = args.GetResult(RetriesWaitTime, 5) + + member x.BuildConnectionDetails() = + let (Discovery.UriAndKey (endpointUri,masterKey)) = Discovery.FromConnectionString x.Connection + Log.Information("CosmosDb {mode} {endpointUri} Database {database} Collection {collection}.", + x.Mode, endpointUri, x.Database, x.Collection) + Log.Information("CosmosDb timeout: {timeout}s, {retries} retries; Throttling maxRetryWaitTime {maxRetryWaitTime}", + (let t = x.Timeout in t.TotalSeconds), x.Retries, x.MaxRetryWaitTime) + let c = + CosmosConnector(log=Log.Logger, mode=x.Mode, requestTimeout=x.Timeout, + maxRetryAttemptsOnThrottledRequests=x.Retries, maxRetryWaitTimeInSeconds=x.MaxRetryWaitTime) + (endpointUri,masterKey), c.ConnectionPolicy, { database = x.Database; collection = x.Collection } + + [] + type Arguments = + (* ChangeFeed Args*) + | [] ConsumerGroupName of string + | [] LeaseCollectionSuffix of string + | [] ForceStartFromHere + | [] BatchSize of int + | [] LagFreqS of float + | [] Verbose + | [] ChangeFeedVerbose +//#if kafka + (* Kafka Args *) + | [] Broker of string + | [] Topic of string +//#endif + (* ChangeFeed Args *) + | [] Cosmos of ParseResults + interface IArgParserTemplate with + member a.Usage = + match a with + | ConsumerGroupName _ -> "Projector consumer group name." + | LeaseCollectionSuffix _ -> "specify Collection Name suffix for Leases collection (default: `-aux`)." + | ForceStartFromHere _ -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event." + | BatchSize _ -> "maximum item count to request from feed. Default: 1000" + | LagFreqS _ -> "specify frequency to dump lag stats. Default: off" + | Verbose -> "request Verbose Logging. Default: off" + | ChangeFeedVerbose -> "request Verbose Logging from ChangeFeedProcessor. Default: off" +//#if kafka + | Broker _ -> "specify Kafka Broker, in host:port format. (default: use environment variable EQUINOX_KAFKA_BROKER, if specified)" + | Topic _ -> "specify Kafka Topic Id. (default: use environment variable EQUINOX_KAFKA_TOPIC, if specified)" +//#endif + | Cosmos _ -> "specify CosmosDb input parameters" + and Parameters(args : ParseResults) = + member val Cosmos = Cosmos.Info(args.GetResult Cosmos) +//#if kafka + member val Target = TargetInfo args +//#endif + member __.LeaseId = args.GetResult ConsumerGroupName + member __.Suffix = args.GetResult(LeaseCollectionSuffix,"-aux") + member __.Verbose = args.Contains Verbose + member __.ChangeFeedVerbose = args.Contains ChangeFeedVerbose + member __.BatchSize = args.GetResult(BatchSize,1000) + member __.LagFrequency = args.TryGetResult LagFreqS |> Option.map TimeSpan.FromSeconds + member __.AuxCollectionName = __.Cosmos.Collection + __.Suffix + member __.StartFromHere = args.Contains ForceStartFromHere + member x.BuildChangeFeedParams() = + Log.Information("Processing {leaseId} in {auxCollName} in batches of {batchSize}", x.LeaseId, x.AuxCollectionName, x.BatchSize) + if x.StartFromHere then Log.Warning("(If new projector group) Skipping projection of all existing events.") + x.LagFrequency |> Option.iter (fun s -> Log.Information("Dumping lag stats at {lagS:n0}s intervals", s.TotalSeconds)) + { database = x.Cosmos.Database; collection = x.AuxCollectionName}, x.LeaseId, x.StartFromHere, x.BatchSize, x.LagFrequency +//#if kafka + and TargetInfo(args : ParseResults) = + member __.Broker = Uri(match args.TryGetResult Broker with Some x -> x | None -> envBackstop "Broker" "EQUINOX_KAFKA_BROKER") + member __.Topic = match args.TryGetResult Topic with Some x -> x | None -> envBackstop "Topic" "EQUINOX_KAFKA_TOPIC" + member x.BuildTargetParams() = x.Broker, x.Topic +//#endif + + /// Parse the commandline; can throw exceptions in response to missing arguments and/or `-h`/`--help` args + let parse argv : Parameters = + let programName = System.Reflection.Assembly.GetEntryAssembly().GetName().Name + let parser = ArgumentParser.Create(programName = programName) + parser.ParseCommandLine argv |> Parameters + +// Illustrates how to emit direct to the Console using Serilog +// Other topographies can be achieved by using various adapters and bridges, e.g., SerilogTarget or Serilog.Sinks.NLog +module Logging = + let initialize verbose changeLogVerbose = + Log.Logger <- + LoggerConfiguration().Destructure.FSharpTypes().Enrich.FromLogContext() + |> fun c -> if verbose then c.MinimumLevel.Debug() else c + // LibLog writes to the global logger, so we need to control the emission if we don't want to pass loggers everywhere + |> fun c -> let cfpl = if changeLogVerbose then Serilog.Events.LogEventLevel.Debug else Serilog.Events.LogEventLevel.Warning + c.MinimumLevel.Override("Microsoft.Azure.Documents.ChangeFeedProcessor", cfpl) + |> fun c -> c.WriteTo.Console(theme=Sinks.SystemConsole.Themes.AnsiConsoleTheme.Code) + .CreateLogger() + +let run (endpointUri, masterKey) connectionPolicy source + (aux, leaseId, forceSkip, batchSize, lagReportFreq : TimeSpan option) + createRangeProjector = async { + let logLag (interval : TimeSpan) (remainingWork : (int*int64) seq) = async { + Log.Information("Lags by Range {@rangeLags}", remainingWork) + return! Async.Sleep interval } + let maybeLogLag = lagReportFreq |> Option.map logLag + let! _feedEventHost = + ChangeFeedProcessor.Start + ( Log.Logger, endpointUri, masterKey, connectionPolicy, source, aux, leasePrefix = leaseId, forceSkipExistingEvents = forceSkip, + cfBatchSize = batchSize, createObserver = createRangeProjector, ?reportLagAndAwaitNextEstimation = maybeLogLag) + do! Async.AwaitKeyboardInterrupt() } + +//#if kafka +let mkRangeProjector (broker, topic) = + let sw = Stopwatch.StartNew() // we'll report the warmup/connect time on the first batch + let cfg = KafkaProducerConfig.Create("ProjectorTemplate", broker, Acks.Leader, compression = LZ4) + let producer = KafkaProducer.Create(Log.Logger, cfg, topic) + let disposeProducer = (producer :> IDisposable).Dispose + let projectBatch (ctx : IChangeFeedObserverContext) (docs : IReadOnlyList) = async { + sw.Stop() // Stop the clock after ChangeFeedProcessor hands off to us + let toKafkaEvent (e: DocumentParser.IEvent) : RenderedEvent = { s = e.Stream; i = e.Index; c = e.EventType; t = e.TimeStamp; d = e.Data; m = e.Meta } + let pt,events = (fun () -> docs |> Seq.collect DocumentParser.enumEvents |> Seq.map toKafkaEvent |> Array.ofSeq) |> Stopwatch.Time + let es = [| for e in events -> e.s, JsonConvert.SerializeObject e |] + let! et,_ = producer.ProduceBatch es |> Stopwatch.Time + let r = ctx.FeedResponse + Log.Information("{range} Fetch: {token} {requestCharge:n0}RU {count} docs {l:n1}s; Parse: {events} events {p:n3}s; Emit: {e:n1}s", + ctx.PartitionKeyRangeId, r.ResponseContinuation.Trim[|'"'|], r.RequestCharge, docs.Count, float sw.ElapsedMilliseconds / 1000., + events.Length, (let e = pt.Elapsed in e.TotalSeconds), (let e = et.Elapsed in e.TotalSeconds)) + sw.Restart() // restart the clock as we handoff back to the ChangeFeedProcessor + } + ChangeFeedObserver.Create(Log.Logger, projectBatch, disposeProducer) +//#else +let createRangeHandler () = + let sw = Stopwatch.StartNew() // we'll end up reporting the warmup/connect time on the first batch, but that's ok + let processBatch (ctx : IChangeFeedObserverContext) (docs : IReadOnlyList) = async { + sw.Stop() // Stop the clock after ChangeFeedProcessor hands off to us + let pt,events = (fun () -> docs |> Seq.collect DocumentParser.enumEvents |> Seq.length) |> Stopwatch.Time + let r = ctx.FeedResponse + Log.Information("{range} Fetch: {token} {requestCharge:n0}RU {count} docs {l:n1}s; Parse: {events} events {p:n3}s", + ctx.PartitionKeyRangeId, r.ResponseContinuation.Trim[|'"'|], r.RequestCharge, docs.Count, float sw.ElapsedMilliseconds / 1000., + events, (let e = pt.Elapsed in e.TotalSeconds)) + sw.Restart() // restart the clock as we handoff back to the ChangeFeedProcessor + } + ChangeFeedObserver.Create(Log.Logger, processBatch) + //#endif + +[] +let main argv = + try let args = CmdParser.parse argv + Logging.initialize args.Verbose args.ChangeFeedVerbose + let (endpointUri, masterKey), connectionPolicy, source = args.Cosmos.BuildConnectionDetails() + let aux, leaseId, startFromHere, batchSize, lagFrequency = args.BuildChangeFeedParams() +//#if kafka + let targetParams = args.Target.BuildTargetParams() + let createRangeHandler () = mkRangeProjector targetParams +//#endif + run (endpointUri, masterKey) connectionPolicy source + (aux, leaseId, startFromHere, batchSize, lagFrequency) + createRangeHandler + |> Async.RunSynchronously + 0 + with :? Argu.ArguParseException as e -> eprintfn "%s" e.Message; 1 + | CmdParser.MissingArg msg -> eprintfn "%s" msg; 1 + | e -> eprintfn "%s" e.Message; 1 \ No newline at end of file diff --git a/equinox-projector/Projector/Projector.fsproj b/equinox-projector/Projector/Projector.fsproj new file mode 100644 index 000000000..eeb01a981 --- /dev/null +++ b/equinox-projector/Projector/Projector.fsproj @@ -0,0 +1,25 @@ + + + + Exe + netcoreapp2.1 + 5 + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/equinox-projector/README.md b/equinox-projector/README.md new file mode 100644 index 000000000..e91b254f8 --- /dev/null +++ b/equinox-projector/README.md @@ -0,0 +1,79 @@ +//#if kafka +# Equinox Projector + Consumer +//#else +# Equinox Projector (without consumer) +//#endif + +This project was generated using: +//#if kafka + + dotnet new -i Equinox.Templates # just once, to install/update in the local templates store + dotnet new eqxprojector -k # -k => include projection logic and consumer +//#else + + dotnet new -i Equinox.Templates # just once, to install/update in the local templates store + # add -k to add Kafka Projection logic and consumer + dotnet new eqxprojector # use --help to see options +//#endif + +## Usage instructions + +0. establish connection strings etc. per https://github.com/jet/equinox README + + $env:EQUINOX_COSMOS_CONNECTION="AccountEndpoint=https://....;AccountKey=....=;" # or use -s + $env:EQUINOX_COSMOS_DATABASE="equinox-test" # or use -d + $env:EQUINOX_COSMOS_COLLECTION="equinox-test" # or use - c + +1. Use the `eqx` tool to initialize and then run some transactions in a CosmosDb collection + + dotnet tool install -g Equinox.Tool # only needed once + + # (either add environment variables as per step 0 or use -s/-d/-c to specify them) + + # generate a cosmos collection to store events in + eqx init -ru 1000 cosmos + + # (either add environment variables as per step 0 or use -s/-d/-c to specify them) + # `-t saveforlater` SaveForLater test produces uniform size events to project + # `-C -f 200` constrains current writers to 100 and applies caching so RU consumption is constrained such that an allocation of 1000 is sufficient + eqx run -t saveforlater -C -f 100 cosmos +//#if kafka + +2. To run an instance of the Projector: + + # (either add environment variables as per step 0 or use -s/-d/-c to specify them) + + $env:EQUINOX_KAFKA_BROKER="instance.kafka.mysite.com:9092" # or use -b + + # `default` defines the Projector Group identity - each id has separated state in the aux collection (aka LeaseId) + # `-m 1000` sets the max batch size to 1000 + # `-t topic0` identifies the Kafka topic to which the Projector should write + # cosmos specifies the source (if you have specified 3x EQUINOX_COSMOS_* environment vars, no arguments are needed) + dotnet run -p Projector -- default -m 1000 -t topic0 cosmos + + # (assuming you've scaled up enough to have >1 range, you can run a second instance in a second console with the same arguments) + +3. To run an instance of the Consumer: + + $env:EQUINOX_KAFKA_BROKER="instance.kafka.mysite.com:9092" # or use -b + $env:EQUINOX_KAFKA_TOPIC="topic0" # or use -t + $env:EQUINOX_KAFKA_GROUP="group0" # or use -g + + # `-t topic0` identifies the Kafka topic from which the consumers should read + # `-g group0` identifies the Kafka consumer group among which the consumption is to be spread + dotnet run -p Consumer -- -t topic0 -g group0 + + # (you can run as many instances as there are partitions configured for the topic on the broker) +//#else + +2. To run an instance of the Projector: + + # (either add environment variables as per step 0 or use -s/-d/-c to specify them) + + # `default` defines the Projector Group identity - each id has separated state in the aux collection (aka LeaseId) + # `-m 1000` sets the max batch size to 1000 + # cosmos specifies the source (if you have specified 3x EQUINOX_COSMOS_* environment vars, no arguments are needed) + dotnet run -p Projector -- default -m 1000 cosmos + + # NB (assuming you've scaled up enough to have >1 range, you can run a second instance in a second console with the same arguments) +//#endif \ No newline at end of file diff --git a/equinox-projector/equinox-projector-consumer.sln b/equinox-projector/equinox-projector-consumer.sln new file mode 100644 index 000000000..aa427035b --- /dev/null +++ b/equinox-projector/equinox-projector-consumer.sln @@ -0,0 +1,36 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio 15 +VisualStudioVersion = 15.0.26124.0 +MinimumVisualStudioVersion = 15.0.26124.0 +Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Projector", "Projector\Projector.fsproj", "{6C72C937-ECFC-4DD4-9BA0-7355B237F974}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{518EE7E2-76AF-4DE9-A127-C2DFF709A468}" + ProjectSection(SolutionItems) = preProject + README.md = README.md + EndProjectSection +EndProject +Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Consumer", "Consumer\Consumer.fsproj", "{7ED94D2B-1744-48A0-9B20-94E4777617E9}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {6C72C937-ECFC-4DD4-9BA0-7355B237F974}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {6C72C937-ECFC-4DD4-9BA0-7355B237F974}.Debug|Any CPU.Build.0 = Debug|Any CPU + {6C72C937-ECFC-4DD4-9BA0-7355B237F974}.Release|Any CPU.ActiveCfg = Release|Any CPU + {6C72C937-ECFC-4DD4-9BA0-7355B237F974}.Release|Any CPU.Build.0 = Release|Any CPU + {7ED94D2B-1744-48A0-9B20-94E4777617E9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {7ED94D2B-1744-48A0-9B20-94E4777617E9}.Debug|Any CPU.Build.0 = Debug|Any CPU + {7ED94D2B-1744-48A0-9B20-94E4777617E9}.Release|Any CPU.ActiveCfg = Release|Any CPU + {7ED94D2B-1744-48A0-9B20-94E4777617E9}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {2232D6B6-0CA3-4E51-BFD4-DB0A42EF34BF} + EndGlobalSection +EndGlobal diff --git a/equinox-projector/equinox-projector.sln b/equinox-projector/equinox-projector.sln new file mode 100644 index 000000000..2fffa69eb --- /dev/null +++ b/equinox-projector/equinox-projector.sln @@ -0,0 +1,30 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio 15 +VisualStudioVersion = 15.0.26124.0 +MinimumVisualStudioVersion = 15.0.26124.0 +Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Projector", "Projector\Projector.fsproj", "{6C72C937-ECFC-4DD4-9BA0-7355B237F974}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{518EE7E2-76AF-4DE9-A127-C2DFF709A468}" + ProjectSection(SolutionItems) = preProject + README.md = README.md + EndProjectSection +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {6C72C937-ECFC-4DD4-9BA0-7355B237F974}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {6C72C937-ECFC-4DD4-9BA0-7355B237F974}.Debug|Any CPU.Build.0 = Debug|Any CPU + {6C72C937-ECFC-4DD4-9BA0-7355B237F974}.Release|Any CPU.ActiveCfg = Release|Any CPU + {6C72C937-ECFC-4DD4-9BA0-7355B237F974}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {2232D6B6-0CA3-4E51-BFD4-DB0A42EF34BF} + EndGlobalSection +EndGlobal diff --git a/src/Equinox.Templates/Equinox.Templates.fsproj b/src/Equinox.Templates/Equinox.Templates.fsproj index 722b5ae18..3eae2c8fb 100644 --- a/src/Equinox.Templates/Equinox.Templates.fsproj +++ b/src/Equinox.Templates/Equinox.Templates.fsproj @@ -3,9 +3,9 @@ @jet @bartelink contributors Jet.com - dotnet new eqxweb, eqxwebcs: Equinox Web, Domain templates + dotnet new eqxweb, eqxwebcs, eqxprojector: Equinox Web, Domain, Projector, Consumer templates https://github.com/jet/dotnet-templates - equinox fsharp eventsourcing cosmosdb eventstore + equinox fsharp eventsourcing cosmosdb eventstore changefeedprocessor kafka Apache-2.0 Copyright © 2018-9