diff --git a/docs/parallelism.md b/docs/parallelism.md index f94b68fccf14..986e01960e56 100644 --- a/docs/parallelism.md +++ b/docs/parallelism.md @@ -23,6 +23,8 @@ When namespace parallelism is enabled, it is plausible for a workflow with a low !!! Note Workflows that are executing but restricted from running more nodes due to other mechanisms will still count toward parallelism limits. +In addition to the default parallelism, you are able to set individual limits on namespace parallelism by modifying the namespace object with a `workflows.argoproj.io/parallelism-limit` label. Note that individual limits on namespaces will override global namespace limits. + ### Priority You can set a `priority` on workflows: diff --git a/test/e2e/fixtures/e2e_suite.go b/test/e2e/fixtures/e2e_suite.go index 9f4dab8b149a..95e6597b9e3e 100644 --- a/test/e2e/fixtures/e2e_suite.go +++ b/test/e2e/fixtures/e2e_suite.go @@ -244,5 +244,6 @@ func (s *E2ESuite) Given() *Given { hydrator: s.hydrator, kubeClient: s.KubeClient, bearerToken: bearerToken, + restConfig: s.RestConfig, } } diff --git a/test/e2e/fixtures/given.go b/test/e2e/fixtures/given.go index d5fc4674ce57..03729568f58d 100644 --- a/test/e2e/fixtures/given.go +++ b/test/e2e/fixtures/given.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/require" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" "sigs.k8s.io/yaml" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" @@ -33,6 +34,7 @@ type Given struct { cronWf *wfv1.CronWorkflow kubeClient kubernetes.Interface bearerToken string + restConfig *rest.Config } // creates a workflow based on the parameter, this may be: @@ -250,5 +252,6 @@ func (g *Given) When() *When { hydrator: g.hydrator, kubeClient: g.kubeClient, bearerToken: g.bearerToken, + restConfig: g.restConfig, } } diff --git a/test/e2e/fixtures/then.go b/test/e2e/fixtures/then.go index c4692e784dde..20ebabd978e4 100644 --- a/test/e2e/fixtures/then.go +++ b/test/e2e/fixtures/then.go @@ -12,6 +12,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" @@ -33,6 +34,7 @@ type Then struct { hydrator hydrator.Interface kubeClient kubernetes.Interface bearerToken string + restConfig *rest.Config } func (t *Then) ExpectWorkflow(block func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus)) *Then { @@ -301,5 +303,6 @@ func (t *Then) When() *When { wf: t.wf, kubeClient: t.kubeClient, bearerToken: t.bearerToken, + restConfig: t.restConfig, } } diff --git a/test/e2e/fixtures/when.go b/test/e2e/fixtures/when.go index e42034029473..95aadec12dea 100644 --- a/test/e2e/fixtures/when.go +++ b/test/e2e/fixtures/when.go @@ -15,6 +15,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" "k8s.io/utils/ptr" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" @@ -39,6 +40,7 @@ type When struct { hydrator hydrator.Interface kubeClient kubernetes.Interface bearerToken string + restConfig *rest.Config } func (w *When) SubmitWorkflow() *When { @@ -219,6 +221,7 @@ var ( return node.Type == wfv1.NodeTypePod && node.Phase == wfv1.NodeFailed }), "to have failed pod" } + ToBePending = ToHavePhase(wfv1.WorkflowPending) ) // `ToBeDone` replaces `ToFinish` which also makes sure the workflow is both complete not pending archiving. @@ -488,6 +491,28 @@ func (w *When) RemoveFinalizers(shouldErr bool) *When { return w } +func (w *When) AddNamespaceLimit(limit string) *When { + w.t.Helper() + ctx := context.Background() + patchMap := make(map[string]interface{}) + metadata := make(map[string]interface{}) + labels := make(map[string]interface{}) + labels["workflows.argoproj.io/parallelism-limit"] = limit + metadata["labels"] = labels + patchMap["metadata"] = metadata + + bs, err := json.Marshal(patchMap) + if err != nil { + w.t.Fatal(err) + } + + _, err = w.kubeClient.CoreV1().Namespaces().Patch(ctx, Namespace, types.MergePatchType, []byte(bs), metav1.PatchOptions{}) + if err != nil { + w.t.Fatal(err) + } + return w +} + type PodCondition func(p *corev1.Pod) bool var ( @@ -717,6 +742,7 @@ func (w *When) Then() *Then { hydrator: w.hydrator, kubeClient: w.kubeClient, bearerToken: w.bearerToken, + restConfig: w.restConfig, } } @@ -736,5 +762,6 @@ func (w *When) Given() *Given { cwfTemplates: w.cwfTemplates, cronWf: w.cronWf, kubeClient: w.kubeClient, + restConfig: w.restConfig, } } diff --git a/test/e2e/ns_parallelism_test.go b/test/e2e/ns_parallelism_test.go new file mode 100644 index 000000000000..0d342b6c621f --- /dev/null +++ b/test/e2e/ns_parallelism_test.go @@ -0,0 +1,60 @@ +//go:build functional + +package e2e + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" + + "github.com/argoproj/argo-workflows/v3/test/e2e/fixtures" +) + +type NamespaceParallelismSuite struct { + fixtures.E2ESuite +} + +const wf = ` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: hello-world- + labels: + workflows.argoproj.io/archive-strategy: "false" + annotations: + workflows.argoproj.io/description: | + This is a simple hello world example. +spec: + entrypoint: hello-world + templates: + - name: hello-world + container: + image: "argoproj/argosay:v2" + command: [sleep] + args: ["60"] +` + +func (s *NamespaceParallelismSuite) TestNamespaceParallelism() { + + s.Given(). + Workflow(wf). + When(). + AddNamespaceLimit("1"). + SubmitWorkflow(). + WaitForWorkflow(fixtures.ToStart) + + time.Sleep(time.Second * 5) + wf := s.Given(). + Workflow(wf). + When(). + SubmitWorkflow(). + WaitForWorkflow(fixtures.ToBePending).GetWorkflow() + t := s.T() + assert.Equal(t, "Workflow processing has been postponed because too many workflows are already running", wf.Status.Message) +} + +func TestNamespaceParallelismSuite(t *testing.T) { + suite.Run(t, new(NamespaceParallelismSuite)) +} diff --git a/workflow/common/common.go b/workflow/common/common.go index 91e8822ea9bd..279367dcdfd4 100644 --- a/workflow/common/common.go +++ b/workflow/common/common.go @@ -52,6 +52,10 @@ const ( // AnnotationKeyArtifactGCStrategy is listed as an annotation on the Artifact GC Pod to identify // the strategy whose artifacts are being deleted AnnotationKeyArtifactGCStrategy = workflow.WorkflowFullName + "/artifact-gc-strategy" + + // LabelParallelismLimit is a label applied on namespace objects to control the per namespace parallelism. + LabelParallelismLimit = workflow.WorkflowFullName + "/parallelism-limit" + // AnnotationKeyPodGCStrategy is listed as an annotation on the Pod // the strategy for the pod, in case the pod is orphaned from its workflow AnnotationKeyPodGCStrategy = workflow.WorkflowFullName + "/pod-gc-strategy" diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 48a416284174..27aefaa7796e 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -116,6 +116,7 @@ type WorkflowController struct { // datastructures to support the processing of workflows and workflow pods wfInformer cache.SharedIndexInformer + nsInformer cache.SharedIndexInformer wftmplInformer wfextvv1alpha1.WorkflowTemplateInformer cwftmplInformer wfextvv1alpha1.ClusterWorkflowTemplateInformer PodController *pod.Controller // Currently public for woc to access, but would rather an accessor @@ -298,12 +299,17 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo Info("Current Worker Numbers") wfc.wfInformer = util.NewWorkflowInformer(wfc.dynamicInterface, wfc.GetManagedNamespace(), workflowResyncPeriod, wfc.tweakListRequestListOptions, wfc.tweakWatchRequestListOptions, indexers) + nsInformer, err := wfc.newNamespaceInformer(ctx, wfc.kubeclientset) + if err != nil { + log.Fatal(err) + } + wfc.nsInformer = nsInformer wfc.wftmplInformer = informer.NewTolerantWorkflowTemplateInformer(wfc.dynamicInterface, workflowTemplateResyncPeriod, wfc.managedNamespace) wfc.wfTaskSetInformer = wfc.newWorkflowTaskSetInformer() wfc.artGCTaskInformer = wfc.newArtGCTaskInformer() wfc.taskResultInformer = wfc.newWorkflowTaskResultInformer() - err := wfc.addWorkflowInformerHandlers(ctx) + err = wfc.addWorkflowInformerHandlers(ctx) if err != nil { log.Fatal(err) } @@ -324,6 +330,7 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo go wfc.runConfigMapWatcher(ctx) } + go wfc.nsInformer.Run(ctx.Done()) go wfc.wfInformer.Run(ctx.Done()) go wfc.wftmplInformer.Informer().Run(ctx.Done()) go wfc.configMapInformer.Run(ctx.Done()) @@ -337,6 +344,7 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo if !cache.WaitForCacheSync( ctx.Done(), wfc.wfInformer.HasSynced, + wfc.nsInformer.HasSynced, wfc.wftmplInformer.Informer().HasSynced, wfc.PodController.HasSynced(), wfc.configMapInformer.HasSynced, diff --git a/workflow/controller/ns_watcher.go b/workflow/controller/ns_watcher.go new file mode 100644 index 000000000000..ad9866a7aea8 --- /dev/null +++ b/workflow/controller/ns_watcher.go @@ -0,0 +1,143 @@ +package controller + +import ( + "context" + "strconv" + "time" + + "errors" + + "github.com/sirupsen/logrus" + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/selection" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + + "github.com/argoproj/argo-workflows/v3/workflow/common" +) + +var ( + limitReq, _ = labels.NewRequirement(common.LabelParallelismLimit, selection.Exists, nil) + nsResyncPeriod = 5 * time.Minute + errUnableToExtract = errors.New("was unable to extract limit") +) + +type updateFunc = func(string, int) +type resetFunc = func(string) + +func (wfc *WorkflowController) newNamespaceInformer(ctx context.Context, kubeclientset kubernetes.Interface) (cache.SharedIndexInformer, error) { + + c := kubeclientset.CoreV1().Namespaces() + logger := logrus.WithField("scope", "ns_watcher") + + labelSelector := labels.NewSelector(). + Add(*limitReq) + + listFunc := func(opts metav1.ListOptions) (runtime.Object, error) { + opts.LabelSelector = labelSelector.String() + return c.List(ctx, opts) + } + + watchFunc := func(opts metav1.ListOptions) (watch.Interface, error) { + opts.Watch = true + opts.LabelSelector = labelSelector.String() + return c.Watch(ctx, opts) + } + + source := &cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc} + informer := cache.NewSharedIndexInformer(source, &apiv1.Namespace{}, nsResyncPeriod, cache.Indexers{}) + + _, err := informer.AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + ns, err := nsFromObj(obj) + if err != nil { + return + } + updateNS(logger, ns, wfc.throttler.UpdateNamespaceParallelism, wfc.throttler.ResetNamespaceParallelism) + }, + + UpdateFunc: func(old, newVal interface{}) { + ns, err := nsFromObj(newVal) + if err != nil { + return + } + oldNs, err := nsFromObj(old) + if err == nil && !limitChanged(oldNs, ns) { + return + } + updateNS(logger, ns, wfc.throttler.UpdateNamespaceParallelism, wfc.throttler.ResetNamespaceParallelism) + }, + + DeleteFunc: func(obj interface{}) { + ns, err := nsFromObj(obj) + if err != nil { + return + } + deleteNS(logger, ns, wfc.throttler.ResetNamespaceParallelism) + }, + }, + ) + if err != nil { + return nil, err + } + return informer, nil +} + +func deleteNS(log *logrus.Entry, ns *apiv1.Namespace, resetFn resetFunc) { + log.Infof("reseting the namespace parallelism limits for %s due to deletion event", ns.Name) + resetFn(ns.Name) +} + +func updateNS(log *logrus.Entry, ns *apiv1.Namespace, updateFn updateFunc, resetFn resetFunc) { + limit, err := extractLimit(ns) + if errors.Is(err, errUnableToExtract) { + resetFn(ns.Name) + log.Infof("removing per-namespace parallelism for %s, reverting to default", ns.Name) + return + } else if err != nil { + log.Errorf("was unable to extract the limit due to: %s", err) + return + } + log.Infof("changing namespace parallelism in %s to %d", ns.Name, limit) + updateFn(ns.Name, limit) +} + +func nsFromObj(obj interface{}) (*apiv1.Namespace, error) { + ns, ok := obj.(*apiv1.Namespace) + if !ok { + return nil, errors.New("was unable to convert to namespace") + } + return ns, nil +} + +func limitChanged(old *apiv1.Namespace, newNS *apiv1.Namespace) bool { + oldLimit := old.GetLabels()[common.LabelParallelismLimit] + newLimit := newNS.GetLabels()[common.LabelParallelismLimit] + return !(oldLimit == newLimit) +} + +func extractLimit(ns *apiv1.Namespace) (int, error) { + labels := ns.GetLabels() + var limitString *string + + for lbl, value := range labels { + if lbl == common.LabelParallelismLimit { + limitString = &value + break + } + } + if limitString == nil { + return 0, errUnableToExtract + } + + integerValue, err := strconv.Atoi(*limitString) + if err != nil { + return 0, err + } + return integerValue, nil +} diff --git a/workflow/sync/mocks/Throttler.go b/workflow/sync/mocks/Throttler.go index d4273b265322..ba004a64c274 100644 --- a/workflow/sync/mocks/Throttler.go +++ b/workflow/sync/mocks/Throttler.go @@ -61,6 +61,21 @@ func (_m *Throttler) Remove(key string) { _m.Called(key) } +// ResetNamespaceParallelism provides a mock function with given fields: namespace +func (_m *Throttler) ResetNamespaceParallelism(namespace string) { + _m.Called(namespace) +} + +// UpdateNamespaceParallelism provides a mock function with given fields: namespace, limit +func (_m *Throttler) UpdateNamespaceParallelism(namespace string, limit int) { + _m.Called(namespace, limit) +} + +// UpdateParallelism provides a mock function with given fields: limit +func (_m *Throttler) UpdateParallelism(limit int) { + _m.Called(limit) +} + // NewThrottler creates a new instance of Throttler. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewThrottler(t interface { diff --git a/workflow/sync/multi_throttler.go b/workflow/sync/multi_throttler.go index 434201e148e8..cb0238776c90 100644 --- a/workflow/sync/multi_throttler.go +++ b/workflow/sync/multi_throttler.go @@ -23,6 +23,12 @@ type Throttler interface { Admit(key Key) bool // Remove notifies throttler that item processing is no longer needed Remove(key Key) + // UpdateParallelism + UpdateParallelism(limit int) + // UpdateNamespaceParallelism updates the namespace parallelism + UpdateNamespaceParallelism(namespace string, limit int) + // ResetNamespaceParallelism sets the namespace parallelism to the default value + ResetNamespaceParallelism(namespace string) } type Key = string @@ -137,6 +143,26 @@ func (m *multiThrottler) Remove(key Key) { m.queueThrottled() } +func (m *multiThrottler) UpdateParallelism(limit int) { + m.lock.Lock() + defer m.lock.Unlock() + m.totalParallelism = limit + m.queueThrottled() +} + +func (m *multiThrottler) UpdateNamespaceParallelism(namespace string, limit int) { + m.lock.Lock() + defer m.lock.Unlock() + m.namespaceParallelism[namespace] = limit + m.queueThrottled() +} + +func (m *multiThrottler) ResetNamespaceParallelism(namespace string) { + m.lock.Lock() + defer m.lock.Unlock() + delete(m.namespaceParallelism, namespace) +} + func (m *multiThrottler) queueThrottled() { if m.totalParallelism != 0 && len(m.running) >= m.totalParallelism { return diff --git a/workflow/sync/multi_throttler_test.go b/workflow/sync/multi_throttler_test.go index d321c31c0766..25280a5036ca 100644 --- a/workflow/sync/multi_throttler_test.go +++ b/workflow/sync/multi_throttler_test.go @@ -202,3 +202,35 @@ func TestPriorityAcrossNamespaces(t *testing.T) { assert.True(t, throttler.Admit("b/1")) assert.False(t, throttler.Admit("a/2")) } + +func TestParallelismUpdate(t *testing.T) { + assert := assert.New(t) + throttler := NewMultiThrottler(4, 0, func(Key) {}) + throttler.Add("a/0", 0, time.Now()) + throttler.Add("b/0", 0, time.Now()) + throttler.Add("c/0", 0, time.Now()) + throttler.Add("d/0", 0, time.Now()) + throttler.Add("e/0", 0, time.Now()) + throttler.Add("f/0", 0, time.Now()) + + assert.True(throttler.Admit("a/0")) + assert.True(throttler.Admit("b/0")) + assert.True(throttler.Admit("c/0")) + assert.True(throttler.Admit("d/0")) + assert.False(throttler.Admit("e/0")) + assert.False(throttler.Admit("f/0")) + + throttler.UpdateParallelism(5) + assert.True(throttler.Admit("e/0")) + assert.False(throttler.Admit("f/0")) +} + +func TestNamespaceParallelismUpdate(t *testing.T) { + assert := assert.New(t) + throttler := NewMultiThrottler(4, 0, func(Key) {}) + throttler.UpdateNamespaceParallelism("argo", 1) + throttler.Add("argo/a", 0, time.Now()) + throttler.Add("argo/b", 0, time.Now()) + assert.True(throttler.Admit("argo/a")) + assert.False(throttler.Admit("argo/b")) +}