Skip to content

Commit

Permalink
Improve accuracy and stability of ActivationWorkingSet (#8321)
Browse files Browse the repository at this point in the history
  • Loading branch information
ReubenBond authored Mar 7, 2023
1 parent 1ae5430 commit b1f02d5
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 33 deletions.
3 changes: 2 additions & 1 deletion src/Orleans.Runtime/Catalog/ActivationData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -657,9 +657,10 @@ public TExtensionInterface GetExtension<TExtensionInterface>()

bool IActivationWorkingSetMember.IsCandidateForRemoval(bool wouldRemove)
{
const int IdlenessLowerBound = 10_000;
lock (this)
{
var inactive = IsInactive;
var inactive = IsInactive && _idleDuration.ElapsedMilliseconds > IdlenessLowerBound;

// This instance will remain in the working set if it is either not pending removal or if it is currently active.
_isInWorkingSet = !wouldRemove || !inactive;
Expand Down
68 changes: 36 additions & 32 deletions src/Orleans.Runtime/Catalog/ActivationWorkingSet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Orleans.Internal;
using Orleans.Runtime.Internal;

namespace Orleans.Runtime
{
Expand All @@ -16,7 +17,7 @@ internal sealed class ActivationWorkingSet : IActivationWorkingSet, ILifecyclePa
{
private class MemberState
{
public bool IsMarkedForRemoval { get; set; }
public bool IsIdle { get; set; }
}

private readonly ConcurrentDictionary<IActivationWorkingSetMember, MemberState> _members = new();
Expand All @@ -33,7 +34,7 @@ public ActivationWorkingSet(
IEnumerable<IActivationWorkingSetObserver> observers)
{
_logger = logger;
_scanPeriodTimer = asyncTimerFactory.Create(TimeSpan.FromMilliseconds(100), nameof(ActivationWorkingSet) + "." + nameof(MonitorWorkingSet));
_scanPeriodTimer = asyncTimerFactory.Create(TimeSpan.FromMilliseconds(5_000), nameof(ActivationWorkingSet) + "." + nameof(MonitorWorkingSet));
_observers = observers.ToList();
CatalogInstruments.RegisterActivationWorkingSetObserve(() => Count);
}
Expand All @@ -42,23 +43,25 @@ public ActivationWorkingSet(

public void OnActivated(IActivationWorkingSetMember member)
{
if (!_members.TryAdd(member, new MemberState()))
if (_members.TryAdd(member, new MemberState()))
{
throw new InvalidOperationException($"Member {member} is already a member of the working set");
}
Interlocked.Increment(ref _activeCount);
foreach (var observer in _observers)
{
observer.OnAdded(member);
}

Interlocked.Increment(ref _activeCount);
foreach (var observer in _observers)
{
observer.OnAdded(member);
return;
}

throw new InvalidOperationException($"Member {member} is already a member of the working set");
}

public void OnActive(IActivationWorkingSetMember member)
{
if (_members.TryGetValue(member, out var state))
{
state.IsMarkedForRemoval = false;
state.IsIdle = false;
}
else if (_members.TryAdd(member, new()))
{
Expand Down Expand Up @@ -101,9 +104,27 @@ public void OnDeactivated(IActivationWorkingSetMember member)
}
}

private async Task MonitorWorkingSet()
{
while (await _scanPeriodTimer.NextTick())
{
foreach (var pair in _members)
{
try
{
VisitMember(pair.Key, pair.Value);
}
catch (Exception exception)
{
_logger.LogError(exception, "Exception visiting working set member {Member}", pair.Key);
}
}
}
}

private void VisitMember(IActivationWorkingSetMember member, MemberState state)
{
var wouldRemove = state.IsMarkedForRemoval;
var wouldRemove = state.IsIdle;
if (member.IsCandidateForRemoval(wouldRemove))
{
if (wouldRemove)
Expand All @@ -112,7 +133,7 @@ private void VisitMember(IActivationWorkingSetMember member, MemberState state)
}
else
{
state.IsMarkedForRemoval = true;
state.IsIdle = true;
foreach (var observer in _observers)
{
observer.OnIdle(member);
Expand All @@ -121,40 +142,23 @@ private void VisitMember(IActivationWorkingSetMember member, MemberState state)
}
else
{
state.IsMarkedForRemoval = false;
state.IsIdle = false;
foreach (var observer in _observers)
{
observer.OnActive(member);
}
}
}

private async Task MonitorWorkingSet()
{
while (await _scanPeriodTimer.NextTick())
{
foreach (var pair in _members)
{
try
{
VisitMember(pair.Key, pair.Value);
}
catch (Exception exception)
{
_logger.LogError(exception, "Exception visiting working set member {Member}", pair.Key);
}
}
}
}

void ILifecycleParticipant<ISiloLifecycle>.Participate(ISiloLifecycle lifecycle)
{
lifecycle.Subscribe(
nameof(ActivationWorkingSet),
ServiceLifecycleStage.BecomeActive,
ct =>
{
_runTask = Task.Run(this.MonitorWorkingSet);
using var _ = new ExecutionContextSuppressor();
_runTask = Task.Run(MonitorWorkingSet);
return Task.CompletedTask;
},
async ct =>
Expand Down

0 comments on commit b1f02d5

Please sign in to comment.