Skip to content
This repository has been archived by the owner on Dec 25, 2024. It is now read-only.

Rework realtime logic #247

Merged
merged 1 commit into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ jobs:
uses: actions/checkout@v4
with:
fetch-depth: 0 # Required for GitVersion
submodules: true

- name: Install GitVersion
uses: gittools/actions/gitversion/setup@v1
Expand Down
33 changes: 0 additions & 33 deletions src/FaluCli/Client/Realtime/RealtimeConnectionNegotiation.cs

This file was deleted.

This file was deleted.

88 changes: 88 additions & 0 deletions src/FaluCli/Client/Realtime/RealtimeNegotiation.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
using System.Net;
using System.Text.Json.Nodes;
using System.Text.Json.Serialization;

namespace Falu.Client.Realtime;

public class RealtimeNegotiation
{
[JsonPropertyName("url")]
public Uri Url { get; set; } = default!;

[JsonPropertyName("expires")]
public DateTimeOffset Expires { get; set; }

[JsonPropertyName("token")]
public string Token { get; set; } = default!;

[JsonPropertyName("state")]
public string State { get; set; } = default!;

[JsonPropertyName("workspace")]
public string Workspace { get; set; } = default!;

[JsonPropertyName("live")]
public bool Live { get; set; }

public CancellationTokenSource MakeCancellationTokenSource(CancellationToken other)
{
// create a CancellationToken sourced from the other and cancels when the token expires
var lifetime = Expires - DateTimeOffset.UtcNow - TimeSpan.FromSeconds(2);
var cts = CancellationTokenSource.CreateLinkedTokenSource(other);
cts.CancelAfter(lifetime);
return cts;
}
}

public abstract class RealtimeNegotiationOptions<TFilters>
{
[JsonPropertyName("ttl")]
public string Ttl { get; set; } = "PT1H";

[JsonPropertyName("filters")]
public TFilters? Filters { get; set; }
}

public class RealtimeNegotiationFiltersRequestLogs
{
[JsonPropertyName("ip_networks")]
public IPNetwork[]? IPNetworks { get; set; }

[JsonPropertyName("ip_addresses")]
public IPAddress[]? IPAddresses { get; set; }

[JsonPropertyName("sources")]
public string[]? Sources { get; set; }

[JsonPropertyName("paths")]
public string[]? Paths { get; set; }

[JsonPropertyName("methods")]
public string[]? Methods { get; set; }

[JsonPropertyName("status_codes")]
public int[]? StatusCodes { get; set; }
}

public class RealtimeNegotiationFiltersEvents
{
[JsonPropertyName("types")]
public string[]? Types { get; set; }
}

public class RealtimeNegotiationOptionsEvents
: RealtimeNegotiationOptions<RealtimeNegotiationFiltersEvents>
{ }

public class RealtimeNegotiationOptionsRequestLogs
: RealtimeNegotiationOptions<RealtimeNegotiationFiltersRequestLogs>
{ }

internal class RealtimeMessage
{
[JsonPropertyName("type")]
public string? Type { get; set; }

[JsonPropertyName("object")]
public JsonObject? Object { get; set; }
}
19 changes: 14 additions & 5 deletions src/FaluCli/Client/Realtime/RealtimeServiceClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,21 @@ namespace Falu.Client.Realtime;

internal class RealtimeServiceClient(HttpClient backChannel, FaluClientOptions options) : BaseServiceClient(backChannel, options)
{
public virtual Task<ResourceResponse<RealtimeConnectionNegotiation>> NegotiateAsync(RealtimeConnectionNegotiationRequest request,
RequestOptions? options = null,
public virtual Task<ResourceResponse<RealtimeNegotiation>> NegotiateAsync(RealtimeNegotiationOptionsEvents options,
RequestOptions? requestOptions = null,
CancellationToken cancellationToken = default)
{
var uri = "/v1/realtime/negotiate";
var content = JsonContent.Create(request, SC.Default.RealtimeConnectionNegotiationRequest);
return RequestAsync(uri, HttpMethod.Post, SC.Default.RealtimeConnectionNegotiation, content, options, cancellationToken);
var uri = "/v1/realtime/negotiate/events";
var content = JsonContent.Create(options, SC.Default.RealtimeNegotiationOptionsEvents);
return RequestAsync(uri, HttpMethod.Post, SC.Default.RealtimeNegotiation, content, requestOptions, cancellationToken);
}

public virtual Task<ResourceResponse<RealtimeNegotiation>> NegotiateAsync(RealtimeNegotiationOptionsRequestLogs options,
RequestOptions? requestOptions = null,
CancellationToken cancellationToken = default)
{
var uri = "/v1/realtime/negotiate/request_logs";
var content = JsonContent.Create(options, SC.Default.RealtimeNegotiationOptionsRequestLogs);
return RequestAsync(uri, HttpMethod.Post, SC.Default.RealtimeNegotiation, content, requestOptions, cancellationToken);
}
}
37 changes: 12 additions & 25 deletions src/FaluCli/Commands/Events/EventsListenCommandHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
using CloudNative.CloudEvents.Http;
using Falu.Client;
using Falu.Client.Realtime;
using Falu.Websockets;
using Spectre.Console;
using System.Text;
using System.Text.Json.Nodes;
Expand Down Expand Up @@ -73,10 +72,18 @@ public async Task<int> InvokeAsync(InvocationContext context)
forwardTo);
}

// negotiate a realtime connection
// prepare filters
var options = new RealtimeNegotiationOptionsEvents
{
Filters = new RealtimeNegotiationFiltersEvents
{
Types = types,
},
};

// negotiate a connection
logger.LogInformation("Negotiating connection information ...");
var request = new RealtimeConnectionNegotiationRequest { Type = "websocket", Purpose = "events", };
var response = await client.Realtime.NegotiateAsync(request, cancellationToken: cancellationToken);
var response = await client.Realtime.NegotiateAsync(options, cancellationToken: cancellationToken);
response.EnsureSuccess();
var negotiation = response.Resource ?? throw new InvalidOperationException("Response from negotiation cannot be null or empty");

Expand All @@ -85,34 +92,14 @@ public async Task<int> InvokeAsync(InvocationContext context)
cancellationToken = cts.Token;
await websocketHandler.StartAsync(negotiation, (msg, ct) => HandleIncomingMessage(workspaceId, live, forwardTo, secret, msg, ct), cts);

// prepare filters
var filters = new RealtimeConnectionFilters
{
Events = new RealtimeConnectionFilterEvents
{
Types = types,
}.NullIfEmpty(),
}.NullIfEmpty();

// send message
var message = new RealtimeConnectionOutgoingMessage("subscribe_events", filters);
await websocketHandler.SendMessageAsync(message, cancellationToken);

// run until cancelled
await Task.Delay(Timeout.Infinite, cancellationToken);

return 0;
}

private Task HandleIncomingMessage(string workspaceId, bool live, Uri? forwardTo, string? secret, RealtimeConnectionIncomingMessage message, CancellationToken cancellationToken)
private Task HandleIncomingMessage(string workspaceId, bool live, Uri? forwardTo, string? secret, RealtimeMessage message, CancellationToken cancellationToken)
{
var type = message.Type;
if (!string.Equals(type, "event", StringComparison.OrdinalIgnoreCase))
{
logger.LogWarning("Received unknown message of type {Type}", type);
return Task.CompletedTask;
}

var @object = message.Object ?? throw new InvalidOperationException("The message should have an object at this point");
var @event = System.Text.Json.JsonSerializer.Deserialize(@object, FaluCliJsonSerializerContext.Default.WebhookEvent)!;
var eventId = @event.Id!;
Expand Down
3 changes: 3 additions & 0 deletions src/FaluCli/Commands/RequestLogs/RequestLogsTailCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ internal class RequestLogsTailCommand : Command
{
public RequestLogsTailCommand() : base("tail", "Tail request logs")
{
this.AddOption<IPNetwork[]>(["--ip-network", "--network"],
description: "The IP network to filter for.");

this.AddOption<IPAddress[]>(["--ip-address", "--ip"],
description: "The IP address to filter for.");

Expand Down
45 changes: 17 additions & 28 deletions src/FaluCli/Commands/RequestLogs/RequestLogsTailCommandHandler.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using Falu.Client;
using Falu.Client.Realtime;
using Falu.Websockets;
using Spectre.Console;
using System.Net;
using System.Text;
Expand All @@ -18,56 +17,46 @@ public async Task<int> InvokeAsync(InvocationContext context)

var workspaceId = context.ParseResult.ValueForOption<string>("--workspace")!;
var live = context.ParseResult.ValueForOption<bool?>("--live") ?? false;
var ipNetworks = context.ParseResult.ValueForOption<IPNetwork[]>("--ip-network").NullIfEmpty();
var ipAddresses = context.ParseResult.ValueForOption<IPAddress[]>("--ip-address").NullIfEmpty();
var methods = context.ParseResult.ValueForOption<string[]>("--http-method").NullIfEmpty();
var paths = context.ParseResult.ValueForOption<string[]>("--request-path").NullIfEmpty();
var statusCodes = context.ParseResult.ValueForOption<int[]>("--status-code").NullIfEmpty();
var sources = context.ParseResult.ValueForOption<string[]>("--source").NullIfEmpty();

// negotiate a realtime connection
logger.LogInformation("Negotiating connection information ...");
var request = new RealtimeConnectionNegotiationRequest { Type = "websocket", Purpose = "logs", };
var response = await client.Realtime.NegotiateAsync(request, cancellationToken: cancellationToken);
response.EnsureSuccess();
var negotiation = response.Resource ?? throw new InvalidOperationException("Response from negotiation cannot be null or empty");

// start the handler
using var cts = negotiation.MakeCancellationTokenSource(cancellationToken);
cancellationToken = cts.Token;
await websocketHandler.StartAsync(negotiation, (msg, _) => HandleIncomingMessage(workspaceId, live, msg), cts);

// prepare filters
var filters = new RealtimeConnectionFilters
var options = new RealtimeNegotiationOptionsRequestLogs
{
Logs = new RealtimeConnectionFilterLogs
Filters = new RealtimeNegotiationFiltersRequestLogs
{
IPNetworks = ipNetworks,
IPAddresses = ipAddresses,
Methods = methods,
Paths = paths,
StatusCodes = statusCodes,
Sources = sources,
}.NullIfEmpty(),
}.NullIfEmpty();
},
};

// send message
var message = new RealtimeConnectionOutgoingMessage("subscribe_request_logs", filters);
await websocketHandler.SendMessageAsync(message, cancellationToken);
// negotiate a connection
logger.LogInformation("Negotiating connection information ...");
var response = await client.Realtime.NegotiateAsync(options, cancellationToken: cancellationToken);
response.EnsureSuccess();
var negotiation = response.Resource ?? throw new InvalidOperationException("Response from negotiation cannot be null or empty");

// start the handler
using var cts = negotiation.MakeCancellationTokenSource(cancellationToken);
cancellationToken = cts.Token;
await websocketHandler.StartAsync(negotiation, (msg, _) => HandleIncomingMessage(workspaceId, live, msg), cts);

// run until cancelled
await Task.Delay(Timeout.Infinite, cancellationToken);

return 0;
}

private Task HandleIncomingMessage(string workspaceId, bool live, RealtimeConnectionIncomingMessage message)
private static Task HandleIncomingMessage(string workspaceId, bool live, RealtimeMessage message)
{
var type = message.Type;
if (!string.Equals(type, "request_log", StringComparison.OrdinalIgnoreCase))
{
logger.LogWarning("Received unknown message of type {Type}", type);
return Task.CompletedTask;
}

var @object = message.Object ?? throw new InvalidOperationException("The message should have an object at this point");
var log = System.Text.Json.JsonSerializer.Deserialize(@object, FaluCliJsonSerializerContext.Default.RequestLog)!;
var url = $"https://dashboard.falu.io/{workspaceId}/developer/logs/{log.Id}?live={live.ToString().ToLowerInvariant()}";
Expand Down
8 changes: 4 additions & 4 deletions src/FaluCli/FaluCliJsonSerializerContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ namespace Falu;
[JsonSerializable(typeof(List<Client.MoneyStatements.MoneyStatement>))]
[JsonSerializable(typeof(Client.MoneyStatements.MoneyStatementUploadResponse))]

[JsonSerializable(typeof(Client.Realtime.RealtimeConnectionNegotiationRequest))]
[JsonSerializable(typeof(Client.Realtime.RealtimeConnectionNegotiation))]
[JsonSerializable(typeof(Client.Realtime.RealtimeMessage))]
[JsonSerializable(typeof(Client.Realtime.RealtimeNegotiationOptionsEvents))]
[JsonSerializable(typeof(Client.Realtime.RealtimeNegotiationOptionsRequestLogs))]
[JsonSerializable(typeof(Client.Realtime.RealtimeNegotiation))]

[JsonSerializable(typeof(Commands.RequestLogs.RequestLog))]
[JsonSerializable(typeof(Commands.Templates.TemplateInfo))]
Expand All @@ -21,8 +23,6 @@ namespace Falu;
[JsonSerializable(typeof(Oidc.OidcTokenResponse))]

[JsonSerializable(typeof(Updates.GitHubLatestRelease))]
[JsonSerializable(typeof(Websockets.RealtimeConnectionIncomingMessage))]
[JsonSerializable(typeof(Websockets.RealtimeConnectionOutgoingMessage))]

[JsonSourceGenerationOptions(
AllowTrailingCommas = true,
Expand Down
1 change: 0 additions & 1 deletion src/FaluCli/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
using Falu.Commands.Money.Statements;
using Falu.Commands.RequestLogs;
using Falu.Commands.Templates;
using Falu.Websockets;
using System.CommandLine.Builder;
using System.CommandLine.Hosting;
using Res = Falu.Properties.Resources;
Expand Down
1 change: 1 addition & 0 deletions src/FaluCli/Properties/launchSettings.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"$schema": "http://json.schemastore.org/launchsettings.json",
"profiles": {
"FaluCli": {
"commandName": "Project",
Expand Down
Loading