Skip to content

Commit

Permalink
move Package method to MessageDelivery
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander Yolokhov committed Jan 29, 2024
1 parent 41ae5ca commit 22468ec
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 39 deletions.
2 changes: 1 addition & 1 deletion src/OpenSmc.Messaging.Contract/IMessageDelivery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public interface IMessageDelivery
object Message { get; }

string AccessObject { get; }
object Context { get; set; }
IMessageDelivery Package();

internal IMessageDelivery SetAccessObject(string accessObject, object address);
internal IMessageDelivery ChangeState(string state);
Expand Down
19 changes: 15 additions & 4 deletions src/OpenSmc.Messaging.Hub/MessageDelivery.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
using System.Collections.Immutable;
using System.Reflection;
using System.Text.Json.Serialization;
using Microsoft.Extensions.DependencyInjection;
using OpenSmc.Serialization;
using OpenSmc.ShortGuid;

namespace OpenSmc.Messaging;

public record MaskedRequest(IMessageDelivery Request, object HostAddress);
public abstract record MessageDelivery(object Sender, object Target) : IMessageDelivery
{
protected IMessageHub SenderHub;

public string Id { get; init; } = Guid.NewGuid().AsString();
public ImmutableDictionary<string, object> Properties { get; init; } = ImmutableDictionary<string, object>.Empty;
public string State { get; init; } = MessageDeliveryState.Submitted;
Expand All @@ -22,8 +25,6 @@ IMessageDelivery IMessageDelivery.ChangeState(string state)

public object AccessProvidedBy { get; init; }
public string AccessObject { get; init; } // TODO SMCv2: later on we might think about accessibility for this property (2023/10/04, Dmitry Kalabin)
[JsonIgnore]
public object Context { get; set; }

IMessageDelivery IMessageDelivery.SetAccessObject(string accessObject, object address) => this with { AccessObject = accessObject, AccessProvidedBy = address, };

Expand Down Expand Up @@ -76,6 +77,16 @@ private IMessageDelivery<TMessage> WithMessageImpl<TMessage>(TMessage message)
ForwardedTo = ForwardedTo,
};
}

public IMessageDelivery Package()
{
var serializationService = SenderHub.ServiceProvider.GetService<ISerializationService>();
if (serializationService == null)
return this;

return serializationService.SerializeDelivery(this);
}

}

public record MessageDelivery<TMessage>(object Sender, object Target, TMessage Message) : MessageDelivery(Sender, Target), IMessageDelivery<TMessage>
Expand All @@ -89,7 +100,7 @@ public MessageDelivery(TMessage message, PostOptions options)
: this(options.Sender, options.Target, message)
{
Properties = options.Properties;
Context = options.Context;
SenderHub = options.SenderHub;
}

protected override object GetMessage()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,9 @@
using Microsoft.Extensions.DependencyInjection;
using OpenSmc.Serialization;
using OpenSmc.Serialization;

namespace OpenSmc.Messaging;

public static class MessageDeliverySerializationExtension
{
public static IMessageDelivery Package(this IMessageDelivery delivery)
{
var senderHub = (IMessageHub)delivery.Context;
var serializationService = senderHub.ServiceProvider.GetService<ISerializationService>();
if (serializationService == null)
return delivery;

return SerializeDelivery(serializationService, delivery);
}

public static IMessageDelivery SerializeDelivery(this ISerializationService serializationService, IMessageDelivery delivery)
{
try
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Collections.Immutable;

namespace OpenSmc.Messaging;

public record PostOptions(object Sender, object Context)
public record PostOptions(object Sender, IMessageHub SenderHub)
{
public const string RequestId = nameof(RequestId);
internal object Target { get; init; }
Expand Down
6 changes: 3 additions & 3 deletions test/OpenSmc.Messaging.Hub.Test/MessageHubTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public async Task HelloWorld()
{
var host = GetHost();
var response = await host.AwaitResponse(new SayHelloRequest(), o => o.WithTarget(new HostAddress()));
response.Should().BeAssignableTo<IMessageDelivery<HelloEvent>>();
response.Should().BeOfType<HelloEvent>();
}


Expand All @@ -41,7 +41,7 @@ public async Task HelloWorldFromClient()
{
var client = Router.GetHostedHub(new ClientAddress(), c => c);
var response = await client.AwaitResponse(new SayHelloRequest(), o => o.WithTarget(new HostAddress()));
response.Should().BeAssignableTo<IMessageDelivery<HelloEvent>>();
response.Should().BeOfType<HelloEvent>();
}

[Fact]
Expand All @@ -53,7 +53,7 @@ public async Task ClientToServerWithMessageTraffic()
var overallMessageTask = clientOut.ToArray().GetAwaiter();

var response = await client.AwaitResponse(new SayHelloRequest(), o => o.WithTarget(new HostAddress()));
response.Should().BeAssignableTo<IMessageDelivery<HelloEvent>>();
response.Should().BeOfType<HelloEvent>();

await DisposeAsync();

Expand Down
41 changes: 25 additions & 16 deletions test/OpenSmc.Serialization.Test/SerializationTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,39 @@ public SerializationTest(ITestOutputHelper output) : base(output)
Services.AddMessageHubs(new RouterAddress(), hubConf => hubConf
.WithForwards(f => f
.RouteAddress<HostAddress>(d =>
{
var hostHub = f.Hub.GetHostedHub((HostAddress)d.Target, c => c);
var packagedDelivery = d.Package();
hostHub.DeliverMessage(packagedDelivery);
})
.RouteAddressToHub<ClientAddress>(d => f.Hub.GetHostedHub((ClientAddress)d.Target,
c => c
.AddSerialization(conf =>
conf.ForType<MyEvent>(s =>
s.WithMutation((value, context) => context.SetProperty("NewProp", "New"))))
))
{
var hostHub = f.Hub.GetHostedHub((HostAddress)d.Target, ConfigureHost);
var packagedDelivery = d.Package();
hostHub.DeliverMessage(packagedDelivery);
})
.RouteAddressToHub<ClientAddress>(d => f.Hub.GetHostedHub((ClientAddress)d.Target, ConfigureClient))
));
}

private static MessageHubConfiguration ConfigureHost(MessageHubConfiguration c)
{
return c;
}

private static MessageHubConfiguration ConfigureClient(MessageHubConfiguration c)
{
return c
.AddSerialization(conf =>
conf.ForType<MyEvent>(s =>
s.WithMutation((value, context) => context.SetProperty("NewProp", "New"))));
}

[Fact]
public async Task SimpleTest()
{
var client = Router.GetHostedHub(new ClientAddress(), c => c);
var clientOut = client.AddObservable();
var messageTask = clientOut.ToArray().GetAwaiter();
var host = Router.GetHostedHub(new HostAddress(), ConfigureHost);
var client = Router.GetHostedHub(new ClientAddress(), ConfigureClient);
var hostOut = host.AddObservable();
var messageTask = hostOut.ToArray().GetAwaiter();

client.Post(new MyEvent("Hello"), o => o.WithTarget(new HostAddress()));
await Task.Delay(2000.Milliseconds());
clientOut.OnCompleted();
await Task.Delay(200.Milliseconds());
hostOut.OnCompleted();

var events = await messageTask;
events.Should().HaveCount(1);
Expand Down

0 comments on commit 22468ec

Please sign in to comment.