Skip to content

Commit

Permalink
Move nopImage from pkg/…/sidecars package to cmd/controller
Browse files Browse the repository at this point in the history
This is part of a set of changes to make sure we don't define CLI
flags in our `pkg/…` packages… so that importing packages from there
do not pollute the cli flags.

Signed-off-by: Vincent Demeester <[email protected]>
  • Loading branch information
vdemeester authored and tekton-robot committed Oct 2, 2019
1 parent b338e40 commit dfb2eca
Show file tree
Hide file tree
Showing 10 changed files with 143 additions and 134 deletions.
13 changes: 11 additions & 2 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package main

import (
"flag"

"knative.dev/pkg/injection/sharedmain"

"github.com/tektoncd/pipeline/pkg/reconciler/pipelinerun"
Expand All @@ -28,9 +30,16 @@ const (
ControllerLogKey = "controller"
)

var (
nopImage = flag.String("nop-image", "override-with-nop:latest", "The container image used to kill sidecars")
)

func main() {
images := map[string]string{
"nopImage": *nopImage,
}
sharedmain.Main(ControllerLogKey,
taskrun.NewController,
pipelinerun.NewController,
taskrun.NewController(images),
pipelinerun.NewController(images),
)
}
99 changes: 49 additions & 50 deletions pkg/reconciler/pipelinerun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,61 +42,60 @@ const (
resyncPeriod = 10 * time.Hour
)

func NewController(
ctx context.Context,
cmw configmap.Watcher,
) *controller.Impl {
logger := logging.FromContext(ctx)
kubeclientset := kubeclient.Get(ctx)
pipelineclientset := pipelineclient.Get(ctx)
taskRunInformer := taskruninformer.Get(ctx)
taskInformer := taskinformer.Get(ctx)
clusterTaskInformer := clustertaskinformer.Get(ctx)
pipelineRunInformer := pipelineruninformer.Get(ctx)
pipelineInformer := pipelineinformer.Get(ctx)
resourceInformer := resourceinformer.Get(ctx)
conditionInformer := conditioninformer.Get(ctx)
timeoutHandler := reconciler.NewTimeoutHandler(ctx.Done(), logger)
func NewController(images map[string]string) func(context.Context, configmap.Watcher) *controller.Impl {
return func(ctx context.Context, cmw configmap.Watcher) *controller.Impl {
logger := logging.FromContext(ctx)
kubeclientset := kubeclient.Get(ctx)
pipelineclientset := pipelineclient.Get(ctx)
taskRunInformer := taskruninformer.Get(ctx)
taskInformer := taskinformer.Get(ctx)
clusterTaskInformer := clustertaskinformer.Get(ctx)
pipelineRunInformer := pipelineruninformer.Get(ctx)
pipelineInformer := pipelineinformer.Get(ctx)
resourceInformer := resourceinformer.Get(ctx)
conditionInformer := conditioninformer.Get(ctx)
timeoutHandler := reconciler.NewTimeoutHandler(ctx.Done(), logger)

opt := reconciler.Options{
KubeClientSet: kubeclientset,
PipelineClientSet: pipelineclientset,
ConfigMapWatcher: cmw,
ResyncPeriod: resyncPeriod,
Logger: logger,
}
opt := reconciler.Options{
KubeClientSet: kubeclientset,
PipelineClientSet: pipelineclientset,
ConfigMapWatcher: cmw,
ResyncPeriod: resyncPeriod,
Logger: logger,
}

c := &Reconciler{
Base: reconciler.NewBase(opt, pipelineRunAgentName),
pipelineRunLister: pipelineRunInformer.Lister(),
pipelineLister: pipelineInformer.Lister(),
taskLister: taskInformer.Lister(),
clusterTaskLister: clusterTaskInformer.Lister(),
taskRunLister: taskRunInformer.Lister(),
resourceLister: resourceInformer.Lister(),
conditionLister: conditionInformer.Lister(),
timeoutHandler: timeoutHandler,
}
impl := controller.NewImpl(c, c.Logger, pipelineRunControllerName)
c := &Reconciler{
Base: reconciler.NewBase(opt, pipelineRunAgentName, images),
pipelineRunLister: pipelineRunInformer.Lister(),
pipelineLister: pipelineInformer.Lister(),
taskLister: taskInformer.Lister(),
clusterTaskLister: clusterTaskInformer.Lister(),
taskRunLister: taskRunInformer.Lister(),
resourceLister: resourceInformer.Lister(),
conditionLister: conditionInformer.Lister(),
timeoutHandler: timeoutHandler,
}
impl := controller.NewImpl(c, c.Logger, pipelineRunControllerName)

timeoutHandler.SetPipelineRunCallbackFunc(impl.Enqueue)
timeoutHandler.CheckTimeouts(kubeclientset, pipelineclientset)
timeoutHandler.SetPipelineRunCallbackFunc(impl.Enqueue)
timeoutHandler.CheckTimeouts(kubeclientset, pipelineclientset)

c.Logger.Info("Setting up event handlers")
pipelineRunInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: impl.Enqueue,
UpdateFunc: controller.PassNew(impl.Enqueue),
DeleteFunc: impl.Enqueue,
})
c.Logger.Info("Setting up event handlers")
pipelineRunInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: impl.Enqueue,
UpdateFunc: controller.PassNew(impl.Enqueue),
DeleteFunc: impl.Enqueue,
})

c.tracker = tracker.New(impl.EnqueueKey, 30*time.Minute)
taskRunInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: controller.PassNew(impl.EnqueueControllerOf),
})
c.tracker = tracker.New(impl.EnqueueKey, 30*time.Minute)
taskRunInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: controller.PassNew(impl.EnqueueControllerOf),
})

c.Logger.Info("Setting up ConfigMap receivers")
c.configStore = config.NewStore(c.Logger.Named("config-store"))
c.configStore.WatchConfigs(opt.ConfigMapWatcher)
c.Logger.Info("Setting up ConfigMap receivers")
c.configStore = config.NewStore(c.Logger.Named("config-store"))
c.configStore.WatchConfigs(opt.ConfigMapWatcher)

return impl
return impl
}
}
10 changes: 5 additions & 5 deletions pkg/reconciler/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ import (

var (
ignoreLastTransitionTime = cmpopts.IgnoreTypes(apis.Condition{}.LastTransitionTime.Inner.Time)
images = map[string]string{
"nopImage": "override-with-nop:latest",
}
)

func getRunName(pr *v1alpha1.PipelineRun) string {
Expand All @@ -58,11 +61,8 @@ func getPipelineRunController(t *testing.T, d test.Data) (test.TestAssets, func(
configMapWatcher := configmap.NewInformedWatcher(c.Kube, system.GetNamespace())
ctx, cancel := context.WithCancel(ctx)
return test.TestAssets{
Controller: NewController(
ctx,
configMapWatcher,
),
Clients: c,
Controller: NewController(images)(ctx, configMapWatcher),
Clients: c,
}, cancel
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,14 @@ type Base struct {
// performance benefits, raw logger also preserves type-safety at
// the expense of slightly greater verbosity.
Logger *zap.SugaredLogger

// Images contains images to use for certain internal container
Images map[string]string
}

// NewBase instantiates a new instance of Base implementing
// the common & boilerplate code between our reconcilers.
func NewBase(opt Options, controllerAgentName string) *Base {
func NewBase(opt Options, controllerAgentName string, images map[string]string) *Base {
// Enrich the logs with controller name
logger := opt.Logger.Named(controllerAgentName).With(zap.String(logkey.ControllerType, controllerAgentName))

Expand All @@ -110,6 +113,7 @@ func NewBase(opt Options, controllerAgentName string) *Base {
ConfigMapWatcher: opt.ConfigMapWatcher,
Recorder: recorder,
Logger: logger,
Images: images,
}

return base
Expand Down
4 changes: 2 additions & 2 deletions pkg/reconciler/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestRecorderOptions(t *testing.T) {
Logger: zap.New(observer).Sugar(),
KubeClientSet: c.Kube,
PipelineClientSet: c.Pipeline,
}, "test")
}, "test", map[string]string{})

if strings.Compare(reflect.TypeOf(b.Recorder).String(), "*record.recorderImpl") != 0 {
t.Errorf("Expected recorder type '*record.recorderImpl' but actual type is: %s", reflect.TypeOf(b.Recorder).String())
Expand All @@ -81,7 +81,7 @@ func TestRecorderOptions(t *testing.T) {
KubeClientSet: c.Kube,
PipelineClientSet: c.Pipeline,
Recorder: fr,
}, "test")
}, "test", map[string]string{})

if strings.Compare(reflect.TypeOf(b.Recorder).String(), "*record.FakeRecorder") != 0 {
t.Errorf("Expected recorder type '*record.FakeRecorder' but actual type is: %s", reflect.TypeOf(b.Recorder).String())
Expand Down
113 changes: 56 additions & 57 deletions pkg/reconciler/taskrun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,62 +42,61 @@ const (
resyncPeriod = 10 * time.Hour
)

func NewController(
ctx context.Context,
cmw configmap.Watcher,
) *controller.Impl {
logger := logging.FromContext(ctx)
kubeclientset := kubeclient.Get(ctx)
pipelineclientset := pipelineclient.Get(ctx)
taskRunInformer := taskruninformer.Get(ctx)
taskInformer := taskinformer.Get(ctx)
clusterTaskInformer := clustertaskinformer.Get(ctx)
podInformer := podinformer.Get(ctx)
resourceInformer := resourceinformer.Get(ctx)
timeoutHandler := reconciler.NewTimeoutHandler(ctx.Done(), logger)

opt := reconciler.Options{
KubeClientSet: kubeclientset,
PipelineClientSet: pipelineclientset,
ConfigMapWatcher: cmw,
ResyncPeriod: resyncPeriod,
Logger: logger,
func NewController(images map[string]string) func(context.Context, configmap.Watcher) *controller.Impl {
return func(ctx context.Context, cmw configmap.Watcher) *controller.Impl {
logger := logging.FromContext(ctx)
kubeclientset := kubeclient.Get(ctx)
pipelineclientset := pipelineclient.Get(ctx)
taskRunInformer := taskruninformer.Get(ctx)
taskInformer := taskinformer.Get(ctx)
clusterTaskInformer := clustertaskinformer.Get(ctx)
podInformer := podinformer.Get(ctx)
resourceInformer := resourceinformer.Get(ctx)
timeoutHandler := reconciler.NewTimeoutHandler(ctx.Done(), logger)

opt := reconciler.Options{
KubeClientSet: kubeclientset,
PipelineClientSet: pipelineclientset,
ConfigMapWatcher: cmw,
ResyncPeriod: resyncPeriod,
Logger: logger,
}

c := &Reconciler{
Base: reconciler.NewBase(opt, taskRunAgentName, images),
taskRunLister: taskRunInformer.Lister(),
taskLister: taskInformer.Lister(),
clusterTaskLister: clusterTaskInformer.Lister(),
resourceLister: resourceInformer.Lister(),
timeoutHandler: timeoutHandler,
cloudEventClient: cloudeventclient.Get(ctx),
}
impl := controller.NewImpl(c, c.Logger, taskRunControllerName)

timeoutHandler.SetTaskRunCallbackFunc(impl.Enqueue)
timeoutHandler.CheckTimeouts(kubeclientset, pipelineclientset)

c.Logger.Info("Setting up event handlers")
taskRunInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: impl.Enqueue,
UpdateFunc: controller.PassNew(impl.Enqueue),
})

c.tracker = tracker.New(impl.EnqueueKey, controller.GetTrackerLease(ctx))

podInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.Filter(v1alpha1.SchemeGroupVersion.WithKind("TaskRun")),
Handler: controller.HandleAll(impl.EnqueueControllerOf),
})

// FIXME(vdemeester) it was never set
//entrypoint cache will be initialized by controller if not provided
c.Logger.Info("Setting up Entrypoint cache")
c.cache = nil
if c.cache == nil {
c.cache, _ = entrypoint.NewCache()
}

return impl
}

c := &Reconciler{
Base: reconciler.NewBase(opt, taskRunAgentName),
taskRunLister: taskRunInformer.Lister(),
taskLister: taskInformer.Lister(),
clusterTaskLister: clusterTaskInformer.Lister(),
resourceLister: resourceInformer.Lister(),
timeoutHandler: timeoutHandler,
cloudEventClient: cloudeventclient.Get(ctx),
}
impl := controller.NewImpl(c, c.Logger, taskRunControllerName)

timeoutHandler.SetTaskRunCallbackFunc(impl.Enqueue)
timeoutHandler.CheckTimeouts(kubeclientset, pipelineclientset)

c.Logger.Info("Setting up event handlers")
taskRunInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: impl.Enqueue,
UpdateFunc: controller.PassNew(impl.Enqueue),
})

c.tracker = tracker.New(impl.EnqueueKey, controller.GetTrackerLease(ctx))

podInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.Filter(v1alpha1.SchemeGroupVersion.WithKind("TaskRun")),
Handler: controller.HandleAll(impl.EnqueueControllerOf),
})

// FIXME(vdemeester) it was never set
//entrypoint cache will be initialized by controller if not provided
c.Logger.Info("Setting up Entrypoint cache")
c.cache = nil
if c.cache == nil {
c.cache, _ = entrypoint.NewCache()
}

return impl
}
12 changes: 3 additions & 9 deletions pkg/reconciler/taskrun/sidecars/stop.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,10 @@ limitations under the License.
package sidecars

import (
"flag"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var (
nopImage = flag.String("nop-image", "override-with-nop:latest", "The container image used to kill sidecars")
)

type GetPod func(string, metav1.GetOptions) (*corev1.Pod, error)
type UpdatePod func(*corev1.Pod) (*corev1.Pod, error)

Expand All @@ -39,15 +33,15 @@ type UpdatePod func(*corev1.Pod) (*corev1.Pod, error)
// image, which in turn quickly exits. If the sidecar defines a command then
// it will exit with a non-zero status. When we check for TaskRun success we
// have to check for the containers we care about - not the final Pod status.
func Stop(pod *corev1.Pod, updatePod UpdatePod) error {
func Stop(pod *corev1.Pod, nopImage string, updatePod UpdatePod) error {
updated := false
if pod.Status.Phase == corev1.PodRunning {
for _, s := range pod.Status.ContainerStatuses {
if s.State.Running != nil {
for j, c := range pod.Spec.Containers {
if c.Name == s.Name && c.Image != *nopImage {
if c.Name == s.Name && c.Image != nopImage {
updated = true
pod.Spec.Containers[j].Image = *nopImage
pod.Spec.Containers[j].Image = nopImage
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/reconciler/taskrun/sidecars/stop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var (
nopImage = "nopImage"
)

// TestStop exercises the Stop() method of the sidecars package.
// A sidecar is killed by having its container image changed to that of the nop image.
// This test therefore runs through a series of pod and sidecar configurations,
Expand All @@ -47,7 +51,7 @@ func TestStop(t *testing.T) {
sidecarState: corev1.ContainerState{
Running: &corev1.ContainerStateRunning{StartedAt: metav1.NewTime(time.Now())},
},
expectedImage: *nopImage,
expectedImage: nopImage,
}, {
description: "a pending pod should not have its sidecars stopped",
podPhase: corev1.PodPending,
Expand Down Expand Up @@ -106,7 +110,7 @@ func TestStop(t *testing.T) {
},
}
updatePod := func(p *corev1.Pod) (*corev1.Pod, error) { return nil, nil }
if err := Stop(pod, updatePod); err != nil {
if err := Stop(pod, nopImage, updatePod); err != nil {
t.Errorf("error stopping sidecar: %v", err)
}
sidecarIdx := len(pod.Spec.Containers) - 1
Expand Down
2 changes: 1 addition & 1 deletion pkg/reconciler/taskrun/taskrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error {
c.timeoutHandler.Release(tr)
pod, err := c.KubeClientSet.CoreV1().Pods(tr.Namespace).Get(tr.Status.PodName, metav1.GetOptions{})
if err == nil {
err = sidecars.Stop(pod, c.KubeClientSet.CoreV1().Pods(tr.Namespace).Update)
err = sidecars.Stop(pod, c.Images["nopImage"], c.KubeClientSet.CoreV1().Pods(tr.Namespace).Update)
} else if errors.IsNotFound(err) {
return merr.ErrorOrNil()
}
Expand Down
Loading

0 comments on commit dfb2eca

Please sign in to comment.