Skip to content

Commit

Permalink
[otlp] OTLP Exporter Custom serializer - (Part 2) Histogram and Expon…
Browse files Browse the repository at this point in the history
…ential Histogram (#5962)
  • Loading branch information
TimothyMothra authored Nov 12, 2024
1 parent d113ecf commit ae3feb9
Show file tree
Hide file tree
Showing 4 changed files with 226 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ private static int WriteMetric(byte[] buffer, int writePosition, Metric metric)
writePosition = ProtobufSerializer.WriteStringWithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.Metric_Unit, metric.Unit);
}

var aggregationValue = metric.Temporality == AggregationTemporality.Cumulative
? ProtobufOtlpMetricFieldNumberConstants.Aggregation_Temporality_Cumulative
: ProtobufOtlpMetricFieldNumberConstants.Aggregation_Temporality_Delta;

switch (metric.MetricType)
{
case MetricType.LongSum:
Expand All @@ -150,7 +154,7 @@ private static int WriteMetric(byte[] buffer, int writePosition, Metric metric)
writePosition += ReserveSizeForLength;

writePosition = ProtobufSerializer.WriteBoolWithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.Sum_Is_Monotonic, metric.MetricType == MetricType.LongSum);
writePosition = ProtobufSerializer.WriteEnumWithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.Sum_Aggregation_Temporality, metric.Temporality == AggregationTemporality.Cumulative ? 2 : 1);
writePosition = ProtobufSerializer.WriteEnumWithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.Sum_Aggregation_Temporality, aggregationValue);

foreach (ref readonly var metricPoint in metric.GetMetricPoints())
{
Expand All @@ -170,7 +174,7 @@ private static int WriteMetric(byte[] buffer, int writePosition, Metric metric)
writePosition += ReserveSizeForLength;

writePosition = ProtobufSerializer.WriteBoolWithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.Sum_Is_Monotonic, metric.MetricType == MetricType.DoubleSum);
writePosition = ProtobufSerializer.WriteEnumWithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.Sum_Aggregation_Temporality, metric.Temporality == AggregationTemporality.Cumulative ? 2 : 1);
writePosition = ProtobufSerializer.WriteEnumWithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.Sum_Aggregation_Temporality, aggregationValue);

foreach (ref readonly var metricPoint in metric.GetMetricPoints())
{
Expand Down Expand Up @@ -216,11 +220,134 @@ private static int WriteMetric(byte[] buffer, int writePosition, Metric metric)

case MetricType.Histogram:
{
writePosition = ProtobufSerializer.WriteTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.Metric_Data_Histogram, ProtobufWireType.LEN);
int metricTypeLengthPosition = writePosition;
writePosition += ReserveSizeForLength;

writePosition = ProtobufSerializer.WriteEnumWithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.Histogram_Aggregation_Temporality, aggregationValue);

foreach (ref readonly var metricPoint in metric.GetMetricPoints())
{
writePosition = ProtobufSerializer.WriteTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.Histogram_Data_Points, ProtobufWireType.LEN);
int dataPointLengthPosition = writePosition;
writePosition += ReserveSizeForLength;

var startTime = (ulong)metricPoint.StartTime.ToUnixTimeNanoseconds();
writePosition = ProtobufSerializer.WriteFixed64WithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.HistogramDataPoint_Start_Time_Unix_Nano, startTime);

var endTime = (ulong)metricPoint.EndTime.ToUnixTimeNanoseconds();
writePosition = ProtobufSerializer.WriteFixed64WithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.HistogramDataPoint_Time_Unix_Nano, endTime);

foreach (var tag in metricPoint.Tags)
{
writePosition = WriteTag(buffer, writePosition, tag, ProtobufOtlpMetricFieldNumberConstants.HistogramDataPoint_Attributes);
}

var count = (ulong)metricPoint.GetHistogramCount();
writePosition = ProtobufSerializer.WriteFixed64WithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.HistogramDataPoint_Count, count);

var sum = metricPoint.GetHistogramSum();
writePosition = ProtobufSerializer.WriteDoubleWithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.HistogramDataPoint_Sum, sum);

if (metricPoint.TryGetHistogramMinMaxValues(out double min, out double max))
{
writePosition = ProtobufSerializer.WriteDoubleWithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.HistogramDataPoint_Min, min);
writePosition = ProtobufSerializer.WriteDoubleWithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.HistogramDataPoint_Max, max);
}

foreach (var histogramMeasurement in metricPoint.GetHistogramBuckets())
{
var bucketCount = (ulong)histogramMeasurement.BucketCount;
writePosition = ProtobufSerializer.WriteFixed64WithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.HistogramDataPoint_Bucket_Counts, bucketCount);

if (histogramMeasurement.ExplicitBound != double.PositiveInfinity)
{
writePosition = ProtobufSerializer.WriteDoubleWithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.HistogramDataPoint_Explicit_Bounds, histogramMeasurement.ExplicitBound);
}
}

if (metricPoint.TryGetExemplars(out var exemplars))
{
foreach (ref readonly var exemplar in exemplars)
{
writePosition = WriteExemplar(buffer, writePosition, in exemplar, exemplar.DoubleValue, ProtobufOtlpMetricFieldNumberConstants.HistogramDataPoint_Exemplars);
}
}

ProtobufSerializer.WriteReservedLength(buffer, dataPointLengthPosition, writePosition - (dataPointLengthPosition + ReserveSizeForLength));
}

ProtobufSerializer.WriteReservedLength(buffer, metricTypeLengthPosition, writePosition - (metricTypeLengthPosition + ReserveSizeForLength));
break;
}

case MetricType.ExponentialHistogram:
{
writePosition = ProtobufSerializer.WriteTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.Metric_Data_Exponential_Histogram, ProtobufWireType.LEN);
int metricTypeLengthPosition = writePosition;
writePosition += ReserveSizeForLength;

writePosition = ProtobufSerializer.WriteEnumWithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.ExponentialHistogram_Aggregation_Temporality, aggregationValue);

foreach (ref readonly var metricPoint in metric.GetMetricPoints())
{
writePosition = ProtobufSerializer.WriteTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.ExponentialHistogram_Data_Points, ProtobufWireType.LEN);
int dataPointLengthPosition = writePosition;
writePosition += ReserveSizeForLength;

var startTime = (ulong)metricPoint.StartTime.ToUnixTimeNanoseconds();
writePosition = ProtobufSerializer.WriteFixed64WithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.ExponentialHistogramDataPoint_Start_Time_Unix_Nano, startTime);

var endTime = (ulong)metricPoint.EndTime.ToUnixTimeNanoseconds();
writePosition = ProtobufSerializer.WriteFixed64WithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.ExponentialHistogramDataPoint_Time_Unix_Nano, endTime);

foreach (var tag in metricPoint.Tags)
{
writePosition = WriteTag(buffer, writePosition, tag, ProtobufOtlpMetricFieldNumberConstants.ExponentialHistogramDataPoint_Attributes);
}

var sum = metricPoint.GetHistogramSum();
writePosition = ProtobufSerializer.WriteDoubleWithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.ExponentialHistogramDataPoint_Sum, sum);

var count = (ulong)metricPoint.GetHistogramCount();
writePosition = ProtobufSerializer.WriteFixed64WithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.ExponentialHistogramDataPoint_Count, count);

if (metricPoint.TryGetHistogramMinMaxValues(out double min, out double max))
{
writePosition = ProtobufSerializer.WriteDoubleWithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.ExponentialHistogramDataPoint_Min, min);
writePosition = ProtobufSerializer.WriteDoubleWithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.ExponentialHistogramDataPoint_Max, max);
}

var exponentialHistogramData = metricPoint.GetExponentialHistogramData();

writePosition = ProtobufSerializer.WriteSInt32WithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.ExponentialHistogramDataPoint_Scale, exponentialHistogramData.Scale);
writePosition = ProtobufSerializer.WriteFixed64WithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.ExponentialHistogramDataPoint_Zero_Count, (ulong)exponentialHistogramData.ZeroCount);

writePosition = ProtobufSerializer.WriteTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.ExponentialHistogramDataPoint_Positive, ProtobufWireType.LEN);
int positiveBucketsLengthPosition = writePosition;
writePosition += ReserveSizeForLength;

writePosition = ProtobufSerializer.WriteSInt32WithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.ExponentialHistogramDataPoint_Buckets_Offset, exponentialHistogramData.PositiveBuckets.Offset);

foreach (var bucketCount in exponentialHistogramData.PositiveBuckets)
{
writePosition = ProtobufSerializer.WriteInt64WithTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.ExponentialHistogramDataPoint_Buckets_Bucket_Counts, (ulong)bucketCount);
}

ProtobufSerializer.WriteReservedLength(buffer, positiveBucketsLengthPosition, writePosition - (positiveBucketsLengthPosition + ReserveSizeForLength));

if (metricPoint.TryGetExemplars(out var exemplars))
{
foreach (ref readonly var exemplar in exemplars)
{
writePosition = WriteExemplar(buffer, writePosition, in exemplar, exemplar.DoubleValue, ProtobufOtlpMetricFieldNumberConstants.ExponentialHistogramDataPoint_Exemplars);
}
}

ProtobufSerializer.WriteReservedLength(buffer, dataPointLengthPosition, writePosition - (dataPointLengthPosition + ReserveSizeForLength));
}

ProtobufSerializer.WriteReservedLength(buffer, metricTypeLengthPosition, writePosition - (metricTypeLengthPosition + ReserveSizeForLength));
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,17 @@ internal static int WriteFixed64WithTag(byte[] buffer, int writePosition, int fi
return writePosition;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static int WriteSInt32WithTag(byte[] buffer, int writePosition, int fieldNumber, int value)
{
writePosition = WriteTag(buffer, writePosition, fieldNumber, ProtobufWireType.VARINT);

// https://protobuf.dev/programming-guides/encoding/#signed-ints
writePosition = WriteVarInt32(buffer, writePosition, (uint)((value << 1) ^ (value >> 31)));

return writePosition;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static int WriteVarInt32(byte[] buffer, int writePosition, uint value)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,24 @@ public void WriteDoubleWithTag_WritesCorrectly()
Assert.Equal(0x40, buffer[8]);
}

[Fact]
public void WriteSignedInt32_WritesCorrectly()
{
byte[] buffer = new byte[10];
int position = ProtobufSerializer.WriteSInt32WithTag(buffer, 0, 1, 300);
Assert.Equal(3, position);
Assert.Equal(8, buffer[0]); // Tag
Assert.Equal(0xD8, buffer[1]);
Assert.Equal(0x04, buffer[2]);

buffer = new byte[10];
position = ProtobufSerializer.WriteSInt32WithTag(buffer, 0, 1, -300);
Assert.Equal(3, position);
Assert.Equal(8, buffer[0]); // Tag
Assert.Equal(0xD7, buffer[1]);
Assert.Equal(0x04, buffer[2]);
}

[Fact]
public void WriteVarInt32_WritesCorrectly()
{
Expand Down
Loading

0 comments on commit ae3feb9

Please sign in to comment.