Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Samples for multiple transports #457

Merged
merged 4 commits into from
Oct 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Tingle.EventBus.sln
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MultiEventsConsumer", "samp
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MultipleConsumers", "samples\MultipleConsumers\MultipleConsumers.csproj", "{B2043778-DDC9-4396-801C-442EFF0C7E73}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MultipleDifferentTransports", "samples\MultipleDifferentTransports\MultipleDifferentTransports.csproj", "{FC8C4CF2-16F7-4BBC-8A67-446AF35D3C19}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MultipleSimilarTransports", "samples\MultipleSimilarTransports\MultipleSimilarTransports.csproj", "{AC629321-9099-46F7-BD26-4205DA724D64}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SimpleConsumer", "samples\SimpleConsumer\SimpleConsumer.csproj", "{ED1E8BA4-9538-42AB-9759-4FC4233A36B1}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SimplePublisher", "samples\SimplePublisher\SimplePublisher.csproj", "{2FEC41FE-188B-4B02-943B-0FEAB0E1FA39}"
Expand Down Expand Up @@ -166,6 +170,14 @@ Global
{B2043778-DDC9-4396-801C-442EFF0C7E73}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B2043778-DDC9-4396-801C-442EFF0C7E73}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B2043778-DDC9-4396-801C-442EFF0C7E73}.Release|Any CPU.Build.0 = Release|Any CPU
{FC8C4CF2-16F7-4BBC-8A67-446AF35D3C19}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{FC8C4CF2-16F7-4BBC-8A67-446AF35D3C19}.Debug|Any CPU.Build.0 = Debug|Any CPU
{FC8C4CF2-16F7-4BBC-8A67-446AF35D3C19}.Release|Any CPU.ActiveCfg = Release|Any CPU
{FC8C4CF2-16F7-4BBC-8A67-446AF35D3C19}.Release|Any CPU.Build.0 = Release|Any CPU
{AC629321-9099-46F7-BD26-4205DA724D64}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{AC629321-9099-46F7-BD26-4205DA724D64}.Debug|Any CPU.Build.0 = Debug|Any CPU
{AC629321-9099-46F7-BD26-4205DA724D64}.Release|Any CPU.ActiveCfg = Release|Any CPU
{AC629321-9099-46F7-BD26-4205DA724D64}.Release|Any CPU.Build.0 = Release|Any CPU
{ED1E8BA4-9538-42AB-9759-4FC4233A36B1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{ED1E8BA4-9538-42AB-9759-4FC4233A36B1}.Debug|Any CPU.Build.0 = Debug|Any CPU
{ED1E8BA4-9538-42AB-9759-4FC4233A36B1}.Release|Any CPU.ActiveCfg = Release|Any CPU
Expand Down Expand Up @@ -202,6 +214,8 @@ Global
{C9293277-90BA-4F1A-BEA7-85CE41103B8D} = {62F603F3-FF36-4E36-AC0C-08D1883525BE}
{E9D6F25A-0586-4409-A3EB-A72B6E89A71C} = {62F603F3-FF36-4E36-AC0C-08D1883525BE}
{B2043778-DDC9-4396-801C-442EFF0C7E73} = {62F603F3-FF36-4E36-AC0C-08D1883525BE}
{FC8C4CF2-16F7-4BBC-8A67-446AF35D3C19} = {62F603F3-FF36-4E36-AC0C-08D1883525BE}
{AC629321-9099-46F7-BD26-4205DA724D64} = {62F603F3-FF36-4E36-AC0C-08D1883525BE}
{ED1E8BA4-9538-42AB-9759-4FC4233A36B1} = {62F603F3-FF36-4E36-AC0C-08D1883525BE}
{2FEC41FE-188B-4B02-943B-0FEAB0E1FA39} = {62F603F3-FF36-4E36-AC0C-08D1883525BE}
EndGlobalSection
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<Project Sdk="Microsoft.NET.Sdk.Worker">

<PropertyGroup>
<UserSecretsId>50e90b43-cf86-44cc-b5f8-2c8c5ff11362</UserSecretsId>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Configuration.UserSecrets" Version="6.0.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\Tingle.EventBus.Transports.Azure.EventHubs\Tingle.EventBus.Transports.Azure.EventHubs.csproj" />
<ProjectReference Include="..\..\src\Tingle.EventBus.Transports.Azure.ServiceBus\Tingle.EventBus.Transports.Azure.ServiceBus.csproj" />
</ItemGroup>

</Project>
45 changes: 45 additions & 0 deletions samples/MultipleDifferentTransports/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
using MultipleDifferentTransports;
using Tingle.EventBus.Configuration;
using Tingle.EventBus.Transports.Azure.EventHubs;
using Tingle.EventBus.Transports.Azure.ServiceBus;

var host = Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
var configuration = hostContext.Configuration;

services.AddEventBus(builder =>
{
builder.Configure(o =>
{
o.DefaultTransportName = AzureServiceBusDefaults.Name;

o.ConfigureEvent<VehicleTelemetryEvent>(reg =>
{
reg.ConfigureAsIotHubEvent(configuration["IotHubEventHubName"])
.UseIotHubEventSerializer();

reg.TransportName = AzureEventHubsDefaults.Name; // you can also use EventTransportNameAttribute on the event type declaration
});
});

builder.AddConsumer<VehicleTelemetryEventsConsumer>();

// Transport specific configuration
builder.AddAzureEventHubsTransport(options =>
{
options.Credentials = configuration.GetConnectionString("EventHub");
options.BlobStorageCredentials = configuration.GetConnectionString("AzureStorage");
});

// Transport specific configuration
builder.AddAzureServiceBusTransport(options =>
{
options.Credentials = configuration.GetConnectionString("ServiceBus");
options.DefaultEntityKind = EntityKind.Queue; // required if using the basic SKU (does not support topics)
});
});
})
.Build();

await host.RunAsync();
10 changes: 10 additions & 0 deletions samples/MultipleDifferentTransports/Properties/launchSettings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"profiles": {
"MultipleDifferentTransports": {
"commandName": "Project",
"environmentVariables": {
"DOTNET_ENVIRONMENT": "Development"
}
}
}
}
9 changes: 9 additions & 0 deletions samples/MultipleDifferentTransports/VehicleDoorOpenedEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace MultipleDifferentTransports;

public class VehicleDoorOpenedEvent
{
public string? VehicleId { get; set; }
public VehicleDoorKind Kind { get; set; }
public DateTimeOffset? Opened { get; set; }
public DateTimeOffset? Closed { get; set; }
}
45 changes: 45 additions & 0 deletions samples/MultipleDifferentTransports/VehicleTelemetryEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
using System.Text.Json.Nodes;
using System.Text.Json.Serialization;
using Tingle.EventBus.Transports.Azure.EventHubs.IotHub;

namespace MultipleDifferentTransports;

internal record VehicleTelemetryEvent : IotHubEvent<VehicleTelemetry>
{
public VehicleTelemetryEvent(IotHubEventMessageSource source,
VehicleTelemetry? telemetry,
IotHubOperationalEvent<IotHubDeviceTwinChangeEvent>? twinEvent,
IotHubOperationalEvent<IotHubDeviceLifecycleEvent>? lifecycleEvent,
IotHubOperationalEvent<IotHubDeviceConnectionStateEvent>? connectionStateEvent)
: base(source, telemetry, twinEvent, lifecycleEvent, connectionStateEvent) { }
}

internal class VehicleTelemetry
{
public DateTimeOffset Timestamp { get; set; }

public string? Action { get; set; }

public VehicleDoorKind? VehicleDoorKind { get; set; }
public VehicleDoorStatus? VehicleDoorStatus { get; set; }

[JsonExtensionData]
public JsonObject? Extras { get; set; }
}

public enum VehicleDoorStatus
{
Unknown,
Open,
Closed,
}

public enum VehicleDoorKind
{
FrontLeft,
FrontRight,
RearLeft,
ReadRight,
Hood,
Trunk,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
using Tingle.EventBus.Transports.Azure.EventHubs.IotHub;

namespace MultipleDifferentTransports;

internal class VehicleTelemetryEventsConsumer : IEventConsumer<VehicleTelemetryEvent>
{
private readonly ILogger logger;

public VehicleTelemetryEventsConsumer(ILogger<VehicleTelemetryEventsConsumer> logger)
{
this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

public async Task ConsumeAsync(EventContext<VehicleTelemetryEvent> context, CancellationToken cancellationToken)
{
var evt = context.Event;
var source = evt.Source;
if (source != IotHubEventMessageSource.Telemetry) return;

var telemetry = evt.Telemetry!;
var action = telemetry.Action;
if (action is not "door-status-changed")
{
logger.LogWarning("Telemetry with action '{TelemetryAction}' is not yet supported", action);
return;
}

var status = telemetry.VehicleDoorStatus;
if (status is not VehicleDoorStatus.Open and not VehicleDoorStatus.Closed)
{
logger.LogWarning("Vehicle Door status '{VehicleDoorStatus}' is not yet supported", status);
return;
}

var kind = telemetry.VehicleDoorKind;
if (kind is null)
{
logger.LogWarning("Vehicle Door kind '{VehicleDoorKind}' cannot be null", kind);
return;
}

var deviceId = context.GetIotHubDeviceId();
var timestamp = telemetry.Timestamp;
var updateEvt = new VehicleDoorOpenedEvent
{
VehicleId = deviceId, // not the registration number
Kind = kind.Value,
Closed = status is VehicleDoorStatus.Closed ? timestamp : null,
Opened = status is VehicleDoorStatus.Open ? timestamp : null,
};

// the VehicleDoorOpenedEvent on a broadcast bus would notify all subscribers
await context.PublishAsync(updateEvt, cancellationToken: cancellationToken);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"Logging": {
"LogLevel": {
"Default": "Debug",
"Microsoft": "Information",
"System": "Information"
}
}
}
15 changes: 15 additions & 0 deletions samples/MultipleDifferentTransports/appsettings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft": "Warning",
"Microsoft.Hosting.Lifetime": "Information"
}
},
"AllowedHosts": "*",

"ConnectionStrings:AzureStorage": "UseDevelopmentStorage=true;",
"ConnectionStrings:EventHub": "Endpoint=sb://abcd.servicebus.windows.net/;SharedAccessKeyName=xyz;SharedAccessKey=AAAAAAAAAAAAAAAAAAAAAA==",
"ConnectionStrings:ServiceBus": "Endpoint=sb://abcd.servicebus.windows.net/;SharedAccessKeyName=xyz;SharedAccessKey=AAAAAAAAAAAAAAAAAAAAAA==",
"IotHubEventHubName": "iothub-ehub-test-dev-0000000-0aaaa000aa"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk.Worker">

<ItemGroup>
<ProjectReference Include="..\..\src\Tingle.EventBus.Transports.InMemory\Tingle.EventBus.Transports.InMemory.csproj" />
</ItemGroup>

</Project>
109 changes: 109 additions & 0 deletions samples/MultipleSimilarTransports/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
using Tingle.EventBus.Configuration;

var host = Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
services.AddEventBus(builder =>
{
// Transport agnostic configuration
builder.Configure(o => o.Naming.UseFullTypeNames = false);

builder.AddConsumer<VisualsUploadedConsumer>();

// Add transports
builder.AddInMemoryTransport("in-memory-images");
builder.AddInMemoryTransport("in-memory-videos");
});

services.AddHostedService<VisualsProducerService>();
})
.Build();

await host.RunAsync();

class VisualsProducerService : BackgroundService
{
private readonly IEventPublisher publisher;
private readonly ILogger logger;

public VisualsProducerService(IEventPublisher publisher, ILogger<VisualsProducerService> logger)
{
this.publisher = publisher ?? throw new ArgumentNullException(nameof(publisher));
this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await Task.Delay(TimeSpan.FromSeconds(2), stoppingToken); // delays a little so that the logs are better visible in a better order (only ended for sample)

logger.LogInformation("Starting production ...");

var delay = TimeSpan.FromSeconds(10);
var times = 10;

var rnd = new Random(DateTimeOffset.UtcNow.Millisecond);

for (var i = 0; i < times; i++)
{
var id = Convert.ToUInt32(rnd.Next()).ToString();
var size = Convert.ToUInt32(rnd.Next());
var image = (i % 2) == 0;
var url = $"https://localhost:8080/{(image ? "images" : "videos")}/{id}.{(image ? "png" : "flv")}";

_ = image
? await DoPublishAsync(new VideoUploaded { VideoId = id, SizeBytes = size, Url = url, }, stoppingToken)
: await DoPublishAsync(new ImageUploaded { ImageId = id, SizeBytes = size, Url = url, }, stoppingToken);

await Task.Delay(delay, stoppingToken);
}
}

private async Task<ScheduledResult?> DoPublishAsync<T>(T @event, CancellationToken cancellationToken) where T : class
=> await publisher.PublishAsync(@event, cancellationToken: cancellationToken);
}

class VisualsUploadedConsumer : IEventConsumer<ImageUploaded>, IEventConsumer<VideoUploaded>
{
private static readonly TimeSpan SimulationDuration = TimeSpan.FromSeconds(1.3f);

private readonly ILogger logger;

public VisualsUploadedConsumer(ILogger<VisualsUploadedConsumer> logger)
{
this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

public async Task ConsumeAsync(EventContext<ImageUploaded> context, CancellationToken cancellationToken)
{
var id = context.Event.ImageId;
var thumbnailUrl = $"https://localhost:8080/thumbnails/{id}.jpg";

await Task.Delay(SimulationDuration, cancellationToken);
logger.LogInformation("Generated thumbnail from image '{ImageId}' at '{ThumbnailUrl}'.", id, thumbnailUrl);
}

public async Task ConsumeAsync(EventContext<VideoUploaded> context, CancellationToken cancellationToken = default)
{
var id = context.Event.VideoId;
var thumbnailUrl = $"https://localhost:8080/thumbnails/{id}.jpg";

await Task.Delay(SimulationDuration, cancellationToken);
logger.LogInformation("Generated thumbnail from video '{VideoId}' at '{ThumbnailUrl}'.", id, thumbnailUrl);
}
}

[EventTransportName("in-memory-images")] // can also be configured in the builder
class ImageUploaded
{
public string? ImageId { get; set; }
public string? Url { get; set; }
public long SizeBytes { get; set; }
}

[EventTransportName("in-memory-videos")] // can also be configured in the builder
class VideoUploaded
{
public string? VideoId { get; set; }
public string? Url { get; set; }
public long SizeBytes { get; set; }
}
10 changes: 10 additions & 0 deletions samples/MultipleSimilarTransports/Properties/launchSettings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"profiles": {
"MultipleSimilarTransports": {
"commandName": "Project",
"environmentVariables": {
"DOTNET_ENVIRONMENT": "Development"
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"Logging": {
"LogLevel": {
"Default": "Debug",
"Microsoft": "Information",
"System": "Information"
}
}
}
9 changes: 9 additions & 0 deletions samples/MultipleSimilarTransports/appsettings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft": "Warning",
"Microsoft.Hosting.Lifetime": "Information"
}
}
}