Skip to content
This repository was archived by the owner on Nov 17, 2023. It is now read-only.

Service Bus persisted connection for ISubscriptionClient #1521

Merged
merged 2 commits into from
Mar 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,27 +1,51 @@
using Microsoft.Azure.ServiceBus;
using Microsoft.Extensions.Logging;
using System;

namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus
{
public class DefaultServiceBusPersisterConnection : IServiceBusPersisterConnection
{
private readonly ILogger<DefaultServiceBusPersisterConnection> _logger;
private readonly ServiceBusConnectionStringBuilder _serviceBusConnectionStringBuilder;
private readonly string _subscriptionClientName;
private SubscriptionClient _subscriptionClient;
private ITopicClient _topicClient;

bool _disposed;

public DefaultServiceBusPersisterConnection(ServiceBusConnectionStringBuilder serviceBusConnectionStringBuilder,
ILogger<DefaultServiceBusPersisterConnection> logger)
string subscriptionClientName)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));

_serviceBusConnectionStringBuilder = serviceBusConnectionStringBuilder ??
throw new ArgumentNullException(nameof(serviceBusConnectionStringBuilder));
_subscriptionClientName = subscriptionClientName;
_subscriptionClient = new SubscriptionClient(_serviceBusConnectionStringBuilder, subscriptionClientName);
_topicClient = new TopicClient(_serviceBusConnectionStringBuilder, RetryPolicy.Default);
}

public ITopicClient TopicClient
{
get
{
if (_topicClient.IsClosedOrClosing)
{
_topicClient = new TopicClient(_serviceBusConnectionStringBuilder, RetryPolicy.Default);
}
return _topicClient;
}
}

public ISubscriptionClient SubscriptionClient
{
get
{
if (_subscriptionClient.IsClosedOrClosing)
{
_subscriptionClient = new SubscriptionClient(_serviceBusConnectionStringBuilder, _subscriptionClientName);
}
return _subscriptionClient;
}
}

public ServiceBusConnectionStringBuilder ServiceBusConnectionStringBuilder => _serviceBusConnectionStringBuilder;

public ITopicClient CreateModel()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,16 @@ public class EventBusServiceBus : IEventBus
private readonly IServiceBusPersisterConnection _serviceBusPersisterConnection;
private readonly ILogger<EventBusServiceBus> _logger;
private readonly IEventBusSubscriptionsManager _subsManager;
private readonly SubscriptionClient _subscriptionClient;
private readonly ILifetimeScope _autofac;
private readonly string AUTOFAC_SCOPE_NAME = "eshop_event_bus";
private const string INTEGRATION_EVENT_SUFFIX = "IntegrationEvent";

public EventBusServiceBus(IServiceBusPersisterConnection serviceBusPersisterConnection,
ILogger<EventBusServiceBus> logger, IEventBusSubscriptionsManager subsManager, string subscriptionClientName,
ILifetimeScope autofac)
ILogger<EventBusServiceBus> logger, IEventBusSubscriptionsManager subsManager, ILifetimeScope autofac)
{
_serviceBusPersisterConnection = serviceBusPersisterConnection;
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager();

_subscriptionClient = new SubscriptionClient(serviceBusPersisterConnection.ServiceBusConnectionStringBuilder,
subscriptionClientName);
_autofac = autofac;

RemoveDefaultRule();
Expand All @@ -51,9 +46,7 @@ public void Publish(IntegrationEvent @event)
Label = eventName,
};

var topicClient = _serviceBusPersisterConnection.CreateModel();

topicClient.SendAsync(message)
_serviceBusPersisterConnection.TopicClient.SendAsync(message)
.GetAwaiter()
.GetResult();
}
Expand All @@ -77,7 +70,7 @@ public void Subscribe<T, TH>()
{
try
{
_subscriptionClient.AddRuleAsync(new RuleDescription
_serviceBusPersisterConnection.SubscriptionClient.AddRuleAsync(new RuleDescription
{
Filter = new CorrelationFilter { Label = eventName },
Name = eventName
Expand All @@ -102,10 +95,11 @@ public void Unsubscribe<T, TH>()

try
{
_subscriptionClient
.RemoveRuleAsync(eventName)
.GetAwaiter()
.GetResult();
_serviceBusPersisterConnection
.SubscriptionClient
.RemoveRuleAsync(eventName)
.GetAwaiter()
.GetResult();
}
catch (MessagingEntityNotFoundException)
{
Expand All @@ -132,7 +126,7 @@ public void Dispose()

private void RegisterSubscriptionClientMessageHandler()
{
_subscriptionClient.RegisterMessageHandler(
_serviceBusPersisterConnection.SubscriptionClient.RegisterMessageHandler(
async (message, token) =>
{
var eventName = $"{message.Label}{INTEGRATION_EVENT_SUFFIX}";
Expand All @@ -141,7 +135,7 @@ private void RegisterSubscriptionClientMessageHandler()
// Complete the message so that it is not received again.
if (await ProcessEvent(eventName, messageData))
{
await _subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
await _serviceBusPersisterConnection.SubscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
}
},
new MessageHandlerOptions(ExceptionReceivedHandler) { MaxConcurrentCalls = 10, AutoComplete = false });
Expand Down Expand Up @@ -194,10 +188,11 @@ private void RemoveDefaultRule()
{
try
{
_subscriptionClient
.RemoveRuleAsync(RuleDescription.DefaultRuleName)
.GetAwaiter()
.GetResult();
_serviceBusPersisterConnection
.SubscriptionClient
.RemoveRuleAsync(RuleDescription.DefaultRuleName)
.GetAwaiter()
.GetResult();
}
catch (MessagingEntityNotFoundException)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@

public interface IServiceBusPersisterConnection : IDisposable
{
ServiceBusConnectionStringBuilder ServiceBusConnectionStringBuilder { get; }

ITopicClient CreateModel();
ITopicClient TopicClient { get; }
ISubscriptionClient SubscriptionClient { get; }
}
}
10 changes: 4 additions & 6 deletions src/Services/Basket/Basket.API/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,11 @@ public virtual IServiceProvider ConfigureServices(IServiceCollection services)
{
services.AddSingleton<IServiceBusPersisterConnection>(sp =>
{
var logger = sp.GetRequiredService<ILogger<DefaultServiceBusPersisterConnection>>();

var serviceBusConnectionString = Configuration["EventBusConnection"];
var serviceBusConnection = new ServiceBusConnectionStringBuilder(serviceBusConnectionString);

return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger);
var subscriptionClientName = Configuration["SubscriptionClientName"];
return new DefaultServiceBusPersisterConnection(serviceBusConnection, subscriptionClientName);
});
}
else
Expand Down Expand Up @@ -278,8 +277,6 @@ protected virtual void ConfigureAuth(IApplicationBuilder app)

private void RegisterEventBus(IServiceCollection services)
{
var subscriptionClientName = Configuration["SubscriptionClientName"];

if (Configuration.GetValue<bool>("AzureServiceBusEnabled"))
{
services.AddSingleton<IEventBus, EventBusServiceBus>(sp =>
Expand All @@ -290,13 +287,14 @@ private void RegisterEventBus(IServiceCollection services)
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();

return new EventBusServiceBus(serviceBusPersisterConnection, logger,
eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope);
eventBusSubcriptionsManager, iLifetimeScope);
});
}
else
{
services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp =>
{
var subscriptionClientName = Configuration["SubscriptionClientName"];
var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();
Expand Down
10 changes: 4 additions & 6 deletions src/Services/Catalog/Catalog.API/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -280,11 +280,10 @@ public static IServiceCollection AddIntegrationServices(this IServiceCollection
services.AddSingleton<IServiceBusPersisterConnection>(sp =>
{
var settings = sp.GetRequiredService<IOptions<CatalogSettings>>().Value;
var logger = sp.GetRequiredService<ILogger<DefaultServiceBusPersisterConnection>>();

var serviceBusConnection = new ServiceBusConnectionStringBuilder(settings.EventBusConnection);
var subscriptionClientName = configuration["SubscriptionClientName"];

return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger);
return new DefaultServiceBusPersisterConnection(serviceBusConnection, subscriptionClientName);
});
}
else
Expand Down Expand Up @@ -325,8 +324,6 @@ public static IServiceCollection AddIntegrationServices(this IServiceCollection

public static IServiceCollection AddEventBus(this IServiceCollection services, IConfiguration configuration)
{
var subscriptionClientName = configuration["SubscriptionClientName"];

if (configuration.GetValue<bool>("AzureServiceBusEnabled"))
{
services.AddSingleton<IEventBus, EventBusServiceBus>(sp =>
Expand All @@ -337,14 +334,15 @@ public static IServiceCollection AddEventBus(this IServiceCollection services, I
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();

return new EventBusServiceBus(serviceBusPersisterConnection, logger,
eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope);
eventBusSubcriptionsManager, iLifetimeScope);
});

}
else
{
services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp =>
{
var subscriptionClientName = configuration["SubscriptionClientName"];
var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();
Expand Down
10 changes: 4 additions & 6 deletions src/Services/Ordering/Ordering.API/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -297,12 +297,11 @@ public static IServiceCollection AddCustomIntegrations(this IServiceCollection s
{
services.AddSingleton<IServiceBusPersisterConnection>(sp =>
{
var logger = sp.GetRequiredService<ILogger<DefaultServiceBusPersisterConnection>>();

var serviceBusConnectionString = configuration["EventBusConnection"];
var serviceBusConnection = new ServiceBusConnectionStringBuilder(serviceBusConnectionString);
var subscriptionClientName = configuration["SubscriptionClientName"];

return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger);
return new DefaultServiceBusPersisterConnection(serviceBusConnection, subscriptionClientName);
});
}
else
Expand Down Expand Up @@ -368,8 +367,6 @@ public static IServiceCollection AddCustomConfiguration(this IServiceCollection

public static IServiceCollection AddEventBus(this IServiceCollection services, IConfiguration configuration)
{
var subscriptionClientName = configuration["SubscriptionClientName"];

if (configuration.GetValue<bool>("AzureServiceBusEnabled"))
{
services.AddSingleton<IEventBus, EventBusServiceBus>(sp =>
Expand All @@ -380,13 +377,14 @@ public static IServiceCollection AddEventBus(this IServiceCollection services, I
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();

return new EventBusServiceBus(serviceBusPersisterConnection, logger,
eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope);
eventBusSubcriptionsManager, iLifetimeScope);
});
}
else
{
services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp =>
{
var subscriptionClientName = configuration["SubscriptionClientName"];
var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,10 @@ public static IServiceCollection AddEventBus(this IServiceCollection services, I
{
services.AddSingleton<IServiceBusPersisterConnection>(sp =>
{
var logger = sp.GetRequiredService<ILogger<DefaultServiceBusPersisterConnection>>();

var serviceBusConnectionString = configuration["EventBusConnection"];
var serviceBusConnection = new ServiceBusConnectionStringBuilder(serviceBusConnectionString);

return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger);
return new DefaultServiceBusPersisterConnection(serviceBusConnection, subscriptionClientName);
});

services.AddSingleton<IEventBus, EventBusServiceBus>(sp =>
Expand All @@ -68,7 +66,7 @@ public static IServiceCollection AddEventBus(this IServiceCollection services, I
var logger = sp.GetRequiredService<ILogger<EventBusServiceBus>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();

return new EventBusServiceBus(serviceBusPersisterConnection, logger, eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope);
return new EventBusServiceBus(serviceBusPersisterConnection, logger, eventBusSubcriptionsManager, iLifetimeScope);
});
}
else
Expand Down
11 changes: 5 additions & 6 deletions src/Services/Ordering/Ordering.SignalrHub/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ public IServiceProvider ConfigureServices(IServiceCollection services)
{
services.AddSingleton<IServiceBusPersisterConnection>(sp =>
{
var logger = sp.GetRequiredService<ILogger<DefaultServiceBusPersisterConnection>>();

var serviceBusConnectionString = Configuration["EventBusConnection"];
var serviceBusConnection = new ServiceBusConnectionStringBuilder(serviceBusConnectionString);

return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger);
var subscriptionClientName = Configuration["SubscriptionClientName"];

return new DefaultServiceBusPersisterConnection(serviceBusConnection, subscriptionClientName);
});
}
else
Expand Down Expand Up @@ -205,8 +205,6 @@ private void ConfigureAuthService(IServiceCollection services)

private void RegisterEventBus(IServiceCollection services)
{
var subscriptionClientName = Configuration["SubscriptionClientName"];

if (Configuration.GetValue<bool>("AzureServiceBusEnabled"))
{
services.AddSingleton<IEventBus, EventBusServiceBus>(sp =>
Expand All @@ -217,13 +215,14 @@ private void RegisterEventBus(IServiceCollection services)
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();

return new EventBusServiceBus(serviceBusPersisterConnection, logger,
eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope);
eventBusSubcriptionsManager, iLifetimeScope);
});
}
else
{
services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp =>
{
var subscriptionClientName = Configuration["SubscriptionClientName"];
var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();
Expand Down
10 changes: 4 additions & 6 deletions src/Services/Payment/Payment.API/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,11 @@ public IServiceProvider ConfigureServices(IServiceCollection services)
{
services.AddSingleton<IServiceBusPersisterConnection>(sp =>
{
var logger = sp.GetRequiredService<ILogger<DefaultServiceBusPersisterConnection>>();

var serviceBusConnectionString = Configuration["EventBusConnection"];
var serviceBusConnection = new ServiceBusConnectionStringBuilder(serviceBusConnectionString);
var subscriptionClientName = Configuration["SubscriptionClientName"];

return new DefaultServiceBusPersisterConnection(serviceBusConnection, logger);
return new DefaultServiceBusPersisterConnection(serviceBusConnection, subscriptionClientName);
});
}
else
Expand Down Expand Up @@ -123,8 +122,6 @@ private void RegisterAppInsights(IServiceCollection services)

private void RegisterEventBus(IServiceCollection services)
{
var subscriptionClientName = Configuration["SubscriptionClientName"];

if (Configuration.GetValue<bool>("AzureServiceBusEnabled"))
{
services.AddSingleton<IEventBus, EventBusServiceBus>(sp =>
Expand All @@ -135,13 +132,14 @@ private void RegisterEventBus(IServiceCollection services)
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();

return new EventBusServiceBus(serviceBusPersisterConnection, logger,
eventBusSubcriptionsManager, subscriptionClientName, iLifetimeScope);
eventBusSubcriptionsManager, iLifetimeScope);
});
}
else
{
services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp =>
{
var subscriptionClientName = Configuration["SubscriptionClientName"];
var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();
Expand Down
Loading