-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathStreamClassOperatorService.cs
130 lines (115 loc) · 5.26 KB
/
StreamClassOperatorService.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka.Streams;
using Akka.Streams.Dsl;
using Akka.Streams.Supervision;
using Akka.Util;
using Arcane.Operator.Configurations;
using Arcane.Operator.Models;
using Arcane.Operator.Models.StreamClass.Base;
using Arcane.Operator.Services.Base;
using Arcane.Operator.Services.Metrics;
using Arcane.Operator.Services.Models;
using k8s;
using k8s.Models;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Snd.Sdk.ActorProviders;
using Snd.Sdk.Metrics.Base;
namespace Arcane.Operator.Services.Operator;
/// <inheritdoc cref="IStreamClassOperatorService"/>
public class StreamClassOperatorService : IStreamClassOperatorService
{
private const int parallelism = 1;
private readonly StreamClassOperatorServiceConfiguration configuration;
private readonly Dictionary<string, StreamOperatorServiceWorker> streams = new();
private readonly ILogger<StreamClassOperatorService> logger;
private readonly IStreamClassRepository streamClassRepository;
private readonly IStreamOperatorServiceWorkerFactory streamOperatorServiceWorkerFactory;
private readonly IMetricsReporter metricsService;
public StreamClassOperatorService(IOptions<StreamClassOperatorServiceConfiguration> streamOperatorServiceOptions,
IStreamOperatorServiceWorkerFactory streamOperatorServiceWorkerFactory,
IStreamClassRepository streamClassRepository,
IMetricsReporter metricsService,
ILogger<StreamClassOperatorService> logger)
{
this.configuration = streamOperatorServiceOptions.Value;
this.logger = logger;
this.streamClassRepository = streamClassRepository;
this.streamOperatorServiceWorkerFactory = streamOperatorServiceWorkerFactory;
this.metricsService = metricsService;
}
/// <inheritdoc cref="IStreamClassOperatorService.GetStreamClassEventsGraph"/>
public IRunnableGraph<Task> GetStreamClassEventsGraph(CancellationToken cancellationToken)
{
var sink = Sink.ForEachAsync<StreamClassOperatorResponse>(parallelism, response =>
{
this.logger.LogInformation("The phase of the stream class {namespace}/{name} changed to {status}",
response.StreamClass.Metadata.Namespace(),
response.StreamClass.Metadata.Name,
response.Phase);
return this.streamClassRepository.InsertOrUpdate(response.StreamClass, response.Phase, response.Conditions, this.configuration.Plural);
});
var request = new CustomResourceApiRequest(
this.configuration.NameSpace,
this.configuration.ApiGroup,
this.configuration.Version,
this.configuration.Plural
);
return this.streamClassRepository.GetEvents(request, this.configuration.MaxBufferCapacity)
.Via(cancellationToken.AsFlow<ResourceEvent<IStreamClass>>(true))
.Select(this.metricsService.ReportTrafficMetrics)
.Select(this.OnEvent)
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(this.HandleError))
.CollectOption()
.Select(this.metricsService.ReportStatusMetrics)
.ToMaterialized(sink, Keep.Right);
}
private Option<StreamClassOperatorResponse> OnEvent(ResourceEvent<IStreamClass> resourceEvent)
{
return resourceEvent switch
{
(WatchEventType.Added, var streamClass) => this.OnAdded(streamClass),
(WatchEventType.Modified, var streamClass) => this.OnModified(streamClass),
(WatchEventType.Deleted, var streamClass) => this.OnDeleted(streamClass),
_ => Option<StreamClassOperatorResponse>.None
};
}
private Directive HandleError(Exception exception)
{
this.logger.LogError(exception, "Failed to handle stream definition event");
return exception switch
{
BufferOverflowException => Directive.Stop,
_ => Directive.Resume
};
}
private Option<StreamClassOperatorResponse> OnAdded(IStreamClass streamClass) => this.StartStreamWorker(streamClass);
private Option<StreamClassOperatorResponse> OnDeleted(IStreamClass streamClass) => this.StopStreamWorker(streamClass);
private Option<StreamClassOperatorResponse> OnModified(IStreamClass streamClass)
{
this.StopStreamWorker(streamClass);
return this.StartStreamWorker(streamClass);
}
private Option<StreamClassOperatorResponse> StartStreamWorker(IStreamClass streamClass)
{
if (!this.streams.ContainsKey(streamClass.ToStreamClassId()))
{
var listener = this.streamOperatorServiceWorkerFactory.Create(streamClass);
this.streams[streamClass.ToStreamClassId()] = listener;
this.streams[streamClass.ToStreamClassId()].Start(streamClass.ToStreamClassId());
}
return StreamClassOperatorResponse.Ready(streamClass);
}
private Option<StreamClassOperatorResponse> StopStreamWorker(IStreamClass streamClass)
{
if (this.streams.ContainsKey(streamClass.ToStreamClassId()))
{
this.streams[streamClass.ToStreamClassId()].Stop();
}
return StreamClassOperatorResponse.Stopped(streamClass);
}
}