Skip to content

Commit

Permalink
Add eqxprojector template (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink authored Mar 8, 2019
1 parent a5ea569 commit c5c781c
Show file tree
Hide file tree
Showing 12 changed files with 694 additions and 2 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ This repo hosts the source for Jet's [`dotnet new`](https://docs.microsoft.com/e

- [`eqxweb`](equinox-web/README.md) - Boilerplate for an ASP .NET Core Web App, with an associated storage-independent Domain project.
- [`eqxwebcs`](equinox-web-csharp/README.md) - Boilerplate for an ASP .NET Core Web App, with an associated storage-independent Domain project _ported to C#_.
- [`eqxprojector`](equinox-projector-cosmos/README.md) - Boilerplate for a CosmosDb ChangeFeedProcessor, with optional projection to Apache Kafka and associated consumer logic.

## How to use

Expand All @@ -29,6 +30,9 @@ To use from the command line, the outline is:
# see readme.md in the generated code for further instructions regarding the TodoBackend the above -t switch above triggers the inclusion of
start readme.md

# to add a Projector and a Consumer (-k emits to Kafka and hence implies having a Consumer)
dotnet new eqxprojector -k

## CONTRIBUTING

Please don't hesitate to [create a GitHub issue](https://github.com/jet/dotnet-templates/issues/new) for any questions so others can benefit from the discussion. For any significant planned changes or additions, please err on the side of [reaching out early](https://github.com/jet/dotnet-templates/issues/new) so we can align expectationss - there's nothing more frustrating than having your hard work not yielding a mutually agreeable result ;)
Expand Down
48 changes: 48 additions & 0 deletions equinox-projector/.template.config/template.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
{
"$schema": "http://json.schemastore.org/template",
"author": "@jet @bartelink",
"classifications": [
"Equinox",
"Event Sourcing",
"CosmosDb",
"ChangeFeed",
"Kafka"
],
"tags": {
"language": "F#"
},
"identity": "Equinox.Template.Projector.Cosmos",
"name": "Equinox Projector (with optional consumer)",
"shortName": "eqxprojector",
"sourceName": "ProjectorTemplate",
"preferNameDirectory": true,

"symbols": {
"kafka": {
"type": "parameter",
"datatype": "bool",
"isRequired": false,
"defaultValue": "false",
"description": "Include a code projecting to, and consuming from Kafka."
}
},
"sources": [
{
"modifiers": [
{
"condition": "(!kafka)",
"exclude": [
"**/Consumer/**/*",
"equinox-projector-consumer-cosmos.sln"
]
},
{
"condition": "(kafka)",
"exclude": [
"equinox-projector-cosmos.sln"
]
}
]
}
]
}
23 changes: 23 additions & 0 deletions equinox-projector/Consumer/Consumer.fsproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.1</TargetFramework>
<WarningLevel>5</WarningLevel>
</PropertyGroup>

<ItemGroup>
<Compile Include="Infrastructure.fs" />
<Compile Include="Program.fs" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Argu" Version="5.2.0" />
<PackageReference Include="Destructurama.FSharp.NetCore" Version="1.0.14" />
<PackageReference Include="Equinox.Codec" Version="2.0.0-preview1" />
<PackageReference Include="Equinox.Projection.Codec" Version="2.0.0-preview1" />
<PackageReference Include="Jet.ConfluentKafka.FSharp" Version="1.0.0-preview1" />
<PackageReference Include="Serilog.Sinks.Console" Version="3.1.1" />
</ItemGroup>

</Project>
47 changes: 47 additions & 0 deletions equinox-projector/Consumer/Infrastructure.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
[<AutoOpen>]
module private ProjectorTemplate.Consumer.Infrastructure

open System
open System.Threading
open System.Threading.Tasks

type Async with
static member AwaitTaskCorrect (task : Task<'T>) : Async<'T> =
Async.FromContinuations <| fun (k,ek,_) ->
task.ContinueWith (fun (t:Task<'T>) ->
if t.IsFaulted then
let e = t.Exception
if e.InnerExceptions.Count = 1 then ek e.InnerExceptions.[0]
else ek e
elif t.IsCanceled then ek (TaskCanceledException("Task wrapped with Async has been cancelled."))
elif t.IsCompleted then k t.Result
else ek(Exception "invalid Task state!"))
|> ignore

static member AwaitTaskCorrect (task : Task) : Async<unit> =
Async.FromContinuations <| fun (k,ek,_) ->
task.ContinueWith (fun (t:Task) ->
if t.IsFaulted then
let e = t.Exception
if e.InnerExceptions.Count = 1 then ek e.InnerExceptions.[0]
else ek e
elif t.IsCanceled then ek (TaskCanceledException("Task wrapped with Async has been cancelled."))
elif t.IsCompleted then k ()
else ek(Exception "invalid Task state!"))
|> ignore

type SemaphoreSlim with
/// F# friendly semaphore await function
member semaphore.Await(?timeout : TimeSpan) = async {
let! ct = Async.CancellationToken
let timeout = defaultArg timeout Timeout.InfiniteTimeSpan
let task = semaphore.WaitAsync(timeout, ct)
return! Async.AwaitTaskCorrect task
}

/// Throttling wrapper which waits asynchronously until the semaphore has available capacity
member semaphore.Throttle(workflow : Async<'T>) : Async<'T> = async {
let! _ = semaphore.Await()
try return! workflow
finally semaphore.Release() |> ignore
}
169 changes: 169 additions & 0 deletions equinox-projector/Consumer/Program.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
module ProjectorTemplate.Consumer.Program

open Jet.ConfluentKafka.FSharp
open Serilog
open System
open System.Threading

module EventParser =
open Equinox.Projection.Codec
open Newtonsoft.Json

type SkuId = string

let settings = JsonSerializerSettings()

// NB - these schemas reflect the actual storage formats and hence need to be versioned with care
module SavedForLater =
type Item = { skuId : SkuId; dateSaved : DateTimeOffset }

type Added = { skus : SkuId []; dateSaved : DateTimeOffset }
type Removed = { skus : SkuId [] }
type Merged = { items : Item [] }

type Event =
/// Inclusion of another set of state in this one
| Merged of Merged
/// Removal of a set of skus
| Removed of Removed
/// Addition of a collection of skus to the list
| Added of Added
interface TypeShape.UnionContract.IUnionContract
let codec = Equinox.Codec.JsonNet.JsonUtf8.Create<Event>(settings)

// NB - these schemas reflect the actual storage formats and hence need to be versioned with care
module Favorites =
type Favorited = { date: DateTimeOffset; skuId: SkuId }
type Unfavorited = { skuId: SkuId }

type Event =
| Favorited of Favorited
| Unfavorited of Unfavorited
interface TypeShape.UnionContract.IUnionContract
let codec = Equinox.Codec.JsonNet.JsonUtf8.Create<Event>(settings)

let tryExtractCategory (x : RenderedEvent) =
x.s.Split([|'-'|], 2, StringSplitOptions.RemoveEmptyEntries)
|> Array.tryHead
let tryDecode (log : ILogger) (codec : Equinox.Codec.IUnionEncoder<_,_>) (x : RenderedEvent) =
match codec.TryDecode (Equinox.Codec.Core.EventData.Create(x.c, x.d)) with
| None ->
if log.IsEnabled Serilog.Events.LogEventLevel.Debug then
log.ForContext("event", System.Text.Encoding.UTF8.GetString(x.d), true)
.Debug("Codec {type} Could not decode {eventType} in {stream}", codec.GetType().FullName, x.c, x.s);
None
| Some e -> Some e

// Example of filtering our relevant Events from the Kafka stream
// NB if the percentage of relevant events is low, one may wish to adjust the projector to project only a subset
type Interpreter() =
let log = Log.ForContext<Interpreter>()

/// Handles various category / eventType / payload types as produced by Equinox.Tool
member __.TryDecode(x : Confluent.Kafka.ConsumeResult<_,_>) =
let ke = JsonConvert.DeserializeObject<RenderedEvent>(x.Value)
match tryExtractCategory ke with
| Some "Favorites" -> tryDecode log Favorites.codec ke |> Option.map Choice1Of2
| Some "SavedForLater" -> tryDecode log SavedForLater.codec ke |> Option.map Choice2Of2
| x ->
if log.IsEnabled Serilog.Events.LogEventLevel.Debug then
log.ForContext("event", ke).Debug("Event could not be interpreted due to unknown category {category}", x)
None

module Consumer =
open EventParser

/// Starts a consumer which will will be driven based on batches emanating from the supplied `cfg`
let start (cfg: KafkaConsumerConfig) (degreeOfParallelism: int) =
let log = Log.ForContext<KafkaConsumer>()
let dop = new SemaphoreSlim(degreeOfParallelism)
let decoder = Interpreter()
let consume msgs = async {
// TODO filter relevant events, fan our processing as appropriate
let mutable favorited, unfavorited, saved, cleared = 0, 0, 0, 0
let handleFave = function
| Favorites.Favorited _ -> Interlocked.Increment &favorited |> ignore
| Favorites.Unfavorited _ -> Interlocked.Increment &unfavorited |> ignore
let handleSave = function
| SavedForLater.Added e -> Interlocked.Add(&saved,e.skus.Length) |> ignore
| SavedForLater.Removed e -> Interlocked.Add(&cleared,e.skus.Length) |> ignore
| SavedForLater.Merged e -> Interlocked.Add(&saved,e.items.Length) |> ignore
// While this does not need to be async in this toy case, we illustrate here as any real example will need to deal with it
let handle e = async {
match e with
| Choice1Of2 fe -> handleFave fe
| Choice2Of2 se -> handleSave se
}
let! _ =
msgs
|> Seq.choose decoder.TryDecode
|> Seq.map (handle >> dop.Throttle)
|> Async.Parallel
log.Information("Consumed {b} Favorited {f} Unfavorited {u} Saved {s} Cleared {c}",
Array.length msgs, favorited, unfavorited, saved, cleared)
}
KafkaConsumer.Start log cfg consume

module CmdParser =
open Argu

exception MissingArg of string
let envBackstop msg key =
match Environment.GetEnvironmentVariable key with
| null -> raise <| MissingArg (sprintf "Please provide a %s, either as an argment or via the %s environment variable" msg key)
| x -> x

[<NoEquality; NoComparison>]
type Arguments =
| [<AltCommandLine("-b"); Unique>] Broker of string
| [<AltCommandLine("-t"); Unique>] Topic of string
| [<AltCommandLine("-g"); Unique>] Group of string
| [<AltCommandLine("-i"); Unique>] Parallelism of int
| [<AltCommandLine("-v"); Unique>] Verbose

interface IArgParserTemplate with
member a.Usage = a |> function
| Broker _ -> "specify Kafka Broker, in host:port format. (optional if environment variable EQUINOX_KAFKA_BROKER specified)"
| Topic _ -> "specify Kafka Topic name. (optional if environment variable EQUINOX_KAFKA_TOPIC specified)"
| Group _ -> "specify Kafka Consumer Group Id. (optional if environment variable EQUINOX_KAFKA_GROUP specified)"
| Parallelism _ -> "parallelism constraint when handling batches being consumed (default: 2 * Environment.ProcessorCount)"
| Verbose _ -> "request verbose logging."

/// Parse the commandline; can throw exceptions in response to missing arguments and/or `-h`/`--help` args
let parse argv : ParseResults<Arguments> =
let programName = Reflection.Assembly.GetEntryAssembly().GetName().Name
let parser = ArgumentParser.Create<Arguments>(programName = programName)
parser.ParseCommandLine argv

type Parameters(args : ParseResults<Arguments>) =
member __.Broker = Uri(match args.TryGetResult Broker with Some x -> x | None -> envBackstop "Broker" "EQUINOX_KAFKA_BROKER")
member __.Topic = match args.TryGetResult Topic with Some x -> x | None -> envBackstop "Topic" "EQUINOX_KAFKA_TOPIC"
member __.Group = match args.TryGetResult Group with Some x -> x | None -> envBackstop "Group" "EQUINOX_KAFKA_GROUP"
member __.Parallelism = match args.TryGetResult Parallelism with Some x -> x | None -> 2 * Environment.ProcessorCount
member __.Verbose = args.Contains Verbose

module Logging =
let initialize verbose =
Log.Logger <-
LoggerConfiguration()
.Destructure.FSharpTypes()
.Enrich.FromLogContext()
|> fun c -> if verbose then c.MinimumLevel.Debug() else c
|> fun c -> let theme = Sinks.SystemConsole.Themes.AnsiConsoleTheme.Code
if not verbose then c.WriteTo.Console(theme=theme)
else c.WriteTo.Console(theme=theme, outputTemplate="[{Timestamp:HH:mm:ss} {Level:u3}] {Message:lj}|{Properties}{NewLine}{Exception}")
|> fun c -> c.CreateLogger()

[<EntryPoint>]
let main argv =
try let parsed = CmdParser.parse argv
let args = CmdParser.Parameters(parsed)
Logging.initialize args.Verbose
let cfg = KafkaConsumerConfig.Create("ProjectorTemplate", args.Broker, [args.Topic], args.Group)

use c = Consumer.start cfg args.Parallelism
c.AwaitCompletion() |> Async.RunSynchronously
0
with :? Argu.ArguParseException as e -> eprintfn "%s" e.Message; 1
| CmdParser.MissingArg msg -> eprintfn "%s" msg; 1
| e -> Log.Fatal(e, "Exiting"); 1
19 changes: 19 additions & 0 deletions equinox-projector/Projector/Infrastructure.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[<AutoOpen>]
module private ProjectorTemplate.Projector.Infrastructure

open System
open System.Threading
open System.Threading.Tasks

#nowarn "21" // re AwaitKeyboardInterrupt
#nowarn "40" // re AwaitKeyboardInterrupt

type Async with
static member Sleep(t : TimeSpan) : Async<unit> = Async.Sleep(int t.TotalMilliseconds)
/// Asynchronously awaits the next keyboard interrupt event
static member AwaitKeyboardInterrupt () : Async<unit> =
Async.FromContinuations(fun (sc,_,_) ->
let isDisposed = ref 0
let rec callback _ = Task.Run(fun () -> if Interlocked.Increment isDisposed = 1 then d.Dispose() ; sc ()) |> ignore
and d : IDisposable = Console.CancelKeyPress.Subscribe callback
in ())
Loading

0 comments on commit c5c781c

Please sign in to comment.