Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multi-threaded parsing #51

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ bin/
/GameTracking-CSGO/
*.dem
.idea/
.vs/
*.DotSettings.user
8 changes: 7 additions & 1 deletion src/DemoFile.Benchmark/DemoParserBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void GlobalSetup()
[IterationSetup]
public void Setup()
{
_fileStream = new MemoryStream(_demoBytes);
_fileStream = new MemoryStream(_demoBytes, 0, _demoBytes.Length, false, true);
}

[Benchmark]
Expand All @@ -48,4 +48,10 @@ public async Task ParseDemo()
await _demoParser.ReadAllAsync(_fileStream, default);
#endif
}

[Benchmark]
public async Task ParseDemoMT()
{
await DemoParser.ReadAllMultiThreadedAsync(null, _fileStream, default);
}
}
9 changes: 9 additions & 0 deletions src/DemoFile.Test/GlobalUtil.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@ public static class GlobalUtil

public static MemoryStream GotvProtocol13990Deathmatch => new(File.ReadAllBytes(Path.Combine(DemoBase, "13990_dm.dem")));

public static KeyValuePair<string, MemoryStream>[] GetAllFiles()
{
return typeof(GlobalUtil)
.GetProperties()
.Where(p => p.PropertyType == typeof(MemoryStream))
.Select(p => new KeyValuePair<string, MemoryStream>(p.Name, (MemoryStream)p.GetValue(null)!))
.ToArray();
}

public static byte[] ToBitStream(string input)
{
var bitArray = new BitArray(input.Length);
Expand Down
72 changes: 72 additions & 0 deletions src/DemoFile.Test/Integration/MultiThreadingIntegrationTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
using System.Text;
using System.Text.Json;

namespace DemoFile.Test.Integration;

[TestFixture]
public class MultiThreadingIntegrationTest
{
[TestCaseSource(typeof(GlobalUtil), nameof(GetAllFiles))]
public async Task MTTest(KeyValuePair<string, MemoryStream> testCase)
{
byte[] buffer = testCase.Value.ToArray();
var stream = new MemoryStream(buffer, 0, buffer.Length, false, true);

// MT parsing
var mtResult = await DemoParser.ReadAllMultiThreadedAsync(SetupParser, stream, default);

var mtSB = new StringBuilder(mtResult.Sections.Sum(s => s.StringBuilder.Length));
foreach (var section in mtResult.Sections)
mtSB.Append(section.StringBuilder);

// ST parsing
var stParser = new DemoParser();
var stSection = new MultiThreadedDemoParserSection(stParser);
SetupParser(stSection);
stream.Position = 0;
await stParser.ReadAllAsync(stream, default);

// compare
Assert.That(stSection.StringBuilder.ToString(), Is.EqualTo(mtSB.ToString()));
}

void SetupParser(MultiThreadedDemoParserSection section)
{
var demo = section.DemoParser;
var snapshot = section.StringBuilder;
snapshot.EnsureCapacity(512);

demo.Source1GameEvents.RoundStart += e => LogEvent(e.GameEventName, e, snapshot, demo);
demo.Source1GameEvents.RoundEnd += e => LogEvent(e.GameEventName, e, snapshot, demo);
demo.Source1GameEvents.PlayerDeath += e => LogEvent(e.GameEventName, e, snapshot, demo);

// entity access
demo.Source1GameEvents.PlayerDeath += e =>
{
JsonAppend(e.Attacker, snapshot);
JsonAppend(e.AttackerPawn, snapshot);
JsonAppend(e.Assister, snapshot);
JsonAppend(e.AssisterPawn, snapshot);
JsonAppend(e.Player, snapshot);
JsonAppend(e.PlayerPawn, snapshot);

snapshot.AppendLine($"{e.Attacker?.PlayerName} [{e.Assister?.PlayerName}] {e.Player?.PlayerName}");
};
}

static void JsonAppend(object? obj, StringBuilder stringBuilder)
{
if (obj == null)
return;
var str = JsonSerializer.Serialize(obj, DemoJson.SerializerOptions);
stringBuilder.AppendLine(str);
}

static void LogEvent<T>(string eventName, T evt, StringBuilder snapshot, DemoParser demo)
{
snapshot.AppendLine($"[{demo.CurrentDemoTick.Value}] {eventName}:");

var eventJson = JsonSerializer.Serialize(evt, DemoJson.SerializerOptions);
snapshot.AppendLine($" {eventJson}");
}
}
186 changes: 186 additions & 0 deletions src/DemoFile/DemoParser.MultiThreading.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
using System.Text;

namespace DemoFile
{
/// <summary>
/// Result of multi-threaded parsing.
/// </summary>
public class MultiThreadedDemoParsingResult
{
/// <summary>
/// All section that were parsed. Every section is parsed on a different thread.
/// </summary>
public List<MultiThreadedDemoParserSection> Sections { get; } = new();

public MultiThreadedDemoParsingResult(params MultiThreadedDemoParserSection[] sections)
{
Sections.AddRange(sections);
}
}

public class MultiThreadedDemoParserSection
{
/// <summary>
/// Parser that is parsing this section.
/// </summary>
public DemoParser DemoParser { get; }

/// <summary>
/// StringBuilder that you can use to output data. Provided for convenience.
/// </summary>
public StringBuilder StringBuilder { get; set; } = new StringBuilder();

/// <summary>
/// Optional user data stored for this section. This should be used when outputing data in a custom format.
/// </summary>
public object? UserData { get; set; }

public MultiThreadedDemoParserSection(DemoParser demoParser)
{
DemoParser = demoParser;
}
}

public partial class DemoParser
{
/// <summary>
/// Read entire demo in a multi-threaded way. Demo is split-up into multiple sections, and each section is parsed
/// on a different thread. When parsing is finished, you can access parsed sections and their data, without the need
/// for sorting. Note that <paramref name="setupAction"/> will be called on a background thread, so your callbacks
/// need to be thread-safe, but they can freely access the data of their own section.
/// </summary>
public static async ValueTask<MultiThreadedDemoParsingResult> ReadAllMultiThreadedAsync(
Action<MultiThreadedDemoParserSection>? setupAction, MemoryStream stream, CancellationToken cancellationToken)
{
if (!stream.TryGetBuffer(out var arraySegment))
{
throw new ArgumentException("Can't get MemoryStream's internal buffer - this is a waste of performance. " +
"Make sure that you create MemoryStream with exposed buffer");
}

var parser = new DemoParser();

await parser.StartReadingAsync(stream, cancellationToken).ConfigureAwait(false);

// currently, no way to find snapshot positions in incomplete demos
// => fallback to single-threaded parsing
if (parser.TickCount.Value <= 0)
{
stream.Position = 0;
parser = new DemoParser();
var section = new MultiThreadedDemoParserSection(parser);
setupAction?.Invoke(section);
await parser.ReadAllAsync(stream, cancellationToken).ConfigureAwait(false);
return new MultiThreadedDemoParsingResult(section);
}

int numParsers = Environment.ProcessorCount;
int numSections = (int)Math.Ceiling(parser.TickCount.Value / (double)FullPacketInterval);
int numSectionsPerParser = (int)Math.Ceiling(numSections / (double)numParsers);

// seek to end of demo to find all snapshot positions
await parser.SeekToTickAsync(parser.TickCount, cancellationToken).ConfigureAwait(false);

var tasks = new List<Task>();
var sections = new List<MultiThreadedDemoParserSection>();

for (int p = 0; p < numParsers; p++)
{
int startFullPacketPositionIndex = p * numSectionsPerParser;
int endFullPacketPositionIndex = startFullPacketPositionIndex + numSectionsPerParser;

bool reachedLastTick = endFullPacketPositionIndex >= parser._fullPacketPositions.Count;

DemoTick startTick = parser._fullPacketPositions[startFullPacketPositionIndex].Tick;
DemoTick endTick = reachedLastTick
? parser.TickCount
: parser._fullPacketPositions[endFullPacketPositionIndex].Tick;

var backgroundParser = new DemoParser();
var backgroundParserStream = new MemoryStream(arraySegment.Array!);
var section = new MultiThreadedDemoParserSection(backgroundParser);

var taskFunc = () => backgroundParser.ParseSectionInBackgroundAsync(
section,
backgroundParserStream,
startTick,
endTick,
parser._fullPacketPositions,
setupAction,
cancellationToken);

tasks.Add(Task.Run(taskFunc, cancellationToken));

sections.Add(section);

if (reachedLastTick)
break;
}

// wait for all parsers to finish

await Task.WhenAll(tasks).ConfigureAwait(false);

return new MultiThreadedDemoParsingResult(sections.ToArray());
}

private async Task ParseSectionInBackgroundAsync(
MultiThreadedDemoParserSection section,
MemoryStream backgroundParserStream,
DemoTick startTick,
DemoTick endTick,
IReadOnlyList<FullPacketRecord> fullPacketPositions,
Action<MultiThreadedDemoParserSection>? setupAction,
CancellationToken cancellationToken)
{
var backgroundParser = this;

await backgroundParser.StartReadingAsync(backgroundParserStream, cancellationToken).ConfigureAwait(false);

backgroundParser._fullPacketPositions.Clear();
backgroundParser._fullPacketPositions.AddRange(fullPacketPositions);

if (startTick != DemoTick.Zero)
await backgroundParser.SeekToTickAsync(startTick, cancellationToken).ConfigureAwait(false);

// only AFTER we seek to starting tick, allow the user to setup callbacks
setupAction?.Invoke(section);

while (true)
{
// peek
(DemoTick nextTick, EDemoCommands command) = backgroundParser.PeekNext();

if (nextTick >= endTick) // parser completed his section
break;

bool hasNext = await backgroundParser.MoveNextAsync(cancellationToken).ConfigureAwait(false);
if (!hasNext) // end of file
break;
}
}

/// <summary>
/// Peek next command. Parser's state will not be modified and callbacks will not be invoked.
/// </summary>
public (DemoTick nextTick, EDemoCommands command) PeekNext()
{
var streamPositionBefore = _stream.Position;
var commandStartPositionBefore = _commandStartPosition;
var demoTickBefore = CurrentDemoTick;

try
{
var cmd = ReadCommandHeader();
var nextTick = CurrentDemoTick;
return (nextTick, cmd.Command);
}
finally
{
_commandStartPosition = commandStartPositionBefore;
CurrentDemoTick = demoTickBefore;
_stream.Position = streamPositionBefore;
}
}
}
}