diff --git a/src/System.Private.CoreLib/src/System.Private.CoreLib.csproj b/src/System.Private.CoreLib/src/System.Private.CoreLib.csproj index 027ede670a9..0b71ccc51f1 100644 --- a/src/System.Private.CoreLib/src/System.Private.CoreLib.csproj +++ b/src/System.Private.CoreLib/src/System.Private.CoreLib.csproj @@ -268,11 +268,12 @@ - + + diff --git a/src/System.Private.CoreLib/src/System/Threading/ClrThreadPool.WorkerThread.cs b/src/System.Private.CoreLib/src/System/Threading/ClrThreadPool.WorkerThread.cs index fc55b861228..f5b2b41dec8 100644 --- a/src/System.Private.CoreLib/src/System/Threading/ClrThreadPool.WorkerThread.cs +++ b/src/System.Private.CoreLib/src/System/Threading/ClrThreadPool.WorkerThread.cs @@ -3,7 +3,6 @@ // See the LICENSE file in the project root for more information. using System.Globalization; -using Internal.LowLevelLinq; using Internal.Runtime.Augments; namespace System.Threading @@ -18,8 +17,16 @@ private static class WorkerThread /// /// Semaphore for controlling how many threads are currently working. /// - private static LowLevelLifoSemaphore s_semaphore = new LowLevelLifoSemaphore(0, MaxPossibleThreadCount); - + private static LowLevelLifoSemaphore s_semaphore = new LowLevelLifoSemaphore(0, MaxPossibleThreadCount, SemaphoreSpinCount); + + /// + /// Maximum number of spins a thread pool worker thread performs before waiting for work + /// + private static int SemaphoreSpinCount + { + get => AppContextConfigHelper.GetInt16Config("ThreadPool_UnfairSemaphoreSpinLimit", 70, false); + } + private static void WorkerThreadStart() { ClrThreadPoolEventSource.Log.WorkerThreadStart(ThreadCounts.VolatileReadCounts(ref ThreadPoolInstance._separated.counts).numExistingThreads); diff --git a/src/System.Private.CoreLib/src/System/Threading/LowLevelLifoSemaphore.Unix.cs b/src/System.Private.CoreLib/src/System/Threading/LowLevelLifoSemaphore.Unix.cs index ce3a097a1ac..d817d8c958b 100644 --- a/src/System.Private.CoreLib/src/System/Threading/LowLevelLifoSemaphore.Unix.cs +++ b/src/System.Private.CoreLib/src/System/Threading/LowLevelLifoSemaphore.Unix.cs @@ -2,33 +2,35 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. +using System.Collections.Generic; + namespace System.Threading { /// /// A LIFO semaphore. /// Waits on this semaphore are uninterruptible. /// - internal sealed class LowLevelLifoSemaphore : IDisposable + internal sealed partial class LowLevelLifoSemaphore : IDisposable { private WaitSubsystem.WaitableObject _semaphore; - public LowLevelLifoSemaphore(int initialSignalCount, int maximumSignalCount) + private void Create(int maximumSignalCount) { - _semaphore = WaitSubsystem.WaitableObject.NewSemaphore(initialSignalCount, maximumSignalCount); + _semaphore = WaitSubsystem.WaitableObject.NewSemaphore(0, maximumSignalCount); } - public bool Wait(int timeoutMs) + public void Dispose() { - return WaitSubsystem.Wait(_semaphore, timeoutMs, false, true); } - public int Release(int count) + private bool WaitCore(int timeoutMs) { - return WaitSubsystem.ReleaseSemaphore(_semaphore, count); + return WaitSubsystem.Wait(_semaphore, timeoutMs, false, true); } - public void Dispose() + private void ReleaseCore(int count) { + WaitSubsystem.ReleaseSemaphore(_semaphore, count); } } } diff --git a/src/System.Private.CoreLib/src/System/Threading/LowLevelLifoSemaphore.Windows.cs b/src/System.Private.CoreLib/src/System/Threading/LowLevelLifoSemaphore.Windows.cs index 2ce93146242..34d2895dd7d 100644 --- a/src/System.Private.CoreLib/src/System/Threading/LowLevelLifoSemaphore.Windows.cs +++ b/src/System.Private.CoreLib/src/System/Threading/LowLevelLifoSemaphore.Windows.cs @@ -15,15 +15,13 @@ namespace System.Threading /// See https://msdn.microsoft.com/en-us/library/windows/desktop/aa365198(v=vs.85).aspx under How I/O Completion Ports Work. /// From the docs "Threads that block their execution on an I/O completion port are released in last-in-first-out (LIFO) order." /// - internal sealed class LowLevelLifoSemaphore : IDisposable + internal sealed partial class LowLevelLifoSemaphore : IDisposable { private IntPtr _completionPort; - public LowLevelLifoSemaphore(int initialSignalCount, int maximumSignalCount) + private void Create(int maximumSignalCount) { - Debug.Assert(initialSignalCount >= 0, "Windows LowLevelLifoSemaphore does not support a negative signal count"); // TODO: Track actual signal count to enable this Debug.Assert(maximumSignalCount > 0); - Debug.Assert(initialSignalCount <= maximumSignalCount); _completionPort = Interop.Kernel32.CreateIoCompletionPort(new IntPtr(-1), IntPtr.Zero, UIntPtr.Zero, maximumSignalCount); @@ -34,7 +32,6 @@ public LowLevelLifoSemaphore(int initialSignalCount, int maximumSignalCount) exception.HResult = error; throw exception; } - Release(initialSignalCount); } ~LowLevelLifoSemaphore() @@ -45,7 +42,7 @@ public LowLevelLifoSemaphore(int initialSignalCount, int maximumSignalCount) } } - public bool Wait(int timeoutMs) + public bool WaitCore(int timeoutMs) { Debug.Assert(timeoutMs >= -1); @@ -54,7 +51,7 @@ public bool Wait(int timeoutMs) return success; } - public int Release(int count) + public void ReleaseCore(int count) { Debug.Assert(count > 0); @@ -68,7 +65,6 @@ public int Release(int count) throw exception; } } - return 0; // TODO: Track actual signal count to calculate this } public void Dispose() diff --git a/src/System.Private.CoreLib/src/System/Threading/LowLevelLifoSemaphore.cs b/src/System.Private.CoreLib/src/System/Threading/LowLevelLifoSemaphore.cs new file mode 100644 index 00000000000..ce1c04026ee --- /dev/null +++ b/src/System.Private.CoreLib/src/System/Threading/LowLevelLifoSemaphore.cs @@ -0,0 +1,296 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Diagnostics; +using System.Runtime.InteropServices; +using Internal.Runtime.Augments; + +namespace System.Threading +{ + /// + /// A LIFO semaphore. + /// Waits on this semaphore are uninterruptible. + /// + internal sealed partial class LowLevelLifoSemaphore : IDisposable + { + private CacheLineSeparatedCounts _separated; + + private int _maximumSignalCount; + private int _spinCount; + + private const int SpinSleep0Threshold = 10; + + public LowLevelLifoSemaphore(int initialSignalCount, int maximumSignalCount, int spinCount) + { + Debug.Assert(initialSignalCount >= 0); + Debug.Assert(initialSignalCount <= maximumSignalCount); + Debug.Assert(maximumSignalCount > 0); + Debug.Assert(spinCount >= 0); + + _separated = new CacheLineSeparatedCounts(); + _separated._counts._signalCount = (uint)initialSignalCount; + _maximumSignalCount = maximumSignalCount; + _spinCount = spinCount; + + Create(maximumSignalCount); + } + + public bool Wait(int timeoutMs) + { + Debug.Assert(timeoutMs >= -1); + + // Try to acquire the semaphore or + // a) register as a spinner if spinCount > 0 and timeoutMs > 0 + // b) register as a waiter if there's already too many spinners or spinCount == 0 and timeoutMs > 0 + // c) bail out if timeoutMs == 0 and return false + Counts counts = _separated._counts; + while (true) + { + Debug.Assert(counts._signalCount <= _maximumSignalCount); + Counts newCounts = counts; + + if (counts._signalCount != 0) + { + newCounts._signalCount--; + } + else if (timeoutMs != 0) + { + if (_spinCount > 0 && newCounts._spinnerCount < byte.MaxValue) + { + newCounts._spinnerCount++; + } + else + { + // Maximum number of spinners reached, register as a waiter instead + newCounts._waiterCount++; + Debug.Assert(newCounts._waiterCount != 0); // overflow check, this many waiters is currently not supported + } + } + + Counts countsBeforeUpdate = _separated._counts.CompareExchange(newCounts, counts); + if (countsBeforeUpdate == counts) + { + if (counts._signalCount != 0) + { + return true; + } + if (newCounts._waiterCount != counts._waiterCount) + { + return WaitForSignal(timeoutMs); + } + if (timeoutMs == 0) + { + return false; + } + break; + } + + counts = countsBeforeUpdate; + } + + int processorCount = PlatformHelper.ProcessorCount; + int spinIndex = processorCount > 1 ? 0 : SpinSleep0Threshold; + while (spinIndex < _spinCount) + { + LowLevelSpinWaiter.Wait(spinIndex, SpinSleep0Threshold, processorCount); + spinIndex++; + + // Try to acquire the semaphore and unregister as a spinner + counts = _separated._counts; + while (counts._signalCount > 0) + { + Counts newCounts = counts; + newCounts._signalCount--; + newCounts._spinnerCount--; + + Counts countsBeforeUpdate = _separated._counts.CompareExchange(newCounts, counts); + if (countsBeforeUpdate == counts) + { + return true; + } + + counts = countsBeforeUpdate; + } + } + + // Unregister as spinner and acquire the semaphore or register as a waiter + counts = _separated._counts; + while (true) + { + Counts newCounts = counts; + newCounts._spinnerCount--; + if (counts._signalCount != 0) + { + newCounts._signalCount--; + } + else + { + newCounts._waiterCount++; + Debug.Assert(newCounts._waiterCount != 0); // overflow check, this many waiters is currently not supported + } + + Counts countsBeforeUpdate = _separated._counts.CompareExchange(newCounts, counts); + if (countsBeforeUpdate == counts) + { + return counts._signalCount != 0 || WaitForSignal(timeoutMs); + } + + counts = countsBeforeUpdate; + } + } + + + public void Release(int releaseCount) + { + Debug.Assert(releaseCount > 0); + Debug.Assert(releaseCount <= _maximumSignalCount); + + int countOfWaitersToWake; + Counts counts = _separated._counts; + while (true) + { + Counts newCounts = counts; + + // Increase the signal count. The addition doesn't overflow because of the limit on the max signal count in constructor. + newCounts._signalCount += (uint)releaseCount; + Debug.Assert(newCounts._signalCount > counts._signalCount); + + // Determine how many waiters to wake, taking into account how many spinners and waiters there are and how many waiters + // have previously been signaled to wake but have not yet woken + countOfWaitersToWake = + (int)Math.Min(newCounts._signalCount, (uint)newCounts._waiterCount + newCounts._spinnerCount) - + newCounts._spinnerCount - + newCounts._countOfWaitersSignaledToWake; + if (countOfWaitersToWake > 0) + { + // Ideally, limiting to a maximum of releaseCount would not be necessary and could be an assert instead, but since + // WaitForSignal() does not have enough information to tell whether a woken thread was signaled, and due to the cap + // below, it's possible for countOfWaitersSignaledToWake to be less than the number of threads that have actually + // been signaled to wake. + if (countOfWaitersToWake > releaseCount) + { + countOfWaitersToWake = releaseCount; + } + + // Cap countOfWaitersSignaledToWake to its max value. It's ok to ignore some woken threads in this count, it just + // means some more threads will be woken next time. Typically, it won't reach the max anyway. + newCounts._countOfWaitersSignaledToWake += (byte)Math.Min(countOfWaitersToWake, byte.MaxValue); + if (newCounts._countOfWaitersSignaledToWake <= counts._countOfWaitersSignaledToWake) + { + newCounts._countOfWaitersSignaledToWake = byte.MaxValue; + } + } + + Counts countsBeforeUpdate = _separated._counts.CompareExchange(newCounts, counts); + if (countsBeforeUpdate == counts) + { + Debug.Assert(releaseCount <= _maximumSignalCount - counts._signalCount); + if (countOfWaitersToWake > 0) + ReleaseCore(countOfWaitersToWake); + return; + } + + counts = countsBeforeUpdate; + } + } + + private bool WaitForSignal(int timeoutMs) + { + Debug.Assert(timeoutMs > 0 || timeoutMs == -1); + + while (true) + { + if (!WaitCore(timeoutMs)) + { + // Unregister the waiter. The wait subsystem used above guarantees that a thread that wakes due to a timeout does + // not observe a signal to the object being waited upon. + Counts toSubtract = new Counts(); + toSubtract._waiterCount++; + Counts newCounts = _separated._counts.Subtract(toSubtract); + Debug.Assert(newCounts._waiterCount != ushort.MaxValue); // Check for underflow + return false; + } + + // Unregister the waiter if this thread will not be waiting anymore, and try to acquire the semaphore + Counts counts = _separated._counts; + while (true) + { + Debug.Assert(counts._waiterCount != 0); + Counts newCounts = counts; + if (counts._signalCount != 0) + { + --newCounts._signalCount; + --newCounts._waiterCount; + } + + // This waiter has woken up and this needs to be reflected in the count of waiters signaled to wake + if (counts._countOfWaitersSignaledToWake != 0) + { + --newCounts._countOfWaitersSignaledToWake; + } + + Counts countsBeforeUpdate = _separated._counts.CompareExchange(newCounts, counts); + if (countsBeforeUpdate == counts) + { + if (counts._signalCount != 0) + { + return true; + } + break; + } + + counts = countsBeforeUpdate; + } + } + } + + [StructLayout(LayoutKind.Explicit)] + private struct Counts + { + [FieldOffset(0)] + public uint _signalCount; + [FieldOffset(4)] + public ushort _waiterCount; + [FieldOffset(6)] + public byte _spinnerCount; + [FieldOffset(8)] + public byte _countOfWaitersSignaledToWake; + + [FieldOffset(0)] + private long _asLong; + + public Counts CompareExchange(Counts newCounts, Counts oldCounts) + { + return new Counts { _asLong = Interlocked.CompareExchange(ref _asLong, newCounts._asLong, oldCounts._asLong) }; + } + + public Counts Subtract(Counts subtractCounts) + { + return new Counts { _asLong = Interlocked.Add(ref _asLong, -subtractCounts._asLong) }; + } + + public static bool operator ==(Counts lhs, Counts rhs) => lhs._asLong == rhs._asLong; + + public static bool operator !=(Counts lhs, Counts rhs) => lhs._asLong != rhs._asLong; + + public override bool Equals(object obj) + { + return obj is Counts counts && this._asLong == counts._asLong; + } + + public override int GetHashCode() + { + return (int)(_asLong >> 8); + } + } + + [StructLayout(LayoutKind.Sequential)] + private struct CacheLineSeparatedCounts + { + private Internal.PaddingFor32 _pad1; + public Counts _counts; + private Internal.PaddingFor32 _pad2; + } + } +} diff --git a/src/System.Private.CoreLib/src/System/Threading/LowLevelLock.cs b/src/System.Private.CoreLib/src/System/Threading/LowLevelLock.cs index 719541856fe..ea30900667b 100644 --- a/src/System.Private.CoreLib/src/System/Threading/LowLevelLock.cs +++ b/src/System.Private.CoreLib/src/System/Threading/LowLevelLock.cs @@ -36,7 +36,7 @@ internal sealed class LowLevelLock : IDisposable /// private bool _isAnyWaitingThreadSignaled; - private FirstLevelSpinWaiter _spinWaiter; + private LowLevelSpinWaiter _spinWaiter; private readonly Func _spinWaitTryAcquireCallback; private readonly LowLevelMonitor _monitor; @@ -46,8 +46,7 @@ public LowLevelLock() _ownerThread = null; #endif - _spinWaiter = new FirstLevelSpinWaiter(); - _spinWaiter.Initialize(); + _spinWaiter = new LowLevelSpinWaiter(); _spinWaitTryAcquireCallback = SpinWaitTryAcquireCallback; _monitor = new LowLevelMonitor(); } diff --git a/src/System.Private.CoreLib/src/System/Threading/FirstLevelSpinWaiter.cs b/src/System.Private.CoreLib/src/System/Threading/LowLevelSpinWaiter.cs similarity index 55% rename from src/System.Private.CoreLib/src/System/Threading/FirstLevelSpinWaiter.cs rename to src/System.Private.CoreLib/src/System/Threading/LowLevelSpinWaiter.cs index 936ffc20bb0..95354abcffd 100644 --- a/src/System.Private.CoreLib/src/System/Threading/FirstLevelSpinWaiter.cs +++ b/src/System.Private.CoreLib/src/System/Threading/LowLevelSpinWaiter.cs @@ -14,31 +14,19 @@ namespace System.Threading /// /// Used by the wait subsystem on Unix, so this class cannot have any dependencies on the wait subsystem. /// - internal struct FirstLevelSpinWaiter + internal struct LowLevelSpinWaiter { // TODO: Tune these values private const int SpinCount = 8; - private const int SpinYieldThreshold = 4; - private const int SpinSleep0Threshold = 6; - - private static int s_processorCount; + private const int SpinSleep0Threshold = 4; private int _spinningThreadCount; - public void Initialize() - { - if (s_processorCount == 0) - { - s_processorCount = Environment.ProcessorCount; - } - } - public bool SpinWaitForCondition(Func condition) { Debug.Assert(condition != null); - Debug.Assert(s_processorCount > 0); - int processorCount = s_processorCount; + int processorCount = Runtime.RuntimeImports.RhGetProcessCpuCount(); int spinningThreadCount = Interlocked.Increment(ref _spinningThreadCount); try { @@ -49,10 +37,10 @@ public bool SpinWaitForCondition(Func condition) { // For uniprocessor systems, start at the yield threshold since the pause instructions used for waiting // prior to that threshold would not help other threads make progress - for (int spinIndex = processorCount > 1 ? 0 : SpinYieldThreshold; spinIndex < SpinCount; ++spinIndex) + for (int spinIndex = processorCount > 1 ? 0 : SpinSleep0Threshold; spinIndex < SpinCount; ++spinIndex) { // The caller should check the condition in a fast path before calling this method, so wait first - Wait(spinIndex); + Wait(spinIndex, SpinSleep0Threshold, processorCount); if (condition()) { @@ -69,24 +57,43 @@ public bool SpinWaitForCondition(Func condition) return false; } - private static void Wait(int spinIndex) + public static void Wait(int spinIndex, int sleep0Threshold, int processorCount) { - Debug.Assert(SpinYieldThreshold < SpinSleep0Threshold); + Debug.Assert(spinIndex >= 0); + Debug.Assert(sleep0Threshold >= 0); - if (spinIndex < SpinYieldThreshold) - { - RuntimeThread.SpinWait(1 << spinIndex); - return; - } - - if (spinIndex < SpinSleep0Threshold && RuntimeThread.Yield()) + // Wait + // + // (spinIndex - Sleep0Threshold) % 2 != 0: The purpose of this check is to interleave Thread.Yield/Sleep(0) with + // Thread.SpinWait. Otherwise, the following issues occur: + // - When there are no threads to switch to, Yield and Sleep(0) become no-op and it turns the spin loop into a + // busy-spin that may quickly reach the max spin count and cause the thread to enter a wait state. Completing the + // spin loop too early can cause excessive context switcing from the wait. + // - If there are multiple threads doing Yield and Sleep(0) (typically from the same spin loop due to contention), + // they may switch between one another, delaying work that can make progress. + if (processorCount > 1 && (spinIndex < sleep0Threshold || (spinIndex - sleep0Threshold) % 2 != 0)) { + // Cap the maximum spin count to a value such that many thousands of CPU cycles would not be wasted doing + // the equivalent of YieldProcessor(), as that that point SwitchToThread/Sleep(0) are more likely to be able to + // allow other useful work to run. Long YieldProcessor() loops can help to reduce contention, but Sleep(1) is + // usually better for that. + // + // RuntimeThread.OptimalMaxSpinWaitsPerSpinIteration: + // - See Thread::InitializeYieldProcessorNormalized(), which describes and calculates this value. + // + int n = RuntimeThread.OptimalMaxSpinWaitsPerSpinIteration; + if (spinIndex <= 30 && (1 << spinIndex) < n) + { + n = 1 << spinIndex; + } + RuntimeThread.SpinWait(n); return; } /// is interruptible. The current operation may not allow thread interrupt /// (for instance, as part of ). Use the - /// uninterruptible version of Sleep(0). + /// uninterruptible version of Sleep(0). Not doing , it does not seem to have any + /// benefit over Sleep(0). RuntimeThread.UninterruptibleSleep0(); // Don't want to Sleep(1) in this spin wait: diff --git a/src/System.Private.CoreLib/src/System/Threading/WaitHandle.cs b/src/System.Private.CoreLib/src/System/Threading/WaitHandle.cs index 4d96a88304e..f303af78003 100644 --- a/src/System.Private.CoreLib/src/System/Threading/WaitHandle.cs +++ b/src/System.Private.CoreLib/src/System/Threading/WaitHandle.cs @@ -137,8 +137,6 @@ private bool WaitOneCore(int millisecondsTimeout, bool interruptible = true) public virtual bool WaitOne(int millisecondsTimeout, bool exitContext) => WaitOne(millisecondsTimeout); public virtual bool WaitOne(TimeSpan timeout, bool exitContext) => WaitOne(timeout); - internal bool WaitOne(bool interruptible) => WaitOneCore(Timeout.Infinite, interruptible); - /// /// Obtains all of the corresponding safe wait handles and adds a ref to each. Since the /// property is publically modifiable, this makes sure that we add and release refs one the same set of safe wait