Skip to content

Commit

Permalink
fix: Support memoization on plugin node. Fixes #8553 (#8554)
Browse files Browse the repository at this point in the history
Signed-off-by: wujayway <[email protected]>
  • Loading branch information
wujayway authored May 2, 2022
1 parent 5005315 commit dc8fef3
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 0 deletions.
10 changes: 10 additions & 0 deletions workflow/controller/taskset.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

log "github.com/sirupsen/logrus"

"github.com/argoproj/argo-workflows/v3/errors"
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/workflow/common"
controllercache "github.com/argoproj/argo-workflows/v3/workflow/controller/cache"
)

func (woc *wfOperationCtx) patchTaskSet(ctx context.Context, patch interface{}, pathTypeType types.PatchType) error {
Expand Down Expand Up @@ -134,6 +137,13 @@ func (woc *wfOperationCtx) reconcileTaskSet(ctx context.Context) error {
node.FinishedAt = metav1.Now()

woc.wf.Status.Nodes[nodeID] = node
if node.MemoizationStatus != nil && node.Succeeded() {
c := woc.controller.cacheFactory.GetCache(controllercache.ConfigMapCache, node.MemoizationStatus.CacheName)
err := c.Save(ctx, node.MemoizationStatus.Key, node.ID, node.Outputs)
if err != nil {
woc.log.WithFields(log.Fields{"nodeID": node.ID}).WithError(err).Error("Failed to save node outputs to cache")
}
}
woc.updated = true
}
}
Expand Down
78 changes: 78 additions & 0 deletions workflow/controller/taskset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -333,3 +334,80 @@ func TestNonHTTPTemplateScenario(t *testing.T) {
assert.NoError(t, err)
})
}

func TestReconcileTaskSetWithMemoization(t *testing.T) {
wf := wfv1.MustUnmarshalWorkflow(`apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: http-template-1
namespace: default
spec:
entrypoint: main
templates:
- name: main
http:
url: "http://localhost"
memoize:
key: cache-demo-1
maxAge: "10s"
cache:
configMap:
name: cache-demo-1
status:
nodes:
http-template-fqgsf-2338098285:
boundaryID: http-template-fqgsf
displayName: main
id: http-template-fqgsf-2338098285
name: http-template-fqgsf[0].main
memoizationStatus:
hit: false
key: cache-demo-1
cacheName: cache-demo-1
outputs:
parameters:
- name: result
value: |
{ demo }
phase: Succeeded
templateName: http
type: HTTP
phase: Running
`)
ctx := context.Background()
var ts wfv1.WorkflowTaskSet
wfv1.MustUnmarshal(`apiVersion: argoproj.io/v1alpha1
kind: WorkflowTaskSet
metadata:
name: http-template-1
namespace: default
spec:
tasks:
http-template-fqgsf-2338098285:
http:
url: http://localhost
name: http
status:
nodes:
http-template-fqgsf-2338098285:
outputs:
parameters:
- name: result
value: |
{ demo }
phase: Succeeded
`, &ts)
t.Run("MemoizeOnTaskSetSucceeded", func(t *testing.T) {
cancel, controller := newController(wf, ts)
defer cancel()
_, err := controller.wfclientset.ArgoprojV1alpha1().WorkflowTaskSets("default").Create(ctx, &ts, v1.CreateOptions{})
assert.NoError(t, err)
woc := newWorkflowOperationCtx(wf, controller)
time.Sleep(1 * time.Second)
err = woc.reconcileTaskSet(ctx)
assert.NoError(t, err)
memo, err := controller.kubeclientset.CoreV1().ConfigMaps("default").Get(ctx, "cache-demo-1", v1.GetOptions{})
assert.NoError(t, err)
assert.NotEmpty(t, memo.Data["cache-demo-1"])
})
}

0 comments on commit dc8fef3

Please sign in to comment.