-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathCheckpoint.fs
154 lines (127 loc) · 6.27 KB
/
Checkpoint.fs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
namespace Propulsion.EventStoreDB
open System
open System.Text
open Newtonsoft.Json
open EventStore.ClientAPI
open System.Collections.Concurrent
[<CLIMutable; NoComparison>]
type CheckpointEntry =
{
Stream: string
ConsumerGroup: string
Position: Nullable<int64>
}
type ICheckpointer =
/// Get the position for a stream
abstract member GetPosition: stream: string -> consumerGroup: string -> Async<Nullable<int64>>
/// Commit the position for a stream
abstract member CommitPosition: stream: string -> consumerGroup: string -> position: int64 -> Async<unit>
[<RequireQualifiedAccess>]
module EventStoreCheckpointer =
let [<Literal>] CheckpointSuffix = "checkpoint"
/// Builds a stream name for the checkpoint stream based upon the stream and consumer group.
/// Also replaces any leading `$` with `_` as not to conflict with EventStore built-in streams.
let buildCheckpointStreamName (stream: string) (consumerGroup: string) =
sprintf "%s-%s_%s" (stream.TrimStart ('$', '_')) consumerGroup CheckpointSuffix
/// Serialize & encode an object to bytes
let encode (object: obj) = JsonConvert.SerializeObject object |> Encoding.UTF8.GetBytes
/// Decode & deserialize a byte array to a `CheckpointEntry`
let decode<'t> (data: byte array) = Encoding.UTF8.GetString data |> JsonConvert.DeserializeObject<'t>
/// `ICheckpointer` implementation that holds checkpoints in an in-memory ConcurrentDictionary
type InMemoryCheckpointer
(
?checkpointStreamNamer: string -> string -> string,
?logger: Serilog.ILogger
) =
let store = ConcurrentDictionary<string, int64>()
let buildCheckpointStreamName =
checkpointStreamNamer
|> Option.defaultValue EventStoreCheckpointer.buildCheckpointStreamName
interface ICheckpointer with
member __.GetPosition (stream: string) (consumerGroup: string) = async {
let checkpointStream = buildCheckpointStreamName stream consumerGroup
match store.TryGetValue checkpointStream with
| true, position ->
logger |> Option.iter (fun logger -> logger.Debug ("Getting position for {checkpointStream}: {position}", checkpointStream, position))
return Nullable position
| false, _ ->
logger |> Option.iter (fun logger -> logger.Debug ("Getting position for {checkpointStream}: Null", checkpointStream))
return Nullable ()
}
member __.CommitPosition (stream: string) (consumerGroup: string) (position: int64) = async {
let checkpointStream = buildCheckpointStreamName stream consumerGroup
logger |> Option.iter (fun logger -> logger.Debug ("Committing position for {checkpointStream}: {position}", checkpointStream, position))
store.AddOrUpdate (checkpointStream, position, fun _ _ -> position) |> ignore<int64>
return ()
}
/// `ICheckpointer` implementation that stores checkpoints in EventStore
type EventStoreCheckpointer
(
conn: IEventStoreConnection,
logger: Serilog.ILogger,
?checkpointStreamNamer: string -> string -> string
) =
/// Use the provided `checkpointStreamNamer` function or use the default if not provided
let buildCheckpointStreamName =
checkpointStreamNamer
|> Option.defaultValue EventStoreCheckpointer.buildCheckpointStreamName
/// Create the associated checkpoint stream for the provided stream
member this.CreateCheckpointStream (stream: string) (consumerGroup: string) = async {
let expectedVersion = int64 ExpectedVersion.Any
let checkpointStream = buildCheckpointStreamName stream consumerGroup
let metadata =
StreamMetadata
.Build()
.SetMaxCount(1L)
.SetCustomProperty("consumerGroup", consumerGroup)
.SetCustomProperty("sourceStream", stream)
.Build()
return!
conn.SetStreamMetadataAsync (checkpointStream, expectedVersion, metadata)
|> Async.AwaitTaskCorrect
|> Async.Ignore // TODO: ??
}
member this.CreateCheckpointEvent stream consumerGroup position =
let metadata = {| |}
let data =
{
Stream = stream
ConsumerGroup = consumerGroup
Position = Nullable position
}
EventData (
eventId=Guid.NewGuid (),
``type``="checkpoint",
isJson=true,
data=EventStoreCheckpointer.encode data,
metadata=EventStoreCheckpointer.encode metadata
)
interface ICheckpointer with
member this.GetPosition (stream: string) (consumerGroup: string) = async {
let position = int64 StreamPosition.End
let checkpointStream = buildCheckpointStreamName stream consumerGroup
let! event =
conn.ReadEventAsync (checkpointStream, position, false)
|> Async.AwaitTaskCorrect
match event.Status with
| EventReadStatus.NotFound
| EventReadStatus.NoStream ->
logger.Information ("Creating stream {checkpointStream}", checkpointStream)
do! this.CreateCheckpointStream stream consumerGroup
return Nullable ()
| _ ->
let eventData = EventStoreCheckpointer.decode<CheckpointEntry> event.Event.Value.Event.Data
let position = eventData.Position
logger.Debug ("Got position {position} for {checkpointStream}", checkpointStream, position)
return position
}
member this.CommitPosition (stream: string) (consumerGroup: string) (position: int64) = async {
let expectedVersion = int64 ExpectedVersion.Any
let checkpointStream = buildCheckpointStreamName stream consumerGroup
let checkpointEvent = this.CreateCheckpointEvent stream consumerGroup position
logger.Debug ("Committing position for {checkpointStream}: {position}", checkpointStream, position)
return!
conn.AppendToStreamAsync (checkpointStream, expectedVersion, checkpointEvent)
|> Async.AwaitTaskCorrect
|> Async.Ignore
}