Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 158 additions & 24 deletions src/TraceEvent/EventPipe/EventCache.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Runtime.InteropServices;

namespace Microsoft.Diagnostics.Tracing.EventPipe
Expand Down Expand Up @@ -68,6 +67,10 @@ public void ProcessEventBlock(Block block)
}
else
{
if (thread.Events.Count == 0)
{
_activeThreadQueues.Add(thread.Events);
}
thread.Events.Enqueue(eventMarker);
}

Expand Down Expand Up @@ -165,49 +168,178 @@ private void CheckForPendingThreadRemoval()

private unsafe void SortAndDispatch(long stopTimestamp)
{
// This sort could be made faster by using a min-heap but this is a simple place to start
List<Queue<EventMarker>> threadQueues = new List<Queue<EventMarker>>(_threads.Values.Select(t => t.Events));
while(true)
// Build a min-heap from active thread queues (those with pending events) whose
// front event is before stopTimestamp. Using _activeThreadQueues avoids iterating
// all threads in the dictionary — only threads that have had events enqueued are checked.
// This gives O(N * log(T)) merge performance where N is the number of events and
// T is the number of active threads.
_heap.Clear();
foreach (Queue<EventMarker> q in _activeThreadQueues)
{
long lowestTimestamp = stopTimestamp;
Queue<EventMarker> oldestEventQueue = null;
foreach(Queue<EventMarker> threadQueue in threadQueues)
if (q.Count > 0)
{
if(threadQueue.Count == 0)
long ts = q.Peek().Header.TimeStamp;
if (ts < stopTimestamp)
{
continue;
_heap.Add(ts, q);
}
long eventTimestamp = threadQueue.Peek().Header.TimeStamp;
if (eventTimestamp < lowestTimestamp)
}
}

if (_heap.Count == 0)
{
return;
}

_heap.Build();

// Merge events in timestamp order using the min-heap.
while (_heap.Count > 0)
{
Queue<EventMarker> minQueue = _heap.PeekValue;
EventMarker eventMarker = minQueue.Dequeue();
OnEvent?.Invoke(ref eventMarker.Header);

if (minQueue.Count > 0)
{
long nextTs = minQueue.Peek().Header.TimeStamp;
if (nextTs < stopTimestamp)
{
oldestEventQueue = threadQueue;
lowestTimestamp = eventTimestamp;
// Update the root with the next timestamp and restore the heap property.
_heap.ReplaceRoot(nextTs, minQueue);
}
else
{
_heap.RemoveRoot();
}
}
if(oldestEventQueue == null)
else
{
break;
_heap.RemoveRoot();
// Remove from active set and free internal storage to prevent unbounded
// memory growth when the application creates and destroys threads.
_activeThreadQueues.Remove(minQueue);
minQueue.TrimExcess();
}
}
}

#region Min-heap Implementation

/// <summary>
/// A min-heap that pairs a long key with a value of type <typeparamref name="TValue"/>.
/// Entries are ordered by key so the minimum key is always at the root.
/// </summary>
internal class MinHeap<TValue>
{
private struct Entry
{
public long Key;
public TValue Value;

public Entry(long key, TValue value)
{
Key = key;
Value = value;
}
}

private readonly List<Entry> _entries = new List<Entry>();

public int Count => _entries.Count;

public TValue PeekValue => _entries[0].Value;

public void Clear() => _entries.Clear();

public void Add(long key, TValue value)
{
_entries.Add(new Entry(key, value));
}

/// <summary>
/// Establishes the heap property over all entries. Call once after adding all
/// entries via Add, before extracting from the heap.
/// </summary>
/// <remarks>
/// Starts from the last non-leaf node (_entries.Count / 2 - 1) and sifts each
/// node down to its correct position. Leaves (the second half of the array) are
/// already trivially valid heaps of size 1, so they are skipped.
/// </remarks>
public void Build()
{
// Start from the last non-leaf node and work backwards to the root.
// Nodes at indices [Count/2 .. Count-1] are leaves that need no adjustment.
for (int i = _entries.Count / 2 - 1; i >= 0; i--)
{
SiftDown(i);
}
}

/// <summary>
/// Replaces the root entry with a new key/value pair and restores the heap property.
/// Use when the root element has been consumed but its source still has more items.
/// </summary>
public void ReplaceRoot(long newKey, TValue value)
{
_entries[0] = new Entry(newKey, value);
SiftDown(0);
}

/// <summary>
/// Removes the root (minimum) entry from the heap and restores the heap property.
/// </summary>
public void RemoveRoot()
{
int lastIndex = _entries.Count - 1;
if (lastIndex == 0)
{
_entries.Clear();
}
else
{
EventMarker eventMarker = oldestEventQueue.Dequeue();
OnEvent?.Invoke(ref eventMarker.Header);
_entries[0] = _entries[lastIndex];
_entries.RemoveAt(lastIndex);
SiftDown(0);
}
}

// If the app creates and destroys threads over time we need to flush old threads
// from the cache or memory usage will grow unbounded. AddThread handles the
// the thread objects but the storage for the queue elements also does not shrink
// below the high water mark unless we free it explicitly.
foreach (Queue<EventMarker> q in threadQueues)
/// <summary>
/// Restores the min-heap property by moving the element at index i down the tree
/// until it is smaller than both children or reaches a leaf position.
/// </summary>
private void SiftDown(int i)
{
if(q.Count == 0)
int count = _entries.Count;
while (true)
{
q.TrimExcess();
int smallest = i;

// In a binary heap stored as an array, the children of node i are at
// indices 2i+1 (left) and 2i+2 (right).
int left = 2 * i + 1;
int right = 2 * i + 2;

if (left < count && _entries[left].Key < _entries[smallest].Key)
{
smallest = left;
}
if (right < count && _entries[right].Key < _entries[smallest].Key)
{
smallest = right;
}
if (smallest == i)
{
break;
}
(_entries[i], _entries[smallest]) = (_entries[smallest], _entries[i]);
i = smallest;
}
}
}

#endregion

private void FreeOldEventBuffers(long stopTimestamp)
{
while (_buffers.Count > 0)
Expand Down Expand Up @@ -277,6 +409,8 @@ public EventBlockBuffer(FixedBuffer buffer, long maxTimestamp)
EventPipeEventSource _source;
ThreadCache _threads;
Queue<EventBlockBuffer> _buffers = new Queue<EventBlockBuffer>();
MinHeap<Queue<EventMarker>> _heap = new MinHeap<Queue<EventMarker>>();
HashSet<Queue<EventMarker>> _activeThreadQueues = new HashSet<Queue<EventMarker>>();
}

internal class EventMarker
Expand Down
25 changes: 24 additions & 1 deletion src/TraceEvent/Parsers/UniversalSystemTraceEventParser.cs
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,20 @@ public sealed class ProcessMappingMetadataTraceData : TraceEvent

public string SymbolMetadata {get { return GetShortUTF8StringAt(SkipVarInt(0)); } }

internal ProcessMappingSymbolMetadata ParsedSymbolMetadata { get { return ProcessMappingSymbolMetadataParser.TryParse(SymbolMetadata); } }
internal ProcessMappingSymbolMetadata ParsedSymbolMetadata
{
get
{
if (!_parsedSymbolMetadataCached)
{
_parsedSymbolMetadata = ProcessMappingSymbolMetadataParser.TryParse(SymbolMetadata);
_parsedSymbolMetadataCached = true;
}
return _parsedSymbolMetadata;
}
}
private ProcessMappingSymbolMetadata _parsedSymbolMetadata;
private bool _parsedSymbolMetadataCached;

public string VersionMetadata {get {return GetShortUTF8StringAt(SkipShortUTF8String(SkipVarInt(0))); } }

Expand All @@ -367,9 +380,19 @@ internal ProcessMappingMetadataTraceData(Action<ProcessMappingMetadataTraceData>
}
protected internal override void Dispatch()
{
_parsedSymbolMetadataCached = false;
_parsedSymbolMetadata = null;
Action(this);
}

public override unsafe TraceEvent Clone()
{
var clone = (ProcessMappingMetadataTraceData)base.Clone();
clone._parsedSymbolMetadata = _parsedSymbolMetadata;
clone._parsedSymbolMetadataCached = _parsedSymbolMetadataCached;
return clone;
}

protected internal override Delegate Target
{
get { return Action; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,16 @@ public void BeforeProcess(TraceLog traceLog, TraceEventDispatcher source)
}
processes.Add(process);

if (!string.IsNullOrEmpty(data.FileName) && data.FileName.StartsWith(DotnetJittedCodeMappingName, StringComparison.Ordinal))
string fileName = data.FileName;
if (!string.IsNullOrEmpty(fileName) && fileName.StartsWith(DotnetJittedCodeMappingName, StringComparison.Ordinal))
{
// Don't create a module for jitted code.
// These will be created for each jitted code symbol.
return;
}

_mappingMetadata.TryGetValue(data.MetadataId, out ProcessMappingMetadataTraceData metadata);
TraceModuleFile moduleFile = process.LoadedModules.UniversalMapping(data, metadata);
TraceModuleFile moduleFile = process.LoadedModules.UniversalMapping(fileName, data.StartAddress, data.EndAddress, data.TimeStampQPC, metadata);
};
universalSystemParser.ProcessMappingMetadata += delegate (ProcessMappingMetadataTraceData data)
{
Expand Down
Loading