-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathStreamDefinitionCommands.cs
128 lines (110 loc) · 4.53 KB
/
StreamDefinitionCommands.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
using Arcane.Operator.Contracts;
using Arcane.Operator.Models.Base;
using Arcane.Operator.Models.Resources.Status.V1Alpha1;
using Arcane.Operator.Models.StreamDefinitions.Base;
using Arcane.Operator.Services.Base;
using Arcane.Operator.Services.Base.CommandHandlers;
namespace Arcane.Operator.Models.Commands;
/// <summary>
/// Possible stream states.
/// </summary>
public enum StreamPhase
{
/// <summary>
/// A running stream.
/// </summary>
RUNNING,
/// <summary>
/// A stream that is in a data backfill process.
/// </summary>
RELOADING,
/// <summary>
/// A stream that had been suspended.
/// </summary>
SUSPENDED,
/// <summary>
/// A stream that has failed and cannot be automatically recovered.
/// </summary>
FAILED
}
/// <summary>
/// Abstract class for stream definition commands
/// </summary>
public abstract record StreamDefinitionCommand : KubernetesCommand;
/// <summary>
/// Update the stream definition status
/// </summary>
/// <param name="affectedResource"></param>
/// <param name="conditions"></param>
/// <param name="phase"></param>
public abstract record UpdateStatusCommand(IStreamDefinition affectedResource,
V1Alpha1StreamCondition[] conditions,
StreamPhase phase) : StreamDefinitionCommand;
/// <summary>
/// Abstract class for setting error status
/// </summary>
/// <param name="affectedResource"></param>
public abstract record SetErrorStatus(IStreamDefinition affectedResource) : UpdateStatusCommand(affectedResource,
V1Alpha1StreamCondition.ErrorCondition,
StreamPhase.FAILED);
/// <summary>
/// Abstract class for setting error status
/// </summary>
/// <param name="affectedResource"></param>
public abstract record SetCustomErrorStatus(IStreamDefinition affectedResource, V1Alpha1StreamCondition[] conditions) : UpdateStatusCommand(affectedResource,
conditions,
StreamPhase.FAILED);
/// <summary>
/// Abstract class for setting error status
/// </summary>
/// <param name="affectedResource"></param>
public abstract record SetWarningStatus(IStreamDefinition affectedResource, StreamPhase phase) : UpdateStatusCommand(affectedResource,
V1Alpha1StreamCondition.WarningCondition, phase);
/// <summary>
/// Abstract class for setting error status
/// </summary>
/// <param name="affectedResource"></param>
public abstract record SetReadyStatus(IStreamDefinition affectedResource, StreamPhase phase) : UpdateStatusCommand(affectedResource,
V1Alpha1StreamCondition.ReadyCondition, phase);
/// <summary>
/// Sets the stream definition status to CrashLoop
/// </summary>
/// <param name="affectedResource"></param>
public record SetCrashLoopStatusCommand(IStreamDefinition affectedResource) : SetErrorStatus(affectedResource);
/// <summary>
/// Used to set error statuses in case of internal stream operator errors.
/// </summary>
/// <param name="affectedResource"></param>
public record SetInternalErrorStatus(IStreamDefinition affectedResource, V1Alpha1StreamCondition[] conditions) : SetCustomErrorStatus(affectedResource, conditions);
/// <summary>
/// Sets the stream definition status to Suspended
/// </summary>
/// <param name="affectedResource"></param>
public record Suspended(IStreamDefinition affectedResource) : SetWarningStatus(affectedResource, StreamPhase.SUSPENDED);
/// <summary>
/// Sets the stream definition status to Reloading
/// </summary>
/// <param name="affectedResource"></param>
public record Reloading(IStreamDefinition affectedResource) : SetReadyStatus(affectedResource, StreamPhase.RELOADING);
/// <summary>
/// Sets the stream definition status to Running
/// </summary>
/// <param name="affectedResource"></param>
public record Running(IStreamDefinition affectedResource) : SetReadyStatus(affectedResource, StreamPhase.RUNNING);
/// <summary>
/// Sets the stream definition annotation to indicate that the stream is in a crash loop
/// </summary>
/// <param name="affectedResource">The resource to update</param>
public record SetCrashLoopStatusAnnotationCommand(IStreamDefinition affectedResource) :
SetAnnotationCommand<IStreamDefinition>(
affectedResource,
Annotations.STATE_ANNOTATION_KEY,
Annotations.CRASH_LOOP_STATE_ANNOTATION_VALUE);
/// <summary>
/// Removes the stream definition annotation to indicate that the stream is should restart in backfill mode
/// </summary>
/// <param name="affectedResource">The resource to update</param>
public record RemoveReloadRequestedAnnotation(IStreamDefinition affectedResource) :
RemoveAnnotationCommand<IStreamDefinition>(
affectedResource,
Annotations.STATE_ANNOTATION_KEY);