Skip to content

Commit

Permalink
Fix apache#560 - Check if the .spec.flow has changed before building (a…
Browse files Browse the repository at this point in the history
…pache#564)

* Fix apache#560 - Check if the .spec.flow has changed before building

Signed-off-by: Ricardo Zanini <[email protected]>

* Use CRC32 instead of comparing flows directly

Signed-off-by: Ricardo Zanini <[email protected]>

---------

Signed-off-by: Ricardo Zanini <[email protected]>
  • Loading branch information
ricardozanini authored and rgdoliveira committed Nov 18, 2024
1 parent 0d36c77 commit fa85d0a
Show file tree
Hide file tree
Showing 12 changed files with 132 additions and 37 deletions.
2 changes: 2 additions & 0 deletions api/v1alpha08/sonataflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ type SonataFlowStatus struct {
// Triggers list of triggers created for the SonataFlow
//+operator-sdk:csv:customresourcedefinitions:type=status,displayName="triggers"
Triggers []SonataFlowTriggerRef `json:"triggers,omitempty"`
//+operator-sdk:csv:customresourcedefinitions:type=status,displayName="flowRevision"
FlowCRC uint32 `json:"flowCRC,omitempty"`
}

// SonataFlowTriggerRef defines a trigger created for the SonataFlow.
Expand Down
3 changes: 3 additions & 0 deletions bundle/manifests/sonataflow.org_sonataflows.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10050,6 +10050,9 @@ spec:
endpoint:
description: Endpoint is an externally accessible URL of the workflow
type: string
flowCRC:
format: int32
type: integer
lastTimeRecoverAttempt:
format: date-time
type: string
Expand Down
3 changes: 3 additions & 0 deletions config/crd/bases/sonataflow.org_sonataflows.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10050,6 +10050,9 @@ spec:
endpoint:
description: Endpoint is an externally accessible URL of the workflow
type: string
flowCRC:
format: int32
type: integer
lastTimeRecoverAttempt:
format: date-time
type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,8 @@ spec:
- description: Endpoint is an externally accessible URL of the workflow
displayName: endpoint
path: endpoint
- displayName: flowRevision
path: flowCRC
- displayName: lastTimeRecoverAttempt
path: lastTimeRecoverAttempt
- description: Platform displays which platform is being used by this workflow
Expand Down
6 changes: 6 additions & 0 deletions internal/controller/profiles/common/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"context"
"fmt"

"github.com/apache/incubator-kie-kogito-serverless-operator/utils"

"k8s.io/client-go/rest"

"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/discovery"
Expand Down Expand Up @@ -56,6 +58,10 @@ func (s *StateSupport) PerformStatusUpdate(ctx context.Context, workflow *operat
return false, err
}
workflow.Status.ObservedGeneration = workflow.Generation
workflow.Status.FlowCRC, err = utils.Crc32Checksum(workflow.Spec.Flow)
if err != nil {
return false, err
}
services.SetServiceUrlsInWorkflowStatus(pl, workflow)
if workflow.Status.Platform == nil {
workflow.Status.Platform = &operatorapi.SonataFlowPlatformRef{}
Expand Down
13 changes: 6 additions & 7 deletions internal/controller/profiles/preview/profile_preview_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func Test_deployWorkflowReconciliationHandler_handleObjects(t *testing.T) {
assert.Equal(t, serviceMonitor.Spec.Endpoints[0].Path, "/q/metrics")
}

func Test_GenerationAnnotationCheck(t *testing.T) {
func Test_WorkflowChangedCheck(t *testing.T) {
// we load a workflow with metadata.generation to 0
workflow := test.GetBaseSonataFlow(t.Name())
platform := test.GetBasePlatformInReadyPhase(t.Name())
Expand All @@ -199,15 +199,14 @@ func Test_GenerationAnnotationCheck(t *testing.T) {
assert.NotNil(t, result)
assert.Len(t, objects, 3)

// then we load a workflow with metadata.generation set to 1
// then we load the current workflow
workflowChanged := &operatorapi.SonataFlow{}
err = client.Get(context.TODO(), clientruntime.ObjectKeyFromObject(workflow), workflowChanged)
assert.NoError(t, err)
//we set the generation to 1
workflowChanged.Generation = int64(1)
err = client.Update(context.TODO(), workflowChanged)
assert.NoError(t, err)
// reconcile
//we change something within the flow
workflowChanged.Spec.Flow.AutoRetries = true

// reconcile -> the one in the k8s DB is different, so there's a change.
handler = &deployWithBuildWorkflowState{
StateSupport: fakeReconcilerSupport(client),
ensurers: NewObjectEnsurers(&common.StateSupport{C: client}),
Expand Down
25 changes: 17 additions & 8 deletions internal/controller/profiles/preview/states_preview.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"sort"

"github.com/apache/incubator-kie-kogito-serverless-operator/utils"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -40,7 +41,6 @@ import (
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/profiles/common"
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/profiles/common/constants"
"github.com/apache/incubator-kie-kogito-serverless-operator/log"
kubeutil "github.com/apache/incubator-kie-kogito-serverless-operator/utils/kubernetes"
"github.com/apache/incubator-kie-kogito-serverless-operator/workflowproj"
)

Expand Down Expand Up @@ -209,7 +209,11 @@ func (h *deployWithBuildWorkflowState) Do(ctx context.Context, workflow *operato
return ctrl.Result{}, nil, err
}

if h.isWorkflowChanged(workflow) { // Let's check that the 2 resWorkflowDef definition are different
hasChanged, err := h.isWorkflowChanged(workflow)
if err != nil {
return ctrl.Result{}, nil, err
}
if hasChanged { // Let's check that the 2 resWorkflowDef definition are different
if err = buildManager.MarkToRestart(build); err != nil {
return ctrl.Result{}, nil, err
}
Expand All @@ -235,13 +239,18 @@ func (h *deployWithBuildWorkflowState) PostReconcile(ctx context.Context, workfl
return h.cleanupOutdatedRevisions(ctx, workflow)
}

// isWorkflowChanged marks the workflow status as unknown to require a new build reconciliation
func (h *deployWithBuildWorkflowState) isWorkflowChanged(workflow *operatorapi.SonataFlow) bool {
generation := kubeutil.GetLastGeneration(workflow.Namespace, workflow.Name, h.C, context.TODO())
if generation > workflow.Status.ObservedGeneration {
return true
// isWorkflowChanged checks whether the contents of .spec.flow of the given workflow has changed.
func (h *deployWithBuildWorkflowState) isWorkflowChanged(workflow *operatorapi.SonataFlow) (bool, error) {
// Added this guard for backward compatibility for workflows deployed with a previous operator version, so we won't kick thousands of builds on users' cluster.
// After this reconciliation cycle, the CRC should be updated
if workflow.Status.FlowCRC == 0 {
return false, nil
}
return false
actualCRC, err := utils.Crc32Checksum(workflow.Spec.Flow)
if err != nil {
return false, err
}
return actualCRC != workflow.Status.FlowCRC, nil
}

func (h *deployWithBuildWorkflowState) cleanupOutdatedRevisions(ctx context.Context, workflow *operatorapi.SonataFlow) error {
Expand Down
54 changes: 54 additions & 0 deletions internal/controller/profiles/preview/states_preview_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package preview

import (
"testing"

"github.com/apache/incubator-kie-kogito-serverless-operator/utils"

"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/profiles/common"
"github.com/apache/incubator-kie-kogito-serverless-operator/test"
"github.com/serverlessworkflow/sdk-go/v2/model"
"github.com/stretchr/testify/assert"
)

func Test_deployWithBuildWorkflowState_isWorkflowChanged(t *testing.T) {
workflow1 := test.GetBaseSonataFlow(t.Name())
workflow2 := test.GetBaseSonataFlow(t.Name())
workflow1.Status.FlowCRC, _ = utils.Crc32Checksum(workflow1.Spec.Flow)
workflow2.Status.FlowCRC, _ = utils.Crc32Checksum(workflow2.Spec.Flow)
deployWithBuildWorkflowState := &deployWithBuildWorkflowState{
StateSupport: &common.StateSupport{C: test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow1).Build()},
}

hasChanged, err := deployWithBuildWorkflowState.isWorkflowChanged(workflow2)
assert.NoError(t, err)
assert.False(t, hasChanged)

// change workflow2
workflow2.Spec.Flow.Metadata = model.Metadata{
"string": model.Object{
StringValue: "test",
},
}

hasChanged, err = deployWithBuildWorkflowState.isWorkflowChanged(workflow2)
assert.NoError(t, err)
assert.True(t, hasChanged)
}
3 changes: 3 additions & 0 deletions operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27477,6 +27477,9 @@ spec:
endpoint:
description: Endpoint is an externally accessible URL of the workflow
type: string
flowCRC:
format: int32
type: integer
lastTimeRecoverAttempt:
format: date-time
type: string
Expand Down
3 changes: 3 additions & 0 deletions test/yaml.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"runtime"
"strings"

"github.com/apache/incubator-kie-kogito-serverless-operator/utils"

"github.com/apache/incubator-kie-kogito-serverless-operator/api"
operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
"github.com/apache/incubator-kie-kogito-serverless-operator/log"
Expand Down Expand Up @@ -71,6 +73,7 @@ func GetSonataFlow(testFile, namespace string) *operatorapi.SonataFlow {
GetKubernetesResource(testFile, ksw)
klog.V(log.D).InfoS("Successfully read KSW", "ksw", spew.Sprint(ksw))
ksw.Namespace = namespace
ksw.Status.FlowCRC, _ = utils.Crc32Checksum(ksw.Spec.Flow)
return ksw
}

Expand Down
33 changes: 33 additions & 0 deletions utils/crc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package utils

import (
"bytes"
"encoding/gob"
"hash/crc32"
)

func Crc32Checksum(v interface{}) (uint32, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
if err := enc.Encode(v); err != nil {
return 0, err
}
return crc32.ChecksumIEEE(buf.Bytes()), nil
}
22 changes: 0 additions & 22 deletions utils/kubernetes/annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,33 +20,11 @@
package kubernetes

import (
"context"
"strconv"

"k8s.io/klog/v2"

"sigs.k8s.io/controller-runtime/pkg/client"

operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
"github.com/apache/incubator-kie-kogito-serverless-operator/log"
)

func getWorkflow(namespace string, name string, c client.Client, ctx context.Context) *operatorapi.SonataFlow {
serverlessWorkflowType := &operatorapi.SonataFlow{}
serverlessWorkflowType.Namespace = namespace
serverlessWorkflowType.Name = name
serverlessWorkflow := &operatorapi.SonataFlow{}
if err := c.Get(ctx, client.ObjectKeyFromObject(serverlessWorkflowType), serverlessWorkflow); err != nil {
klog.V(log.E).ErrorS(err, "unable to retrieve SonataFlow definition")
}
return serverlessWorkflow
}

func GetLastGeneration(namespace string, name string, c client.Client, ctx context.Context) int64 {
workflow := getWorkflow(namespace, name, c, ctx)
return workflow.Generation
}

// GetAnnotationAsBool returns the boolean value from the given annotation.
// If the annotation is not present or is there an error in the ParseBool conversion, returns false.
func GetAnnotationAsBool(object client.Object, key string) bool {
Expand Down

0 comments on commit fa85d0a

Please sign in to comment.