-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
Copy pathresource_manager.go
1935 lines (1781 loc) · 79.4 KB
/
resource_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2018 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package resource
import (
"context"
"encoding/json"
"fmt"
scheduledworkflow "github.com/kubeflow/pipelines/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1"
"io"
"net"
"reflect"
"strconv"
"github.com/cenkalti/backoff"
"github.com/golang/glog"
"github.com/kubeflow/pipelines/backend/src/apiserver/archive"
kfpauth "github.com/kubeflow/pipelines/backend/src/apiserver/auth"
"github.com/kubeflow/pipelines/backend/src/apiserver/client"
"github.com/kubeflow/pipelines/backend/src/apiserver/common"
"github.com/kubeflow/pipelines/backend/src/apiserver/list"
"github.com/kubeflow/pipelines/backend/src/apiserver/model"
"github.com/kubeflow/pipelines/backend/src/apiserver/storage"
"github.com/kubeflow/pipelines/backend/src/apiserver/template"
exec "github.com/kubeflow/pipelines/backend/src/common"
"github.com/kubeflow/pipelines/backend/src/common/util"
scheduledworkflowclient "github.com/kubeflow/pipelines/backend/src/crd/pkg/client/clientset/versioned/typed/scheduledworkflow/v1beta1"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/grpc/codes"
authorizationv1 "k8s.io/api/authorization/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
)
// Metric variables. Please prefix the metric names with resource_manager_.
var (
extraLabels = []string{
// display in which Kubeflow namespace the runs were triggered
"profile",
// display workflow name
"workflow",
}
// Count the removed workflows due to garbage collection.
workflowGCCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "resource_manager_workflow_gc",
Help: "The number of gabarage-collected workflows",
})
// Count the successfull workflow runs
workflowSuccessCounter = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "resource_manager_workflow_runs_success",
Help: "The current number of successfully workflows runs",
}, extraLabels)
// Count the failed workflow runs
workflowFailedCounter = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "resource_manager_workflow_runs_failed",
Help: "The current number of failed workflows runs",
}, extraLabels)
)
type ClientManagerInterface interface {
ExperimentStore() storage.ExperimentStoreInterface
PipelineStore() storage.PipelineStoreInterface
JobStore() storage.JobStoreInterface
RunStore() storage.RunStoreInterface
TaskStore() storage.TaskStoreInterface
ResourceReferenceStore() storage.ResourceReferenceStoreInterface
DBStatusStore() storage.DBStatusStoreInterface
DefaultExperimentStore() storage.DefaultExperimentStoreInterface
ObjectStore() storage.ObjectStoreInterface
ExecClient() util.ExecutionClient
SwfClient() client.SwfClientInterface
KubernetesCoreClient() client.KubernetesCoreInterface
SubjectAccessReviewClient() client.SubjectAccessReviewInterface
TokenReviewClient() client.TokenReviewInterface
LogArchive() archive.LogArchiveInterface
Time() util.TimeInterface
UUID() util.UUIDGeneratorInterface
Authenticators() []kfpauth.Authenticator
}
type ResourceManagerOptions struct {
CollectMetrics bool `json:"collect_metrics,omitempty"`
}
type ResourceManager struct {
experimentStore storage.ExperimentStoreInterface
pipelineStore storage.PipelineStoreInterface
jobStore storage.JobStoreInterface
runStore storage.RunStoreInterface
taskStore storage.TaskStoreInterface
resourceReferenceStore storage.ResourceReferenceStoreInterface
dBStatusStore storage.DBStatusStoreInterface
defaultExperimentStore storage.DefaultExperimentStoreInterface
objectStore storage.ObjectStoreInterface
execClient util.ExecutionClient
swfClient client.SwfClientInterface
k8sCoreClient client.KubernetesCoreInterface
subjectAccessReviewClient client.SubjectAccessReviewInterface
tokenReviewClient client.TokenReviewInterface
logArchive archive.LogArchiveInterface
time util.TimeInterface
uuid util.UUIDGeneratorInterface
authenticators []kfpauth.Authenticator
options *ResourceManagerOptions
}
func NewResourceManager(clientManager ClientManagerInterface, options *ResourceManagerOptions) *ResourceManager {
return &ResourceManager{
experimentStore: clientManager.ExperimentStore(),
pipelineStore: clientManager.PipelineStore(),
jobStore: clientManager.JobStore(),
runStore: clientManager.RunStore(),
taskStore: clientManager.TaskStore(),
resourceReferenceStore: clientManager.ResourceReferenceStore(),
dBStatusStore: clientManager.DBStatusStore(),
defaultExperimentStore: clientManager.DefaultExperimentStore(),
objectStore: clientManager.ObjectStore(),
execClient: clientManager.ExecClient(),
swfClient: clientManager.SwfClient(),
k8sCoreClient: clientManager.KubernetesCoreClient(),
subjectAccessReviewClient: clientManager.SubjectAccessReviewClient(),
tokenReviewClient: clientManager.TokenReviewClient(),
logArchive: clientManager.LogArchive(),
time: clientManager.Time(),
uuid: clientManager.UUID(),
authenticators: clientManager.Authenticators(),
options: options,
}
}
func (r *ResourceManager) getWorkflowClient(namespace string) util.ExecutionInterface {
return r.execClient.Execution(namespace)
}
func (r *ResourceManager) getScheduledWorkflowClient(namespace string) scheduledworkflowclient.ScheduledWorkflowInterface {
return r.swfClient.ScheduledWorkflow(namespace)
}
// Creates a new experiment.
func (r *ResourceManager) CreateExperiment(experiment *model.Experiment) (*model.Experiment, error) {
if common.IsMultiUserMode() {
if experiment.Namespace == "" {
return nil, util.NewInvalidInputError("Namespace cannot be empty")
}
}
return r.experimentStore.CreateExperiment(experiment)
}
// Fetches an experiment with the given id.
func (r *ResourceManager) GetExperiment(experimentId string) (*model.Experiment, error) {
return r.experimentStore.GetExperiment(experimentId)
}
// Fetches experiments with the given filtering and listing options.
func (r *ResourceManager) ListExperiments(filterContext *model.FilterContext, opts *list.Options) ([]*model.Experiment, int, string, error) {
return r.experimentStore.ListExperiments(filterContext, opts)
}
// Deletes the experiment with the given id.
func (r *ResourceManager) DeleteExperiment(experimentId string) error {
defaultExperimentId, err := r.GetDefaultExperimentId()
if err != nil {
return util.Wrapf(err, "Failed to delete experiment %v due to error fetching the default experiment id", experimentId)
}
if defaultExperimentId != "" && experimentId == defaultExperimentId {
return util.NewBadRequestError(util.NewInvalidInputError("Experiment id cannot be equal to the default id %v", defaultExperimentId), "Failed to delete experiment %v. The default experiment cannot be deleted", experimentId)
}
if _, err := r.experimentStore.GetExperiment(experimentId); err != nil {
return util.Wrapf(err, "Failed to delete experiment %v due to error fetching it", experimentId)
}
return r.experimentStore.DeleteExperiment(experimentId)
}
// Archives the experiment with the given id.
func (r *ResourceManager) ArchiveExperiment(ctx context.Context, experimentId string) error {
// To archive an experiment
// (1) update our persistent agent to disable CRDs of jobs in experiment
// (2) update database to
// (2.1) archive experiments
// (2.2) archive runs
// (2.3) disable jobs
opts, err := list.NewOptions(&model.Job{}, 50, "name", nil)
if err != nil {
return util.NewInternalServerError(err,
"Failed to archive experiment %v", experimentId)
}
for {
jobs, _, newToken, err := r.jobStore.ListJobs(&model.FilterContext{
ReferenceKey: &model.ReferenceKey{Type: model.ExperimentResourceType, ID: experimentId},
}, opts)
if err != nil {
return util.NewInternalServerError(err,
"Failed to list jobs of to-be-archived experiment %v", experimentId)
}
for _, job := range jobs {
k8sNamespace := job.Namespace
if k8sNamespace == "" {
k8sNamespace = common.GetPodNamespace()
}
_, err = r.getScheduledWorkflowClient(k8sNamespace).Patch(
ctx,
job.K8SName,
types.MergePatchType,
[]byte(fmt.Sprintf(`{"spec":{"enabled":%s}}`, strconv.FormatBool(false))))
if err != nil {
return util.NewInternalServerError(err,
"Failed to disable job %v while archiving experiment %v", job.UUID, experimentId)
}
}
if newToken == "" {
break
} else {
opts, err = list.NewOptionsFromToken(newToken, 50)
if err != nil {
return util.NewInternalServerError(err,
"Failed to create list jobs options from page token when archiving experiment %v", experimentId)
}
}
}
return r.experimentStore.ArchiveExperiment(experimentId)
}
// Un-archives the experiment with the given id.
func (r *ResourceManager) UnarchiveExperiment(experimentId string) error {
return r.experimentStore.UnarchiveExperiment(experimentId)
}
// Returns a list of pipelines.
func (r *ResourceManager) ListPipelines(filterContext *model.FilterContext, opts *list.Options) ([]*model.Pipeline, int, string, error) {
pipelines, total_size, nextPageToken, err := r.pipelineStore.ListPipelines(filterContext, opts)
if err != nil {
err = util.Wrapf(err, "Failed to list pipelines with context %v, options %v", filterContext, opts)
}
return pipelines, total_size, nextPageToken, err
}
// TODO(gkcalat): consider removing after KFP v2 GA if users are not affected.
// Returns a list of pipelines using LEFT JOIN on SQL query.
// This could be more performant for a large number of pipeline versions.
func (r *ResourceManager) ListPipelinesV1(filterContext *model.FilterContext, opts *list.Options) ([]*model.Pipeline, []*model.PipelineVersion, int, string, error) {
pipelines, pipelineVersions, total_size, nextPageToken, err := r.pipelineStore.ListPipelinesV1(filterContext, opts)
if err != nil {
err = util.Wrapf(err, "ResourceManager (v1beta1): Failed to list pipelines with context %v, options %v", filterContext, opts)
}
return pipelines, pipelineVersions, total_size, nextPageToken, err
}
// Returns a pipeline.
func (r *ResourceManager) GetPipeline(pipelineId string) (*model.Pipeline, error) {
if pipeline, err := r.pipelineStore.GetPipeline(pipelineId); err != nil {
return nil, util.Wrapf(err, "Failed to get a pipeline with id %v", pipelineId)
} else {
return pipeline, nil
}
}
// Returns a pipeline specified by name and namespace.
func (r *ResourceManager) GetPipelineByNameAndNamespace(name string, namespace string) (*model.Pipeline, error) {
if pipeline, err := r.pipelineStore.GetPipelineByNameAndNamespace(name, namespace); err != nil {
return nil, util.Wrapf(err, "Failed to get a pipeline named %v in namespace %v", name, namespace)
} else {
return pipeline, nil
}
}
// TODO(gkcalat): consider removing after KFP v2 GA if users are not affected.
// Returns a pipeline specified by name and namespace using LEFT JOIN on SQL query.
// This could be more performant for a large number of pipeline versions.
func (r *ResourceManager) GetPipelineByNameAndNamespaceV1(name string, namespace string) (*model.Pipeline, *model.PipelineVersion, error) {
if pipeline, pipelineVersion, err := r.pipelineStore.GetPipelineByNameAndNamespaceV1(name, namespace); err != nil {
return nil, nil, util.Wrapf(err, "ResourceManager (v1beta1): Failed to get a pipeline named %v in namespace %v", name, namespace)
} else {
return pipeline, pipelineVersion, nil
}
}
// Deletes a pipeline. Does not delete pipeline spec in the object storage.
// Fails if the pipeline has existing pipeline versions.
func (r *ResourceManager) DeletePipeline(pipelineId string) error {
// Check if pipeline exists
_, err := r.pipelineStore.GetPipeline(pipelineId)
if err != nil {
return util.Wrapf(err, "Failed to delete pipeline with id %v as it was not found", pipelineId)
}
// Check if it has no pipeline versions in Ready state
latestPipelineVersion, err := r.pipelineStore.GetLatestPipelineVersion(pipelineId)
if latestPipelineVersion != nil {
return util.NewInvalidInputError("Failed to delete pipeline with id %v as it has existing pipeline versions (e.g. %v)", pipelineId, latestPipelineVersion.UUID)
} else if err.(*util.UserError).ExternalStatusCode() != codes.NotFound {
return util.Wrapf(err, "Failed to delete pipeline with id %v as it failed to check existing pipeline versions", pipelineId)
}
// Mark pipeline as deleting so it's not visible to user.
err = r.pipelineStore.UpdatePipelineStatus(pipelineId, model.PipelineDeleting)
if err != nil {
return util.Wrapf(err, "Failed to change the status of pipeline id %v to DELETING", pipelineId)
}
// Delete a pipeline.
err = r.pipelineStore.DeletePipeline(pipelineId)
if err != nil {
return util.Wrapf(err, "Failed to delete pipeline DB entry for pipeline id %v", pipelineId)
}
return nil
}
// TODO(gkcalat): consider removing before v2beta1 GA as default version is deprecated. This requires changes to v1beta1 proto.
// Updates default pipeline version for a given pipeline.
// Supports v1beta1 behavior.
func (r *ResourceManager) UpdatePipelineDefaultVersion(pipelineId string, versionId string) error {
return r.pipelineStore.UpdatePipelineDefaultVersion(pipelineId, versionId)
}
// Creates a pipeline, but does not create a pipeline version.
// Call CreatePipelineVersion to create a pipeline version.
func (r *ResourceManager) CreatePipeline(p *model.Pipeline) (*model.Pipeline, error) {
if p.Name == "" {
return nil, util.NewInvalidInputError("pipeline's name cannot be empty")
}
// Create a record in KFP DB (only pipelines table)
newPipeline, err := r.pipelineStore.CreatePipeline(p)
if err != nil {
return nil, util.Wrap(err, "Failed to create a pipeline in PipelineStore")
}
newPipeline.Status = model.PipelineReady
err = r.pipelineStore.UpdatePipelineStatus(
newPipeline.UUID,
newPipeline.Status,
)
if err != nil {
return nil, util.Wrap(err, "Failed to update status of a pipeline after creation")
}
return newPipeline, nil
}
// Creates a pipeline and a pipeline version.
// This is used when two resources need to be created in a single DB transaction.
func (r *ResourceManager) CreatePipelineAndPipelineVersion(p *model.Pipeline, pv *model.PipelineVersion) (*model.Pipeline, *model.PipelineVersion, error) {
// Fetch pipeline spec, verify it, and parse parameters
pipelineSpecBytes, pipelineSpecURI, err := r.fetchTemplateFromPipelineVersion(pv)
if err != nil {
return nil, nil, util.Wrap(err, "Failed to create a pipeline and a pipeline version as template is broken")
}
pv.PipelineSpec = string(pipelineSpecBytes)
if pipelineSpecURI != "" {
pv.PipelineSpecURI = pipelineSpecURI
}
tmpl, err := template.New(pipelineSpecBytes)
if err != nil {
return nil, nil, util.Wrap(err, "Failed to create a pipeline and a pipeline version due to template creation error")
}
// Validate pipeline's name in:
// 1. pipeline spec for v2 pipelines and v2-compatible pipeline must comply with MLMD requirements
// 2. display name must be non-empty
pipelineSpecName := ""
if tmpl.IsV2() {
pipelineSpecName = tmpl.V2PipelineName()
if err := common.ValidatePipelineName(pipelineSpecName); err != nil {
return nil, nil, err
}
}
if pv.Name == "" && p.Name == "" {
if pipelineSpecName == "" {
return nil, nil, util.NewInvalidInputError("pipeline's name cannot be empty")
}
pv.Name = pipelineSpecName
p.Name = pipelineSpecName
} else if pv.Name == "" {
pv.Name = p.Name
} else if p.Name == "" {
p.Name = pv.Name
}
// Parse parameters
paramsJSON, err := tmpl.ParametersJSON()
if err != nil {
return nil, nil, util.Wrap(err, "Failed to create a pipeline and a pipeline version due to error converting parameters to json")
}
pv.Parameters = paramsJSON
pv.PipelineSpec = string(tmpl.Bytes())
// Create records in KFP DB (both pipelines and pipeline_versions tables)
newPipeline, newVersion, err := r.pipelineStore.CreatePipelineAndPipelineVersion(p, pv)
if err != nil {
return nil, nil, util.Wrap(err, "Failed to create a pipeline and a pipeline version")
}
newPipeline.Status = model.PipelineReady
err = r.pipelineStore.UpdatePipelineStatus(
newPipeline.UUID,
newPipeline.Status,
)
if err != nil {
return nil, nil, util.Wrap(err, "Failed to update status of a new pipeline after creation")
}
newVersion.Status = model.PipelineVersionReady
err = r.pipelineStore.UpdatePipelineVersionStatus(
newVersion.UUID,
newVersion.Status,
)
if err != nil {
return nil, nil, util.Wrap(err, "Failed to update status of a new pipeline version after creation")
}
return newPipeline, newVersion, nil
}
// Updates the status of a pipeline.
func (r *ResourceManager) UpdatePipelineStatus(pipelineId string, status model.PipelineStatus) error {
err := r.pipelineStore.UpdatePipelineStatus(pipelineId, status)
if err != nil {
return util.Wrapf(err, "Failed to update the status of pipeline id %v to %v", pipelineId, status)
}
return nil
}
// Updates the status of a pipeline version.
func (r *ResourceManager) UpdatePipelineVersionStatus(pipelineVersionId string, status model.PipelineVersionStatus) error {
err := r.pipelineStore.UpdatePipelineVersionStatus(pipelineVersionId, status)
if err != nil {
return util.Wrapf(err, "Failed to update the status of pipeline version id %v to %v", pipelineVersionId, status)
}
return nil
}
// Returns the latest template for a specified pipeline id.
func (r *ResourceManager) GetPipelineLatestTemplate(pipelineId string) ([]byte, error) {
// Verify pipeline exists
_, err := r.pipelineStore.GetPipeline(pipelineId)
if err != nil {
return nil, util.Wrap(err, "Failed to get the latest template as pipeline was not found")
}
// Get the latest pipeline version
latestPipelineVersion, err := r.pipelineStore.GetLatestPipelineVersion(pipelineId)
if err != nil {
return nil, util.Wrap(err, "Failed to get the latest template for a pipeline")
}
// Fetch template []byte array
if bytes, _, err := r.fetchTemplateFromPipelineVersion(latestPipelineVersion); err != nil {
return nil, util.Wrapf(err, "Failed to get the latest template for pipeline with id %v", pipelineId)
} else {
return bytes, nil
}
}
// Creates a run and schedule a workflow CR.
// Manifest's namespace gets overwritten with the run.Namespace.
// Creating a run from recurring run prioritizes recurring run's pipeline spec over the run's one.
func (r *ResourceManager) CreateRun(ctx context.Context, run *model.Run) (*model.Run, error) {
// Create a template based on the manifest of an existing pipeline version or used-provided manifest.
// Update the run.PipelineSpec if an existing pipeline version is used.
tmpl, manifest, err := r.fetchTemplateFromPipelineSpec(&run.PipelineSpec)
if err != nil {
return nil, util.NewInternalServerError(err, "Failed to create a run due to error fetching manifest")
}
// TODO(gkcalat): consider changing the flow. Other resource UUIDs are assigned by their respective stores (DB).
// Proposed flow:
// 1. Create an entry and assign creation timestamp and uuid.
// 2. Create a workflow CR.
// 3. Update a record in the DB with scheduled timestamp, state, etc.
// 4. Persistence agent will call apiserver to update the records later.
if run.UUID == "" {
uuid, err := r.uuid.NewRandom()
if err != nil {
return nil, util.NewInternalServerError(err, "Failed to generate run ID")
}
run.UUID = uuid.String()
}
run.RunDetails.CreatedAtInSec = r.time.Now().Unix()
runWorkflowOptions := template.RunWorkflowOptions{
RunId: run.UUID,
RunAt: run.RunDetails.CreatedAtInSec,
}
executionSpec, err := tmpl.RunWorkflow(run, runWorkflowOptions)
if err != nil {
return nil, util.Wrap(err, "Failed to generate the ExecutionSpec")
}
err = executionSpec.Validate(false, false)
if err != nil {
return nil, util.NewInternalServerError(err, "Failed to validate workflow for (%+v)", executionSpec.ExecutionName())
}
// Create argo workflow CR resource
k8sNamespace := run.Namespace
if k8sNamespace == "" {
k8sNamespace = common.GetPodNamespace()
}
if k8sNamespace == "" {
return nil, util.NewInternalServerError(util.NewInvalidInputError("Namespace cannot be empty when creating an Argo workflow. Check if you have specified POD_NAMESPACE or try adding the parent namespace to the request"), "Failed to create a run due to empty namespace")
}
executionSpec.SetExecutionNamespace(k8sNamespace)
// assign OwnerReference to scheduledworkflow
if run.RecurringRunId != "" {
job, err := r.jobStore.GetJob(run.RecurringRunId)
if err != nil {
return nil, util.NewInternalServerError(util.NewInvalidInputError("RecurringRunId doesn't exist: %s", run.RecurringRunId), "Failed to create a run due to invalid recurring run id")
}
swf, err := r.swfClient.ScheduledWorkflow(job.Namespace).Get(ctx, job.K8SName, v1.GetOptions{})
if err != nil {
return nil, util.NewInternalServerError(util.NewInvalidInputError("ScheduledWorkflow doesn't exist: %s", job.K8SName), "Failed to create a run due to invalid name")
}
executionSpec.SetOwnerReferences(swf)
}
newExecSpec, err := r.getWorkflowClient(k8sNamespace).Create(ctx, executionSpec, v1.CreateOptions{})
if err != nil {
if err, ok := err.(net.Error); ok && err.Timeout() {
return nil, util.NewUnavailableServerError(err, "Failed to create a workflow for (%s) - try again later", executionSpec.ExecutionName())
}
return nil, util.NewInternalServerError(err, "Failed to create a workflow for (%s)", executionSpec.ExecutionName())
}
// Update the run with the new scheduled workflow
run.K8SName = newExecSpec.ExecutionName()
run.ServiceAccount = newExecSpec.ServiceAccount()
run.RunDetails.State = model.RuntimeState(string(newExecSpec.ExecutionStatus().Condition())).ToV2()
run.RunDetails.Conditions = string(run.RunDetails.State.ToV1())
// TODO(gkcalat): consider to avoid updating runtime manifest at create time and let
// persistence agent update the runtime data.
if tmpl.GetTemplateType() == template.V1 && run.RunDetails.WorkflowRuntimeManifest == "" {
run.RunDetails.WorkflowRuntimeManifest = newExecSpec.ToStringForStore()
run.PipelineSpec.WorkflowSpecManifest = manifest
} else if tmpl.GetTemplateType() == template.V2 {
run.RunDetails.PipelineRuntimeManifest = newExecSpec.ToStringForStore()
run.PipelineSpec.PipelineSpecManifest = manifest
} else {
run.PipelineSpec.PipelineSpecManifest = manifest
}
// Assign the scheduled at time
if run.RunDetails.ScheduledAtInSec == 0 {
// if there is no scheduled time, then we assume this run is scheduled at the same time it is created
run.RunDetails.ScheduledAtInSec = run.RunDetails.CreatedAtInSec
}
run.State = model.RuntimeStatePending
newRun, err := r.runStore.CreateRun(run)
if err != nil {
return nil, util.Wrap(err, "Failed to create a run")
}
// Upon run creation, update owning experiment
err = r.experimentStore.SetLastRunTimestamp(newRun)
if err != nil {
return nil, util.Wrap(err, fmt.Sprintf("Failed to set last run timestamp on experiment %s for run %s", newRun.ExperimentId, newRun.UUID))
}
return newRun, nil
}
// ReconcileSwfCrs reconciles the ScheduledWorkflow CRs based on existing jobs.
func (r *ResourceManager) ReconcileSwfCrs(ctx context.Context) error {
filterContext := &model.FilterContext{
ReferenceKey: &model.ReferenceKey{Type: model.NamespaceResourceType, ID: common.GetPodNamespace()},
}
opts := list.EmptyOptions()
jobs, _, _, err := r.jobStore.ListJobs(filterContext, opts)
if err != nil {
return util.Wrap(err, "Failed to reconcile ScheduledWorkflow Kubernetes resources")
}
for i := range jobs {
select {
case <-ctx.Done():
return nil
default:
}
tmpl, _, err := r.fetchTemplateFromPipelineSpec(&jobs[i].PipelineSpec)
if err != nil {
return failedToReconcileSwfCrsError(err)
}
newScheduledWorkflow, err := tmpl.ScheduledWorkflow(jobs[i])
if err != nil {
return failedToReconcileSwfCrsError(err)
}
for {
currentScheduledWorkflow, err := r.getScheduledWorkflowClient(jobs[i].Namespace).Get(ctx, jobs[i].K8SName, v1.GetOptions{})
if err != nil {
if util.IsNotFound(err) {
break
}
return failedToReconcileSwfCrsError(err)
}
if !reflect.DeepEqual(currentScheduledWorkflow.Spec, newScheduledWorkflow.Spec) {
currentScheduledWorkflow.Spec = newScheduledWorkflow.Spec
err = r.updateSwfCrSpec(ctx, jobs[i].Namespace, currentScheduledWorkflow)
if err != nil {
if apierrors.IsConflict(errors.Unwrap(err)) {
continue
} else if util.IsNotFound(errors.Cause(err)) {
break
}
return failedToReconcileSwfCrsError(err)
}
}
break
}
}
return nil
}
func failedToReconcileSwfCrsError(err error) error {
return util.Wrap(err, "Failed to reconcile ScheduledWorkflow Kubernetes resources")
}
func (r *ResourceManager) updateSwfCrSpec(ctx context.Context, k8sNamespace string, scheduledWorkflow *scheduledworkflow.ScheduledWorkflow) error {
_, err := r.getScheduledWorkflowClient(k8sNamespace).Update(ctx, scheduledWorkflow)
if err != nil {
return util.Wrap(err, "Failed to update ScheduledWorkflow")
}
return nil
}
// Fetches a run with a given id.
func (r *ResourceManager) GetRun(runId string) (*model.Run, error) {
run, err := r.runStore.GetRun(runId)
if err != nil {
return nil, util.Wrapf(err, "Failed to fetch run %v", runId)
}
return run, nil
}
// Fetches runs with a given set of filtering and listing options.
func (r *ResourceManager) ListRuns(filterContext *model.FilterContext, opts *list.Options) ([]*model.Run, int, string, error) {
runs, totalSize, nextPageToken, err := r.runStore.ListRuns(filterContext, opts)
if err != nil {
return nil, 0, "", util.Wrap(err, "Failed to list runs")
}
return runs, totalSize, nextPageToken, nil
}
// Archives a run with a given id.
func (r *ResourceManager) ArchiveRun(runId string) error {
if _, err := r.GetRun(runId); err != nil {
return util.Wrapf(err, "Failed to archive run %v as it failed to be retrieved", runId)
}
if err := r.runStore.ArchiveRun(runId); err != nil {
return util.Wrapf(err, "Failed to archive run %v", runId)
}
return nil
}
// Un-archives a run with a given id.
func (r *ResourceManager) UnarchiveRun(runId string) error {
run, err := r.GetRun(runId)
if err != nil {
return util.Wrapf(err, "Failed to unarchive run %v as it does not exist", runId)
}
if run.ExperimentId == "" {
experimentRef, err := r.resourceReferenceStore.GetResourceReference(runId, model.RunResourceType, model.ExperimentResourceType)
if err != nil {
return util.Wrapf(err, "Failed to unarchive run %v due to resource references fetching error", runId)
}
run.ExperimentId = experimentRef.ReferenceUUID
}
experiment, err := r.GetExperiment(run.ExperimentId)
if err != nil {
return util.Wrapf(err, "Failed to unarchive run %v due to experiment fetching error", runId)
}
if experiment.StorageState.ToV2() == model.StorageStateArchived {
return util.NewFailedPreconditionError(
errors.New("Unarchive the experiment first to allow the run to be restored"),
fmt.Sprintf("Failed to unarchive run %v as experiment %v must be un-archived first", runId, run.ExperimentId),
)
}
if err := r.runStore.UnarchiveRun(runId); err != nil {
return util.Wrapf(err, "Failed to unarchive run %v", runId)
}
return nil
}
// Deletes a run entry with a given id.
func (r *ResourceManager) DeleteRun(ctx context.Context, runId string) error {
run, err := r.GetRun(runId)
if err != nil {
return util.Wrapf(err, "Failed to delete run %v as it does not exist", runId)
}
if run.Namespace == "" {
namespace, err := r.GetNamespaceFromExperimentId(run.ExperimentId)
if err != nil {
return util.Wrapf(err, "Failed to delete a run %v due to namespace fetching error", runId)
}
run.Namespace = namespace
}
k8sNamespace := run.Namespace
if k8sNamespace == "" {
k8sNamespace = common.GetPodNamespace()
}
err = r.getWorkflowClient(k8sNamespace).Delete(ctx, run.K8SName, v1.DeleteOptions{})
if err != nil {
// API won't need to delete the workflow CR
// once persistent agent sync the state to DB and set TTL for it.
glog.Warningf("Failed to delete run %v. Error: %v", run.K8SName, err.Error())
}
err = r.runStore.DeleteRun(runId)
if err != nil {
return util.Wrapf(err, "Failed to delete a run %v", runId)
}
if r.options.CollectMetrics {
if run.Conditions == string(exec.ExecutionSucceeded) {
if util.GetMetricValue(workflowSuccessCounter) > 0 {
workflowSuccessCounter.WithLabelValues(run.Namespace, run.DisplayName).Dec()
}
} else {
if util.GetMetricValue(workflowFailedCounter) > 0 {
workflowFailedCounter.WithLabelValues(run.Namespace, run.DisplayName).Dec()
}
}
}
return nil
}
// Creates a task entry.
func (r *ResourceManager) CreateTask(t *model.Task) (*model.Task, error) {
run, err := r.GetRun(t.RunId)
if err != nil {
return nil, util.Wrapf(err, "Failed to create a task for run %v", t.RunId)
}
if run.ExperimentId == "" {
defaultExperimentId, err := r.GetDefaultExperimentId()
if err != nil {
return nil, util.Wrapf(err, "Failed to create a task in run %v. Specify experiment id for the run or check if the default experiment exists", t.RunId)
}
run.ExperimentId = defaultExperimentId
}
// Validate namespace
if t.Namespace == "" {
namespace, err := r.GetNamespaceFromExperimentId(run.ExperimentId)
if err != nil {
return nil, util.Wrapf(err, "Failed to create a task in run %v", t.RunId)
}
t.Namespace = namespace
}
if common.IsMultiUserMode() {
if t.Namespace == "" {
return nil, util.NewInternalServerError(util.NewInvalidInputError("Task cannot have an empty namespace in multi-user mode"), "Failed to create a task in run %v", t.RunId)
}
}
if err := r.CheckExperimentBelongsToNamespace(run.ExperimentId, t.Namespace); err != nil {
return nil, util.Wrapf(err, "Failed to create a task in run %v", t.RunId)
}
newTask, err := r.taskStore.CreateTask(t)
if err != nil {
return nil, util.Wrapf(err, "Failed to create a task in run %v", t.RunId)
}
return newTask, nil
}
// Fetches tasks with a given set of filtering and listing options.
func (r *ResourceManager) ListTasks(filterContext *model.FilterContext, opts *list.Options) ([]*model.Task, int, string, error) {
tasks, totalSize, nextPageToken, err := r.taskStore.ListTasks(filterContext, opts)
if err != nil {
return nil, 0, "", util.Wrap(err, "Failed to list tasks")
}
return tasks, totalSize, nextPageToken, nil
}
// Fetches recurring runs with given filtering and listing options.
func (r *ResourceManager) ListJobs(filterContext *model.FilterContext, opts *list.Options) ([]*model.Job, int, string, error) {
return r.jobStore.ListJobs(filterContext, opts)
}
// Terminates a workflow by setting its activeDeadlineSeconds to 0.
func TerminateWorkflow(ctx context.Context, wfClient util.ExecutionInterface, name string) error {
patchObj := util.GetTerminatePatch(util.CurrentExecutionType())
patch, err := json.Marshal(patchObj)
if err != nil {
return util.NewInternalServerError(err, "Failed to terminate workflow %s due to error parsing the patch", name)
}
operation := func() error {
_, err = wfClient.Patch(ctx, name, types.MergePatchType, patch, v1.PatchOptions{})
return util.Wrapf(err, "Failed to terminate workflow %s due to patching error", name)
}
backoffPolicy := backoff.WithMaxRetries(backoff.NewConstantBackOff(100), 10)
err = backoff.Retry(operation, backoffPolicy)
if err != nil {
return util.Wrapf(err, "Failed to terminate workflow %s due to patching error after multiple retries", name)
}
return nil
}
// Terminates a running run and the corresponding workflow.
func (r *ResourceManager) TerminateRun(ctx context.Context, runId string) error {
run, err := r.GetRun(runId)
if err != nil {
return util.Wrapf(err, "Failed to terminate run %s due to error fetching the run", runId)
}
// TODO(gkcalat): consider using run.Namespace after migration logic will be available.
namespace, err := r.getNamespaceFromRunId(runId)
if err != nil {
return util.Wrapf(err, "Failed to terminate run %s due to error fetching its namespace", runId)
}
err = r.runStore.TerminateRun(runId)
if err != nil {
return util.Wrapf(err, "Failed to terminate run %s", runId)
}
if namespace == "" {
namespace = common.GetPodNamespace()
}
err = TerminateWorkflow(ctx, r.getWorkflowClient(namespace), run.K8SName)
if err != nil {
return util.NewInternalServerError(err, "Failed to terminate run %s due to error terminating its workflow", runId)
}
return nil
}
// Retries a run given its id.
func (r *ResourceManager) RetryRun(ctx context.Context, runId string) error {
run, err := r.GetRun(runId)
if err != nil {
return util.Wrapf(err, "Failed to retry run %s due to error fetching the run", runId)
}
// TODO(gkcalat): consider using run.Namespace after migration logic will be available.
namespace, err := r.getNamespaceFromRunId(runId)
if err != nil {
return util.Wrapf(err, "Failed to retry run %s due to error fetching its namespace", runId)
}
if run.RunDetails.WorkflowRuntimeManifest == "" {
return util.NewBadRequestError(util.NewInvalidInputError("Workflow manifest cannot be empty"), "Failed to retry run %s due to error fetching workflow manifest", runId)
}
execSpec, err := util.NewExecutionSpecJSON(util.ArgoWorkflow, []byte(run.RunDetails.WorkflowRuntimeManifest))
if err != nil {
return util.NewInternalServerError(err, "Failed to retry run %s due to error parsing the workflow manifest", runId)
}
if err := execSpec.Decompress(); err != nil {
return util.NewInternalServerError(err, "Failed to retry run %s due to error decompressing execution spec", runId)
}
if err := execSpec.CanRetry(); err != nil {
return util.NewInternalServerError(err, "Failed to retry run %s as it does not allow reties", runId)
}
newExecSpec, podsToDelete, err := execSpec.GenerateRetryExecution()
if err != nil {
return util.Wrapf(err, "Failed to retry run %s", runId)
}
if namespace == "" {
namespace = common.GetPodNamespace()
}
if err = deletePods(ctx, r.k8sCoreClient, podsToDelete, namespace); err != nil {
return util.NewInternalServerError(err, "Failed to retry run %s due to error cleaning up the failed pods from the previous attempt", runId)
}
// First try to update workflow
// If fail to get the workflow, return error.
latestWorkflow, updateError := r.getWorkflowClient(namespace).Get(ctx, newExecSpec.ExecutionName(), v1.GetOptions{})
if updateError == nil {
// Update the workflow's resource version to latest.
newExecSpec.SetVersion(latestWorkflow.Version())
_, updateError = r.getWorkflowClient(namespace).Update(ctx, newExecSpec, v1.UpdateOptions{})
}
if updateError != nil {
// Remove resource version
newExecSpec.SetVersion("")
newCreatedWorkflow, createError := r.getWorkflowClient(namespace).Create(ctx, newExecSpec, v1.CreateOptions{})
if createError != nil {
if createError, ok := createError.(net.Error); ok && createError.Timeout() {
return util.NewUnavailableServerError(createError, "Failed to retry run %s due to error creating and updating a workflow - try again later. Update error: %s", runId, updateError.Error())
}
return util.NewInternalServerError(createError, "Failed to retry run %s due to error updating and creating a workflow. Update error: %s", runId, updateError.Error())
}
newExecSpec = newCreatedWorkflow
}
condition := string(newExecSpec.ExecutionStatus().Condition())
err = r.runStore.UpdateRun(&model.Run{UUID: runId, RunDetails: model.RunDetails{Conditions: condition, FinishedAtInSec: 0, WorkflowRuntimeManifest: newExecSpec.ToStringForStore(), State: model.RuntimeState(condition).ToV2()}})
if err != nil {
return util.NewInternalServerError(err, "Failed to retry run %s due to error updating entry", runId)
}
return nil
}
// Fetches execution logs and writes to the destination.
// 1. Attempts to read logs directly from pod.
// 2. Attempts to read logs from archive if reading from pod fails.
func (r *ResourceManager) ReadLog(ctx context.Context, runId string, nodeId string, follow bool, dst io.Writer) error {
run, err := r.GetRun(runId)
if err != nil {
return util.NewBadRequestError(err, "Failed to read logs for run %v due to run fetching error", runId)
}
// TODO(gkcalat): consider using run.Namespace after migration logic will be available.
namespace, err := r.getNamespaceFromRunId(runId)
if err != nil {
return util.NewBadRequestError(err, "Failed to read logs for run %v due to namespace fetching error", runId)
}
err = r.readRunLogFromPod(ctx, namespace, nodeId, follow, dst)
if err != nil && r.logArchive != nil {
err = r.readRunLogFromArchive(run.WorkflowRuntimeManifest, nodeId, dst)
if err != nil {
return util.NewBadRequestError(err, "Failed to read logs for run %v", runId)
}
}
if err != nil {
return util.NewBadRequestError(err, "Failed to read logs for run %v", runId)
}
return nil
}
// Fetches execution logs from a pod.
func (r *ResourceManager) readRunLogFromPod(ctx context.Context, namespace string, nodeId string, follow bool, dst io.Writer) error {
logOptions := corev1.PodLogOptions{
Container: "main",
Timestamps: false,
Follow: follow,
}
req := r.k8sCoreClient.PodClient(namespace).GetLogs(nodeId, &logOptions)
podLogs, err := req.Stream(ctx)
if err != nil {
if !apierrors.IsNotFound(err) {
glog.Errorf("Failed to read logs from pod %v: %v", nodeId, err)
}
return util.NewInternalServerError(err, "Failed to read logs from pod %v due to error opening log stream", nodeId)
}
defer podLogs.Close()
_, err = io.Copy(dst, podLogs)
if err != nil && !errors.Is(err, io.EOF) {
return util.NewInternalServerError(err, "Failed to read logs from pod %v due to error in streaming the log", nodeId)
}
return nil
}
// Fetches execution logs from a archived pod logs.
func (r *ResourceManager) readRunLogFromArchive(workflowManifest string, nodeId string, dst io.Writer) error {
if workflowManifest == "" {
return util.NewInternalServerError(util.NewInvalidInputError("Runtime workflow manifest cannot empty"), "Failed to read logs from archive %v due to empty runtime workflow manifest", nodeId)
}
execSpec, err := util.NewExecutionSpecJSON(util.CurrentExecutionType(), []byte(workflowManifest))
if err != nil {
return util.NewInternalServerError(err, "Failed to read logs from archive %v due error reading execution spec", nodeId)
}
logPath, err := r.logArchive.GetLogObjectKey(execSpec, nodeId)
if err != nil {
return util.NewInternalServerError(err, "Failed to read logs from archive %v", nodeId)
}
logContent, err := r.objectStore.GetFile(logPath)
if err != nil {
return util.NewInternalServerError(err, "Failed to read logs from archive %v due to error fetching the log file", nodeId)
}
err = r.logArchive.CopyLogFromArchive(logContent, dst, archive.ExtractLogOptions{LogFormat: archive.LogFormatText, Timestamps: false})
if err != nil {
return util.NewInternalServerError(err, "Failed to read logs from archive %v due to error copying the log file", nodeId)
}
return nil
}
// Fetches a recurring run with given id.
func (r *ResourceManager) GetJob(id string) (*model.Job, error) {
return r.jobStore.GetJob(id)
}
// Fetches or creates a new pipeline version based on internal PipelineSpec representation.
// Returns a pipeline version if any of the following is present in pipeline spec:
// 1. Pipeline version with the given pipeline version id
// 2. The latest pipeline version with given pipeline id
// 3. Repeats 1 and 2 for pipeline version id and pipeline id parsed from the pipeline name
func (r *ResourceManager) fetchPipelineVersionFromPipelineSpec(pipelineSpec model.PipelineSpec) (*model.PipelineVersion, error) {
// Fetch or create a pipeline version