Skip to content

Commit

Permalink
wip: proof of concept for datapoint batching.
Browse files Browse the repository at this point in the history
  • Loading branch information
alxbl committed Mar 7, 2024
1 parent e75608a commit eaf5419
Show file tree
Hide file tree
Showing 4 changed files with 304 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,49 @@ internal static class MetricItemExtensions
private static readonly ConcurrentBag<OtlpMetrics.ScopeMetrics> MetricListPool = new();
private static readonly Action<RepeatedField<OtlpMetrics.Metric>, int> RepeatedFieldOfMetricSetCountAction = CreateRepeatedFieldOfMetricSetCountAction();

// [abeaulieu] Overload which takes a pre-converted OtlpMetrics object.
internal static void AddMetrics(
this OtlpCollector.ExportMetricsServiceRequest request,
OtlpResource.Resource processResource,
in OtlpMetrics.Metric otlpMetric,
in Metric metric) // [abeaulieu] [PERF] need the original metric to set the resources stuff. Technically we could build this once and clone the message.
{
var metricsByLibrary = new Dictionary<string, OtlpMetrics.ScopeMetrics>();

// Only add the process resource once per request.
if (request.ResourceMetrics.Count == 0)
{
var r = new OtlpMetrics.ResourceMetrics
{
Resource = processResource,
};

request.ResourceMetrics.Add(r);
}

var resourceMetrics = request.ResourceMetrics[0]; // FIXME: This is ugly but it works for testing.

// TODO: Replace null check with exception handling.
if (otlpMetric == null)
{
OpenTelemetryProtocolExporterEventSource.Log.CouldNotTranslateMetric(
nameof(MetricItemExtensions),
nameof(AddMetrics));
return;
}

var meterName = metric.MeterName;
if (!metricsByLibrary.TryGetValue(meterName, out var scopeMetrics))
{
scopeMetrics = GetMetricListFromPool(meterName, metric.MeterVersion);

metricsByLibrary.Add(meterName, scopeMetrics);
resourceMetrics.ScopeMetrics.Add(scopeMetrics);
}

scopeMetrics.Metrics.Add(otlpMetric);
}

internal static void AddMetrics(
this OtlpCollector.ExportMetricsServiceRequest request,
OtlpResource.Resource processResource,
Expand Down Expand Up @@ -109,6 +152,201 @@ internal static OtlpMetrics.ScopeMetrics GetMetricListFromPool(string name, stri
return metrics;
}

internal static OtlpMetrics.Metric GetMetric(Metric metric)
{
var otlpMetric = new OtlpMetrics.Metric
{
Name = metric.Name,
};

if (metric.Description != null)
{
otlpMetric.Description = metric.Description;
}

if (metric.Unit != null)
{
otlpMetric.Unit = metric.Unit;
}

return otlpMetric;
}

// [abeaulieu] Attempting to batch data points per metric.
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static IEnumerable<OtlpMetrics.Metric> ToBatchedOtlpMetric(this Metric metric, int batchSize = 15000)
{
List<OtlpMetrics.Metric> metrics = new();

// Keep this here since it won't change for this metric.
OtlpMetrics.AggregationTemporality temporality;
if (metric.Temporality == AggregationTemporality.Delta)
{
temporality = OtlpMetrics.AggregationTemporality.Delta;
}
else
{
temporality = OtlpMetrics.AggregationTemporality.Cumulative;
}

var otlpMetric = GetMetric(metric); // In the best case, this only requires one metric message.

switch (metric.MetricType)
{
case MetricType.LongSum:
case MetricType.LongSumNonMonotonic:
{
var sum = new OtlpMetrics.Sum
{
IsMonotonic = metric.MetricType == MetricType.LongSum,
AggregationTemporality = temporality,
};

foreach (ref readonly var metricPoint in metric.GetMetricPoints())
{
var dataPoint = new OtlpMetrics.NumberDataPoint
{
StartTimeUnixNano = (ulong)metricPoint.StartTime.ToUnixTimeNanoseconds(),
TimeUnixNano = (ulong)metricPoint.EndTime.ToUnixTimeNanoseconds(),
};

AddAttributes(metricPoint.Tags, dataPoint.Attributes);

dataPoint.AsInt = metricPoint.GetSumLong();
sum.DataPoints.Add(dataPoint);
}

otlpMetric.Sum = sum;
break;
}

case MetricType.DoubleSum:
case MetricType.DoubleSumNonMonotonic:
{
var sum = new OtlpMetrics.Sum
{
IsMonotonic = metric.MetricType == MetricType.DoubleSum,
AggregationTemporality = temporality,
};

foreach (ref readonly var metricPoint in metric.GetMetricPoints())
{
var dataPoint = new OtlpMetrics.NumberDataPoint
{
StartTimeUnixNano = (ulong)metricPoint.StartTime.ToUnixTimeNanoseconds(),
TimeUnixNano = (ulong)metricPoint.EndTime.ToUnixTimeNanoseconds(),
};

AddAttributes(metricPoint.Tags, dataPoint.Attributes);

dataPoint.AsDouble = metricPoint.GetSumDouble();
sum.DataPoints.Add(dataPoint);
}

otlpMetric.Sum = sum;
break;
}

case MetricType.LongGauge:
{
// FIXME: Cleanup
var gauge = new OtlpMetrics.Gauge();
var curBatch = 0; // Keep a local copy here for performance.

foreach (ref readonly var metricPoint in metric.GetMetricPoints())
{
var dataPoint = new OtlpMetrics.NumberDataPoint
{
StartTimeUnixNano = (ulong)metricPoint.StartTime.ToUnixTimeNanoseconds(),
TimeUnixNano = (ulong)metricPoint.EndTime.ToUnixTimeNanoseconds(),
};
curBatch++;

AddAttributes(metricPoint.Tags, dataPoint.Attributes);

dataPoint.AsInt = metricPoint.GetGaugeLastValueLong();
gauge.DataPoints.Add(dataPoint);

if (curBatch >= batchSize)
{
// This batch is full, let's offload it and create a new metric message.
otlpMetric.Gauge = gauge;
metrics.Add(otlpMetric);
gauge = new OtlpMetrics.Gauge(); // Create a fresh gauge to store the data points.
otlpMetric = GetMetric(metric); // Get a fresh message.
curBatch = 0;
}
}

break;
}

case MetricType.DoubleGauge:
{
var gauge = new OtlpMetrics.Gauge();
foreach (ref readonly var metricPoint in metric.GetMetricPoints())
{
var dataPoint = new OtlpMetrics.NumberDataPoint
{
StartTimeUnixNano = (ulong)metricPoint.StartTime.ToUnixTimeNanoseconds(),
TimeUnixNano = (ulong)metricPoint.EndTime.ToUnixTimeNanoseconds(),
};

AddAttributes(metricPoint.Tags, dataPoint.Attributes);

dataPoint.AsDouble = metricPoint.GetGaugeLastValueDouble();
gauge.DataPoints.Add(dataPoint);
}

otlpMetric.Gauge = gauge;
break;
}

case MetricType.Histogram:
{
var histogram = new OtlpMetrics.Histogram
{
AggregationTemporality = temporality,
};

foreach (ref readonly var metricPoint in metric.GetMetricPoints())
{
var dataPoint = new OtlpMetrics.HistogramDataPoint
{
StartTimeUnixNano = (ulong)metricPoint.StartTime.ToUnixTimeNanoseconds(),
TimeUnixNano = (ulong)metricPoint.EndTime.ToUnixTimeNanoseconds(),
};

AddAttributes(metricPoint.Tags, dataPoint.Attributes);
dataPoint.Count = (ulong)metricPoint.GetHistogramCount();
dataPoint.Sum = metricPoint.GetHistogramSum();

if (metricPoint.TryGetHistogramMinMaxValues(out double min, out double max))
{
dataPoint.Min = min;
dataPoint.Max = max;
}

foreach (var histogramMeasurement in metricPoint.GetHistogramBuckets())
{
dataPoint.BucketCounts.Add((ulong)histogramMeasurement.BucketCount);
if (histogramMeasurement.ExplicitBound != double.PositiveInfinity)
{
dataPoint.ExplicitBounds.Add(histogramMeasurement.ExplicitBound);
}
}

histogram.DataPoints.Add(dataPoint);
}

otlpMetric.Histogram = histogram;
break;
}
}

return metrics;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static OtlpMetrics.Metric ToOtlpMetric(this Metric metric)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;
using OpenTelemetry.Metrics;
using OtlpCollector = OpenTelemetry.Proto.Collector.Metrics.V1;
using OtlpMetrics = OpenTelemetry.Proto.Metrics.V1;
using OtlpResource = OpenTelemetry.Proto.Resource.V1;

namespace OpenTelemetry.Exporter
Expand Down Expand Up @@ -66,26 +67,71 @@ public override ExportResult Export(in Batch<Metric> metrics)
// Prevents the exporter's gRPC and HTTP operations from being instrumented.
using var scope = SuppressInstrumentationScope.Begin();

var request = new OtlpCollector.ExportMetricsServiceRequest();
// [Performance] Need to allow for a message with partial data points but several metrics so we can still batch metrics in a single message
// and minimize the amount of requests to transmit all the data.
var perMetricBatches = new List<OtlpMetrics.Metric>[metrics.Count];
var meterNames = new Metric[metrics.Count]; // [abeaulieu] Would be nice to be able to do that without copying the metrics.

try
var i = 0;
foreach (var metric in metrics)
{
request.AddMetrics(this.ProcessResource, metrics);
perMetricBatches[i] = metric.ToBatchedOtlpMetric().ToList();
meterNames[i] = metric;
++i;
}

// [abeaulieu] Now loop through all the metrics until we've sent all the messages to send
var done = false;
var num_msg = 0;
var wire_size = 0;
while (!done)
{
var request = new OtlpCollector.ExportMetricsServiceRequest();

if (!this.exportClient.SendExportRequest(request))
// Try to make progress on every metric that still has data to send.
for (i = 0; i < perMetricBatches.Length; ++i)
{
done = true; // If all metrics have flushed their data, done will remain true and the loop exits.

var messages = perMetricBatches[i];
if (messages.Count > 0)
{
var m = messages[messages.Count - 1]; // Take the last one so there's no need to list copy to shift the list left.
messages.RemoveAt(messages.Count - 1); // [Perf]: Use a queue or just index manipulation to avoid memory shenanigans.
request.AddMetrics(this.ProcessResource, m, meterNames[i]);
done = false; // Not done until every metric has nothing to send.
num_msg++;
}
}

try
{
wire_size += request.CalculateSize();

// [abeaulieu] This need to be refactored since we will need several requests now.
// request.AddMetrics(this.ProcessResource, metrics);
// [abeaulieu] TODO: Spread the request load?
if (!this.exportClient.SendExportRequest(request))
{
// Fail as soon as one export fails.
// [abeaulieu] Should we try the rest of the messages and only return the result at the end?
// Need to handle partial success here... what happens if only some payloads made it?
// Ideally the telemetry should be split into individual messages that get processed using the retry mechanism/
return ExportResult.Failure;
}
}
catch (Exception ex)
{
OpenTelemetryProtocolExporterEventSource.Log.ExportMethodException(ex);
return ExportResult.Failure;
}
finally
{
request.Return();
}
}
catch (Exception ex)
{
OpenTelemetryProtocolExporterEventSource.Log.ExportMethodException(ex);
return ExportResult.Failure;
}
finally
{
request.Return();
}

Console.WriteLine($"\nSent {num_msg} messages totaling {wire_size / 1024.0f / 1024.0f} MB of data");

return ExportResult.Success;
}
Expand Down
6 changes: 6 additions & 0 deletions src/OpenTelemetry/Internal/OpenTelemetrySdkEventSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,12 @@ public void InvalidEnvironmentVariable(string key, string value)
this.WriteEvent(47, key, value);
}

[Event(48, Message = "PeriodicExportingMeterReader call to Collect() took {0} ms", Level = EventLevel.Informational)]
public void MetricCollectCompleted(string elapsedMs)
{
this.WriteEvent(48, elapsedMs);
}

#if DEBUG
public class OpenTelemetryEventListener : EventListener
{
Expand Down
1 change: 1 addition & 0 deletions src/OpenTelemetry/Metrics/MetricReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ protected virtual bool OnCollect(int timeoutMilliseconds)
return false;
}

OpenTelemetrySdkEventSource.Log.MetricCollectCompleted(sw.ElapsedMilliseconds.ToString());
OpenTelemetrySdkEventSource.Log.MetricReaderEvent("ProcessMetrics called.");
result = this.ProcessMetrics(metrics, (int)timeout);
if (result)
Expand Down

0 comments on commit eaf5419

Please sign in to comment.