Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add eqxprojector template #11

merged 14 commits into from
Mar 8, 2019
4 changes: 4 additions & 0 deletions
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ This repo hosts the source for Jet's [`dotnet new`](

- [`eqxweb`](equinox-web/ - Boilerplate for an ASP .NET Core Web App, with an associated storage-independent Domain project.
- [`eqxwebcs`](equinox-web-csharp/ - Boilerplate for an ASP .NET Core Web App, with an associated storage-independent Domain project _ported to C#_.
- [`eqxprojector`](equinox-projector-cosmos/ - Boilerplate for a CosmosDb ChangeFeedProcessor, with optional projection to Apache Kafka and associated consumer logic.

## How to use

Expand All @@ -29,6 +30,9 @@ To use from the command line, the outline is:
# see in the generated code for further instructions regarding the TodoBackend the above -t switch above triggers the inclusion of

# to add a Projector and a Consumer (-k emits to Kafka and hence implies having a Consumer)
dotnet new eqxprojector -k


Please don't hesitate to [create a GitHub issue]( for any questions so others can benefit from the discussion. For any significant planned changes or additions, please err on the side of [reaching out early]( so we can align expectationss - there's nothing more frustrating than having your hard work not yielding a mutually agreeable result ;)
Expand Down
48 changes: 48 additions & 0 deletions equinox-projector/.template.config/template.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"$schema": "",
"author": "@jet @bartelink",
"classifications": [
"Event Sourcing",
"tags": {
"language": "F#"
"identity": "Equinox.Template.Projector.Cosmos",
"name": "Equinox Projector (with optional consumer)",
"shortName": "eqxprojector",
"sourceName": "ProjectorTemplate",
"preferNameDirectory": true,

"symbols": {
"kafka": {
"type": "parameter",
"datatype": "bool",
"isRequired": false,
"defaultValue": "false",
"description": "Include a code projecting to, and consuming from Kafka."
"sources": [
"modifiers": [
"condition": "(!kafka)",
"exclude": [
"condition": "(kafka)",
"exclude": [
23 changes: 23 additions & 0 deletions equinox-projector/Consumer/Consumer.fsproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<Project Sdk="Microsoft.NET.Sdk">


<Compile Include="Infrastructure.fs" />
<Compile Include="Program.fs" />

<PackageReference Include="Argu" Version="5.2.0" />
<PackageReference Include="Destructurama.FSharp.NetCore" Version="1.0.14" />
<PackageReference Include="Equinox.Codec" Version="2.0.0-preview1" />
<PackageReference Include="Equinox.Projection.Codec" Version="2.0.0-preview1" />
<PackageReference Include="Jet.ConfluentKafka.FSharp" Version="1.0.0-preview1" />
<PackageReference Include="Serilog.Sinks.Console" Version="3.1.1" />

47 changes: 47 additions & 0 deletions equinox-projector/Consumer/Infrastructure.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
module private ProjectorTemplate.Consumer.Infrastructure

open System
open System.Threading
open System.Threading.Tasks

type Async with
static member AwaitTaskCorrect (task : Task<'T>) : Async<'T> =
Async.FromContinuations <| fun (k,ek,_) ->
task.ContinueWith (fun (t:Task<'T>) ->
if t.IsFaulted then
let e = t.Exception
if e.InnerExceptions.Count = 1 then ek e.InnerExceptions.[0]
else ek e
elif t.IsCanceled then ek (TaskCanceledException("Task wrapped with Async has been cancelled."))
elif t.IsCompleted then k t.Result
else ek(Exception "invalid Task state!"))
|> ignore

static member AwaitTaskCorrect (task : Task) : Async<unit> =
Async.FromContinuations <| fun (k,ek,_) ->
task.ContinueWith (fun (t:Task) ->
if t.IsFaulted then
let e = t.Exception
if e.InnerExceptions.Count = 1 then ek e.InnerExceptions.[0]
else ek e
elif t.IsCanceled then ek (TaskCanceledException("Task wrapped with Async has been cancelled."))
elif t.IsCompleted then k ()
else ek(Exception "invalid Task state!"))
|> ignore

type SemaphoreSlim with
/// F# friendly semaphore await function
member semaphore.Await(?timeout : TimeSpan) = async {
let! ct = Async.CancellationToken
let timeout = defaultArg timeout Timeout.InfiniteTimeSpan
let task = semaphore.WaitAsync(timeout, ct)
return! Async.AwaitTaskCorrect task

/// Throttling wrapper which waits asynchronously until the semaphore has available capacity
member semaphore.Throttle(workflow : Async<'T>) : Async<'T> = async {
let! _ = semaphore.Await()
try return! workflow
finally semaphore.Release() |> ignore
169 changes: 169 additions & 0 deletions equinox-projector/Consumer/Program.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
module ProjectorTemplate.Consumer.Program

open Jet.ConfluentKafka.FSharp
open Serilog
open System
open System.Threading

module EventParser =
open Equinox.Projection.Codec
open Newtonsoft.Json

type SkuId = string

let settings = JsonSerializerSettings()

// NB - these schemas reflect the actual storage formats and hence need to be versioned with care
module SavedForLater =
type Item = { skuId : SkuId; dateSaved : DateTimeOffset }

type Added = { skus : SkuId []; dateSaved : DateTimeOffset }
type Removed = { skus : SkuId [] }
type Merged = { items : Item [] }

type Event =
/// Inclusion of another set of state in this one
| Merged of Merged
/// Removal of a set of skus
| Removed of Removed
/// Addition of a collection of skus to the list
| Added of Added
interface TypeShape.UnionContract.IUnionContract
let codec = Equinox.Codec.JsonNet.JsonUtf8.Create<Event>(settings)

// NB - these schemas reflect the actual storage formats and hence need to be versioned with care
module Favorites =
type Favorited = { date: DateTimeOffset; skuId: SkuId }
type Unfavorited = { skuId: SkuId }

type Event =
| Favorited of Favorited
| Unfavorited of Unfavorited
interface TypeShape.UnionContract.IUnionContract
let codec = Equinox.Codec.JsonNet.JsonUtf8.Create<Event>(settings)

let tryExtractCategory (x : RenderedEvent) =
x.s.Split([|'-'|], 2, StringSplitOptions.RemoveEmptyEntries)
|> Array.tryHead
let tryDecode (log : ILogger) (codec : Equinox.Codec.IUnionEncoder<_,_>) (x : RenderedEvent) =
match codec.TryDecode (Equinox.Codec.Core.EventData.Create(x.c, x.d)) with
| None ->
if log.IsEnabled Serilog.Events.LogEventLevel.Debug then
log.ForContext("event", System.Text.Encoding.UTF8.GetString(x.d), true)
.Debug("Codec {type} Could not decode {eventType} in {stream}", codec.GetType().FullName, x.c, x.s);
| Some e -> Some e

// Example of filtering our relevant Events from the Kafka stream
// NB if the percentage of relevant events is low, one may wish to adjust the projector to project only a subset
type Interpreter() =
let log = Log.ForContext<Interpreter>()

/// Handles various category / eventType / payload types as produced by Equinox.Tool
member __.TryDecode(x : Confluent.Kafka.ConsumeResult<_,_>) =
let ke = JsonConvert.DeserializeObject<RenderedEvent>(x.Value)
match tryExtractCategory ke with
| Some "Favorites" -> tryDecode log Favorites.codec ke |> Choice1Of2
| Some "SavedForLater" -> tryDecode log SavedForLater.codec ke |> Choice2Of2
| x ->
if log.IsEnabled Serilog.Events.LogEventLevel.Debug then
log.ForContext("event", ke).Debug("Event could not be interpreted due to unknown category {category}", x)

module Consumer =
open EventParser

/// Starts a consumer which will will be driven based on batches emanating from the supplied `cfg`
let start (cfg: KafkaConsumerConfig) (degreeOfParallelism: int) =
let log = Log.ForContext<KafkaConsumer>()
let dop = new SemaphoreSlim(degreeOfParallelism)
let decoder = Interpreter()
let consume msgs = async {
// TODO filter relevant events, fan our processing as appropriate
let mutable favorited, unfavorited, saved, cleared = 0, 0, 0, 0
let handleFave = function
| Favorites.Favorited _ -> Interlocked.Increment &favorited |> ignore
| Favorites.Unfavorited _ -> Interlocked.Increment &unfavorited |> ignore
let handleSave = function
| SavedForLater.Added e -> Interlocked.Add(&saved,e.skus.Length) |> ignore
| SavedForLater.Removed e -> Interlocked.Add(&cleared,e.skus.Length) |> ignore
| SavedForLater.Merged e -> Interlocked.Add(&saved,e.items.Length) |> ignore
// While this does not need to be async in this toy case, we illustrate here as any real example will need to deal with it
let handle e = async {
match e with
| Choice1Of2 fe -> handleFave fe
| Choice2Of2 se -> handleSave se
let! _ =
|> Seq.choose decoder.TryDecode
|> (handle >> dop.Throttle)
|> Async.Parallel
log.Information("Consumed {b} Favorited {f} Unfavorited {u} Saved {s} Cleared {c}",
Array.length msgs, favorited, unfavorited, saved, cleared)
KafkaConsumer.Start log cfg consume

module CmdParser =
open Argu

exception MissingArg of string
let envBackstop msg key =
match Environment.GetEnvironmentVariable key with
| null -> raise <| MissingArg (sprintf "Please provide a %s, either as an argment or via the %s environment variable" msg key)
| x -> x

[<NoEquality; NoComparison>]
type Arguments =
| [<AltCommandLine("-b"); Unique>] Broker of string
| [<AltCommandLine("-t"); Unique>] Topic of string
| [<AltCommandLine("-g"); Unique>] Group of string
| [<AltCommandLine("-i"); Unique>] Parallelism of int
| [<AltCommandLine("-v"); Unique>] Verbose

interface IArgParserTemplate with
member a.Usage = a |> function
| Broker _ -> "specify Kafka Broker, in host:port format. (optional if environment variable EQUINOX_KAFKA_BROKER specified)"
| Topic _ -> "specify Kafka Topic name. (optional if environment variable EQUINOX_KAFKA_TOPIC specified)"
| Group _ -> "specify Kafka Consumer Group Id. (optional if environment variable EQUINOX_KAFKA_GROUP specified)"
| Parallelism _ -> "parallelism constraint when handling batches being consumed (default: 2 * Environment.ProcessorCount)"
| Verbose _ -> "request verbose logging."

/// Parse the commandline; can throw exceptions in response to missing arguments and/or `-h`/`--help` args
let parse argv : ParseResults<Arguments> =
let programName = Reflection.Assembly.GetEntryAssembly().GetName().Name
let parser = ArgumentParser.Create<Arguments>(programName = programName)
parser.ParseCommandLine argv

type Parameters(args : ParseResults<Arguments>) =
member __.Broker = Uri(match args.TryGetResult Broker with Some x -> x | None -> envBackstop "Broker" "EQUINOX_KAFKA_BROKER")
member __.Topic = match args.TryGetResult Topic with Some x -> x | None -> envBackstop "Topic" "EQUINOX_KAFKA_TOPIC"
member __.Group = match args.TryGetResult Group with Some x -> x | None -> envBackstop "Group" "EQUINOX_KAFKA_GROUP"
member __.Parallelism = match args.TryGetResult Parallelism with Some x -> x | None -> 2 * Environment.ProcessorCount
member __.Verbose = args.Contains Verbose

module Logging =
let initialize verbose =
Log.Logger <-
|> fun c -> if verbose then c.MinimumLevel.Debug() else c
|> fun c -> let theme = Sinks.SystemConsole.Themes.AnsiConsoleTheme.Code
if not verbose then c.WriteTo.Console(theme=theme)
else c.WriteTo.Console(theme=theme, outputTemplate="[{Timestamp:HH:mm:ss} {Level:u3}] {Message:lj}|{Properties}{NewLine}{Exception}")
|> fun c -> c.CreateLogger()

let main argv =
try let parsed = CmdParser.parse argv
let args = CmdParser.Parameters(parsed)
Logging.initialize args.Verbose
let cfg = KafkaConsumerConfig.Create("ProjectorTemplate", args.Broker, [args.Topic], args.Group)

use c = Consumer.start cfg args.Parallelism
c.AwaitCompletion() |> Async.RunSynchronously
with :? Argu.ArguParseException as e -> eprintfn "%s" e.Message; 1
| CmdParser.MissingArg msg -> eprintfn "%s" msg; 1
| e -> Log.Fatal(e, "Exiting"); 1
19 changes: 19 additions & 0 deletions equinox-projector/Projector/Infrastructure.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
module private ProjectorTemplate.Projector.Infrastructure

open System
open System.Threading
open System.Threading.Tasks

#nowarn "21" // re AwaitKeyboardInterrupt
#nowarn "40" // re AwaitKeyboardInterrupt

type Async with
static member Sleep(t : TimeSpan) : Async<unit> = Async.Sleep(int t.TotalMilliseconds)
/// Asynchronously awaits the next keyboard interrupt event
static member AwaitKeyboardInterrupt () : Async<unit> =
Async.FromContinuations(fun (sc,_,_) ->
let isDisposed = ref 0
let rec callback _ = Task.Run(fun () -> if Interlocked.Increment isDisposed = 1 then d.Dispose() ; sc ()) |> ignore
and d : IDisposable = Console.CancelKeyPress.Subscribe callback
in ())