Skip to content

Commit

Permalink
chore(Packages): Inline Feed, Extract Prometheus
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Sep 2, 2024
1 parent 5d7b0c5 commit 11c7e56
Show file tree
Hide file tree
Showing 30 changed files with 90 additions and 86 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `Propulsion.CosmosStore.CosmosStoreSink`+`CosmosStorePruner`: Exposed `*Stats` [#226](https://github.com/jet/propulsion/pull/226)
- `Propulsion.EventStore`: Pinned to target `Equinox.EventStore` v `[3.0.7`-`3.99.0]` **Deprecated; Please migrate to `Propulsion.EventStoreDb`** [#139](https://github.com/jet/propulsion/pull/139)
- `Propulsion.EventStoreDb.EventStoreSource`: Changed API to match`Propulsion.SqlStreamStore` API rather than`Propulsion.EventStore` [#139](https://github.com/jet/propulsion/pull/139)
- `Propulsion.Feed`: Moved implementations into main `Propulsion` library. While this adds a `FSharp.Control.TaskSeq` dependency, it makes maintenance and navigation easier
- `Propulsion.Feed`,`Kafka`: Replaced `Async` with `task` for supervision [#158](https://github.com/jet/propulsion/pull/158), [#159](https://github.com/jet/propulsion/pull/159)
- `Propulsion.Kafka`: Target `FsCodec.NewtonsoftJson` v `3.0.0` [#139](https://github.com/jet/propulsion/pull/139)
- `Propulsion.Prometheus`: Extracted `Propulsion.Prometheus` and `Propulsion.Feed.Prometheus` in order to remove `Prometheus` dependency from core package
- `Propulsion.Tool`: `project` renamed to `sync`; sources now have a `from` prefix [#252](https://github.com/jet/propulsion/pull/252)

### Removed
Expand Down
2 changes: 2 additions & 0 deletions Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
<!-- suppress false positive warning FS2003 about invalid version of AssemblyInformationalVersionAttribute -->
<!-- supress NU5105 triggered by trailing dotted elements such as .43 and .2 in e.g.: pr.43-rc1.2: The package version '<X>' uses SemVer 2.0.0 or components of SemVer 1.0.0 that are not supported on legacy clients. Change the package version to a SemVer 1.0.0 string. If the version contains a release label it must start with a letter. This message can be ignored if the package is not intended for older clients. -->
<NoWarn>$(NoWarn);FS2003;NU5105</NoWarn>
<!-- <PinnedBaselineRootPackage Condition=" '$(Configuration)' == 'Release' ">[3.0.0-rc.14, 4.0.0)</PinnedBaselineRootPackage>-->
<!-- <PinnedDynamoStoreRootPackage Condition=" '$(Configuration)' == 'Release' ">[3.0.0-rc.14, 4.0.0)</PinnedDynamoStoreRootPackage>-->
</PropertyGroup>
<ItemGroup>
<!-- SourceLink etc -->
Expand Down
2 changes: 1 addition & 1 deletion Propulsion.sln
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Propulsion.SqlStreamStore",
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Propulsion.CosmosStore", "src\Propulsion.CosmosStore\Propulsion.CosmosStore.fsproj", "{356294D8-DF59-4903-9A9C-03F0F459B2A3}"
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Propulsion.Feed", "src\Propulsion.Feed\Propulsion.Feed.fsproj", "{B6C1C225-940C-425C-A2F7-A728AEBFCB31}"
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Propulsion.Prometheus", "src\Propulsion.Prometheus\Propulsion.Prometheus.fsproj", "{B6C1C225-940C-425C-A2F7-A728AEBFCB31}"
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Propulsion.CosmosStore3", "src\Propulsion.CosmosStore3\Propulsion.CosmosStore3.fsproj", "{8A2D7DDF-ED05-4871-A2B1-66D22A581C95}"
EndProject
Expand Down
37 changes: 18 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,26 @@ If you're looking for a good discussion forum on these kinds of topics, look no

## Core Components

- `Propulsion` [![NuGet](https://img.shields.io/nuget/v/Propulsion.svg)](https://www.nuget.org/packages/Propulsion/) Implements core functionality in a channel-independent fashion. [Depends](https://www.fuget.org/packages/Propulsion) on `MathNet.Numerics`
- `Propulsion` [![NuGet](https://img.shields.io/nuget/v/Propulsion.svg)](https://www.nuget.org/packages/Propulsion/) Implements core functionality in a channel-independent fashion. [Depends](https://www.fuget.org/packages/Propulsion) on `Serilog`, `MathNet.Numerics`, `FSharp.Control.TaskSeq`:

1. `StreamsSink`: High performance pipeline that handles parallelized event processing. Ingestion of events, and checkpointing of progress are handled asynchronously. Each aspect of the pipeline is decoupled such that it can be customized as desired.
2. `Streams.Prometheus`: Helper that exposes per-scheduler metrics for Prometheus scraping.
3. `ParallelProjector`: Scaled down variant of `StreamsSink` that does not preserve stream level ordering semantics
4. `FeedSource`: Handles continual reading and checkpointing of events from a set of feeds ('tranches') of a 'source' that collectively represent a change data capture source for a given system (roughly analogous to how a CosmosDB Container presents a changefeed). A `readTranches` function is used to identify the Tranches (sub-feeds) on startup. The Feed Source then operates a logical reader thread per Tranche. Tranches represent content as an incrementally retrievable change feed consisting of batches of `FsCodec.ITimelineEvent` records. Each batch has an optional associated checkpointing callback that's triggered only when the Sink has handled all events within it.
5. `Monitor.AwaitCompletion`: Enables efficient waiting for completion of reaction processing within an integration test.
6. `PeriodicSource`: Handles regular crawling of an external datasource (such as a SQL database) where there is no way to save progress and then resume from that saved token (based on either the intrinsic properties of the data, or of the store itself). The source is expected to present its content as an `IAsyncEnumerable` of `FsCodec.StreamName * FsCodec.IEventData * context`. Checkpointing occurs only when all events have been deemed handled by the Sink.
7. `SinglePassFeedSource`: Handles single pass loading of large datasets (such as a SQL database), completing when the full data has been ingested.
8. `JsonSource`: Simple source that feeds items from a File containing JSON (such a file can be generated via `eqx query -o JSONFILE from cosmos` etc)

- `Propulsion.Feed` [![NuGet](https://img.shields.io/nuget/v/Propulsion.Feed.svg)](https://www.nuget.org/packages/Propulsion.Feed/) Provides helpers for checkpointed consumption of a feed of stream-based inputs. Provides for custom bindings (e.g. a third-party Feed API) or various other input configurations (e.g. periodically correlating with inputs from a non-streamed source such as a SQL Database). Provides a generic API for checkpoint storage, with diverse implementations hosted in the sibling packages associated with each concrete store (supported stores include DynamoStore, CosmosStore, SQL Server, Postgres). [Depends](https://www.fuget.org/packages/Propulsion.Feed) on `Propulsion`, a `IFeedCheckpointStore` implementation (from e.g., `Propulsion.CosmosStore|DynamoStore|MessageDb|SqlStreamStore`)
NOTE `Propulsion.Feed` is a namespace within the main `Propulsion` package that provides helpers for checkpointed consumption of a feed of stream-based inputs.
- Supported inputs include custom bindings (e.g. a third-party Feed API) or various other input configurations (e.g. periodically correlating with inputs from a non-streamed source such as a SQL Database).
- Provides a generic API for checkpoint storage, with diverse implementations hosted in the sibling packages associated with each concrete store (supported stores include DynamoStore, CosmosStore, SQL Server, Postgres).
- Using a feed normally requires a checkpoint store that inmplements `IFeedCheckpointStore` from e.g., `Propulsion.CosmosStore|DynamoStore|MessageDb|SqlStreamStore`

1. `FeedSource`: Handles continual reading and checkpointing of events from a set of feeds ('tranches') of a 'source' that collectively represent a change data capture source for a given system (roughly analogous to how a CosmosDB Container presents a changefeed). A `readTranches` function is used to identify the Tranches (sub-feeds) on startup. The Feed Source then operates a logical reader thread per Tranche. Tranches represent content as an incrementally retrievable change feed consisting of batches of `FsCodec.ITimelineEvent` records. Each batch has an optional associated checkpointing callback that's triggered only when the Sink has handled all events within it.
2. `Monitor.AwaitCompletion`: Enables efficient waiting for completion of reaction processing within an integration test.
3. `PeriodicSource`: Handles regular crawling of an external datasource (such as a SQL database) where there is no way to save progress and then resume from that saved token (based on either the intrinsic properties of the data, or of the store itself). The source is expected to present its content as an `IAsyncEnumerable` of `FsCodec.StreamName * FsCodec.IEventData * context`. Checkpointing occurs only when all events have been deemed handled by the Sink.
4. `JsonSource`: Simple source that feeds items from a File containing JSON (such a file can be generated via `eqx query -o JSONFILE from cosmos` etc)
5. `SinglePassFeedSource`: Handles single pass loading of large datasets (such as a SQL database), completing when the full data has been ingested.
6. `Prometheus`: Exposes reading statistics to Prometheus (including metrics from `DynamoStore.DynamoStoreSource`, `EventStoreDb.EventStoreSource`, `MessageDb.MessageDbSource` and `SqlStreamStore.SqlStreamStoreSource`). (NOTE all other statistics relating to processing throughput and latency etc are exposed from the Scheduler component on the Sink side)
- `Propulsion.Prometheus` [![NuGet](https://img.shields.io/nuget/v/Propulsion.Prometheus.svg)](https://www.nuget.org/packages/Propulsion.Prometheus/) Provides helpers for checkpointed consumption of a feed of stream-based inputs. Provides for custom bindings (e.g. a third-party Feed API) or various other input configurations (e.g. periodically correlating with inputs from a non-streamed source such as a SQL Database). Provides a generic API for checkpoint storage, with diverse implementations hosted in the sibling packages associated with each concrete store (supported stores include DynamoStore, CosmosStore, SQL Server, Postgres). [Depends](https://www.fuget.org/packages/Propulsion.Prometheus) on `Propulsion`, a `IFeedCheckpointStore` implementation (from e.g., `Propulsion.CosmosStore|DynamoStore|MessageDb|SqlStreamStore`)

1. `Propulsion.Prometheus`: Exposes processing throughput statistics to Prometheus.
2. `Propulsion.Feed.Prometheus`: Exposes reading statistics to Prometheus (including metrics from `DynamoStore.DynamoStoreSource`, `EventStoreDb.EventStoreSource`, `MessageDb.MessageDbSource` and `SqlStreamStore.SqlStreamStoreSource`).

- `Propulsion.MemoryStore` [![NuGet](https://img.shields.io/nuget/v/Propulsion.MemoryStore.svg)](https://www.nuget.org/packages/Propulsion.MemoryStore/). Provides bindings to `Equinox.MemoryStore`. [Depends](https://www.fuget.org/packages/Propulsion.MemoryStore) on `Equinox.MemoryStore` v `4.0.0`, `FsCodec.Box`, `Propulsion`

Expand All @@ -51,9 +57,8 @@ If you're looking for a good discussion forum on these kinds of topics, look no
2. `DynamoStoreIndexer`: writes to `AppendsIndex`/`AppendsEpoch` (used by `Propulsion.DynamoStore.Indexer`, `Propulsion.Tool`)
3. `DynamoStoreSource`: reads from `AppendsIndex`/`AppendsEpoch` (see `DynamoStoreIndexer`)
4. `ReaderCheckpoint`: checkpoint storage for `Propulsion.DynamoStore`/`EventStoreDb`/`Feed`/`MessageDb`/`SqlStreamSteamStore` using `Equinox.DynamoStore` v `4.0.0`.
5. `Monitor.AwaitCompletion`: See `Propulsion.Feed`

(Reading and position metrics are exposed via `Propulsion.Feed.Prometheus`)
(Reading and position metrics are exposed via `Propulsion.Prometheus`)

- `Propulsion.DynamoStore.Indexer` [![NuGet](https://img.shields.io/nuget/v/Propulsion.DynamoStore.Indexer.svg)](https://www.nuget.org/packages/Propulsion.DynamoStore.Indexer/) AWS Lambda to index appends into an Index Table. [Depends](https://www.fuget.org/packages/Propulsion.DynamoStore.Indexer) on `Propulsion.DynamoStore`, `Amazon.Lambda.Core`, `Amazon.Lambda.DynamoDBEvents`, `Amazon.Lambda.Serialization.SystemTextJson`

Expand All @@ -76,7 +81,7 @@ If you're looking for a good discussion forum on these kinds of topics, look no
2. `DynamoStoreNotifierLambda`: CDK wiring for `Propulsion.DynamoStore.Notifier`
3. `DynamoStoreReactorLambda`: CDK wiring for a Reactor that's triggered based on messages supplied by `Propulsion.DynamoStore.Notifier`

- `Propulsion.DynamoStore.Lambda` [![NuGet](https://img.shields.io/nuget/v/Propulsion.DynamoStore.Lambda.svg)](https://www.nuget.org/packages/Propulsion.DynamoStore.Lambda/) Helpers for implementing Lambda Reactors. [Depends](https://www.fuget.org/packages/Propulsion.DynamoStore.Lambda) on `Amazon.Lambda.SQSEvents`, `Propulsion.Feed`
- `Propulsion.DynamoStore.Lambda` [![NuGet](https://img.shields.io/nuget/v/Propulsion.DynamoStore.Lambda.svg)](https://www.nuget.org/packages/Propulsion.DynamoStore.Lambda/) Helpers for implementing Lambda Reactors. [Depends](https://www.fuget.org/packages/Propulsion.DynamoStore.Lambda) on `Amazon.Lambda.SQSEvents`

1. `SqsNotificationBatch.parse`: parses a batch of notification events (queued by a `Notifier`) in a `Amazon.Lambda.SQSEvents.SQSEvent`
2. `SqsNotificationBatch.batchResponseWithFailuresForPositionsNotReached`: Correlates the updated checkpoints with the input `SQSEvent`, generating a `SQSBatchResponse` that will requeue any notifications that have not yet been serviced.
Expand All @@ -86,23 +91,17 @@ If you're looking for a good discussion forum on these kinds of topics, look no
- `Propulsion.EventStoreDb` [![NuGet](https://img.shields.io/nuget/v/Propulsion.EventStoreDb.svg)](https://www.nuget.org/packages/Propulsion.EventStoreDb/). Provides bindings to [EventStoreDB](https://www.eventstore.org), writing via `Propulsion.EventStore.EventStoreSink`. [Depends](https://www.fuget.org/packages/Propulsion.EventStoreDb) on `Equinox.EventStoreDb` v `4.0.0`
1. `EventStoreSource`: reading from an EventStoreDB >= `20.10` `$all` stream using the gRPC interface into a `Propulsion.Sink`.
2. `EventStoreSink`: writing to `Equinox.EventStoreDb` v `4.0.0`
3. `Monitor.AwaitCompletion`: See `Propulsion.Feed`

(Reading and position metrics are exposed via `Propulsion.Feed.Prometheus`)

- `Propulsion.Kafka` [![NuGet](https://img.shields.io/nuget/v/Propulsion.Kafka.svg)](https://www.nuget.org/packages/Propulsion.Kafka/) Provides bindings for producing and consuming both streamwise and in parallel. Includes a standard codec for use with streamwise projection and consumption, `Propulsion.Kafka.Codec.NewtonsoftJson.RenderedSpan`. [Depends](https://www.fuget.org/packages/Propulsion.Kafka) on `FsKafka` v `1.7.0`-`1.9.99`

- `Propulsion.MessageDb` [![NuGet](https://img.shields.io/nuget/v/Propulsion.MessageDb.svg)](https://www.nuget.org/packages/Propulsion.MessageDb/). Provides bindings to [MessageDb](http://docs.eventide-project.org/user-guide/message-db/), maintaining checkpoints in a postgres table [Depends](https://www.fuget.org/packages/Propulsion.MessageDb) on `Propulsion.Feed`, `Npgsql` >= `7.0.7` [#181](https://github.com/jet/propulsion/pull/181) :pray: [@nordfjord](https://github.com/nordfjord)
- `Propulsion.MessageDb` [![NuGet](https://img.shields.io/nuget/v/Propulsion.MessageDb.svg)](https://www.nuget.org/packages/Propulsion.MessageDb/). Provides bindings to [MessageDb](http://docs.eventide-project.org/user-guide/message-db/), maintaining checkpoints in a postgres table [Depends](https://www.fuget.org/packages/Propulsion.MessageDb) on `Propulsion`, `Npgsql` >= `7.0.7` [#181](https://github.com/jet/propulsion/pull/181) :pray: [@nordfjord](https://github.com/nordfjord)
1. `MessageDbSource`: reading from one or more MessageDB categories into a `Propulsion.Sink`
2. `CheckpointStore`: checkpoint storage for `Propulsion.Feed` using `Npgsql` (can be initialized via `propulsion initpg -c connstr -s schema`)

- `Propulsion.SqlStreamStore` [![NuGet](https://img.shields.io/nuget/v/Propulsion.SqlStreamStore.svg)](https://www.nuget.org/packages/Propulsion.SqlStreamStore/). Provides bindings to [SqlStreamStore](https://github.com/SQLStreamStore/SQLStreamStore), maintaining checkpoints in a SQL Server table. [Depends](https://www.fuget.org/packages/Propulsion.SqlStreamStore) on `Propulsion.Feed`, `SqlStreamStore`, `Dapper` v `2.0`, `Microsoft.Data.SqlClient` v `1.1.3`
- `Propulsion.SqlStreamStore` [![NuGet](https://img.shields.io/nuget/v/Propulsion.SqlStreamStore.svg)](https://www.nuget.org/packages/Propulsion.SqlStreamStore/). Provides bindings to [SqlStreamStore](https://github.com/SQLStreamStore/SQLStreamStore), maintaining checkpoints in a SQL Server table. [Depends](https://www.fuget.org/packages/Propulsion.SqlStreamStore) on `Propulsion`, `SqlStreamStore`, `Dapper` v `2.0`, `Microsoft.Data.SqlClient` v `1.1.3`

1. `SqlStreamStoreSource`: reading from a SqlStreamStore `$all` stream into a `Propulsion.Sink`
2. `ReaderCheckpoint`: checkpoint storage for `Propulsion.EventStoreDb`/`Feed`/`SqlStreamSteamStore` using `Dapper`, `Microsoft.Data.SqlClient`
3. `Monitor.AwaitCompletion`: See `Propulsion.Feed`

(Reading and position metrics are exposed via `Propulsion.Feed.Prometheus`)

The ubiquitous `Serilog` dependency is solely on the core module, not any sinks.

Expand Down
2 changes: 1 addition & 1 deletion build.proj
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
<Exec Command="dotnet pack src/Propulsion.DynamoStore.Lambda $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Propulsion.DynamoStore.Notifier $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Propulsion.EventStoreDb $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Propulsion.Feed $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Propulsion.Kafka $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Propulsion.MemoryStore $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Propulsion.MessageDb $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Propulsion.Prometheus $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Propulsion.SqlStreamStore $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack tools/Propulsion.Tool $(Cfg) $(PackOptions)" />

Expand Down
4 changes: 2 additions & 2 deletions src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Condition=" '$(Configuration)' == 'Debug' " Include="..\Propulsion\Propulsion.fsproj" />
<PackageReference Condition=" '$(Configuration)' == 'Release' " Include="Propulsion" Version="[3.0.0-rc.14, 4.0.0)" />
<ProjectReference Condition=" '$(PinnedBaselineRootPackage)' == '' " Include="..\Propulsion.Prometheus\Propulsion.Prometheus.fsproj" />
<PackageReference Condition=" '$(PinnedBaselineRootPackage)' != '' " Include="Propulsion.Prometheus" Version="$(PinnedBaselineRootPackage)" />
</ItemGroup>

</Project>
5 changes: 2 additions & 3 deletions src/Propulsion.CosmosStore3/Propulsion.CosmosStore3.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,8 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Propulsion\Propulsion.fsproj" />
<!-- <ProjectReference Condition=" '$(Configuration)' == 'Debug' " Include="..\Propulsion\Propulsion.fsproj" />-->
<!-- <PackageReference Condition=" '$(Configuration)' == 'Release' " Include="Propulsion" Version="[3.0.0-rc.14, 4.0.0)" />-->
<ProjectReference Condition=" '$(PinnedBaselineRootPackage)' == '' " Include="..\Propulsion.Prometheus\Propulsion.Prometheus.fsproj" />
<PackageReference Condition=" '$(PinnedBaselineRootPackage)' != '' " Include="Propulsion.Prometheus" Version="$(PinnedBaselineRootPackage)" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<PackageValidationBaselineVersion>3.0.0-rc.12</PackageValidationBaselineVersion>
<!-- <PackageValidationBaselineVersion>3.0.0-rc.12</PackageValidationBaselineVersion>-->
</PropertyGroup>

<ItemGroup>
Expand Down
Loading

0 comments on commit 11c7e56

Please sign in to comment.