-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathMetricsPublisherActor.cs
106 lines (90 loc) · 3.65 KB
/
MetricsPublisherActor.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
using System.Collections.Generic;
using Akka.Actor;
using Akka.Event;
using Arcane.Operator.Configurations;
using Snd.Sdk.Metrics.Base;
namespace Arcane.Operator.Services.Metrics.Actors;
/// <summary>
/// Add stream class metrics message. Once received, the metrics will be added to the
/// metrics collection in the <see cref="StreamKindRef"/> actor.
/// </summary>
/// <param name="MetricName">Name of the stream kind referenced by the stream class</param>
/// <param name="MetricTags">Name of the metric to report</param>
/// <param name="MetricTags">Tags of the metric to report</param>
public record AddStreamClassMetricsMessage(string StreamKindRef, string MetricName,
SortedDictionary<string, string> MetricTags);
/// <summary>
/// Remove stream class metrics message. Once received, the metrics will be removed from the
/// metrics collection in the <see cref="StreamKindRef"/> actor.
/// </summary>
/// <param name="StreamKindRef">Name of the stream kind referenced by the stream class</param>
public record RemoveStreamClassMetricsMessage(string StreamKindRef);
/// <summary>
/// Emit metrics message. Once received, the metrics will be emitted to the metrics service.
/// This message is emitted periodically by the <see cref="MetricsPublisherActor"/> actor.
/// </summary>
public record EmitMetricsMessage;
/// <summary>
/// A metric collection element for a stream class.
/// </summary>
public class StreamClassMetric
{
/// <summary>
/// Name of the metric to report.
/// </summary>
public string MetricName { get; init; }
/// <summary>
/// Tags of the metric to report.
/// </summary>
public SortedDictionary<string, string> MetricTags { get; init; }
/// <summary>
/// Metric Value
/// </summary>
public int MetricValue { get; set; } = 1;
}
/// <summary>
/// Stream class service actor. This actor is responsible for collecting metrics for stream classes
/// that should be emitted periodically.
/// </summary>
public class MetricsPublisherActor : ReceiveActor, IWithTimers
{
public ITimerScheduler Timers { get; set; }
private readonly Dictionary<string, StreamClassMetric> streamClassMetrics = new();
private readonly ILoggingAdapter Log = Context.GetLogger();
private readonly MetricsPublisherActorConfiguration configuration;
public MetricsPublisherActor(MetricsPublisherActorConfiguration configuration, MetricsService metricsService)
{
this.configuration = configuration;
this.Receive<AddStreamClassMetricsMessage>(s =>
{
this.Log.Debug("Adding stream class metrics for {streamKindRef}", s.StreamKindRef);
this.streamClassMetrics[s.StreamKindRef] = new StreamClassMetric
{
MetricTags = s.MetricTags,
MetricName = s.MetricName,
};
});
this.Receive<RemoveStreamClassMetricsMessage>(s =>
{
if (!this.streamClassMetrics.Remove(s.StreamKindRef))
{
this.Log.Warning("Stream class {streamKindRef} not found in metrics collection", s.StreamKindRef);
}
});
this.Receive<EmitMetricsMessage>(_ =>
{
this.Log.Debug("Start emitting stream class metrics");
foreach (var (_, metric) in this.streamClassMetrics)
{
metricsService.Count(metric.MetricName, metric.MetricValue, metric.MetricTags);
}
});
}
protected override void PreStart()
{
this.Timers.StartPeriodicTimer(nameof(EmitMetricsMessage),
new EmitMetricsMessage(),
this.configuration.InitialDelay,
this.configuration.UpdateInterval);
}
}