Skip to content

Commit

Permalink
feat: dynamic namespace parallelism. Fixes #14194 (#14188)
Browse files Browse the repository at this point in the history
Signed-off-by: isubasinghe <[email protected]>
Signed-off-by: Isitha Subasinghe <[email protected]>
  • Loading branch information
isubasinghe authored Feb 24, 2025
1 parent e9e7c43 commit dbfedbd
Show file tree
Hide file tree
Showing 12 changed files with 325 additions and 1 deletion.
2 changes: 2 additions & 0 deletions docs/parallelism.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ When namespace parallelism is enabled, it is plausible for a workflow with a low
!!! Note
Workflows that are executing but restricted from running more nodes due to other mechanisms will still count toward parallelism limits.
In addition to the default parallelism, you are able to set individual limits on namespace parallelism by modifying the namespace object with a `workflows.argoproj.io/parallelism-limit` label. Note that individual limits on namespaces will override global namespace limits.

### Priority

You can set a `priority` on workflows:
Expand Down
1 change: 1 addition & 0 deletions test/e2e/fixtures/e2e_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,5 +244,6 @@ func (s *E2ESuite) Given() *Given {
hydrator: s.hydrator,
kubeClient: s.KubeClient,
bearerToken: bearerToken,
restConfig: s.RestConfig,
}
}
3 changes: 3 additions & 0 deletions test/e2e/fixtures/given.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"sigs.k8s.io/yaml"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
Expand All @@ -33,6 +34,7 @@ type Given struct {
cronWf *wfv1.CronWorkflow
kubeClient kubernetes.Interface
bearerToken string
restConfig *rest.Config
}

// creates a workflow based on the parameter, this may be:
Expand Down Expand Up @@ -250,5 +252,6 @@ func (g *Given) When() *When {
hydrator: g.hydrator,
kubeClient: g.kubeClient,
bearerToken: g.bearerToken,
restConfig: g.restConfig,
}
}
3 changes: 3 additions & 0 deletions test/e2e/fixtures/then.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"

"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
Expand All @@ -33,6 +34,7 @@ type Then struct {
hydrator hydrator.Interface
kubeClient kubernetes.Interface
bearerToken string
restConfig *rest.Config
}

func (t *Then) ExpectWorkflow(block func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus)) *Then {
Expand Down Expand Up @@ -301,5 +303,6 @@ func (t *Then) When() *When {
wf: t.wf,
kubeClient: t.kubeClient,
bearerToken: t.bearerToken,
restConfig: t.restConfig,
}
}
27 changes: 27 additions & 0 deletions test/e2e/fixtures/when.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/utils/ptr"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
Expand All @@ -39,6 +40,7 @@ type When struct {
hydrator hydrator.Interface
kubeClient kubernetes.Interface
bearerToken string
restConfig *rest.Config
}

func (w *When) SubmitWorkflow() *When {
Expand Down Expand Up @@ -219,6 +221,7 @@ var (
return node.Type == wfv1.NodeTypePod && node.Phase == wfv1.NodeFailed
}), "to have failed pod"
}
ToBePending = ToHavePhase(wfv1.WorkflowPending)
)

// `ToBeDone` replaces `ToFinish` which also makes sure the workflow is both complete not pending archiving.
Expand Down Expand Up @@ -488,6 +491,28 @@ func (w *When) RemoveFinalizers(shouldErr bool) *When {
return w
}

func (w *When) AddNamespaceLimit(limit string) *When {
w.t.Helper()
ctx := context.Background()
patchMap := make(map[string]interface{})
metadata := make(map[string]interface{})
labels := make(map[string]interface{})
labels["workflows.argoproj.io/parallelism-limit"] = limit
metadata["labels"] = labels
patchMap["metadata"] = metadata

bs, err := json.Marshal(patchMap)
if err != nil {
w.t.Fatal(err)
}

_, err = w.kubeClient.CoreV1().Namespaces().Patch(ctx, Namespace, types.MergePatchType, []byte(bs), metav1.PatchOptions{})
if err != nil {
w.t.Fatal(err)
}
return w
}

type PodCondition func(p *corev1.Pod) bool

var (
Expand Down Expand Up @@ -717,6 +742,7 @@ func (w *When) Then() *Then {
hydrator: w.hydrator,
kubeClient: w.kubeClient,
bearerToken: w.bearerToken,
restConfig: w.restConfig,
}
}

Expand All @@ -736,5 +762,6 @@ func (w *When) Given() *Given {
cwfTemplates: w.cwfTemplates,
cronWf: w.cronWf,
kubeClient: w.kubeClient,
restConfig: w.restConfig,
}
}
60 changes: 60 additions & 0 deletions test/e2e/ns_parallelism_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
//go:build functional

package e2e

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"

"github.com/argoproj/argo-workflows/v3/test/e2e/fixtures"
)

type NamespaceParallelismSuite struct {
fixtures.E2ESuite
}

const wf = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: hello-world-
labels:
workflows.argoproj.io/archive-strategy: "false"
annotations:
workflows.argoproj.io/description: |
This is a simple hello world example.
spec:
entrypoint: hello-world
templates:
- name: hello-world
container:
image: "argoproj/argosay:v2"
command: [sleep]
args: ["60"]
`

func (s *NamespaceParallelismSuite) TestNamespaceParallelism() {

s.Given().
Workflow(wf).
When().
AddNamespaceLimit("1").
SubmitWorkflow().
WaitForWorkflow(fixtures.ToStart)

time.Sleep(time.Second * 5)
wf := s.Given().
Workflow(wf).
When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBePending).GetWorkflow()
t := s.T()
assert.Equal(t, "Workflow processing has been postponed because too many workflows are already running", wf.Status.Message)
}

func TestNamespaceParallelismSuite(t *testing.T) {
suite.Run(t, new(NamespaceParallelismSuite))
}
4 changes: 4 additions & 0 deletions workflow/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ const (
// AnnotationKeyArtifactGCStrategy is listed as an annotation on the Artifact GC Pod to identify
// the strategy whose artifacts are being deleted
AnnotationKeyArtifactGCStrategy = workflow.WorkflowFullName + "/artifact-gc-strategy"

// LabelParallelismLimit is a label applied on namespace objects to control the per namespace parallelism.
LabelParallelismLimit = workflow.WorkflowFullName + "/parallelism-limit"

// AnnotationKeyPodGCStrategy is listed as an annotation on the Pod
// the strategy for the pod, in case the pod is orphaned from its workflow
AnnotationKeyPodGCStrategy = workflow.WorkflowFullName + "/pod-gc-strategy"
Expand Down
10 changes: 9 additions & 1 deletion workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ type WorkflowController struct {

// datastructures to support the processing of workflows and workflow pods
wfInformer cache.SharedIndexInformer
nsInformer cache.SharedIndexInformer
wftmplInformer wfextvv1alpha1.WorkflowTemplateInformer
cwftmplInformer wfextvv1alpha1.ClusterWorkflowTemplateInformer
PodController *pod.Controller // Currently public for woc to access, but would rather an accessor
Expand Down Expand Up @@ -298,12 +299,17 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo
Info("Current Worker Numbers")

wfc.wfInformer = util.NewWorkflowInformer(wfc.dynamicInterface, wfc.GetManagedNamespace(), workflowResyncPeriod, wfc.tweakListRequestListOptions, wfc.tweakWatchRequestListOptions, indexers)
nsInformer, err := wfc.newNamespaceInformer(ctx, wfc.kubeclientset)
if err != nil {
log.Fatal(err)
}
wfc.nsInformer = nsInformer
wfc.wftmplInformer = informer.NewTolerantWorkflowTemplateInformer(wfc.dynamicInterface, workflowTemplateResyncPeriod, wfc.managedNamespace)

wfc.wfTaskSetInformer = wfc.newWorkflowTaskSetInformer()
wfc.artGCTaskInformer = wfc.newArtGCTaskInformer()
wfc.taskResultInformer = wfc.newWorkflowTaskResultInformer()
err := wfc.addWorkflowInformerHandlers(ctx)
err = wfc.addWorkflowInformerHandlers(ctx)
if err != nil {
log.Fatal(err)
}
Expand All @@ -324,6 +330,7 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo
go wfc.runConfigMapWatcher(ctx)
}

go wfc.nsInformer.Run(ctx.Done())
go wfc.wfInformer.Run(ctx.Done())
go wfc.wftmplInformer.Informer().Run(ctx.Done())
go wfc.configMapInformer.Run(ctx.Done())
Expand All @@ -337,6 +344,7 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo
if !cache.WaitForCacheSync(
ctx.Done(),
wfc.wfInformer.HasSynced,
wfc.nsInformer.HasSynced,
wfc.wftmplInformer.Informer().HasSynced,
wfc.PodController.HasSynced(),
wfc.configMapInformer.HasSynced,
Expand Down
143 changes: 143 additions & 0 deletions workflow/controller/ns_watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package controller

import (
"context"
"strconv"
"time"

"errors"

"github.com/sirupsen/logrus"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"

"github.com/argoproj/argo-workflows/v3/workflow/common"
)

var (
limitReq, _ = labels.NewRequirement(common.LabelParallelismLimit, selection.Exists, nil)
nsResyncPeriod = 5 * time.Minute
errUnableToExtract = errors.New("was unable to extract limit")
)

type updateFunc = func(string, int)
type resetFunc = func(string)

func (wfc *WorkflowController) newNamespaceInformer(ctx context.Context, kubeclientset kubernetes.Interface) (cache.SharedIndexInformer, error) {

c := kubeclientset.CoreV1().Namespaces()
logger := logrus.WithField("scope", "ns_watcher")

labelSelector := labels.NewSelector().
Add(*limitReq)

listFunc := func(opts metav1.ListOptions) (runtime.Object, error) {
opts.LabelSelector = labelSelector.String()
return c.List(ctx, opts)
}

watchFunc := func(opts metav1.ListOptions) (watch.Interface, error) {
opts.Watch = true
opts.LabelSelector = labelSelector.String()
return c.Watch(ctx, opts)
}

source := &cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
informer := cache.NewSharedIndexInformer(source, &apiv1.Namespace{}, nsResyncPeriod, cache.Indexers{})

_, err := informer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ns, err := nsFromObj(obj)
if err != nil {
return
}
updateNS(logger, ns, wfc.throttler.UpdateNamespaceParallelism, wfc.throttler.ResetNamespaceParallelism)
},

UpdateFunc: func(old, newVal interface{}) {
ns, err := nsFromObj(newVal)
if err != nil {
return
}
oldNs, err := nsFromObj(old)
if err == nil && !limitChanged(oldNs, ns) {
return
}
updateNS(logger, ns, wfc.throttler.UpdateNamespaceParallelism, wfc.throttler.ResetNamespaceParallelism)
},

DeleteFunc: func(obj interface{}) {
ns, err := nsFromObj(obj)
if err != nil {
return
}
deleteNS(logger, ns, wfc.throttler.ResetNamespaceParallelism)
},
},
)
if err != nil {
return nil, err
}
return informer, nil
}

func deleteNS(log *logrus.Entry, ns *apiv1.Namespace, resetFn resetFunc) {
log.Infof("reseting the namespace parallelism limits for %s due to deletion event", ns.Name)
resetFn(ns.Name)
}

func updateNS(log *logrus.Entry, ns *apiv1.Namespace, updateFn updateFunc, resetFn resetFunc) {
limit, err := extractLimit(ns)
if errors.Is(err, errUnableToExtract) {
resetFn(ns.Name)
log.Infof("removing per-namespace parallelism for %s, reverting to default", ns.Name)
return
} else if err != nil {
log.Errorf("was unable to extract the limit due to: %s", err)
return
}
log.Infof("changing namespace parallelism in %s to %d", ns.Name, limit)
updateFn(ns.Name, limit)
}

func nsFromObj(obj interface{}) (*apiv1.Namespace, error) {
ns, ok := obj.(*apiv1.Namespace)
if !ok {
return nil, errors.New("was unable to convert to namespace")
}
return ns, nil
}

func limitChanged(old *apiv1.Namespace, newNS *apiv1.Namespace) bool {
oldLimit := old.GetLabels()[common.LabelParallelismLimit]
newLimit := newNS.GetLabels()[common.LabelParallelismLimit]
return !(oldLimit == newLimit)
}

func extractLimit(ns *apiv1.Namespace) (int, error) {
labels := ns.GetLabels()
var limitString *string

for lbl, value := range labels {
if lbl == common.LabelParallelismLimit {
limitString = &value
break
}
}
if limitString == nil {
return 0, errUnableToExtract
}

integerValue, err := strconv.Atoi(*limitString)
if err != nil {
return 0, err
}
return integerValue, nil
}
Loading

0 comments on commit dbfedbd

Please sign in to comment.