Skip to content

Commit

Permalink
Update EventPipeEventDispatcher.cs
Browse files Browse the repository at this point in the history
  • Loading branch information
davmason committed Aug 7, 2023
1 parent 2300d0e commit 9c2e4ee
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2075,6 +2075,9 @@
<data name="EventSource_ChannelTypeDoesNotMatchEventChannelValue" xml:space="preserve">
<value>Channel {0} does not match event channel value {1}.</value>
</data>
<data name="EventSource_CouldNotEnableEventPipe" xml:space="preserve">
<value>Failed to open an EventPipe session for NativeRuntimeEventSource.</value>
</data>
<data name="EventSource_DataDescriptorsOutOfRange" xml:space="preserve">
<value>Data descriptors are out of range.</value>
</data>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,10 @@ internal EventListenerSubscription(EventKeywords matchAnyKeywords, EventLevel le
private readonly IntPtr m_RuntimeProviderID;

private ulong m_sessionID;
private DateTime m_syncTimeUtc;
private long m_syncTimeQPC;
private long m_timeQPCFrequency;

private bool m_stopDispatchTask;
private Task? m_dispatchTask;
private ManualResetEvent m_stoppedEvent = new ManualResetEvent(true);
private readonly object m_dispatchControlLock = new object();
private readonly Dictionary<EventListener, EventListenerSubscription> m_subscriptions = new Dictionary<EventListener, EventListenerSubscription>();

Expand All @@ -46,49 +44,48 @@ private EventPipeEventDispatcher()

internal void SendCommand(EventListener eventListener, EventCommand command, bool enable, EventLevel level, EventKeywords matchAnyKeywords)
{
lock (EventListener.EventListenersLock)
while (true)
{
if (command == EventCommand.Update && enable)
if (m_stopDispatchTask)
{
lock (m_dispatchControlLock)
m_stoppedEvent.WaitOne();
}

lock (m_dispatchControlLock)
{
if (m_stopDispatchTask)
{
// We happened to end up here after the task was marked as stopping, give up the lock and try again later
continue;
}

Debug.Assert(!m_stopDispatchTask);
if (command == EventCommand.Update && enable)
{
// Add the new subscription. This will overwrite an existing subscription for the listener if one exists.
m_subscriptions[eventListener] = new EventListenerSubscription(matchAnyKeywords, level);

// Commit the configuration change.
CommitDispatchConfiguration();
}
}
else if (command == EventCommand.Update && !enable)
{
RemoveEventListener(eventListener);
}
}
}
else if (command == EventCommand.Update && !enable)
{
// Remove the event listener from the list of subscribers.
m_subscriptions.Remove(eventListener);
}

internal void RemoveEventListener(EventListener listener)
{
lock (m_dispatchControlLock)
{
// Remove the event listener from the list of subscribers.
m_subscriptions.Remove(listener);
// Commit the configuration change.
CommitDispatchConfiguration();

// Commit the configuration change.
CommitDispatchConfiguration();
// We were successful, break out of the loop
break;
}
}
}

private void CommitDispatchConfiguration()
{
Debug.Assert(Monitor.IsEntered(m_dispatchControlLock));

// Ensure that the dispatch task is stopped.
// This is a no-op if the task is already stopped.
StopDispatchTask();

// Stop tracing.
// This is a no-op if it's already disabled.
EventPipeInternal.Disable(m_sessionID);
// Signal that the thread should shut down
SetStopDispatchTask();

// Check to see if tracing should be enabled.
if (m_subscriptions.Count <= 0)
Expand Down Expand Up @@ -122,7 +119,10 @@ private void CommitDispatchConfiguration()
};

m_sessionID = EventPipeInternal.Enable(null, EventPipeSerializationFormat.NetTrace, DefaultEventListenerCircularMBSize, providerConfiguration);
Debug.Assert(m_sessionID != 0);
if (m_sessionID == 0)
{
throw new EventSourceException(SR.EventSource_CouldNotEnableEventPipe);
}

// Get the session information that is required to properly dispatch events.
EventPipeSessionInfo sessionInfo;
Expand All @@ -134,39 +134,37 @@ private void CommitDispatchConfiguration()
}
}

m_syncTimeUtc = DateTime.FromFileTimeUtc(sessionInfo.StartTimeAsUTCFileTime);
m_syncTimeQPC = sessionInfo.StartTimeStamp;
m_timeQPCFrequency = sessionInfo.TimeStampFrequency;

DateTime syncTimeUtc = DateTime.FromFileTimeUtc(sessionInfo.StartTimeAsUTCFileTime);
long syncTimeQPC = sessionInfo.StartTimeStamp;
long timeQPCFrequency = sessionInfo.TimeStampFrequency;

// Start the dispatch task.
StartDispatchTask();
StartDispatchTask(m_sessionID, syncTimeUtc, syncTimeQPC, timeQPCFrequency);
}

private void StartDispatchTask()
private void StartDispatchTask(ulong sessionID, DateTime syncTimeUtc, long syncTimeQPC, long timeQPCFrequency)
{
Debug.Assert(Monitor.IsEntered(m_dispatchControlLock));
Debug.Assert(m_dispatchTask == null);
Debug.Assert(m_stopDispatchTask == false);

if (m_dispatchTask == null)
{
m_stopDispatchTask = false;
m_dispatchTask = Task.Factory.StartNew(DispatchEventsToEventListeners, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
}
m_dispatchTask = Task.Factory.StartNew(() => DispatchEventsToEventListeners(sessionID, syncTimeUtc, syncTimeQPC, timeQPCFrequency), CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
}

private void StopDispatchTask()
private void SetStopDispatchTask()
{
Debug.Assert(Monitor.IsEntered(m_dispatchControlLock));

if (m_dispatchTask != null)
{
m_stoppedEvent.Reset();
m_stopDispatchTask = true;
EventPipeInternal.SignalSession(m_sessionID);
m_dispatchTask.Wait();
m_dispatchTask = null;
}
}

private unsafe void DispatchEventsToEventListeners()
private unsafe void DispatchEventsToEventListeners(ulong sessionID, DateTime syncTimeUtc, long syncTimeQPC, long timeQPCFrequency)
{
// Struct to fill with the call to GetNextEvent.
EventPipeEventInstanceData instanceData;
Expand All @@ -175,7 +173,7 @@ private unsafe void DispatchEventsToEventListeners()
{
bool eventsReceived = false;
// Get the next event.
while (!m_stopDispatchTask && EventPipeInternal.GetNextEvent(m_sessionID, &instanceData))
while (!m_stopDispatchTask && EventPipeInternal.GetNextEvent(sessionID, &instanceData))
{
eventsReceived = true;

Expand All @@ -184,7 +182,7 @@ private unsafe void DispatchEventsToEventListeners()
{
// Dispatch the event.
ReadOnlySpan<byte> payload = new ReadOnlySpan<byte>((void*)instanceData.Payload, (int)instanceData.PayloadLength);
DateTime dateTimeStamp = TimeStampToDateTime(instanceData.TimeStamp);
DateTime dateTimeStamp = TimeStampToDateTime(instanceData.TimeStamp, syncTimeUtc, syncTimeQPC, timeQPCFrequency);
NativeRuntimeEventSource.Log.ProcessEvent(instanceData.EventID, instanceData.ThreadID, dateTimeStamp, instanceData.ActivityId, instanceData.ChildActivityId, payload);
}
}
Expand All @@ -200,20 +198,28 @@ private unsafe void DispatchEventsToEventListeners()
Thread.Sleep(10);
}
}

m_dispatchTask = null;
m_stopDispatchTask = false;
// Signal to threads that they can stop waiting since we are done
m_stoppedEvent.Set();

// Disable the old session. This can happen asynchronously since we aren't using the old session anymore
EventPipeInternal.Disable(sessionID);
}

/// <summary>
/// Converts a QueryPerformanceCounter (QPC) timestamp to a UTC DateTime.
/// </summary>
private DateTime TimeStampToDateTime(long timeStamp)
private static DateTime TimeStampToDateTime(long timeStamp, DateTime syncTimeUtc, long syncTimeQPC, long timeQPCFrequency)
{
if (timeStamp == long.MaxValue)
{
return DateTime.MaxValue;
}

Debug.Assert((m_syncTimeUtc.Ticks != 0) && (m_syncTimeQPC != 0) && (m_timeQPCFrequency != 0));
long inTicks = (long)((timeStamp - m_syncTimeQPC) * 10000000.0 / m_timeQPCFrequency) + m_syncTimeUtc.Ticks;
Debug.Assert((syncTimeUtc.Ticks != 0) && (syncTimeQPC != 0) && (timeQPCFrequency != 0));
long inTicks = (long)((timeStamp - syncTimeQPC) * 10000000.0 / timeQPCFrequency) + syncTimeUtc.Ticks;
if ((inTicks < 0) || (DateTime.MaxTicks < inTicks))
{
inTicks = DateTime.MaxTicks;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4075,6 +4075,12 @@ public virtual void Dispose()
}
Validate();
}

#if FEATURE_PERFTRACING
// Remove the listener from the EventPipe dispatcher. EventCommand.Update with enable==false removes it.
// Have to send outside of EventListenersLock, or we risk deadlocks.
EventPipeEventDispatcher.Instance.SendCommand(this, EventCommand.Update, false, EventLevel.LogAlways, (EventKeywords)0);
#endif // FEATURE_PERFTRACING
}
// We don't expose a Dispose(bool), because the contract is that you don't have any non-syncronous
// 'cleanup' associated with this object
Expand Down Expand Up @@ -4391,11 +4397,6 @@ private static void RemoveReferencesToListenerInEventSources(EventListener liste
}
}
}

#if FEATURE_PERFTRACING
// Remove the listener from the EventPipe dispatcher.
EventPipeEventDispatcher.Instance.RemoveEventListener(listenerToRemove);
#endif // FEATURE_PERFTRACING
}


Expand Down

0 comments on commit 9c2e4ee

Please sign in to comment.