Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor controller logic #784

Merged
merged 12 commits into from
Jul 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/apis/camel/v1alpha1/integrationkit_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ const (
// IntegrationKitTypeExternal --
IntegrationKitTypeExternal = "external"

// IntegrationKitPhaseInitial --
IntegrationKitPhaseInitial IntegrationKitPhase = ""
// IntegrationKitPhaseWaitingForPlatform --
IntegrationKitPhaseWaitingForPlatform IntegrationKitPhase = "Waiting For Platform"
// IntegrationKitPhaseBuildSubmitted --
IntegrationKitPhaseBuildSubmitted IntegrationKitPhase = "Build Submitted"
// IntegrationKitPhaseBuildRunning --
Expand Down
6 changes: 2 additions & 4 deletions pkg/controller/build/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
// Action --
type Action interface {
client.Injectable
log.Injectable

// a user friendly name for the action
Name() string
Expand All @@ -36,10 +37,7 @@ type Action interface {
CanHandle(build *v1alpha1.Build) bool

// executes the handling function
Handle(ctx context.Context, build *v1alpha1.Build) error

// Inject integration logger
InjectLogger(log.Logger)
Handle(ctx context.Context, build *v1alpha1.Build) (*v1alpha1.Build, error)
}

type baseAction struct {
Expand Down
70 changes: 47 additions & 23 deletions pkg/controller/build/build_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,9 @@ func (r *ReconcileBuild) Reconcile(request reconcile.Request) (reconcile.Result,
ctx := context.TODO()

// Fetch the Integration instance
instance := &v1alpha1.Build{}
err := r.client.Get(ctx, request.NamespacedName, instance)
if err != nil {
var instance v1alpha1.Build

if err := r.client.Get(ctx, request.NamespacedName, &instance); err != nil {
if errors.IsNotFound(err) {
// Request object not found, could have been deleted after reconcile request.
// Owned objects are automatically garbage collected. For additional cleanup logic use finalizers.
Expand All @@ -161,7 +161,7 @@ func (r *ReconcileBuild) Reconcile(request reconcile.Request) (reconcile.Result,
return reconcile.Result{}, err
}

buildActionPool := []Action{
actions := []Action{
NewInitializeAction(),
NewScheduleRoutineAction(r.reader, r.builder, &r.routines),
NewSchedulePodAction(r.reader),
Expand All @@ -170,33 +170,57 @@ func (r *ReconcileBuild) Reconcile(request reconcile.Request) (reconcile.Result,
NewErrorRecoveryAction(),
}

blog := rlog.ForBuild(instance)
for _, a := range buildActionPool {
var err error

target := instance.DeepCopy()
targetPhase := target.Status.Phase
targetLog := rlog.ForBuild(target)

for _, a := range actions {
a.InjectClient(r.client)
a.InjectLogger(blog)
if a.CanHandle(instance) {
blog.Infof("Invoking action %s", a.Name())
if err := a.Handle(ctx, instance); err != nil {
if k8serrors.IsConflict(err) {
blog.Error(err, "conflict")
return reconcile.Result{
Requeue: true,
}, nil
}
a.InjectLogger(targetLog)

if a.CanHandle(target) {
targetLog.Infof("Invoking action %s", a.Name())

phaseFrom := target.Status.Phase

target, err = a.Handle(ctx, target)
if err != nil {
return reconcile.Result{}, err
}
}
}

// Refresh the build and check the state
if err = r.client.Get(ctx, request.NamespacedName, instance); err != nil {
return reconcile.Result{}, err
if target != nil {
if err := r.client.Status().Update(ctx, target); err != nil {
if k8serrors.IsConflict(err) {
targetLog.Error(err, "conflict")
return reconcile.Result{
Requeue: true,
}, nil
}

return reconcile.Result{}, err
}

targetPhase = target.Status.Phase

if targetPhase != phaseFrom {
targetLog.Info(
"state transition",
"phase-from", phaseFrom,
"phase-to", target.Status.Phase,
)
}
}

// handle one action at time so the resource
// is always at its latest state
break
}
}

// Requeue scheduling build so that it re-enters the build working queue
if instance.Status.Phase == v1alpha1.BuildPhaseScheduling ||
instance.Status.Phase == v1alpha1.BuildPhaseFailed {
if targetPhase == v1alpha1.BuildPhaseScheduling || targetPhase == v1alpha1.BuildPhaseFailed {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

targetPhase may be empty if the schedule action returns nil, or possibly, when no action can handle the event, so it may be better to replace it with target.Status.Phase.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return reconcile.Result{
RequeueAfter: 5 * time.Second,
}, nil
Expand Down
8 changes: 3 additions & 5 deletions pkg/controller/build/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,8 @@ func (action *initializeAction) CanHandle(build *v1alpha1.Build) bool {
}

// Handle handles the builds
func (action *initializeAction) Handle(ctx context.Context, build *v1alpha1.Build) error {
target := build.DeepCopy()
target.Status.Phase = v1alpha1.BuildPhaseScheduling
action.L.Info("Build state transition", "phase", target.Status.Phase)
func (action *initializeAction) Handle(ctx context.Context, build *v1alpha1.Build) (*v1alpha1.Build, error) {
build.Status.Phase = v1alpha1.BuildPhaseScheduling

return action.client.Status().Update(ctx, target)
return build, nil
}
22 changes: 11 additions & 11 deletions pkg/controller/build/monitor_pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,34 +49,34 @@ func (action *monitorPodAction) CanHandle(build *v1alpha1.Build) bool {
}

// Handle handles the builds
func (action *monitorPodAction) Handle(ctx context.Context, build *v1alpha1.Build) error {
target := build.DeepCopy()

func (action *monitorPodAction) Handle(ctx context.Context, build *v1alpha1.Build) (*v1alpha1.Build, error) {
// Get the build pod
pod := &corev1.Pod{}
err := action.client.Get(ctx, types.NamespacedName{Namespace: build.Namespace, Name: buildPodName(build.Spec.Meta)}, pod)
if err != nil {
if k8serrors.IsNotFound(err) {
// Let's reschedule the build
target.Status.Phase = v1alpha1.BuildPhaseScheduling
build.Status.Phase = v1alpha1.BuildPhaseScheduling
} else {
return err
return nil, err
}
}

var buildPhase v1alpha1.BuildPhase

switch pod.Status.Phase {
case corev1.PodSucceeded:
target.Status.Phase = v1alpha1.BuildPhaseSucceeded
buildPhase = v1alpha1.BuildPhaseSucceeded
case corev1.PodFailed:
target.Status.Phase = v1alpha1.BuildPhaseFailed
buildPhase = v1alpha1.BuildPhaseFailed
}

if target.Status.Phase == build.Status.Phase {
if build.Status.Phase == buildPhase {
// Status is already up-to-date
return nil
return nil, nil
}

action.L.Info("Build state transition", "phase", target.Status.Phase)
build.Status.Phase = buildPhase

return action.client.Status().Update(ctx, target)
return build, nil
}
12 changes: 4 additions & 8 deletions pkg/controller/build/monitor_routine.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,15 @@ func (action *monitorRoutineAction) CanHandle(build *v1alpha1.Build) bool {
}

// Handle handles the builds
func (action *monitorRoutineAction) Handle(ctx context.Context, build *v1alpha1.Build) error {
target := build.DeepCopy()

func (action *monitorRoutineAction) Handle(ctx context.Context, build *v1alpha1.Build) (*v1alpha1.Build, error) {
// Check the build routine
if _, ok := action.routines.Load(build.Name); !ok {
// and reschedule the build if it's missing. This can happen when the operator
// stops abruptly and restarts.
target.Status.Phase = v1alpha1.BuildPhaseScheduling

action.L.Info("Build state transition", "phase", target.Status.Phase)
build.Status.Phase = v1alpha1.BuildPhaseScheduling

return action.client.Status().Update(ctx, target)
return build, nil
}

return nil
return nil, nil
}
32 changes: 13 additions & 19 deletions pkg/controller/build/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ func (action *errorRecoveryAction) CanHandle(build *v1alpha1.Build) bool {
return build.Status.Phase == v1alpha1.BuildPhaseFailed
}

func (action *errorRecoveryAction) Handle(ctx context.Context, build *v1alpha1.Build) error {
func (action *errorRecoveryAction) Handle(ctx context.Context, build *v1alpha1.Build) (*v1alpha1.Build, error) {
// The integration platform must be initialized before handling the error recovery
if _, err := platform.GetCurrentPlatform(ctx, action.client, build.Namespace); err != nil {
action.L.Info("Waiting for an integration platform to be initialized")
return nil
return nil, nil
}

if build.Status.Failure == nil {
Expand All @@ -75,17 +75,12 @@ func (action *errorRecoveryAction) Handle(ctx context.Context, build *v1alpha1.B

err := action.client.Status().Update(ctx, build)
if err != nil {
return err
return nil, err
}

target := build.DeepCopy()

if build.Status.Failure.Recovery.Attempt >= build.Status.Failure.Recovery.AttemptMax {
target.Status.Phase = v1alpha1.BuildPhaseError

action.L.Info("Max recovery attempt reached, transition to error phase")

return action.client.Status().Update(ctx, target)
build.Status.Phase = v1alpha1.BuildPhaseError
return build, nil
}

lastAttempt := build.Status.Failure.Recovery.AttemptTime.Time
Expand All @@ -97,19 +92,18 @@ func (action *errorRecoveryAction) Handle(ctx context.Context, build *v1alpha1.B
elapsedMin := action.backOff.ForAttempt(float64(build.Status.Failure.Recovery.Attempt)).Seconds()

if elapsed < elapsedMin {
return nil
return nil, nil
}

target.Status = v1alpha1.BuildStatus{}
target.Status.Phase = ""
target.Status.Failure = build.Status.Failure
target.Status.Failure.Recovery.Attempt = build.Status.Failure.Recovery.Attempt + 1
target.Status.Failure.Recovery.AttemptTime = metav1.Now()
build.Status = v1alpha1.BuildStatus{}
build.Status.Phase = v1alpha1.BuildPhaseInitial
build.Status.Failure.Recovery.Attempt++
build.Status.Failure.Recovery.AttemptTime = metav1.Now()

action.L.Infof("Recovery attempt (%d/%d)",
target.Status.Failure.Recovery.Attempt,
target.Status.Failure.Recovery.AttemptMax,
build.Status.Failure.Recovery.Attempt,
build.Status.Failure.Recovery.AttemptMax,
)

return action.client.Status().Update(ctx, target)
return build, nil
}
22 changes: 10 additions & 12 deletions pkg/controller/build/schedule_pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (action *schedulePodAction) CanHandle(build *v1alpha1.Build) bool {
}

// Handle handles the builds
func (action *schedulePodAction) Handle(ctx context.Context, build *v1alpha1.Build) error {
func (action *schedulePodAction) Handle(ctx context.Context, build *v1alpha1.Build) (*v1alpha1.Build, error) {
// Enter critical section
action.lock.Lock()
defer action.lock.Unlock()
Expand All @@ -72,7 +72,7 @@ func (action *schedulePodAction) Handle(ctx context.Context, build *v1alpha1.Bui
// atomically by write operations
err := action.reader.List(ctx, options, builds)
if err != nil {
return err
return nil, err
}

// Emulate a serialized working queue to only allow one build to run at a given time.
Expand All @@ -87,13 +87,13 @@ func (action *schedulePodAction) Handle(ctx context.Context, build *v1alpha1.Bui

if hasScheduledBuild {
// Let's requeue the build in case one is already running
return nil
return nil, nil
}

// Try to get operator image name before starting the build
operatorImage, err := platform.GetCurrentOperatorImage(ctx, action.client)
if err != nil {
return err
return nil, err
}

// Otherwise, let's create the build pod
Expand All @@ -103,29 +103,27 @@ func (action *schedulePodAction) Handle(ctx context.Context, build *v1alpha1.Bui

// Set the Build instance as the owner and controller
if err := controllerutil.SetControllerReference(build, pod, action.client.GetScheme()); err != nil {
return err
return nil, err
}

// Ensure service account is present
if err := action.ensureServiceAccount(ctx, pod); err != nil {
return errors.Wrap(err, "cannot ensure service account is present")
return nil, errors.Wrap(err, "cannot ensure service account is present")
}

err = action.client.Delete(ctx, pod)
if err != nil && !k8serrors.IsNotFound(err) {
return errors.Wrap(err, "cannot delete build pod")
return nil, errors.Wrap(err, "cannot delete build pod")
}

err = action.client.Create(ctx, pod)
if err != nil {
return errors.Wrap(err, "cannot create build pod")
return nil, errors.Wrap(err, "cannot create build pod")
}

target := build.DeepCopy()
target.Status.Phase = v1alpha1.BuildPhasePending
action.L.Info("Build state transition", "phase", target.Status.Phase)
build.Status.Phase = v1alpha1.BuildPhasePending

return action.client.Status().Update(ctx, target)
return build, nil
}

func (action *schedulePodAction) ensureServiceAccount(ctx context.Context, buildPod *corev1.Pod) error {
Expand Down
10 changes: 5 additions & 5 deletions pkg/controller/build/schedule_routine.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (action *scheduleRoutineAction) CanHandle(build *v1alpha1.Build) bool {
}

// Handle handles the builds
func (action *scheduleRoutineAction) Handle(ctx context.Context, build *v1alpha1.Build) error {
func (action *scheduleRoutineAction) Handle(ctx context.Context, build *v1alpha1.Build) (*v1alpha1.Build, error) {
// Enter critical section
action.lock.Lock()
defer action.lock.Unlock()
Expand All @@ -67,7 +67,7 @@ func (action *scheduleRoutineAction) Handle(ctx context.Context, build *v1alpha1
// atomically by write operations
err := action.reader.List(ctx, options, builds)
if err != nil {
return err
return nil, err
}

// Emulate a serialized working queue to only allow one build to run at a given time.
Expand All @@ -82,7 +82,7 @@ func (action *scheduleRoutineAction) Handle(ctx context.Context, build *v1alpha1

if hasScheduledBuild {
// Let's requeue the build in case one is already running
return nil
return nil, nil
}

// Transition the build to running state
Expand All @@ -91,14 +91,14 @@ func (action *scheduleRoutineAction) Handle(ctx context.Context, build *v1alpha1
action.L.Info("Build state transition", "phase", target.Status.Phase)
err = action.client.Status().Update(ctx, target)
if err != nil {
return err
return nil, err
}

// and run it asynchronously to avoid blocking the reconcile loop
action.routines.Store(build.Name, true)
go action.build(ctx, build)

return nil
return nil, nil
}

func (action *scheduleRoutineAction) build(ctx context.Context, build *v1alpha1.Build) {
Expand Down
Loading