-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathStreamClassOperatorService.cs
106 lines (94 loc) · 4.21 KB
/
StreamClassOperatorService.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
using System;
using System.Threading;
using System.Threading.Tasks;
using Akka.Streams;
using Akka.Streams.Dsl;
using Akka.Util;
using Arcane.Operator.Configurations;
using Arcane.Operator.Models.StreamClass.Base;
using Arcane.Operator.Services.Base;
using Arcane.Operator.Services.Commands;
using Arcane.Operator.Services.Models;
using k8s;
using k8s.Models;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Snd.Sdk.ActorProviders;
using Directive = Akka.Streams.Supervision.Directive;
namespace Arcane.Operator.Services.Operator;
/// <inheritdoc cref="IStreamClassOperatorService"/>
public class StreamClassOperatorService : IStreamClassOperatorService
{
private const int parallelism = 1;
private readonly StreamClassOperatorServiceConfiguration configuration;
private readonly ILogger<StreamClassOperatorService> logger;
private readonly IStreamClassRepository streamClassRepository;
private readonly IMetricsReporter metricsService;
private readonly IStreamOperatorService streamOperatorService;
private readonly CustomResourceApiRequest request;
private readonly ICommandHandler<SetStreamClassStatusCommand> streamClassStatusCommandHandler;
public StreamClassOperatorService(IOptions<StreamClassOperatorServiceConfiguration> streamOperatorServiceOptions,
IStreamClassRepository streamClassRepository,
IMetricsReporter metricsService,
ILogger<StreamClassOperatorService> logger,
ICommandHandler<SetStreamClassStatusCommand> streamClassStatusCommandHandler,
IStreamOperatorService streamOperatorService)
{
this.configuration = streamOperatorServiceOptions.Value;
this.logger = logger;
this.streamClassRepository = streamClassRepository;
this.metricsService = metricsService;
this.streamOperatorService = streamOperatorService;
this.streamClassStatusCommandHandler = streamClassStatusCommandHandler;
this.request = new CustomResourceApiRequest(
this.configuration.NameSpace,
this.configuration.ApiGroup,
this.configuration.Version,
this.configuration.Plural
);
}
/// <inheritdoc cref="IStreamClassOperatorService.GetStreamClassEventsGraph"/>
public IRunnableGraph<Task> GetStreamClassEventsGraph(CancellationToken cancellationToken)
{
var sink = Sink.ForEachAsync<SetStreamClassStatusCommand>(parallelism, command =>
{
this.streamClassStatusCommandHandler.Handle(command);
return this.streamClassRepository.InsertOrUpdate(command.streamClass, command.phase, command.conditions, command.request.PluralName);
});
return this.streamClassRepository.GetEvents(request, this.configuration.MaxBufferCapacity)
.Via(cancellationToken.AsFlow<ResourceEvent<IStreamClass>>(true))
.Select(this.OnEvent)
.CollectOption()
.Select(streamClass => this.metricsService.ReportStatusMetrics(streamClass))
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(this.HandleError))
.ToMaterialized(sink, Keep.Right);
}
private Option<SetStreamClassStatusCommand> OnEvent(ResourceEvent<IStreamClass> resourceEvent)
{
return resourceEvent switch
{
(WatchEventType.Added, var streamClass) => this.Attach(streamClass),
(WatchEventType.Deleted, var streamClass) => this.Detach(streamClass),
_ => Option<SetStreamClassStatusCommand>.None
};
}
private SetStreamClassReady Attach(IStreamClass streamClass)
{
this.streamOperatorService.Attach(streamClass);
return new SetStreamClassReady(streamClass.Name(), this.request, streamClass);
}
private SetStreamClassStopped Detach(IStreamClass streamClass)
{
this.streamOperatorService.Detach(streamClass);
return new SetStreamClassStopped(streamClass.Name(), this.request, streamClass);
}
private Directive HandleError(Exception exception)
{
this.logger.LogError(exception, "Failed to handle stream definition event");
return exception switch
{
BufferOverflowException => Directive.Stop,
_ => Directive.Resume
};
}
}