Skip to content

Commit

Permalink
Fix | Rework timer to ensure guaranteed execution
Browse files Browse the repository at this point in the history
  • Loading branch information
Wraith2 authored Feb 25, 2021
1 parent c92b2ca commit 717ceda
Show file tree
Hide file tree
Showing 12 changed files with 522 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2710,7 +2710,7 @@ private void CleanUpStateObject(bool isCancelRequested = true)
{
_stateObj.CancelRequest();
}
_stateObj._internalTimeout = false;
_stateObj.SetTimeoutStateStopped();
_stateObj.CloseSession();
_stateObj._bulkCopyOpperationInProgress = false;
_stateObj._bulkCopyWriteTimeout = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -963,7 +963,7 @@ private bool TryCloseInternal(bool closeReader)
{
_sharedState._dataReady = true; // set _sharedState._dataReady to not confuse CleanPartialRead
}
_stateObj._internalTimeout = false;
_stateObj.SetTimeoutStateStopped();
if (_sharedState._dataReady)
{
cleanDataFailed = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1896,7 +1896,7 @@ internal bool TryRun(RunBehavior runBehavior, SqlCommand cmdHandler, SqlDataRead
// If there is data ready, but we didn't exit the loop, then something is wrong
Debug.Assert(!dataReady, "dataReady not expected - did we forget to skip the row?");

if (stateObj._internalTimeout)
if (stateObj.IsTimeoutStateExpired)
{
runBehavior = RunBehavior.Attention;
}
Expand Down Expand Up @@ -2520,7 +2520,7 @@ internal bool TryRun(RunBehavior runBehavior, SqlCommand cmdHandler, SqlDataRead
stateObj._attentionSent = false;
stateObj.HasReceivedAttention = false;

if (RunBehavior.Clean != (RunBehavior.Clean & runBehavior) && !stateObj._internalTimeout)
if (RunBehavior.Clean != (RunBehavior.Clean & runBehavior) && !stateObj.IsTimeoutStateExpired)
{
// Add attention error to collection - if not RunBehavior.Clean!
stateObj.AddError(new SqlError(0, 0, TdsEnums.MIN_ERROR_CLASS, _server, SQLMessage.OperationCancelled(), "", 0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,23 @@ internal enum SnapshottedStateFlags : byte
AttentionReceived = 1 << 5 // NOTE: Received is not volatile as it is only ever accessed\modified by TryRun its callees (i.e. single threaded access)
}

private sealed class TimeoutState
{
public const int Stopped = 0;
public const int Running = 1;
public const int ExpiredAsync = 2;
public const int ExpiredSync = 3;

private readonly int _value;

public TimeoutState(int value)
{
_value = value;
}

public int IdentityValue => _value;
}

private const int AttentionTimeoutSeconds = 5;

private static readonly ContextCallback s_readAdyncCallbackComplete = ReadAsyncCallbackComplete;
Expand Down Expand Up @@ -113,9 +130,17 @@ internal enum SnapshottedStateFlags : byte
// Timeout variables
private long _timeoutMilliseconds;
private long _timeoutTime; // variable used for timeout computations, holds the value of the hi-res performance counter at which this request should expire
private int _timeoutState; // expected to be one of the constant values TimeoutStopped, TimeoutRunning, TimeoutExpiredAsync, TimeoutExpiredSync
private int _timeoutIdentitySource;
private volatile int _timeoutIdentityValue;
internal volatile bool _attentionSent; // true if we sent an Attention to the server
internal volatile bool _attentionSending;
internal bool _internalTimeout; // an internal timeout occurred

// Below 2 properties are used to enforce timeout delays in code to
// reproduce issues related to theadpool starvation and timeout delay.
// It should always be set to false by default, and only be enabled during testing.
internal bool _enforceTimeoutDelay = false;
internal int _enforcedTimeoutDelayInMilliSeconds = 5000;

private readonly LastIOTimer _lastSuccessfulIOTimer;

Expand Down Expand Up @@ -760,7 +785,7 @@ private void ResetCancelAndProcessAttention()
// operations.
Parser.ProcessPendingAck(this);
}
_internalTimeout = false;
SetTimeoutStateStopped();
}
}

Expand Down Expand Up @@ -1042,7 +1067,7 @@ internal bool TryProcessHeader()
return false;
}

if (_internalTimeout)
if (IsTimeoutStateExpired)
{
ThrowExceptionAndWarning();
return true;
Expand Down Expand Up @@ -1447,7 +1472,7 @@ internal bool TryReadInt16(out short value)
{
// The entire int16 is in the packet and in the buffer, so just return it
// and take care of the counters.
buffer = _inBuff.AsSpan(_inBytesUsed,2);
buffer = _inBuff.AsSpan(_inBytesUsed, 2);
_inBytesUsed += 2;
_inBytesPacket -= 2;
}
Expand Down Expand Up @@ -1481,7 +1506,7 @@ internal bool TryReadInt32(out int value)
}

AssertValidState();
value = (buffer[3] << 24) + (buffer[2] <<16) + (buffer[1] << 8) + buffer[0];
value = (buffer[3] << 24) + (buffer[2] << 16) + (buffer[1] << 8) + buffer[0];
return true;

}
Expand Down Expand Up @@ -2247,11 +2272,62 @@ internal void OnConnectionClosed()
}
}

private void OnTimeout(object state)
public void SetTimeoutStateStopped()
{
Interlocked.Exchange(ref _timeoutState, TimeoutState.Stopped);
_timeoutIdentityValue = 0;
}

public bool IsTimeoutStateExpired
{
get
{
int state = _timeoutState;
return state == TimeoutState.ExpiredAsync || state == TimeoutState.ExpiredSync;
}
}

private void OnTimeoutAsync(object state)
{
if (!_internalTimeout)
if (_enforceTimeoutDelay)
{
_internalTimeout = true;
Thread.Sleep(_enforcedTimeoutDelayInMilliSeconds);
}

int currentIdentityValue = _timeoutIdentityValue;
TimeoutState timeoutState = (TimeoutState)state;
if (timeoutState.IdentityValue == _timeoutIdentityValue)
{
// the return value is not useful here because no choice is going to be made using it
// we only want to make this call to set the state knowing that it will be seen later
OnTimeoutCore(TimeoutState.Running, TimeoutState.ExpiredAsync);
}
else
{
Debug.WriteLine($"OnTimeoutAsync called with identity state={timeoutState.IdentityValue} but current identity is {currentIdentityValue} so it is being ignored");
}
}

private bool OnTimeoutSync()
{
return OnTimeoutCore(TimeoutState.Running, TimeoutState.ExpiredSync);
}

/// <summary>
/// attempts to change the timout state from the expected state to the target state and if it succeeds
/// will setup the the stateobject into the timeout expired state
/// </summary>
/// <param name="expectedState">the state that is the expected current state, state will change only if this is correct</param>
/// <param name="targetState">the state that will be changed to if the expected state is correct</param>
/// <returns>boolean value indicating whether the call changed the timeout state</returns>
private bool OnTimeoutCore(int expectedState, int targetState)
{
Debug.Assert(targetState == TimeoutState.ExpiredAsync || targetState == TimeoutState.ExpiredSync, "OnTimeoutCore must have an expiry state as the targetState");

bool retval = false;
if (Interlocked.CompareExchange(ref _timeoutState, targetState, expectedState) == expectedState)
{
retval = true;
// lock protects against Close and Cancel
lock (this)
{
Expand Down Expand Up @@ -2349,6 +2425,7 @@ private void OnTimeout(object state)
}
}
}
return retval;
}

internal void ReadSni(TaskCompletionSource<object> completion)
Expand Down Expand Up @@ -2383,19 +2460,32 @@ internal void ReadSni(TaskCompletionSource<object> completion)
{
Debug.Assert(completion != null, "Async on but null asyncResult passed");

if (_networkPacketTimeout == null)
// if the state is currently stopped then change it to running and allocate a new identity value from
// the identity source. The identity value is used to correlate timer callback events to the currently
// running timeout and prevents a late timer callback affecting a result it does not relate to
int previousTimeoutState = Interlocked.CompareExchange(ref _timeoutState, TimeoutState.Running, TimeoutState.Stopped);
if (previousTimeoutState == TimeoutState.Stopped)
{
_networkPacketTimeout = ADP.UnsafeCreateTimer(
new TimerCallback(OnTimeout),
null,
Timeout.Infinite,
Timeout.Infinite);
Debug.Assert(_timeoutIdentityValue == 0, "timer was previously stopped without resetting the _identityValue");
_timeoutIdentityValue = Interlocked.Increment(ref _timeoutIdentitySource);
}

_networkPacketTimeout?.Dispose();

_networkPacketTimeout = ADP.UnsafeCreateTimer(
new TimerCallback(OnTimeoutAsync),
new TimeoutState(_timeoutIdentityValue),
Timeout.Infinite,
Timeout.Infinite
);


// -1 == Infinite
// 0 == Already timed out (NOTE: To simulate the same behavior as sync we will only timeout on 0 if we receive an IO Pending from SNI)
// >0 == Actual timeout remaining
int msecsRemaining = GetTimeoutRemaining();

Debug.Assert(previousTimeoutState == TimeoutState.Stopped, "previous timeout state was not Stopped");
if (msecsRemaining > 0)
{
ChangeNetworkPacketTimeout(msecsRemaining, Timeout.Infinite);
Expand Down Expand Up @@ -2445,12 +2535,15 @@ internal void ReadSni(TaskCompletionSource<object> completion)
_networkPacketTaskSource.TrySetResult(null);
}
// Disable timeout timer on error
SetTimeoutStateStopped();
ChangeNetworkPacketTimeout(Timeout.Infinite, Timeout.Infinite);
}
else if (msecsRemaining == 0)
{ // Got IO Pending, but we have no time left to wait
// Immediately schedule the timeout timer to fire
ChangeNetworkPacketTimeout(0, Timeout.Infinite);
{
// Got IO Pending, but we have no time left to wait
// disable the timer and set the error state by calling OnTimeoutSync
ChangeNetworkPacketTimeout(Timeout.Infinite, Timeout.Infinite);
OnTimeoutSync();
}
// DO NOT HANDLE PENDING READ HERE - which is TdsEnums.SNI_SUCCESS_IO_PENDING state.
// That is handled by user who initiated async read, or by ReadNetworkPacket which is sync over async.
Expand Down Expand Up @@ -2565,13 +2658,13 @@ private void ReadSniError(TdsParserStateObject stateObj, uint error)
Debug.Assert(_syncOverAsync, "Should never reach here with async on!");
bool fail = false;

if (_internalTimeout)
if (IsTimeoutStateExpired)
{ // This is now our second timeout - time to give up.
fail = true;
}
else
{
stateObj._internalTimeout = true;
stateObj.SetTimeoutStateStopped();
Debug.Assert(_parser.Connection != null, "SqlConnectionInternalTds handler can not be null at this point.");
AddError(new SqlError(TdsEnums.TIMEOUT_EXPIRED, (byte)0x00, TdsEnums.MIN_ERROR_CLASS, _parser.Server, _parser.Connection.TimeoutErrorInternal.GetErrorMessage(), "", 0, TdsEnums.SNI_WAIT_TIMEOUT));

Expand Down Expand Up @@ -2794,6 +2887,25 @@ public void ReadAsyncCallback(IntPtr key, PacketHandle packet, uint error)

ChangeNetworkPacketTimeout(Timeout.Infinite, Timeout.Infinite);

// The timer thread may be unreliable under high contention scenarios. It cannot be
// assumed that the timeout has happened on the timer thread callback. Check the timeout
// synchrnously and then call OnTimeoutSync to force an atomic change of state.
if (TimeoutHasExpired)
{
OnTimeoutSync();
}

// try to change to the stopped state but only do so if currently in the running state
// and use cmpexch so that all changes out of the running state are atomic
int previousState = Interlocked.CompareExchange(ref _timeoutState, TimeoutState.Running, TimeoutState.Stopped);

// if the state is anything other than running then this query has reached an end so
// set the correlation _timeoutIdentityValue to 0 to prevent late callbacks executing
if (_timeoutState != TimeoutState.Running)
{
_timeoutIdentityValue = 0;
}

ProcessSniPacket(packet, error);
}
catch (Exception e)
Expand Down Expand Up @@ -3454,7 +3566,6 @@ internal void SendAttention(bool mustTakeWriteLock = false)
// Set _attentionSending to true before sending attention and reset after setting _attentionSent
// This prevents a race condition between receiving the attention ACK and setting _attentionSent
_attentionSending = true;

#if DEBUG
if (!_skipSendAttention)
{
Expand Down Expand Up @@ -3489,7 +3600,7 @@ internal void SendAttention(bool mustTakeWriteLock = false)
}
}
#if DEBUG
}
}
#endif

SetTimeoutSeconds(AttentionTimeoutSeconds); // Initialize new attention timeout of 5 seconds.
Expand Down Expand Up @@ -3862,7 +3973,7 @@ internal void AssertStateIsClean()
// Attention\Cancellation\Timeouts
Debug.Assert(!HasReceivedAttention && !_attentionSent && !_attentionSending, $"StateObj is still dealing with attention: Sent: {_attentionSent}, Received: {HasReceivedAttention}, Sending: {_attentionSending}");
Debug.Assert(!_cancelled, "StateObj still has cancellation set");
Debug.Assert(!_internalTimeout, "StateObj still has internal timeout set");
Debug.Assert(_timeoutState == TimeoutState.Stopped, "StateObj still has internal timeout set");
// Errors and Warnings
Debug.Assert(!_hasErrorOrWarning, "StateObj still has stored errors or warnings");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2858,7 +2858,7 @@ private void CleanUpStateObject(bool isCancelRequested = true)
{
_stateObj.CancelRequest();
}
_stateObj._internalTimeout = false;
_stateObj.SetTimeoutStateStopped();
_stateObj.CloseSession();
_stateObj._bulkCopyOpperationInProgress = false;
_stateObj._bulkCopyWriteTimeout = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1084,7 +1084,7 @@ private bool TryCloseInternal(bool closeReader)
{
_sharedState._dataReady = true; // set _sharedState._dataReady to not confuse CleanPartialRead
}
_stateObj._internalTimeout = false;
_stateObj.SetTimeoutStateStopped();
if (_sharedState._dataReady)
{
cleanDataFailed = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2262,7 +2262,7 @@ internal bool TryRun(RunBehavior runBehavior, SqlCommand cmdHandler, SqlDataRead
// If there is data ready, but we didn't exit the loop, then something is wrong
Debug.Assert(!dataReady, "dataReady not expected - did we forget to skip the row?");

if (stateObj._internalTimeout)
if (stateObj.IsTimeoutStateExpired)
{
runBehavior = RunBehavior.Attention;
}
Expand Down Expand Up @@ -2891,7 +2891,7 @@ internal bool TryRun(RunBehavior runBehavior, SqlCommand cmdHandler, SqlDataRead
stateObj._attentionSent = false;
stateObj._attentionReceived = false;

if (RunBehavior.Clean != (RunBehavior.Clean & runBehavior) && !stateObj._internalTimeout)
if (RunBehavior.Clean != (RunBehavior.Clean & runBehavior) && !stateObj.IsTimeoutStateExpired)
{
// Add attention error to collection - if not RunBehavior.Clean!
stateObj.AddError(new SqlError(0, 0, TdsEnums.MIN_ERROR_CLASS, _server, SQLMessage.OperationCancelled(), "", 0));
Expand Down
Loading

0 comments on commit 717ceda

Please sign in to comment.