-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathStreamDefinitionCommands.cs
89 lines (77 loc) · 3.43 KB
/
StreamDefinitionCommands.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
using Arcane.Operator.Models;
using Arcane.Operator.Models.StreamDefinitions.Base;
using Arcane.Operator.Models.StreamStatuses.StreamStatus.V1Beta1;
using Arcane.Operator.Services.Base;
using Arcane.Operator.StreamingJobLifecycle;
using k8s;
using k8s.Models;
namespace Arcane.Operator.Services.Commands;
/// <summary>
/// Abstract class for stream definition commands
/// </summary>
public abstract record StreamDefinitionCommand : KubernetesCommand;
/// <summary>
/// Update the stream definition status
/// </summary>
/// <param name="affectedResource"></param>
/// <param name="conditions"></param>
/// <param name="phase"></param>
public abstract record UpdateStatusCommand(IStreamDefinition affectedResource,
V1Beta1StreamCondition[] conditions,
StreamPhase phase) : StreamDefinitionCommand;
/// <summary>
/// Abstract class for setting error status
/// </summary>
/// <param name="affectedResource"></param>
public abstract record SetErrorStatus(IStreamDefinition affectedResource) : UpdateStatusCommand(affectedResource,
V1Beta1StreamCondition.ErrorCondition,
StreamPhase.FAILED);
/// <summary>
/// Abstract class for setting error status
/// </summary>
/// <param name="affectedResource"></param>
public abstract record SetWarningStatus(IStreamDefinition affectedResource, StreamPhase phase) : UpdateStatusCommand(affectedResource,
V1Beta1StreamCondition.WarningCondition, phase);
/// <summary>
/// Abstract class for setting error status
/// </summary>
/// <param name="affectedResource"></param>
public abstract record SetReadyStatus(IStreamDefinition affectedResource, StreamPhase phase) : UpdateStatusCommand(affectedResource,
V1Beta1StreamCondition.ReadyCondition, phase);
/// <summary>
/// Sets the stream definition status to CrashLoop
/// </summary>
/// <param name="affectedResource"></param>
public record SetCrashLoopStatusCommand(IStreamDefinition affectedResource) : SetErrorStatus(affectedResource);
/// <summary>
/// Sets the stream definition status to Suspended
/// </summary>
/// <param name="affectedResource"></param>
public record Suspended(IStreamDefinition affectedResource) : SetWarningStatus(affectedResource, StreamPhase.SUSPENDED);
/// <summary>
/// Sets the stream definition status to Reloading
/// </summary>
/// <param name="affectedResource"></param>
public record Reloading(IStreamDefinition affectedResource) : SetReadyStatus(affectedResource, StreamPhase.RELOADING);
/// <summary>
/// Sets the stream definition status to Running
/// </summary>
/// <param name="affectedResource"></param>
public record Running(IStreamDefinition affectedResource) : SetReadyStatus(affectedResource, StreamPhase.RELOADING);
/// <summary>
/// Sets the stream definition annotation to indicate that the stream is in a crash loop
/// </summary>
/// <param name="affectedResource">The resource to update</param>
public record SetCrashLoopStatusAnnotationCommand(IStreamDefinition affectedResource) :
SetAnnotationCommand<IStreamDefinition>(
affectedResource,
Annotations.STATE_ANNOTATION_KEY,
Annotations.CRASH_LOOP_STATE_ANNOTATION_VALUE);
/// <summary>
/// Removes the stream definition annotation to indicate that the stream is should restart in backfill mode
/// </summary>
/// <param name="affectedResource">The resource to update</param>
public record RemoveReloadRequestedAnnotation(IStreamDefinition affectedResource) :
RemoveAnnotationCommand<IStreamDefinition>(
affectedResource,
Annotations.STATE_ANNOTATION_KEY);