Skip to content

Commit

Permalink
Event statistics battle hardening
Browse files Browse the repository at this point in the history
- Removed stdole reference as it was broken and did not seem tim impact
debug/attach functionality
- Standardized formatting of CSV values based on InvariantCulture and
ISO8601 dates
- Add dynamic scaling timeout based on paralleization expected
AcousticWorkbench Service class - when parallel requests are made, and
the requests are backlogged for resources, they sit in a queue and
timeout very quickly
- Up the number of concurrent connections allowed by AcousticWorkbench
Service (assuming a server-server comms scenario)
- Added better support for non-json errors in AcoustiWorkbench Service
(can happen when loadblancers respond with generic 500 html pages)
- MAJOR bug fixed in EventMetadataResolver: race conditions for the two
workflows now no longer occur. Previously, only a fraction of the
processed events from the first workflow were handed off to the second
workflow.
- Added ordering to EventStatistics data class to ensure input sequence
matches output sequence
- Added statistics to EventStatisticsEntry.cs to give useful feedback to
users
- Added delay, cooloff, and retry logic to RemoteSourcePreparer. This
should fixe the occasional 0 byte response faulty responses sent by the
AcousticWorkbench and will also cope with load/demand failures suffered
by servers
  • Loading branch information
atruskie committed Oct 4, 2017
1 parent dd7202c commit 68ffe4c
Show file tree
Hide file tree
Showing 15 changed files with 321 additions and 68 deletions.
2 changes: 1 addition & 1 deletion Acoustics/AcousticWorkbench/AcousticEventService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public async Task<AudioEvent> GetAudioEvent(long audioEventId)
},
out var stringBody);

var response = await this.Client.PostAsync(uri, body);
var response = await this.HttpClient.PostAsync(uri, body);

var audioEvents = await this.ProcessApiResult<AudioEvent[]>(response, stringBody);

Expand Down
2 changes: 1 addition & 1 deletion Acoustics/AcousticWorkbench/AudioRecordingService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public async Task<AudioRecording> GetAudioRecording(long audioRecordingId)
{
var uri = this.AuthenticatedApi.GetAudioRecordingUri(audioRecordingId);

var response = await this.Client.GetAsync(uri);
var response = await this.HttpClient.GetAsync(uri);

return await this.ProcessApiResult<AudioRecording>(response);
}
Expand Down
4 changes: 2 additions & 2 deletions Acoustics/AcousticWorkbench/AuthenticationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public async Task<IAuthenticatedApi> CheckLogin(string token)
};
AddAuthTokenHeader(request.Headers, token);

var result = await this.Client.SendAsync(request);
var result = await this.HttpClient.SendAsync(request);

return await this.ProcessResult(result);
}
Expand All @@ -51,7 +51,7 @@ public async Task<IAuthenticatedApi> Login(string username, string password)
body = this.SerializeContent(new LoginRequest() { Login = username, Password = password });
}

var result = await this.Client.PostAsync(uri, body);
var result = await this.HttpClient.PostAsync(uri, body);

return await this.ProcessResult(result);
}
Expand Down
4 changes: 2 additions & 2 deletions Acoustics/AcousticWorkbench/MediaService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public async Task<Media> GetMetaData(long audioRecordingId)
Method = HttpMethod.Get,
};

var response = await this.Client.SendAsync(request);
var response = await this.HttpClient.SendAsync(request);

return await this.ProcessApiResult<Media>(response);
}
Expand All @@ -61,7 +61,7 @@ public async Task<Media> GetMetaData(long audioRecordingId)

// this is just blocking until the response is ready to start streaming
// and NOT blocking until the entire response has been read!
var response = await this.Client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead);
var response = await this.HttpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead);

if (response.IsSuccessStatusCode)
{
Expand Down
59 changes: 49 additions & 10 deletions Acoustics/AcousticWorkbench/Service.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace AcousticWorkbench
{
using System;
using System.Diagnostics.Contracts;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
Expand All @@ -16,9 +17,16 @@ namespace AcousticWorkbench

public abstract class Service
{
public static readonly TimeSpan ClientTimeout = TimeSpan.FromSeconds(120);

protected readonly HttpClient Client;
/// <summary>
/// The amount of time to wait for a request to be processed.
/// </summary>
/// <remarks>
/// We dynamically scale this value to the number of CPU cores multiplied by 10 seconds each so that in cases
/// where there is back pressure the async requests do not prematurely timeout. This is important because the
/// timeout calculation for HttpClient *includes* the time the request is waiting to be sent as well as the
/// total request time.
/// </remarks>
public static readonly TimeSpan ClientTimeout = TimeSpan.FromSeconds(120 + (Environment.ProcessorCount * 10));

private const string ApplicationJson = "application/json";

Expand All @@ -29,13 +37,23 @@ public abstract class Service

private readonly JsonSerializerSettings jsonSerializerSettings;

static Service()
{
// control application wide HTTP concurrency. The limit is applied per host and is automatically used
// by HttpClient.
// https://docs.microsoft.com/en-us/dotnet/framework/network-programming/managing-connections
// Since typically the acoustic workbench servers should be able to handle a signficant amount of work, we
// up the number of allowed connections so we can make better use of parallel CPUs.
ServicePointManager.DefaultConnectionLimit = Math.Min(64, Environment.ProcessorCount * 4);
}

protected Service(IApi api)
{
this.Client = new HttpClient();
this.HttpClient = new HttpClient();

this.Client.Timeout = ClientTimeout;
this.Client.DefaultRequestHeaders.Accept.Add(MediaTypeWithQualityHeaderValue.Parse(ApplicationJson));
this.Client.BaseAddress = api.Base();
this.HttpClient.Timeout = ClientTimeout;
this.HttpClient.DefaultRequestHeaders.Accept.Add(MediaTypeWithQualityHeaderValue.Parse(ApplicationJson));
this.HttpClient.BaseAddress = api.Base();

this.jsonSerializerSettings = new JsonSerializerSettings { ContractResolver = this.defaultContractResolver };
}
Expand All @@ -44,11 +62,13 @@ protected Service(IAuthenticatedApi authenticatedApi)
: this((IApi)authenticatedApi)
{
this.AuthenticatedApi = authenticatedApi;
AddAuthTokenHeader(this.Client.DefaultRequestHeaders, authenticatedApi.Token);
AddAuthTokenHeader(this.HttpClient.DefaultRequestHeaders, authenticatedApi.Token);
}

protected IAuthenticatedApi AuthenticatedApi { get; }

protected HttpClient HttpClient { get; }

protected static void AddAuthTokenHeader(HttpRequestHeaders headers, string token)
{
headers.Authorization = new AuthenticationHeaderValue("Token", "token=" + token);
Expand Down Expand Up @@ -78,11 +98,30 @@ protected AcousticWorkbenchResponse<T> Deserialize<T>(string json)
protected async Task<T> ProcessApiResult<T>(HttpResponseMessage response, string requestBody = "")
{
var json = await response.Content.ReadAsStringAsync();
var result = this.Deserialize<T>(json);

AcousticWorkbenchResponse<T> result = null;
try
{
result = this.Deserialize<T>(json);
}
catch (JsonReaderException)
{
// throw if it was meant to work... if not then we're already in an error case... best effort to get to
// error handling block below.
if (response.IsSuccessStatusCode)
{
throw;
}
}

if (!response.IsSuccessStatusCode)
{
throw new HttpResponseException(response, result.Meta, requestBody);
throw new HttpResponseException(response, result?.Meta, requestBody);
}

if (result == null)
{
throw new InvalidOperationException("Service has a null data blob that was not caught by error handling");
}

return result.Data;
Expand Down
11 changes: 0 additions & 11 deletions Acoustics/Acoustics.Shared/Acoustics.Shared.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -306,17 +306,6 @@
<Install>true</Install>
</BootstrapperPackage>
</ItemGroup>
<ItemGroup Condition="'$(Configuration)' == 'Debug'">
<COMReference Include="stdole">
<Guid>{00020430-0000-0000-C000-000000000046}</Guid>
<VersionMajor>2</VersionMajor>
<VersionMinor>0</VersionMinor>
<Lcid>0</Lcid>
<WrapperTool>primary</WrapperTool>
<Isolated>False</Isolated>
<EmbedInteropTypes>True</EmbedInteropTypes>
</COMReference>
</ItemGroup>
<ItemGroup>
<AdditionalFiles Include="..\..\stylecop.json">
<Link>stylecop.json</Link>
Expand Down
36 changes: 24 additions & 12 deletions Acoustics/Acoustics.Shared/Csv/Csv.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ namespace Acoustics.Shared.Csv
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Drawing;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Reflection;
Expand Down Expand Up @@ -80,11 +81,22 @@ public static CsvConfiguration DefaultConfiguration
{
// change the defaults here if you want
var settings = new CsvConfiguration()
{
HasHeaderRecord = true,
TrimHeaders = true,

};
{
HasHeaderRecord = true,

// acoustic workbench outputs faulty data with padded headers
TrimHeaders = true,

// ensure we always use InvariantCulture - only reliable way to serialize data
// Additionally R can parse invariant representations of Double.Infinity and
// Double.NaN (whereas it can't in other cultures).
CultureInfo = CultureInfo.InvariantCulture,
};

// ensure dates are always formatted as ISO8601 dates - note: R cannot by default parse proper ISO8601 dates
TypeConverterOptionsFactory.AddOptions<DateTimeOffset>(new TypeConverterOptions() { Format = "O" });
TypeConverterOptionsFactory.AddOptions<DateTime>(new TypeConverterOptions() { Format = "O" });

foreach (var classMap in ClassMapsToRegister)
{
settings.RegisterClassMap(classMap);
Expand All @@ -95,11 +107,12 @@ public static CsvConfiguration DefaultConfiguration
}

/// <summary>
/// Serialize results to CSV - if you want the concrete type to be serialized you need to ensure it is downcast before using this method.
/// Serialize results to CSV - if you want the concrete type to be serialized you need to ensure
/// it is downcast before using this method.
/// </summary>
/// <typeparam name="T">The type to serialize.</typeparam>
/// <param name="destination">The file to create.</param>
/// <param name="results"></param>
/// <param name="results">The data to serialize.</param>
public static void WriteToCsv<T>(FileInfo destination, IEnumerable<T> results)
{
Contract.Requires(destination != null);
Expand All @@ -112,7 +125,6 @@ public static void WriteToCsv<T>(FileInfo destination, IEnumerable<T> results)
}
}


/// <summary>
/// Read an object from a CSV file.
/// </summary>
Expand All @@ -124,8 +136,8 @@ public static void WriteToCsv<T>(FileInfo destination, IEnumerable<T> results)
///
/// </remarks>
public static IEnumerable<T> ReadFromCsv<T>(
FileInfo source,
bool throwOnMissingField = true,
FileInfo source,
bool throwOnMissingField = true,
Action<CsvReader> readerHook = null)
{
Contract.Requires(source != null);
Expand All @@ -138,8 +150,8 @@ public static IEnumerable<T> ReadFromCsv<T>(
}

public static IEnumerable<T> ReadFromCsv<T>(
string csvText,
bool throwOnMissingField = true,
string csvText,
bool throwOnMissingField = true,
Action<CsvReader> readerHook = null)
{
Contract.Requires(csvText != null);
Expand Down
41 changes: 40 additions & 1 deletion Acoustics/Acoustics.Test/Shared/CsvTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ namespace Acoustics.Test.Shared
using global::AnalysisBase.ResultBases;
using global::AnalysisPrograms.EventStatistics;
using global::AudioAnalysisTools;
using global::AudioAnalysisTools.EventStatistics;
using global::AudioAnalysisTools.Indices;
using global::TowseyLibrary;
using Microsoft.VisualStudio.TestTools.UnitTesting;
Expand Down Expand Up @@ -298,6 +299,7 @@ public void TestCsvClassMapsAreAutomaticallyRegistered()
var partialExpected = new[]
{
typeof(AcousticEvent.AcousticEventClassMap),
typeof(EventStatisticsClassMap),
typeof(ImportedEvent.ImportedEventNameClassMap),
};

Expand Down Expand Up @@ -373,7 +375,8 @@ public void TestBaseTypesAreNotSerializedAsArray()
Assert.That.StringEqualWithDiff(childExpected, childText);
}

[TestMethod] public void TestBaseTypesAreSerializedAsEnumerable()
[TestMethod]
public void TestBaseTypesAreSerializedAsEnumerable()
{
var exampleIndices = new SummaryIndexValues();
IEnumerable<SummaryIndexValues> childArray = exampleIndices.AsArray().AsEnumerable();
Expand All @@ -390,6 +393,42 @@ [TestMethod] public void TestBaseTypesAreSerializedAsEnumerable()
Assert.AreEqual(childText, baseText);
}

[TestMethod]
public void TestInvariantCultureIsUsed()
{
var now = new DateTime(1234567891011121314);
var nowOffset = new DateTimeOffset(1234567891011121314, TimeSpan.FromHours(10));

Csv.WriteToCsv(
this.testFile,
new[] { new { value = -789123.456, infinity = double.NegativeInfinity, nan = double.NaN, date = now, dateOffset = nowOffset } });

var actual = File.ReadAllText(this.testFile.FullName);
var expected = $@"value,infinity,nan,date,dateOffset
-789123.456,-Infinity,NaN,3913-03-12T00:31:41.1121314,3913-03-12T00:31:41.1121314+10:00
".NormalizeToCrLf();

Assert.AreEqual(expected, actual);

}

[TestMethod]
public void TestInvariantCultureIsUsedMatrix()
{
Csv.WriteMatrixToCsv(
this.testFile,
new[,] { { -789123.456, double.NegativeInfinity, double.NaN } });

var actual = File.ReadAllText(this.testFile.FullName);

var expected = $@"Index,c000000,c000001,c000002
0,-789123.456,-Infinity,NaN
".NormalizeToCrLf();

Assert.AreEqual(expected, actual);

}

private void AssertCsvEqual(string expected, FileInfo actual)
{
var lines = File.ReadAllText(actual.FullName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,13 @@ public async Task<RemoteSegmentWithData[]> GetRemoteMetadata(ImportedEvent[] eve

// the transform block can't `Complete` unless it's output is empty
// so add a buffer block to store the transform block's output
var buffer = new BufferBlock<RemoteSegmentWithData>();
var bufferBlock = new BufferBlock<RemoteSegmentWithData>();

// link the two parts of block A
getRecordingIdBlock.LinkTo(groupRecordingsBlock);

// link the two parts of block B
createSegmentsBlock.LinkTo(buffer);
createSegmentsBlock.LinkTo(bufferBlock);

// kick off the chain, resolve audio recording ids and group
foreach (var record in events)
Expand All @@ -124,9 +124,14 @@ public async Task<RemoteSegmentWithData[]> GetRemoteMetadata(ImportedEvent[] eve
Log.Trace("Finished posting messages to recording id resolver");
getRecordingIdBlock.Complete();

Log.Trace("Begin waiting for ids to resolve");
Log.Trace("Waiting for getRecordingIdBlock to resolve");
await getRecordingIdBlock.Completion;
Log.Trace("Finish waiting for ids to resolve");
Log.Trace("Waiting for groupRecordingsBlock to resolve");
groupRecordingsBlock.Complete();
await groupRecordingsBlock.Completion;

var eventCount = groupedEvents.Sum(kvp => kvp.Value.Count);
Log.Trace($"Finished waiting for recording ids to resolve, {eventCount} events grouped into {groupedEvents.Count} recordings");

// now post the grouped audio recordings to the segment generating block
foreach (var keyValuePair in groupedEvents)
Expand All @@ -142,9 +147,28 @@ public async Task<RemoteSegmentWithData[]> GetRemoteMetadata(ImportedEvent[] eve
await createSegmentsBlock.Completion;
Log.Trace("Finished waiting for metadata downloader");

if (buffer.TryReceiveAll(out var segments))
if (bufferBlock.TryReceiveAll(out var segments))
{
return segments.ToArray();
RemoteSegmentWithData[] segmentsArray;
int finalEventCount;
lock (segments)
{
segmentsArray = segments.ToArray();

// do some excessive logic checking because we used to have race conditions
finalEventCount = segmentsArray.Sum(x => x.Data.Count);
if (events.Length != finalEventCount)
{
throw new InvalidOperationException(
$"The number of supplied events ({events.Length}) did" +
$" not match the number of events that had metadata resolved ({finalEventCount})" +
" - a race condition has occurred");
}
}

Log.Info($"Metadata generated for {finalEventCount} events, {segmentsArray.Length} segments created");

return segmentsArray;
}
else
{
Expand Down
Loading

0 comments on commit 68ffe4c

Please sign in to comment.