Skip to content
This repository has been archived by the owner on Dec 25, 2024. It is now read-only.

Commit

Permalink
Rework realtime logic (#247)
Browse files Browse the repository at this point in the history
  • Loading branch information
mburumaxwell authored May 7, 2024
1 parent da5d56e commit 9f017ea
Show file tree
Hide file tree
Showing 14 changed files with 148 additions and 219 deletions.
1 change: 0 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ jobs:
uses: actions/checkout@v4
with:
fetch-depth: 0 # Required for GitVersion
submodules: true

- name: Install GitVersion
uses: gittools/actions/gitversion/setup@v1
Expand Down
33 changes: 0 additions & 33 deletions src/FaluCli/Client/Realtime/RealtimeConnectionNegotiation.cs

This file was deleted.

This file was deleted.

88 changes: 88 additions & 0 deletions src/FaluCli/Client/Realtime/RealtimeNegotiation.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
using System.Net;
using System.Text.Json.Nodes;
using System.Text.Json.Serialization;

namespace Falu.Client.Realtime;

public class RealtimeNegotiation
{
[JsonPropertyName("url")]
public Uri Url { get; set; } = default!;

[JsonPropertyName("expires")]
public DateTimeOffset Expires { get; set; }

[JsonPropertyName("token")]
public string Token { get; set; } = default!;

[JsonPropertyName("state")]
public string State { get; set; } = default!;

[JsonPropertyName("workspace")]
public string Workspace { get; set; } = default!;

[JsonPropertyName("live")]
public bool Live { get; set; }

public CancellationTokenSource MakeCancellationTokenSource(CancellationToken other)
{
// create a CancellationToken sourced from the other and cancels when the token expires
var lifetime = Expires - DateTimeOffset.UtcNow - TimeSpan.FromSeconds(2);
var cts = CancellationTokenSource.CreateLinkedTokenSource(other);
cts.CancelAfter(lifetime);
return cts;
}
}

public abstract class RealtimeNegotiationOptions<TFilters>
{
[JsonPropertyName("ttl")]
public string Ttl { get; set; } = "PT1H";

[JsonPropertyName("filters")]
public TFilters? Filters { get; set; }
}

public class RealtimeNegotiationFiltersRequestLogs
{
[JsonPropertyName("ip_networks")]
public IPNetwork[]? IPNetworks { get; set; }

[JsonPropertyName("ip_addresses")]
public IPAddress[]? IPAddresses { get; set; }

[JsonPropertyName("sources")]
public string[]? Sources { get; set; }

[JsonPropertyName("paths")]
public string[]? Paths { get; set; }

[JsonPropertyName("methods")]
public string[]? Methods { get; set; }

[JsonPropertyName("status_codes")]
public int[]? StatusCodes { get; set; }
}

public class RealtimeNegotiationFiltersEvents
{
[JsonPropertyName("types")]
public string[]? Types { get; set; }
}

public class RealtimeNegotiationOptionsEvents
: RealtimeNegotiationOptions<RealtimeNegotiationFiltersEvents>
{ }

public class RealtimeNegotiationOptionsRequestLogs
: RealtimeNegotiationOptions<RealtimeNegotiationFiltersRequestLogs>
{ }

internal class RealtimeMessage
{
[JsonPropertyName("type")]
public string? Type { get; set; }

[JsonPropertyName("object")]
public JsonObject? Object { get; set; }
}
19 changes: 14 additions & 5 deletions src/FaluCli/Client/Realtime/RealtimeServiceClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,21 @@ namespace Falu.Client.Realtime;

internal class RealtimeServiceClient(HttpClient backChannel, FaluClientOptions options) : BaseServiceClient(backChannel, options)
{
public virtual Task<ResourceResponse<RealtimeConnectionNegotiation>> NegotiateAsync(RealtimeConnectionNegotiationRequest request,
RequestOptions? options = null,
public virtual Task<ResourceResponse<RealtimeNegotiation>> NegotiateAsync(RealtimeNegotiationOptionsEvents options,
RequestOptions? requestOptions = null,
CancellationToken cancellationToken = default)
{
var uri = "/v1/realtime/negotiate";
var content = JsonContent.Create(request, SC.Default.RealtimeConnectionNegotiationRequest);
return RequestAsync(uri, HttpMethod.Post, SC.Default.RealtimeConnectionNegotiation, content, options, cancellationToken);
var uri = "/v1/realtime/negotiate/events";
var content = JsonContent.Create(options, SC.Default.RealtimeNegotiationOptionsEvents);
return RequestAsync(uri, HttpMethod.Post, SC.Default.RealtimeNegotiation, content, requestOptions, cancellationToken);
}

public virtual Task<ResourceResponse<RealtimeNegotiation>> NegotiateAsync(RealtimeNegotiationOptionsRequestLogs options,
RequestOptions? requestOptions = null,
CancellationToken cancellationToken = default)
{
var uri = "/v1/realtime/negotiate/request_logs";
var content = JsonContent.Create(options, SC.Default.RealtimeNegotiationOptionsRequestLogs);
return RequestAsync(uri, HttpMethod.Post, SC.Default.RealtimeNegotiation, content, requestOptions, cancellationToken);
}
}
37 changes: 12 additions & 25 deletions src/FaluCli/Commands/Events/EventsListenCommandHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
using CloudNative.CloudEvents.Http;
using Falu.Client;
using Falu.Client.Realtime;
using Falu.Websockets;
using Spectre.Console;
using System.Text;
using System.Text.Json.Nodes;
Expand Down Expand Up @@ -73,10 +72,18 @@ public async Task<int> InvokeAsync(InvocationContext context)
forwardTo);
}

// negotiate a realtime connection
// prepare filters
var options = new RealtimeNegotiationOptionsEvents
{
Filters = new RealtimeNegotiationFiltersEvents
{
Types = types,
},
};

// negotiate a connection
logger.LogInformation("Negotiating connection information ...");
var request = new RealtimeConnectionNegotiationRequest { Type = "websocket", Purpose = "events", };
var response = await client.Realtime.NegotiateAsync(request, cancellationToken: cancellationToken);
var response = await client.Realtime.NegotiateAsync(options, cancellationToken: cancellationToken);
response.EnsureSuccess();
var negotiation = response.Resource ?? throw new InvalidOperationException("Response from negotiation cannot be null or empty");

Expand All @@ -85,34 +92,14 @@ public async Task<int> InvokeAsync(InvocationContext context)
cancellationToken = cts.Token;
await websocketHandler.StartAsync(negotiation, (msg, ct) => HandleIncomingMessage(workspaceId, live, forwardTo, secret, msg, ct), cts);

// prepare filters
var filters = new RealtimeConnectionFilters
{
Events = new RealtimeConnectionFilterEvents
{
Types = types,
}.NullIfEmpty(),
}.NullIfEmpty();

// send message
var message = new RealtimeConnectionOutgoingMessage("subscribe_events", filters);
await websocketHandler.SendMessageAsync(message, cancellationToken);

// run until cancelled
await Task.Delay(Timeout.Infinite, cancellationToken);

return 0;
}

private Task HandleIncomingMessage(string workspaceId, bool live, Uri? forwardTo, string? secret, RealtimeConnectionIncomingMessage message, CancellationToken cancellationToken)
private Task HandleIncomingMessage(string workspaceId, bool live, Uri? forwardTo, string? secret, RealtimeMessage message, CancellationToken cancellationToken)
{
var type = message.Type;
if (!string.Equals(type, "event", StringComparison.OrdinalIgnoreCase))
{
logger.LogWarning("Received unknown message of type {Type}", type);
return Task.CompletedTask;
}

var @object = message.Object ?? throw new InvalidOperationException("The message should have an object at this point");
var @event = System.Text.Json.JsonSerializer.Deserialize(@object, FaluCliJsonSerializerContext.Default.WebhookEvent)!;
var eventId = @event.Id!;
Expand Down
3 changes: 3 additions & 0 deletions src/FaluCli/Commands/RequestLogs/RequestLogsTailCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ internal class RequestLogsTailCommand : Command
{
public RequestLogsTailCommand() : base("tail", "Tail request logs")
{
this.AddOption<IPNetwork[]>(["--ip-network", "--network"],
description: "The IP network to filter for.");

this.AddOption<IPAddress[]>(["--ip-address", "--ip"],
description: "The IP address to filter for.");

Expand Down
45 changes: 17 additions & 28 deletions src/FaluCli/Commands/RequestLogs/RequestLogsTailCommandHandler.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using Falu.Client;
using Falu.Client.Realtime;
using Falu.Websockets;
using Spectre.Console;
using System.Net;
using System.Text;
Expand All @@ -18,56 +17,46 @@ public async Task<int> InvokeAsync(InvocationContext context)

var workspaceId = context.ParseResult.ValueForOption<string>("--workspace")!;
var live = context.ParseResult.ValueForOption<bool?>("--live") ?? false;
var ipNetworks = context.ParseResult.ValueForOption<IPNetwork[]>("--ip-network").NullIfEmpty();
var ipAddresses = context.ParseResult.ValueForOption<IPAddress[]>("--ip-address").NullIfEmpty();
var methods = context.ParseResult.ValueForOption<string[]>("--http-method").NullIfEmpty();
var paths = context.ParseResult.ValueForOption<string[]>("--request-path").NullIfEmpty();
var statusCodes = context.ParseResult.ValueForOption<int[]>("--status-code").NullIfEmpty();
var sources = context.ParseResult.ValueForOption<string[]>("--source").NullIfEmpty();

// negotiate a realtime connection
logger.LogInformation("Negotiating connection information ...");
var request = new RealtimeConnectionNegotiationRequest { Type = "websocket", Purpose = "logs", };
var response = await client.Realtime.NegotiateAsync(request, cancellationToken: cancellationToken);
response.EnsureSuccess();
var negotiation = response.Resource ?? throw new InvalidOperationException("Response from negotiation cannot be null or empty");

// start the handler
using var cts = negotiation.MakeCancellationTokenSource(cancellationToken);
cancellationToken = cts.Token;
await websocketHandler.StartAsync(negotiation, (msg, _) => HandleIncomingMessage(workspaceId, live, msg), cts);

// prepare filters
var filters = new RealtimeConnectionFilters
var options = new RealtimeNegotiationOptionsRequestLogs
{
Logs = new RealtimeConnectionFilterLogs
Filters = new RealtimeNegotiationFiltersRequestLogs
{
IPNetworks = ipNetworks,
IPAddresses = ipAddresses,
Methods = methods,
Paths = paths,
StatusCodes = statusCodes,
Sources = sources,
}.NullIfEmpty(),
}.NullIfEmpty();
},
};

// send message
var message = new RealtimeConnectionOutgoingMessage("subscribe_request_logs", filters);
await websocketHandler.SendMessageAsync(message, cancellationToken);
// negotiate a connection
logger.LogInformation("Negotiating connection information ...");
var response = await client.Realtime.NegotiateAsync(options, cancellationToken: cancellationToken);
response.EnsureSuccess();
var negotiation = response.Resource ?? throw new InvalidOperationException("Response from negotiation cannot be null or empty");

// start the handler
using var cts = negotiation.MakeCancellationTokenSource(cancellationToken);
cancellationToken = cts.Token;
await websocketHandler.StartAsync(negotiation, (msg, _) => HandleIncomingMessage(workspaceId, live, msg), cts);

// run until cancelled
await Task.Delay(Timeout.Infinite, cancellationToken);

return 0;
}

private Task HandleIncomingMessage(string workspaceId, bool live, RealtimeConnectionIncomingMessage message)
private static Task HandleIncomingMessage(string workspaceId, bool live, RealtimeMessage message)
{
var type = message.Type;
if (!string.Equals(type, "request_log", StringComparison.OrdinalIgnoreCase))
{
logger.LogWarning("Received unknown message of type {Type}", type);
return Task.CompletedTask;
}

var @object = message.Object ?? throw new InvalidOperationException("The message should have an object at this point");
var log = System.Text.Json.JsonSerializer.Deserialize(@object, FaluCliJsonSerializerContext.Default.RequestLog)!;
var url = $"https://dashboard.falu.io/{workspaceId}/developer/logs/{log.Id}?live={live.ToString().ToLowerInvariant()}";
Expand Down
8 changes: 4 additions & 4 deletions src/FaluCli/FaluCliJsonSerializerContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ namespace Falu;
[JsonSerializable(typeof(List<Client.MoneyStatements.MoneyStatement>))]
[JsonSerializable(typeof(Client.MoneyStatements.MoneyStatementUploadResponse))]

[JsonSerializable(typeof(Client.Realtime.RealtimeConnectionNegotiationRequest))]
[JsonSerializable(typeof(Client.Realtime.RealtimeConnectionNegotiation))]
[JsonSerializable(typeof(Client.Realtime.RealtimeMessage))]
[JsonSerializable(typeof(Client.Realtime.RealtimeNegotiationOptionsEvents))]
[JsonSerializable(typeof(Client.Realtime.RealtimeNegotiationOptionsRequestLogs))]
[JsonSerializable(typeof(Client.Realtime.RealtimeNegotiation))]

[JsonSerializable(typeof(Commands.RequestLogs.RequestLog))]
[JsonSerializable(typeof(Commands.Templates.TemplateInfo))]
Expand All @@ -21,8 +23,6 @@ namespace Falu;
[JsonSerializable(typeof(Oidc.OidcTokenResponse))]

[JsonSerializable(typeof(Updates.GitHubLatestRelease))]
[JsonSerializable(typeof(Websockets.RealtimeConnectionIncomingMessage))]
[JsonSerializable(typeof(Websockets.RealtimeConnectionOutgoingMessage))]

[JsonSourceGenerationOptions(
AllowTrailingCommas = true,
Expand Down
1 change: 0 additions & 1 deletion src/FaluCli/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
using Falu.Commands.Money.Statements;
using Falu.Commands.RequestLogs;
using Falu.Commands.Templates;
using Falu.Websockets;
using System.CommandLine.Builder;
using System.CommandLine.Hosting;
using Res = Falu.Properties.Resources;
Expand Down
1 change: 1 addition & 0 deletions src/FaluCli/Properties/launchSettings.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"$schema": "http://json.schemastore.org/launchsettings.json",
"profiles": {
"FaluCli": {
"commandName": "Project",
Expand Down
Loading

0 comments on commit 9f017ea

Please sign in to comment.