Skip to content

Commit

Permalink
fix: FasterKv cache expiration time is not effective
Browse files Browse the repository at this point in the history
  • Loading branch information
Memoyu committed Sep 26, 2024
1 parent 04b17e8 commit 022f9c8
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 72 deletions.
82 changes: 51 additions & 31 deletions src/EasyCaching.FasterKv/DefaultFasterKvCachingProvider.Async.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
using System.Threading;
using System.Threading.Tasks;
using EasyCaching.Core;
using EasyCaching.Core.Internal;
using EasyCaching.FasterKv.Internal;
using Microsoft.Extensions.Logging;

namespace EasyCaching.FasterKv
Expand All @@ -13,7 +15,7 @@ public override async Task<bool> BaseExistsAsync(string cacheKey, CancellationTo
{
ArgumentCheck.NotNullOrWhiteSpace(cacheKey, nameof(cacheKey));
using var session = GetSession();
var result = (await session.Session.ReadAsync(GetSpanByte(cacheKey), token: cancellationToken)).Complete();
var result = (await session.Session.ReadAsync(GetKeySpanByte(cacheKey), token: cancellationToken)).Complete();
return result.status.Found;
}

Expand All @@ -27,7 +29,7 @@ public override async Task BaseFlushAsync(CancellationToken cancellationToken =
{
await session.Session.DeleteAsync(ref iter.GetKey(), token: cancellationToken).ConfigureAwait(false);
}
}
}

public override Task<IEnumerable<string>> BaseGetAllKeysByPrefixAsync(string prefix, CancellationToken cancellationToken = default)
{
Expand All @@ -39,7 +41,7 @@ public override async Task<IDictionary<string, CacheValue<T>>> BaseGetAllAsync<T
{
ArgumentCheck.NotNullAndCountGTZero(cacheKeys, nameof(cacheKeys));
EnsureNotDispose();

using var session = GetSession();
var dic = new Dictionary<string, CacheValue<T>>();
foreach (var cacheKey in cacheKeys)
Expand All @@ -57,14 +59,14 @@ public override async Task<CacheValue<T>> BaseGetAsync<T>(string cacheKey, Func<
ArgumentCheck.NotNullOrWhiteSpace(cacheKey, nameof(cacheKey));
ArgumentCheck.NotNegativeOrZero(expiration, nameof(expiration));
EnsureNotDispose();

using var session = GetSession();
var result = await BaseGetInternalAsync<T>(session, cacheKey, cancellationToken);
if (result.HasValue)
{
return result;
}

var item = await dataRetriever();
if (item is not null || _options.CacheNulls)
{
Expand All @@ -80,7 +82,7 @@ public override async Task<object> BaseGetAsync(string cacheKey, Type type,
{
ArgumentCheck.NotNullOrWhiteSpace(cacheKey, nameof(cacheKey));
EnsureNotDispose();

using var session = GetSession();
var result = await BaseGetAsync<object>(cacheKey, cancellationToken);
return Convert.ChangeType(result.Value, type);
Expand All @@ -91,7 +93,7 @@ public override async Task<CacheValue<T>> BaseGetAsync<T>(string cacheKey,
{
ArgumentCheck.NotNullOrWhiteSpace(cacheKey, nameof(cacheKey));
EnsureNotDispose();

using var session = GetSession();
return await BaseGetInternalAsync<T>(session, cacheKey, cancellationToken);
}
Expand All @@ -104,17 +106,17 @@ public override async Task BaseRemoveAllAsync(IEnumerable<string> cacheKeys, Can
using var session = GetSession();
foreach (var cacheKey in cacheKeys)
{
await session.Session.DeleteAsync(GetSpanByte(cacheKey), token: cancellation).ConfigureAwait(false);
await session.Session.DeleteAsync(GetKeySpanByte(cacheKey), token: cancellation).ConfigureAwait(false);
}
}

public override async Task BaseRemoveAsync(string cacheKey, CancellationToken cancellationToken = default)
{
ArgumentCheck.NotNullOrWhiteSpace(cacheKey, nameof(cacheKey));
EnsureNotDispose();

using var session = GetSession();
await session.Session.DeleteAsync(GetSpanByte(cacheKey), token: cancellationToken).ConfigureAwait(false);
await session.Session.DeleteAsync(GetKeySpanByte(cacheKey), token: cancellationToken).ConfigureAwait(false);
}


Expand All @@ -128,7 +130,7 @@ public override async Task BaseSetAllAsync<T>(IDictionary<string, T> values, Tim
using var session = GetSession();
foreach (var kp in values)
{
await BaseSetInternalAsync<T>(session, kp.Key, kp.Value, cancellationToken);
await BaseSetInternalAsync<T>(session, kp.Key, kp.Value, expiration, cancellationToken);
}
}

Expand All @@ -141,7 +143,7 @@ public override async Task BaseSetAsync<T>(string cacheKey, T cacheValue, TimeSp
EnsureNotDispose();

using var session = GetSession();
await BaseSetInternalAsync(session, cacheKey, cacheValue, cancellationToken);
await BaseSetInternalAsync(session, cacheKey, cacheValue, expiration, cancellationToken);
}

public override async Task<bool> BaseTrySetAsync<T>(string cacheKey, T cacheValue, TimeSpan expiration,
Expand All @@ -156,35 +158,53 @@ public override async Task<bool> BaseTrySetAsync<T>(string cacheKey, T cacheValu
var result = await BaseGetAsync<T>(cacheKey, cancellationToken);
if (result.HasValue == false)
{
await BaseSetInternalAsync<T>(session, cacheKey, cacheValue, cancellationToken);
await BaseSetInternalAsync<T>(session, cacheKey, cacheValue, expiration, cancellationToken);
return true;
}

return false;
}
private async Task BaseSetInternalAsync<T>(ClientSessionWrap sessionWarp, string cacheKey, T cacheValue,

private async Task BaseSetInternalAsync<T>(ClientSessionWrap sessionWarp, string cacheKey, T cacheValue, TimeSpan expiration,
CancellationToken cancellationToken)
{
_ = await (await sessionWarp.Session.UpsertAsync(GetSpanByte(cacheKey),
GetSpanByte(cacheValue), token: cancellationToken)
.ConfigureAwait(false)).CompleteAsync(cancellationToken);
if (MaxRdSecond > 0)
{
var addSec = RandomHelper.GetNext(1, MaxRdSecond);
expiration.Add(new TimeSpan(0, 0, addSec));
}

var key = GetKeySpanByte(cacheKey);
var value = GetValueSpanByte(cacheValue, expiration);
_ = await (await sessionWarp.Session.UpsertAsync(key, value, token: cancellationToken).ConfigureAwait(false))
.CompleteAsync(cancellationToken);
}


private async Task<CacheValue<T>> BaseGetInternalAsync<T>(ClientSessionWrap session, string cacheKey, CancellationToken cancellationToken)
{
var result = (await session.Session.ReadAsync(GetSpanByte(cacheKey),
token: cancellationToken)
var key = GetKeySpanByte(cacheKey);
var result = (await session.Session.ReadAsync(key, token: cancellationToken)
.ConfigureAwait(false)).Complete();
if (result.status.Found)
{
if (_options.EnableLogging)
_logger?.LogInformation("Cache Hit : cacheKey = {CacheKey}", cacheKey);

CacheStats.OnHit();

return new CacheValue<T>(GetTValue<T>(ref result.output), true);
var cached = GetTValue<FasterKvCacheValue>(ref result.output);

if (cached.Expiration > DateTimeOffset.UtcNow.ToUnixTimeMilliseconds())
{
if (_options.EnableLogging)
_logger?.LogInformation("Cache Hit : cacheKey = {CacheKey}", cacheKey);

CacheStats.OnHit();

var value = _serializer.Deserialize<T>(cached.Value);
return new CacheValue<T>(value, true);
}
else
{
// delete expired cache
await session.Session.DeleteAsync(key, token: cancellationToken).ConfigureAwait(false);
}
}

CacheStats.OnMiss();
Expand All @@ -201,22 +221,22 @@ public override Task<IDictionary<string, CacheValue<T>>> BaseGetByPrefixAsync<T>
{
throw new NotSupportedException("BaseGetByPrefixAsync is not supported in FasterKv provider.");
}

public override Task<int> BaseGetCountAsync(string prefix = "", CancellationToken cancellationToken = default)
{
throw new NotSupportedException("BaseGetCountAsync is not supported in FasterKv provider.");
}

public override Task BaseRemoveByPrefixAsync(string prefix, CancellationToken cancellationToken = default)
{
throw new NotSupportedException("BaseRemoveByPrefixAsync is not supported in FasterKv provider.");
}

public override Task BaseRemoveByPatternAsync(string pattern, CancellationToken cancellationToken = default)
{
throw new NotSupportedException("BaseRemoveByPatternAsync is not supported in FasterKv provider.");
}

public override Task<TimeSpan> BaseGetExpirationAsync(string cacheKey,
CancellationToken cancellationToken = default)
{
Expand Down
Loading

0 comments on commit 022f9c8

Please sign in to comment.