Skip to content

Commit

Permalink
Using sync version of subscribe in HybridCachingProvider (dotnetcore#455
Browse files Browse the repository at this point in the history
)

* fix: using sync version of subscribe in HybridCachingProvider(dotnetcore#421)

* feat: enable SubscribeAsync
  • Loading branch information
Memoyu authored Feb 9, 2023
1 parent 3e321cc commit 5cf3e25
Show file tree
Hide file tree
Showing 10 changed files with 130 additions and 5 deletions.
15 changes: 15 additions & 0 deletions bus/EasyCaching.Bus.CSRedis/DefaultCSRedisBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,21 @@ public override void BaseSubscribe(string topic, Action<EasyCachingMessage> acti
);
}

/// <summary>
/// Subscribe the specified topic and action async.
/// </summary>
/// <param name="topic">Topic.</param>
/// <param name="action">Action.</param>
/// <param name="cancellationToken">Cancellation token.</param>
public override Task BaseSubscribeAsync(string topic, Action<EasyCachingMessage> action, CancellationToken cancellationToken = default(CancellationToken))
{
_client.Subscribe(
(topic, msg => OnMessage(msg.Body))
);
return Task.CompletedTask;
}


/// <summary>
/// Ons the message.
/// </summary>
Expand Down
26 changes: 23 additions & 3 deletions bus/EasyCaching.Bus.ConfluentKafka/DefaultConfluentKafkaBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
using Microsoft.Extensions.Options;

public class DefaultConfluentKafkaBus : EasyCachingAbstractBus
{
{


/// <summary>
Expand Down Expand Up @@ -77,7 +77,7 @@ public override void BasePublish(string topic, EasyCachingMessage message)
/// <param name="cancellationToken">Cancellation token.</param>
public override async Task BasePublishAsync(string topic, EasyCachingMessage message, CancellationToken cancellationToken = default(CancellationToken))
{
var msg = _serializer.Serialize(message);
var msg = _serializer.Serialize(message);

await _producer.ProduceAsync(topic, new Message<Null, byte[]> { Value = msg });
}
Expand All @@ -89,7 +89,27 @@ public override void BasePublish(string topic, EasyCachingMessage message)
/// <param name="action">Action.</param>
public override void BaseSubscribe(string topic, Action<EasyCachingMessage> action)
{
Task.Factory.StartNew(() =>
_ = StartConsumer(topic);
}

/// <summary>
/// Subscribe the specified topic and action async.
/// </summary>
/// <param name="topic">Topic.</param>
/// <param name="action">Action.</param>
/// <param name="cancellationToken">Cancellation token.</param>
public override async Task BaseSubscribeAsync(string topic, Action<EasyCachingMessage> action, CancellationToken cancellationToken = default(CancellationToken))
{
await StartConsumer(topic);
}

/// <summary>
/// Ons the consumer.
/// </summary>
/// <param name="topic">Topic</param>
private Task StartConsumer(string topic)
{
return Task.Factory.StartNew(() =>
{
for (int i = 0; i < this._kafkaBusOptions.ConsumerCount; i++)
{
Expand Down
24 changes: 23 additions & 1 deletion bus/EasyCaching.Bus.RabbitMQ/DefaultRabbitMQBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ IPooledObjectPolicy<IModel> _objectPolicy
RequestedConnectionTimeout = System.TimeSpan.FromMilliseconds(_options.RequestedConnectionTimeout),
SocketReadTimeout = System.TimeSpan.FromMilliseconds(_options.SocketReadTimeout),
SocketWriteTimeout = System.TimeSpan.FromMilliseconds(_options.SocketWriteTimeout),
ClientProvidedName = _options.ClientProvidedName
ClientProvidedName = _options.ClientProvidedName,
};

_subConnection = factory.CreateConnection();
Expand Down Expand Up @@ -156,6 +156,28 @@ public override void BaseSubscribe(string topic, Action<EasyCachingMessage> acti
}


/// <summary>
/// Subscribe the specified topic and action async.
/// </summary>
/// <param name="topic">Topic.</param>
/// <param name="action">Action.</param>
/// <param name="cancellationToken">Cancellation token.</param>
public override Task BaseSubscribeAsync(string topic, Action<EasyCachingMessage> action, CancellationToken cancellationToken = default(CancellationToken))
{
var queueName = string.Empty;
if (string.IsNullOrWhiteSpace(_options.QueueName))
{
queueName = $"rmq.queue.undurable.easycaching.subscriber.{_busId}";
}
else
{
queueName = _options.QueueName;
}

StartConsumer(queueName, topic);
return Task.CompletedTask;
}

private void StartConsumer(string queueName, string topic)
{
var model = _subConnection.CreateModel();
Expand Down
11 changes: 11 additions & 0 deletions bus/EasyCaching.Bus.Redis/DefaultRedisBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,5 +104,16 @@ public override void BaseSubscribe(string topic, Action<EasyCachingMessage> acti
{
_subscriber.Subscribe(topic, OnMessage);
}

/// <summary>
/// Subscribe the specified topic and action async.
/// </summary>
/// <param name="topic">Topic.</param>
/// <param name="action">Action.</param>
/// <param name="cancellationToken">Cancellation token.</param>
public override async Task BaseSubscribeAsync(string topic, Action<EasyCachingMessage> action, CancellationToken cancellationToken = default(CancellationToken))
{
await _subscriber.SubscribeAsync(topic, OnMessage);
}
}
}
12 changes: 12 additions & 0 deletions bus/EasyCaching.Bus.Zookeeper/DefaultZookeeperBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,18 @@ public override void BaseSubscribe(string topic, Action<EasyCachingMessage> acti
}, TaskCreationOptions.LongRunning);
}

/// <summary>
/// Subscribe the specified topic and action async.
/// </summary>
/// <param name="topic">Topic.</param>
/// <param name="action">Action.</param>
/// <param name="cancellationToken">Cancellation token.</param>
public override async Task BaseSubscribeAsync(string topic, Action<EasyCachingMessage> action, CancellationToken cancellationToken = default(CancellationToken))
{
var path = $"{topic}";
await SubscribeDataChangeAsync(path, SubscribeDataChange);
}

/// <summary>
/// Ons the message.
/// </summary>
Expand Down
11 changes: 11 additions & 0 deletions src/EasyCaching.Core/Bus/IEasyCachingSubscriber.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
namespace EasyCaching.Core.Bus
{
using System;
using System.Threading;
using System.Threading.Tasks;

/// <summary>
/// EasyCaching subscriber.
Expand All @@ -14,5 +16,14 @@ public interface IEasyCachingSubscriber
/// <param name="action">Action.</param>
/// <param name="reconnectAction"> Reconnect Action.</param>
void Subscribe(string topic, Action<EasyCachingMessage> action, Action reconnectAction = null);

/// <summary>
/// Subscribe the specified topic and action async.
/// </summary>
/// <param name="topic">Topic.</param>
/// <param name="action">Action.</param>
/// <param name="reconnectAction"> Reconnect Action.</param>
/// <param name="cancellationToken">Cancellation token.</param>
Task SubscribeAsync(string topic, Action<EasyCachingMessage> action, Action reconnectAction = null, CancellationToken cancellationToken = default(CancellationToken));
}
}
12 changes: 12 additions & 0 deletions src/EasyCaching.Core/Bus/NullEasyCachingBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,17 @@ public void Subscribe(string topic, Action<EasyCachingMessage> action, Action re
{

}

/// <summary>
/// Subscribe the specified topic and action async.
/// </summary>
/// <param name="topic">Topic.</param>
/// <param name="action">Action.</param>
/// <param name="reconnectAction">Reconnect Action.</param>
/// <param name="cancellationToken">Cancellation token.</param>
public Task SubscribeAsync(string topic, Action<EasyCachingMessage> action, Action reconnectAction = null, CancellationToken cancellationToken = default(CancellationToken))
{
return Task.CompletedTask;
}
}
}
8 changes: 8 additions & 0 deletions src/EasyCaching.Core/EasyCachingAbstractBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public abstract class EasyCachingAbstractBus : IEasyCachingBus
public abstract void BasePublish(string topic, EasyCachingMessage message);
public abstract Task BasePublishAsync(string topic, EasyCachingMessage message, CancellationToken cancellationToken = default(CancellationToken));
public abstract void BaseSubscribe(string topic, Action<EasyCachingMessage> action);
public abstract Task BaseSubscribeAsync(string topic, Action<EasyCachingMessage> action, CancellationToken cancellationToken = default(CancellationToken));

protected Action<EasyCachingMessage> _handler;

Expand Down Expand Up @@ -83,6 +84,13 @@ public void Subscribe(string topic, Action<EasyCachingMessage> action, Action re
BaseSubscribe(topic, action);
}

public async Task SubscribeAsync(string topic, Action<EasyCachingMessage> action, Action reconnectAction, CancellationToken cancellationToken = default(CancellationToken))
{
_handler = action;
_reconnectHandler = reconnectAction;
await BaseSubscribeAsync(topic, action);
}

public virtual void BaseOnMessage(EasyCachingMessage message)
{
var operationId = s_diagnosticListener.WriteSubscribeMessageBefore(new BeforeSubscribeMessageRequestEventData(message));
Expand Down
11 changes: 10 additions & 1 deletion src/EasyCaching.HybridCache/HybridCachingProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ string name
else this._distributedCache = distributed;

this._bus = bus ?? NullEasyCachingBus.Instance;
this._bus.Subscribe(_options.TopicName, OnMessage, OnReconnect);
_ = SubscribeAsync();

this._cacheId = Guid.NewGuid().ToString("N");

Expand All @@ -117,6 +117,15 @@ string name
_busAsyncWrap = Policy.WrapAsync(fallbackAsyncPolicy, retryAsyncPolicy);
}

/// <summary>
/// Subscribe the topic
/// </summary>
/// <returns></returns>
private async Task SubscribeAsync()
{
await _bus.SubscribeAsync(_options.TopicName, OnMessage, OnReconnect);
}

/// <summary>
/// Ons the message.
/// </summary>
Expand Down
5 changes: 5 additions & 0 deletions test/EasyCaching.UnitTests/Fake/FakeBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,10 @@ public void Subscribe(string topic, Action<EasyCachingMessage> action, Action re
{

}

public Task SubscribeAsync(string topic, Action<EasyCachingMessage> action, Action reconnectAction = null, CancellationToken cancellationToken = default(CancellationToken))
{
return Task.CompletedTask;
}
}
}

0 comments on commit 5cf3e25

Please sign in to comment.