diff --git a/README.md b/README.md
index aaa30b735..c4e4691e0 100644
--- a/README.md
+++ b/README.md
@@ -6,6 +6,7 @@ This repo hosts the source for Jet's [`dotnet new`](https://docs.microsoft.com/e
- [`eqxweb`](equinox-web/README.md) - Boilerplate for an ASP .NET Core Web App, with an associated storage-independent Domain project.
- [`eqxwebcs`](equinox-web-csharp/README.md) - Boilerplate for an ASP .NET Core Web App, with an associated storage-independent Domain project _ported to C#_.
+- [`eqxprojector`](equinox-projector-cosmos/README.md) - Boilerplate for a CosmosDb ChangeFeedProcessor, with optional projection to Apache Kafka and associated consumer logic.
## How to use
@@ -29,6 +30,9 @@ To use from the command line, the outline is:
# see readme.md in the generated code for further instructions regarding the TodoBackend the above -t switch above triggers the inclusion of
start readme.md
+ # to add a Projector and a Consumer (-k emits to Kafka and hence implies having a Consumer)
+ dotnet new eqxprojector -k
+
## CONTRIBUTING
Please don't hesitate to [create a GitHub issue](https://github.com/jet/dotnet-templates/issues/new) for any questions so others can benefit from the discussion. For any significant planned changes or additions, please err on the side of [reaching out early](https://github.com/jet/dotnet-templates/issues/new) so we can align expectationss - there's nothing more frustrating than having your hard work not yielding a mutually agreeable result ;)
diff --git a/equinox-projector/.template.config/template.json b/equinox-projector/.template.config/template.json
new file mode 100644
index 000000000..3eb827c4d
--- /dev/null
+++ b/equinox-projector/.template.config/template.json
@@ -0,0 +1,48 @@
+{
+ "$schema": "http://json.schemastore.org/template",
+ "author": "@jet @bartelink",
+ "classifications": [
+ "Equinox",
+ "Event Sourcing",
+ "CosmosDb",
+ "ChangeFeed",
+ "Kafka"
+ ],
+ "tags": {
+ "language": "F#"
+ },
+ "identity": "Equinox.Template.Projector.Cosmos",
+ "name": "Equinox Projector (with optional consumer)",
+ "shortName": "eqxprojector",
+ "sourceName": "ProjectorTemplate",
+ "preferNameDirectory": true,
+
+ "symbols": {
+ "kafka": {
+ "type": "parameter",
+ "datatype": "bool",
+ "isRequired": false,
+ "defaultValue": "false",
+ "description": "Include a code projecting to, and consuming from Kafka."
+ }
+ },
+ "sources": [
+ {
+ "modifiers": [
+ {
+ "condition": "(!kafka)",
+ "exclude": [
+ "**/Consumer/**/*",
+ "equinox-projector-consumer-cosmos.sln"
+ ]
+ },
+ {
+ "condition": "(kafka)",
+ "exclude": [
+ "equinox-projector-cosmos.sln"
+ ]
+ }
+ ]
+ }
+ ]
+}
\ No newline at end of file
diff --git a/equinox-projector/Consumer/Consumer.fsproj b/equinox-projector/Consumer/Consumer.fsproj
new file mode 100644
index 000000000..bae441780
--- /dev/null
+++ b/equinox-projector/Consumer/Consumer.fsproj
@@ -0,0 +1,23 @@
+
+
+
+ Exe
+ netcoreapp2.1
+ 5
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/equinox-projector/Consumer/Infrastructure.fs b/equinox-projector/Consumer/Infrastructure.fs
new file mode 100644
index 000000000..7ef7ded63
--- /dev/null
+++ b/equinox-projector/Consumer/Infrastructure.fs
@@ -0,0 +1,47 @@
+[]
+module private ProjectorTemplate.Consumer.Infrastructure
+
+open System
+open System.Threading
+open System.Threading.Tasks
+
+type Async with
+ static member AwaitTaskCorrect (task : Task<'T>) : Async<'T> =
+ Async.FromContinuations <| fun (k,ek,_) ->
+ task.ContinueWith (fun (t:Task<'T>) ->
+ if t.IsFaulted then
+ let e = t.Exception
+ if e.InnerExceptions.Count = 1 then ek e.InnerExceptions.[0]
+ else ek e
+ elif t.IsCanceled then ek (TaskCanceledException("Task wrapped with Async has been cancelled."))
+ elif t.IsCompleted then k t.Result
+ else ek(Exception "invalid Task state!"))
+ |> ignore
+
+ static member AwaitTaskCorrect (task : Task) : Async =
+ Async.FromContinuations <| fun (k,ek,_) ->
+ task.ContinueWith (fun (t:Task) ->
+ if t.IsFaulted then
+ let e = t.Exception
+ if e.InnerExceptions.Count = 1 then ek e.InnerExceptions.[0]
+ else ek e
+ elif t.IsCanceled then ek (TaskCanceledException("Task wrapped with Async has been cancelled."))
+ elif t.IsCompleted then k ()
+ else ek(Exception "invalid Task state!"))
+ |> ignore
+
+type SemaphoreSlim with
+ /// F# friendly semaphore await function
+ member semaphore.Await(?timeout : TimeSpan) = async {
+ let! ct = Async.CancellationToken
+ let timeout = defaultArg timeout Timeout.InfiniteTimeSpan
+ let task = semaphore.WaitAsync(timeout, ct)
+ return! Async.AwaitTaskCorrect task
+ }
+
+ /// Throttling wrapper which waits asynchronously until the semaphore has available capacity
+ member semaphore.Throttle(workflow : Async<'T>) : Async<'T> = async {
+ let! _ = semaphore.Await()
+ try return! workflow
+ finally semaphore.Release() |> ignore
+ }
diff --git a/equinox-projector/Consumer/Program.fs b/equinox-projector/Consumer/Program.fs
new file mode 100644
index 000000000..fe687fc66
--- /dev/null
+++ b/equinox-projector/Consumer/Program.fs
@@ -0,0 +1,169 @@
+module ProjectorTemplate.Consumer.Program
+
+open Jet.ConfluentKafka.FSharp
+open Serilog
+open System
+open System.Threading
+
+module EventParser =
+ open Equinox.Projection.Codec
+ open Newtonsoft.Json
+
+ type SkuId = string
+
+ let settings = JsonSerializerSettings()
+
+ // NB - these schemas reflect the actual storage formats and hence need to be versioned with care
+ module SavedForLater =
+ type Item = { skuId : SkuId; dateSaved : DateTimeOffset }
+
+ type Added = { skus : SkuId []; dateSaved : DateTimeOffset }
+ type Removed = { skus : SkuId [] }
+ type Merged = { items : Item [] }
+
+ type Event =
+ /// Inclusion of another set of state in this one
+ | Merged of Merged
+ /// Removal of a set of skus
+ | Removed of Removed
+ /// Addition of a collection of skus to the list
+ | Added of Added
+ interface TypeShape.UnionContract.IUnionContract
+ let codec = Equinox.Codec.JsonNet.JsonUtf8.Create(settings)
+
+ // NB - these schemas reflect the actual storage formats and hence need to be versioned with care
+ module Favorites =
+ type Favorited = { date: DateTimeOffset; skuId: SkuId }
+ type Unfavorited = { skuId: SkuId }
+
+ type Event =
+ | Favorited of Favorited
+ | Unfavorited of Unfavorited
+ interface TypeShape.UnionContract.IUnionContract
+ let codec = Equinox.Codec.JsonNet.JsonUtf8.Create(settings)
+
+ let tryExtractCategory (x : RenderedEvent) =
+ x.s.Split([|'-'|], 2, StringSplitOptions.RemoveEmptyEntries)
+ |> Array.tryHead
+ let tryDecode (log : ILogger) (codec : Equinox.Codec.IUnionEncoder<_,_>) (x : RenderedEvent) =
+ match codec.TryDecode (Equinox.Codec.Core.EventData.Create(x.c, x.d)) with
+ | None ->
+ if log.IsEnabled Serilog.Events.LogEventLevel.Debug then
+ log.ForContext("event", System.Text.Encoding.UTF8.GetString(x.d), true)
+ .Debug("Codec {type} Could not decode {eventType} in {stream}", codec.GetType().FullName, x.c, x.s);
+ None
+ | Some e -> Some e
+
+ // Example of filtering our relevant Events from the Kafka stream
+ // NB if the percentage of relevant events is low, one may wish to adjust the projector to project only a subset
+ type Interpreter() =
+ let log = Log.ForContext()
+
+ /// Handles various category / eventType / payload types as produced by Equinox.Tool
+ member __.TryDecode(x : Confluent.Kafka.ConsumeResult<_,_>) =
+ let ke = JsonConvert.DeserializeObject(x.Value)
+ match tryExtractCategory ke with
+ | Some "Favorites" -> tryDecode log Favorites.codec ke |> Option.map Choice1Of2
+ | Some "SavedForLater" -> tryDecode log SavedForLater.codec ke |> Option.map Choice2Of2
+ | x ->
+ if log.IsEnabled Serilog.Events.LogEventLevel.Debug then
+ log.ForContext("event", ke).Debug("Event could not be interpreted due to unknown category {category}", x)
+ None
+
+module Consumer =
+ open EventParser
+
+ /// Starts a consumer which will will be driven based on batches emanating from the supplied `cfg`
+ let start (cfg: KafkaConsumerConfig) (degreeOfParallelism: int) =
+ let log = Log.ForContext()
+ let dop = new SemaphoreSlim(degreeOfParallelism)
+ let decoder = Interpreter()
+ let consume msgs = async {
+ // TODO filter relevant events, fan our processing as appropriate
+ let mutable favorited, unfavorited, saved, cleared = 0, 0, 0, 0
+ let handleFave = function
+ | Favorites.Favorited _ -> Interlocked.Increment &favorited |> ignore
+ | Favorites.Unfavorited _ -> Interlocked.Increment &unfavorited |> ignore
+ let handleSave = function
+ | SavedForLater.Added e -> Interlocked.Add(&saved,e.skus.Length) |> ignore
+ | SavedForLater.Removed e -> Interlocked.Add(&cleared,e.skus.Length) |> ignore
+ | SavedForLater.Merged e -> Interlocked.Add(&saved,e.items.Length) |> ignore
+ // While this does not need to be async in this toy case, we illustrate here as any real example will need to deal with it
+ let handle e = async {
+ match e with
+ | Choice1Of2 fe -> handleFave fe
+ | Choice2Of2 se -> handleSave se
+ }
+ let! _ =
+ msgs
+ |> Seq.choose decoder.TryDecode
+ |> Seq.map (handle >> dop.Throttle)
+ |> Async.Parallel
+ log.Information("Consumed {b} Favorited {f} Unfavorited {u} Saved {s} Cleared {c}",
+ Array.length msgs, favorited, unfavorited, saved, cleared)
+ }
+ KafkaConsumer.Start log cfg consume
+
+module CmdParser =
+ open Argu
+
+ exception MissingArg of string
+ let envBackstop msg key =
+ match Environment.GetEnvironmentVariable key with
+ | null -> raise <| MissingArg (sprintf "Please provide a %s, either as an argment or via the %s environment variable" msg key)
+ | x -> x
+
+ []
+ type Arguments =
+ | [] Broker of string
+ | [] Topic of string
+ | [] Group of string
+ | [] Parallelism of int
+ | [] Verbose
+
+ interface IArgParserTemplate with
+ member a.Usage = a |> function
+ | Broker _ -> "specify Kafka Broker, in host:port format. (optional if environment variable EQUINOX_KAFKA_BROKER specified)"
+ | Topic _ -> "specify Kafka Topic name. (optional if environment variable EQUINOX_KAFKA_TOPIC specified)"
+ | Group _ -> "specify Kafka Consumer Group Id. (optional if environment variable EQUINOX_KAFKA_GROUP specified)"
+ | Parallelism _ -> "parallelism constraint when handling batches being consumed (default: 2 * Environment.ProcessorCount)"
+ | Verbose _ -> "request verbose logging."
+
+ /// Parse the commandline; can throw exceptions in response to missing arguments and/or `-h`/`--help` args
+ let parse argv : ParseResults =
+ let programName = Reflection.Assembly.GetEntryAssembly().GetName().Name
+ let parser = ArgumentParser.Create(programName = programName)
+ parser.ParseCommandLine argv
+
+ type Parameters(args : ParseResults) =
+ member __.Broker = Uri(match args.TryGetResult Broker with Some x -> x | None -> envBackstop "Broker" "EQUINOX_KAFKA_BROKER")
+ member __.Topic = match args.TryGetResult Topic with Some x -> x | None -> envBackstop "Topic" "EQUINOX_KAFKA_TOPIC"
+ member __.Group = match args.TryGetResult Group with Some x -> x | None -> envBackstop "Group" "EQUINOX_KAFKA_GROUP"
+ member __.Parallelism = match args.TryGetResult Parallelism with Some x -> x | None -> 2 * Environment.ProcessorCount
+ member __.Verbose = args.Contains Verbose
+
+module Logging =
+ let initialize verbose =
+ Log.Logger <-
+ LoggerConfiguration()
+ .Destructure.FSharpTypes()
+ .Enrich.FromLogContext()
+ |> fun c -> if verbose then c.MinimumLevel.Debug() else c
+ |> fun c -> let theme = Sinks.SystemConsole.Themes.AnsiConsoleTheme.Code
+ if not verbose then c.WriteTo.Console(theme=theme)
+ else c.WriteTo.Console(theme=theme, outputTemplate="[{Timestamp:HH:mm:ss} {Level:u3}] {Message:lj}|{Properties}{NewLine}{Exception}")
+ |> fun c -> c.CreateLogger()
+
+[]
+let main argv =
+ try let parsed = CmdParser.parse argv
+ let args = CmdParser.Parameters(parsed)
+ Logging.initialize args.Verbose
+ let cfg = KafkaConsumerConfig.Create("ProjectorTemplate", args.Broker, [args.Topic], args.Group)
+
+ use c = Consumer.start cfg args.Parallelism
+ c.AwaitCompletion() |> Async.RunSynchronously
+ 0
+ with :? Argu.ArguParseException as e -> eprintfn "%s" e.Message; 1
+ | CmdParser.MissingArg msg -> eprintfn "%s" msg; 1
+ | e -> Log.Fatal(e, "Exiting"); 1
\ No newline at end of file
diff --git a/equinox-projector/Projector/Infrastructure.fs b/equinox-projector/Projector/Infrastructure.fs
new file mode 100644
index 000000000..7f90ddfae
--- /dev/null
+++ b/equinox-projector/Projector/Infrastructure.fs
@@ -0,0 +1,19 @@
+[]
+module private ProjectorTemplate.Projector.Infrastructure
+
+open System
+open System.Threading
+open System.Threading.Tasks
+
+#nowarn "21" // re AwaitKeyboardInterrupt
+#nowarn "40" // re AwaitKeyboardInterrupt
+
+type Async with
+ static member Sleep(t : TimeSpan) : Async = Async.Sleep(int t.TotalMilliseconds)
+ /// Asynchronously awaits the next keyboard interrupt event
+ static member AwaitKeyboardInterrupt () : Async =
+ Async.FromContinuations(fun (sc,_,_) ->
+ let isDisposed = ref 0
+ let rec callback _ = Task.Run(fun () -> if Interlocked.Increment isDisposed = 1 then d.Dispose() ; sc ()) |> ignore
+ and d : IDisposable = Console.CancelKeyPress.Subscribe callback
+ in ())
diff --git a/equinox-projector/Projector/Program.fs b/equinox-projector/Projector/Program.fs
new file mode 100644
index 000000000..3b8bb2018
--- /dev/null
+++ b/equinox-projector/Projector/Program.fs
@@ -0,0 +1,212 @@
+module ProjectorTemplate.Projector.Program
+
+//#if kafka
+open Confluent.Kafka
+//#endif
+open Equinox.Cosmos
+open Equinox.Cosmos.Projection
+//#if kafka
+open Equinox.Projection.Codec
+open Jet.ConfluentKafka.FSharp
+//#endif
+open Equinox.Store
+open Microsoft.Azure.Documents.ChangeFeedProcessor
+open Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing
+//#if kafka
+open Newtonsoft.Json
+//#endif
+open Serilog
+open System
+open System.Collections.Generic
+open System.Diagnostics
+
+module CmdParser =
+ open Argu
+
+ exception MissingArg of string
+ let envBackstop msg key =
+ match Environment.GetEnvironmentVariable key with
+ | null -> raise <| MissingArg (sprintf "Please provide a %s, either as an argment or via the %s environment variable" msg key)
+ | x -> x
+
+ module Cosmos =
+ type [] Arguments =
+ | [] ConnectionMode of Equinox.Cosmos.ConnectionMode
+ | [] Timeout of float
+ | [] Retries of int
+ | [] RetriesWaitTime of int
+ | [] Connection of string
+ | [] Database of string
+ | [] Collection of string
+ interface IArgParserTemplate with
+ member a.Usage =
+ match a with
+ | Timeout _ -> "specify operation timeout in seconds (default: 5)."
+ | Retries _ -> "specify operation retries (default: 1)."
+ | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds (default: 5)"
+ | Connection _ -> "specify a connection string for a Cosmos account (defaults: envvar:EQUINOX_COSMOS_CONNECTION, Cosmos Emulator)."
+ | ConnectionMode _ -> "override the connection mode (default: DirectTcp)."
+ | Database _ -> "specify a database name for Cosmos account (defaults: envvar:EQUINOX_COSMOS_DATABASE, test)."
+ | Collection _ -> "specify a collection name for Cosmos account (defaults: envvar:EQUINOX_COSMOS_COLLECTION, test)."
+ type Info(args : ParseResults) =
+ member __.Connection = match args.TryGetResult Connection with Some x -> x | None -> envBackstop "Connection" "EQUINOX_COSMOS_CONNECTION"
+ member __.Database = match args.TryGetResult Database with Some x -> x | None -> envBackstop "Database" "EQUINOX_COSMOS_DATABASE"
+ member __.Collection = match args.TryGetResult Collection with Some x -> x | None -> envBackstop "Collection" "EQUINOX_COSMOS_COLLECTION"
+
+ member __.Timeout = args.GetResult(Timeout,5.) |> TimeSpan.FromSeconds
+ member __.Mode = args.GetResult(ConnectionMode,Equinox.Cosmos.ConnectionMode.DirectTcp)
+ member __.Retries = args.GetResult(Retries, 1)
+ member __.MaxRetryWaitTime = args.GetResult(RetriesWaitTime, 5)
+
+ member x.BuildConnectionDetails() =
+ let (Discovery.UriAndKey (endpointUri,masterKey)) = Discovery.FromConnectionString x.Connection
+ Log.Information("CosmosDb {mode} {endpointUri} Database {database} Collection {collection}.",
+ x.Mode, endpointUri, x.Database, x.Collection)
+ Log.Information("CosmosDb timeout: {timeout}s, {retries} retries; Throttling maxRetryWaitTime {maxRetryWaitTime}",
+ (let t = x.Timeout in t.TotalSeconds), x.Retries, x.MaxRetryWaitTime)
+ let c =
+ CosmosConnector(log=Log.Logger, mode=x.Mode, requestTimeout=x.Timeout,
+ maxRetryAttemptsOnThrottledRequests=x.Retries, maxRetryWaitTimeInSeconds=x.MaxRetryWaitTime)
+ (endpointUri,masterKey), c.ConnectionPolicy, { database = x.Database; collection = x.Collection }
+
+ []
+ type Arguments =
+ (* ChangeFeed Args*)
+ | [] ConsumerGroupName of string
+ | [] LeaseCollectionSuffix of string
+ | [] ForceStartFromHere
+ | [] BatchSize of int
+ | [] LagFreqS of float
+ | [] Verbose
+ | [] ChangeFeedVerbose
+//#if kafka
+ (* Kafka Args *)
+ | [] Broker of string
+ | [] Topic of string
+//#endif
+ (* ChangeFeed Args *)
+ | [] Cosmos of ParseResults
+ interface IArgParserTemplate with
+ member a.Usage =
+ match a with
+ | ConsumerGroupName _ -> "Projector consumer group name."
+ | LeaseCollectionSuffix _ -> "specify Collection Name suffix for Leases collection (default: `-aux`)."
+ | ForceStartFromHere _ -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event."
+ | BatchSize _ -> "maximum item count to request from feed. Default: 1000"
+ | LagFreqS _ -> "specify frequency to dump lag stats. Default: off"
+ | Verbose -> "request Verbose Logging. Default: off"
+ | ChangeFeedVerbose -> "request Verbose Logging from ChangeFeedProcessor. Default: off"
+//#if kafka
+ | Broker _ -> "specify Kafka Broker, in host:port format. (default: use environment variable EQUINOX_KAFKA_BROKER, if specified)"
+ | Topic _ -> "specify Kafka Topic Id. (default: use environment variable EQUINOX_KAFKA_TOPIC, if specified)"
+//#endif
+ | Cosmos _ -> "specify CosmosDb input parameters"
+ and Parameters(args : ParseResults) =
+ member val Cosmos = Cosmos.Info(args.GetResult Cosmos)
+//#if kafka
+ member val Target = TargetInfo args
+//#endif
+ member __.LeaseId = args.GetResult ConsumerGroupName
+ member __.Suffix = args.GetResult(LeaseCollectionSuffix,"-aux")
+ member __.Verbose = args.Contains Verbose
+ member __.ChangeFeedVerbose = args.Contains ChangeFeedVerbose
+ member __.BatchSize = args.GetResult(BatchSize,1000)
+ member __.LagFrequency = args.TryGetResult LagFreqS |> Option.map TimeSpan.FromSeconds
+ member __.AuxCollectionName = __.Cosmos.Collection + __.Suffix
+ member __.StartFromHere = args.Contains ForceStartFromHere
+ member x.BuildChangeFeedParams() =
+ Log.Information("Processing {leaseId} in {auxCollName} in batches of {batchSize}", x.LeaseId, x.AuxCollectionName, x.BatchSize)
+ if x.StartFromHere then Log.Warning("(If new projector group) Skipping projection of all existing events.")
+ x.LagFrequency |> Option.iter (fun s -> Log.Information("Dumping lag stats at {lagS:n0}s intervals", s.TotalSeconds))
+ { database = x.Cosmos.Database; collection = x.AuxCollectionName}, x.LeaseId, x.StartFromHere, x.BatchSize, x.LagFrequency
+//#if kafka
+ and TargetInfo(args : ParseResults) =
+ member __.Broker = Uri(match args.TryGetResult Broker with Some x -> x | None -> envBackstop "Broker" "EQUINOX_KAFKA_BROKER")
+ member __.Topic = match args.TryGetResult Topic with Some x -> x | None -> envBackstop "Topic" "EQUINOX_KAFKA_TOPIC"
+ member x.BuildTargetParams() = x.Broker, x.Topic
+//#endif
+
+ /// Parse the commandline; can throw exceptions in response to missing arguments and/or `-h`/`--help` args
+ let parse argv : Parameters =
+ let programName = System.Reflection.Assembly.GetEntryAssembly().GetName().Name
+ let parser = ArgumentParser.Create(programName = programName)
+ parser.ParseCommandLine argv |> Parameters
+
+// Illustrates how to emit direct to the Console using Serilog
+// Other topographies can be achieved by using various adapters and bridges, e.g., SerilogTarget or Serilog.Sinks.NLog
+module Logging =
+ let initialize verbose changeLogVerbose =
+ Log.Logger <-
+ LoggerConfiguration().Destructure.FSharpTypes().Enrich.FromLogContext()
+ |> fun c -> if verbose then c.MinimumLevel.Debug() else c
+ // LibLog writes to the global logger, so we need to control the emission if we don't want to pass loggers everywhere
+ |> fun c -> let cfpl = if changeLogVerbose then Serilog.Events.LogEventLevel.Debug else Serilog.Events.LogEventLevel.Warning
+ c.MinimumLevel.Override("Microsoft.Azure.Documents.ChangeFeedProcessor", cfpl)
+ |> fun c -> c.WriteTo.Console(theme=Sinks.SystemConsole.Themes.AnsiConsoleTheme.Code)
+ .CreateLogger()
+
+let run (endpointUri, masterKey) connectionPolicy source
+ (aux, leaseId, forceSkip, batchSize, lagReportFreq : TimeSpan option)
+ createRangeProjector = async {
+ let logLag (interval : TimeSpan) (remainingWork : (int*int64) seq) = async {
+ Log.Information("Lags by Range {@rangeLags}", remainingWork)
+ return! Async.Sleep interval }
+ let maybeLogLag = lagReportFreq |> Option.map logLag
+ let! _feedEventHost =
+ ChangeFeedProcessor.Start
+ ( Log.Logger, endpointUri, masterKey, connectionPolicy, source, aux, leasePrefix = leaseId, forceSkipExistingEvents = forceSkip,
+ cfBatchSize = batchSize, createObserver = createRangeProjector, ?reportLagAndAwaitNextEstimation = maybeLogLag)
+ do! Async.AwaitKeyboardInterrupt() }
+
+//#if kafka
+let mkRangeProjector (broker, topic) =
+ let sw = Stopwatch.StartNew() // we'll report the warmup/connect time on the first batch
+ let cfg = KafkaProducerConfig.Create("ProjectorTemplate", broker, Acks.Leader, compression = LZ4)
+ let producer = KafkaProducer.Create(Log.Logger, cfg, topic)
+ let disposeProducer = (producer :> IDisposable).Dispose
+ let projectBatch (ctx : IChangeFeedObserverContext) (docs : IReadOnlyList) = async {
+ sw.Stop() // Stop the clock after ChangeFeedProcessor hands off to us
+ let toKafkaEvent (e: DocumentParser.IEvent) : RenderedEvent = { s = e.Stream; i = e.Index; c = e.EventType; t = e.TimeStamp; d = e.Data; m = e.Meta }
+ let pt,events = (fun () -> docs |> Seq.collect DocumentParser.enumEvents |> Seq.map toKafkaEvent |> Array.ofSeq) |> Stopwatch.Time
+ let es = [| for e in events -> e.s, JsonConvert.SerializeObject e |]
+ let! et,_ = producer.ProduceBatch es |> Stopwatch.Time
+ let r = ctx.FeedResponse
+ Log.Information("{range} Fetch: {token} {requestCharge:n0}RU {count} docs {l:n1}s; Parse: {events} events {p:n3}s; Emit: {e:n1}s",
+ ctx.PartitionKeyRangeId, r.ResponseContinuation.Trim[|'"'|], r.RequestCharge, docs.Count, float sw.ElapsedMilliseconds / 1000.,
+ events.Length, (let e = pt.Elapsed in e.TotalSeconds), (let e = et.Elapsed in e.TotalSeconds))
+ sw.Restart() // restart the clock as we handoff back to the ChangeFeedProcessor
+ }
+ ChangeFeedObserver.Create(Log.Logger, projectBatch, disposeProducer)
+//#else
+let createRangeHandler () =
+ let sw = Stopwatch.StartNew() // we'll end up reporting the warmup/connect time on the first batch, but that's ok
+ let processBatch (ctx : IChangeFeedObserverContext) (docs : IReadOnlyList) = async {
+ sw.Stop() // Stop the clock after ChangeFeedProcessor hands off to us
+ let pt,events = (fun () -> docs |> Seq.collect DocumentParser.enumEvents |> Seq.length) |> Stopwatch.Time
+ let r = ctx.FeedResponse
+ Log.Information("{range} Fetch: {token} {requestCharge:n0}RU {count} docs {l:n1}s; Parse: {events} events {p:n3}s",
+ ctx.PartitionKeyRangeId, r.ResponseContinuation.Trim[|'"'|], r.RequestCharge, docs.Count, float sw.ElapsedMilliseconds / 1000.,
+ events, (let e = pt.Elapsed in e.TotalSeconds))
+ sw.Restart() // restart the clock as we handoff back to the ChangeFeedProcessor
+ }
+ ChangeFeedObserver.Create(Log.Logger, processBatch)
+ //#endif
+
+[]
+let main argv =
+ try let args = CmdParser.parse argv
+ Logging.initialize args.Verbose args.ChangeFeedVerbose
+ let (endpointUri, masterKey), connectionPolicy, source = args.Cosmos.BuildConnectionDetails()
+ let aux, leaseId, startFromHere, batchSize, lagFrequency = args.BuildChangeFeedParams()
+//#if kafka
+ let targetParams = args.Target.BuildTargetParams()
+ let createRangeHandler () = mkRangeProjector targetParams
+//#endif
+ run (endpointUri, masterKey) connectionPolicy source
+ (aux, leaseId, startFromHere, batchSize, lagFrequency)
+ createRangeHandler
+ |> Async.RunSynchronously
+ 0
+ with :? Argu.ArguParseException as e -> eprintfn "%s" e.Message; 1
+ | CmdParser.MissingArg msg -> eprintfn "%s" msg; 1
+ | e -> eprintfn "%s" e.Message; 1
\ No newline at end of file
diff --git a/equinox-projector/Projector/Projector.fsproj b/equinox-projector/Projector/Projector.fsproj
new file mode 100644
index 000000000..eeb01a981
--- /dev/null
+++ b/equinox-projector/Projector/Projector.fsproj
@@ -0,0 +1,25 @@
+
+
+
+ Exe
+ netcoreapp2.1
+ 5
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/equinox-projector/README.md b/equinox-projector/README.md
new file mode 100644
index 000000000..e91b254f8
--- /dev/null
+++ b/equinox-projector/README.md
@@ -0,0 +1,79 @@
+//#if kafka
+# Equinox Projector + Consumer
+//#else
+# Equinox Projector (without consumer)
+//#endif
+
+This project was generated using:
+//#if kafka
+
+ dotnet new -i Equinox.Templates # just once, to install/update in the local templates store
+ dotnet new eqxprojector -k # -k => include projection logic and consumer
+//#else
+
+ dotnet new -i Equinox.Templates # just once, to install/update in the local templates store
+ # add -k to add Kafka Projection logic and consumer
+ dotnet new eqxprojector # use --help to see options
+//#endif
+
+## Usage instructions
+
+0. establish connection strings etc. per https://github.com/jet/equinox README
+
+ $env:EQUINOX_COSMOS_CONNECTION="AccountEndpoint=https://....;AccountKey=....=;" # or use -s
+ $env:EQUINOX_COSMOS_DATABASE="equinox-test" # or use -d
+ $env:EQUINOX_COSMOS_COLLECTION="equinox-test" # or use - c
+
+1. Use the `eqx` tool to initialize and then run some transactions in a CosmosDb collection
+
+ dotnet tool install -g Equinox.Tool # only needed once
+
+ # (either add environment variables as per step 0 or use -s/-d/-c to specify them)
+
+ # generate a cosmos collection to store events in
+ eqx init -ru 1000 cosmos
+
+ # (either add environment variables as per step 0 or use -s/-d/-c to specify them)
+ # `-t saveforlater` SaveForLater test produces uniform size events to project
+ # `-C -f 200` constrains current writers to 100 and applies caching so RU consumption is constrained such that an allocation of 1000 is sufficient
+ eqx run -t saveforlater -C -f 100 cosmos
+//#if kafka
+
+2. To run an instance of the Projector:
+
+ # (either add environment variables as per step 0 or use -s/-d/-c to specify them)
+
+ $env:EQUINOX_KAFKA_BROKER="instance.kafka.mysite.com:9092" # or use -b
+
+ # `default` defines the Projector Group identity - each id has separated state in the aux collection (aka LeaseId)
+ # `-m 1000` sets the max batch size to 1000
+ # `-t topic0` identifies the Kafka topic to which the Projector should write
+ # cosmos specifies the source (if you have specified 3x EQUINOX_COSMOS_* environment vars, no arguments are needed)
+ dotnet run -p Projector -- default -m 1000 -t topic0 cosmos
+
+ # (assuming you've scaled up enough to have >1 range, you can run a second instance in a second console with the same arguments)
+
+3. To run an instance of the Consumer:
+
+ $env:EQUINOX_KAFKA_BROKER="instance.kafka.mysite.com:9092" # or use -b
+ $env:EQUINOX_KAFKA_TOPIC="topic0" # or use -t
+ $env:EQUINOX_KAFKA_GROUP="group0" # or use -g
+
+ # `-t topic0` identifies the Kafka topic from which the consumers should read
+ # `-g group0` identifies the Kafka consumer group among which the consumption is to be spread
+ dotnet run -p Consumer -- -t topic0 -g group0
+
+ # (you can run as many instances as there are partitions configured for the topic on the broker)
+//#else
+
+2. To run an instance of the Projector:
+
+ # (either add environment variables as per step 0 or use -s/-d/-c to specify them)
+
+ # `default` defines the Projector Group identity - each id has separated state in the aux collection (aka LeaseId)
+ # `-m 1000` sets the max batch size to 1000
+ # cosmos specifies the source (if you have specified 3x EQUINOX_COSMOS_* environment vars, no arguments are needed)
+ dotnet run -p Projector -- default -m 1000 cosmos
+
+ # NB (assuming you've scaled up enough to have >1 range, you can run a second instance in a second console with the same arguments)
+//#endif
\ No newline at end of file
diff --git a/equinox-projector/equinox-projector-consumer.sln b/equinox-projector/equinox-projector-consumer.sln
new file mode 100644
index 000000000..aa427035b
--- /dev/null
+++ b/equinox-projector/equinox-projector-consumer.sln
@@ -0,0 +1,36 @@
+
+Microsoft Visual Studio Solution File, Format Version 12.00
+# Visual Studio 15
+VisualStudioVersion = 15.0.26124.0
+MinimumVisualStudioVersion = 15.0.26124.0
+Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Projector", "Projector\Projector.fsproj", "{6C72C937-ECFC-4DD4-9BA0-7355B237F974}"
+EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{518EE7E2-76AF-4DE9-A127-C2DFF709A468}"
+ ProjectSection(SolutionItems) = preProject
+ README.md = README.md
+ EndProjectSection
+EndProject
+Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Consumer", "Consumer\Consumer.fsproj", "{7ED94D2B-1744-48A0-9B20-94E4777617E9}"
+EndProject
+Global
+ GlobalSection(SolutionConfigurationPlatforms) = preSolution
+ Debug|Any CPU = Debug|Any CPU
+ Release|Any CPU = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(ProjectConfigurationPlatforms) = postSolution
+ {6C72C937-ECFC-4DD4-9BA0-7355B237F974}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {6C72C937-ECFC-4DD4-9BA0-7355B237F974}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {6C72C937-ECFC-4DD4-9BA0-7355B237F974}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {6C72C937-ECFC-4DD4-9BA0-7355B237F974}.Release|Any CPU.Build.0 = Release|Any CPU
+ {7ED94D2B-1744-48A0-9B20-94E4777617E9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {7ED94D2B-1744-48A0-9B20-94E4777617E9}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {7ED94D2B-1744-48A0-9B20-94E4777617E9}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {7ED94D2B-1744-48A0-9B20-94E4777617E9}.Release|Any CPU.Build.0 = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(SolutionProperties) = preSolution
+ HideSolutionNode = FALSE
+ EndGlobalSection
+ GlobalSection(ExtensibilityGlobals) = postSolution
+ SolutionGuid = {2232D6B6-0CA3-4E51-BFD4-DB0A42EF34BF}
+ EndGlobalSection
+EndGlobal
diff --git a/equinox-projector/equinox-projector.sln b/equinox-projector/equinox-projector.sln
new file mode 100644
index 000000000..2fffa69eb
--- /dev/null
+++ b/equinox-projector/equinox-projector.sln
@@ -0,0 +1,30 @@
+
+Microsoft Visual Studio Solution File, Format Version 12.00
+# Visual Studio 15
+VisualStudioVersion = 15.0.26124.0
+MinimumVisualStudioVersion = 15.0.26124.0
+Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Projector", "Projector\Projector.fsproj", "{6C72C937-ECFC-4DD4-9BA0-7355B237F974}"
+EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{518EE7E2-76AF-4DE9-A127-C2DFF709A468}"
+ ProjectSection(SolutionItems) = preProject
+ README.md = README.md
+ EndProjectSection
+EndProject
+Global
+ GlobalSection(SolutionConfigurationPlatforms) = preSolution
+ Debug|Any CPU = Debug|Any CPU
+ Release|Any CPU = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(ProjectConfigurationPlatforms) = postSolution
+ {6C72C937-ECFC-4DD4-9BA0-7355B237F974}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {6C72C937-ECFC-4DD4-9BA0-7355B237F974}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {6C72C937-ECFC-4DD4-9BA0-7355B237F974}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {6C72C937-ECFC-4DD4-9BA0-7355B237F974}.Release|Any CPU.Build.0 = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(SolutionProperties) = preSolution
+ HideSolutionNode = FALSE
+ EndGlobalSection
+ GlobalSection(ExtensibilityGlobals) = postSolution
+ SolutionGuid = {2232D6B6-0CA3-4E51-BFD4-DB0A42EF34BF}
+ EndGlobalSection
+EndGlobal
diff --git a/src/Equinox.Templates/Equinox.Templates.fsproj b/src/Equinox.Templates/Equinox.Templates.fsproj
index 722b5ae18..3eae2c8fb 100644
--- a/src/Equinox.Templates/Equinox.Templates.fsproj
+++ b/src/Equinox.Templates/Equinox.Templates.fsproj
@@ -3,9 +3,9 @@
@jet @bartelink contributors
Jet.com
- dotnet new eqxweb, eqxwebcs: Equinox Web, Domain templates
+ dotnet new eqxweb, eqxwebcs, eqxprojector: Equinox Web, Domain, Projector, Consumer templates
https://github.com/jet/dotnet-templates
- equinox fsharp eventsourcing cosmosdb eventstore
+ equinox fsharp eventsourcing cosmosdb eventstore changefeedprocessor kafka
Apache-2.0
Copyright © 2018-9