-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathStreamingJobCommandHandler.cs
37 lines (33 loc) · 1.56 KB
/
StreamingJobCommandHandler.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
using System;
using System.Threading.Tasks;
using Arcane.Operator.Services.Base;
using Arcane.Operator.Services.Commands;
using k8s.Models;
using Snd.Sdk.Tasks;
namespace Arcane.Operator.Services.CommandHandlers;
/// <inheritdoc cref="ICommandHandler{T}" />
public class StreamingJobCommandHandler : ICommandHandler<StreamingJobCommand>
{
private readonly IStreamClassRepository streamClassRepository;
private readonly IStreamingJobOperatorService streamingJobOperatorService;
public StreamingJobCommandHandler(
IStreamClassRepository streamClassRepository,
IStreamingJobOperatorService streamingJobOperatorService)
{
this.streamClassRepository = streamClassRepository;
this.streamingJobOperatorService = streamingJobOperatorService;
}
/// <inheritdoc cref="ICommandHandler{T}.Handle" />
public Task Handle(StreamingJobCommand command) => command switch
{
StartJob startJob => this.streamClassRepository
.Get(startJob.streamDefinition.Namespace(), startJob.streamDefinition.Kind)
.Map(maybeSc => maybeSc switch
{
{ HasValue: true, Value: var sc } => this.streamingJobOperatorService.StartRegisteredStream(startJob.streamDefinition, startJob.IsBackfilling, sc),
_ => throw new ArgumentOutOfRangeException(nameof(command), command, null)
}),
StopJob stopJob => this.streamingJobOperatorService.DeleteJob(stopJob.streamKind, stopJob.streamId),
_ => throw new ArgumentOutOfRangeException(nameof(command), command, null)
};
}