-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathV1JobExtensions.cs
154 lines (129 loc) · 5.23 KB
/
V1JobExtensions.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
using System;
using System.Collections.Generic;
using Arcane.Operator.Contracts;
using Arcane.Operator.Exceptions;
using Arcane.Operator.Models.Api;
using Arcane.Operator.Models.Resources.StreamClass.Base;
using Arcane.Operator.Models.StreamDefinitions.Base;
using k8s.Models;
using Snd.Sdk.Kubernetes;
namespace Arcane.Operator.Extensions;
public static class V1JobExtensions
{
public const string STREAM_KIND_LABEL = "arcane/stream-kind";
public const string STREAM_ID_LABEL = "arcane/stream-id";
public const string BACK_FILL_LABEL = "arcane/backfilling";
public static V1Job WithStreamingJobLabels(this V1Job job, string streamId, bool isBackfilling, string streamKind)
{
return job.WithLabels(new Dictionary<string, string>
{
{ STREAM_ID_LABEL, streamId },
{ STREAM_KIND_LABEL, streamKind },
{ BACK_FILL_LABEL, isBackfilling.ToString().ToLowerInvariant() }
});
}
public static V1Job WithStreamingJobAnnotations(this V1Job job, string configurationChecksum)
{
return job.WithAnnotations(new Dictionary<string, string>
{
{ Annotations.CONFIGURATION_CHECKSUM_ANNOTATION_KEY, configurationChecksum },
});
}
public static V1Job WithMetadataAnnotations(this V1Job job, IStreamClass streamClass)
{
return job.WithAnnotations(new Dictionary<string, string>
{
{ Annotations.ARCANE_STREAM_API_GROUP, streamClass.ApiGroupRef},
{ Annotations.ARCANE_STREAM_API_VERSION, streamClass.VersionRef},
{ Annotations.ARCANE_STREAM_API_PLURAL_NAME, streamClass.PluralNameRef},
});
}
public static string GetStreamId(this V1Job job)
{
return job.Name();
}
public static string GetStreamKind(this V1Job job)
{
if (job.Labels() != null && job.Labels().TryGetValue(STREAM_KIND_LABEL, out var value))
{
return value;
}
return string.Empty;
}
public static CustomResourceApiRequest ToOwnerApiRequest(this V1Job job)
{
return new CustomResourceApiRequest(job.Namespace(), job.GetApiGroup(), job.GetApiVersion(), job.GetPluralName());
}
public static string GetConfigurationChecksum(this V1Job job)
{
if (job.Annotations() != null && job.Annotations().TryGetValue(
Annotations.CONFIGURATION_CHECKSUM_ANNOTATION_KEY,
out var value))
{
return value;
}
return string.Empty;
}
public static bool ConfigurationMatches(this V1Job job, IStreamDefinition streamDefinition) =>
job.GetConfigurationChecksum() == streamDefinition.GetConfigurationChecksum();
public static bool IsStopRequested(this V1Job job)
{
return job.Annotations() != null
&& job.Annotations().TryGetValue(Annotations.STATE_ANNOTATION_KEY, out var value)
&& value == Annotations.TERMINATE_REQUESTED_STATE_ANNOTATION_VALUE;
}
public static bool IsRestartRequested(this V1Job job)
{
return job.Annotations() != null
&& job.Annotations().TryGetValue(Annotations.STATE_ANNOTATION_KEY, out var value)
&& value == Annotations.RESTARTING_STATE_ANNOTATION_VALUE;
}
public static bool IsReloadRequested(this V1Job job)
{
return job.Annotations() != null
&& job.Annotations().TryGetValue(Annotations.STATE_ANNOTATION_KEY, out var value)
&& value == Annotations.RELOADING_STATE_ANNOTATION_VALUE;
}
public static bool IsReloading(this V1Job job)
{
return job.Labels() != null
&& job.Labels().TryGetValue(BACK_FILL_LABEL, out var value)
&& value == "true";
}
public static bool IsSchemaMismatch(this V1Job job)
{
return job.Annotations() != null
&& job.Annotations().TryGetValue(Annotations.STATE_ANNOTATION_KEY, out var value)
&& value == Annotations.SCHEMA_MISMATCH_STATE_ANNOTATION_VALUE;
}
public static bool IsStopping(this V1Job job)
{
return job.Annotations() != null
&& job.Annotations().TryGetValue(Annotations.STATE_ANNOTATION_KEY, out var value)
&& value == Annotations.TERMINATING_STATE_ANNOTATION_VALUE;
}
private static string GetApiGroup(this V1Job job)
{
if (job.Annotations() != null && job.Annotations().TryGetValue(Annotations.ARCANE_STREAM_API_GROUP, out var value))
{
return value;
}
throw new JobListenerException("Api group not found in job annotations.");
}
private static string GetApiVersion(this V1Job job)
{
if (job.Annotations() != null && job.Annotations().TryGetValue(Annotations.ARCANE_STREAM_API_VERSION, out var value))
{
return value;
}
throw new JobListenerException("Api version not found in job annotations.");
}
private static string GetPluralName(this V1Job job)
{
if (job.Annotations() != null && job.Annotations().TryGetValue(Annotations.ARCANE_STREAM_API_PLURAL_NAME, out var value))
{
return value;
}
throw new JobListenerException("Api plural name version not found in job annotations.");
}
}