Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing the jobrunner #42

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/k8s/addressresolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestResolveAddress(t *testing.T) {

func passingService(t *testing.T) resolveAddressTestCase {
t.Helper()
clients := &tests.FakeClients{Objects: []runtime.Object{
clients := &tests.FakeClients{TB: t, Objects: []runtime.Object{
&corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "hello",
Expand Down
3 changes: 3 additions & 0 deletions pkg/k8s/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,7 @@ var (

// ErrUnexcpected if something unexpected actually has happened.
ErrUnexcpected = errors.New("something unexpected actually has happened")

// ErrICSenderJobFailed if the ICS job runner has failed.
ErrICSenderJobFailed = errors.New("the ICS job runner has failed")
)
90 changes: 55 additions & 35 deletions pkg/k8s/jobrunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@ package k8s

import (
"fmt"
"time"

batchv1 "k8s.io/api/batch/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
"k8s.io/apimachinery/pkg/watch"
)

// JobRunner will launch a Job and monitor it for completion.
Expand All @@ -27,50 +25,72 @@ type jobRunner struct {
}

func (j *jobRunner) Run(job *batchv1.Job) error {
ctx := j.kube.Context()
jobs := j.kube.Typed().BatchV1().Jobs(job.Namespace)
_, err := jobs.Create(j.kube.Context(), job, metav1.CreateOptions{})
_, err := jobs.Create(ctx, job, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("%w: %v", ErrUnexcpected, err)
return fmt.Errorf("%w: %v", ErrICSenderJobFailed, err)
}
factory := kubeinformers.NewSharedInformerFactoryWithOptions(
j.kube.Typed(),
time.Minute,
kubeinformers.WithNamespace(job.Namespace),
kubeinformers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = fmt.Sprintf("metadata.name=%s", job.Name)
}),
)
// FIXME: This function do not wait properly for the end of the Job
stop := make(chan struct{})
jobsInformer := factory.Batch().V1().Jobs().Informer()
jobsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj interface{}) {
close(stop)
},
err = j.watchJob(job, func(job *batchv1.Job) (bool, error) {
if job.Status.CompletionTime == nil && job.Status.Failed == 0 {
return false, nil
}
// We should be done if we reach here.
if job.Status.Succeeded < 1 {
return false, fmt.Errorf("%w: %s", ErrICSenderJobFailed,
"expected to have successful job")
}
return true, nil
})
go factory.Start(stop)
waitOnStop(stop)
updated, err := jobs.Get(j.kube.Context(), job.Name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("%w: %v", ErrUnexcpected, err)
return fmt.Errorf("%w: %v", ErrICSenderJobFailed, err)
}
if updated.Status.Succeeded < 1 {
return fmt.Errorf("%w: %s", ErrUnexcpected, "expected to have successful job")
return j.deleteJob(job)
}

func (j *jobRunner) deleteJob(job *batchv1.Job) error {
ctx := j.kube.Context()
jobs := j.kube.Typed().BatchV1().Jobs(job.GetNamespace())
err := jobs.Delete(ctx, job.GetName(), metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("%w: %v", ErrICSenderJobFailed, err)
}
err = jobs.Delete(j.kube.Context(), job.Name, metav1.DeleteOptions{})
pods := j.kube.Typed().CoreV1().Pods(job.GetNamespace())
err = pods.DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{
LabelSelector: fmt.Sprintf("job-name=%s", job.GetName()),
})
if err != nil {
return fmt.Errorf("%w: %v", ErrUnexcpected, err)
return fmt.Errorf("%w: %v", ErrICSenderJobFailed, err)
}
return nil
}

func waitOnStop(stop chan struct{}) {
for {
select {
case <-stop:
return
default:
time.Sleep(time.Second)
func (j *jobRunner) watchJob(meta metav1.Object, changeFn func(job *batchv1.Job) (bool, error)) error {
ctx := j.kube.Context()
jobs := j.kube.Typed().BatchV1().Jobs(meta.GetNamespace())
watcher, err := jobs.Watch(ctx, metav1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", meta.GetName()),
})
if err != nil {
return fmt.Errorf("%w: %v", ErrICSenderJobFailed, err)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be defer watcher.Stop() helpful around here to clean up resources?

Copy link
Contributor

@dsimansk dsimansk Jul 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And if you check RetryWatcher from client-go tools, link below. In that case it's trying to restart on closed ResultChan channel. That's more likely a rare case but we've seen such reports in client's issue that ResultChan is closed unexpectedly, but with recoverable error.

Finally I wonder if you couldn't reuse something like existing RetryWatcher code directly here.

https://github.com/kubernetes/client-go/blob/master/tools/watch/retrywatcher.go#L242-L275

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking that maybe that's not be possible in this exact case because of:

https://github.com/kubernetes/client-go/blob/ac207faedfb64acd5b99a2fb309b7044918b4dda/tools/watch/retrywatcher.go#L68-L71

WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, I recall that part now. Sure let's keep your watching implementation.

Do you think it's worth to address at least a bit of retry logic when ResultChan is closed? Of course it can done as a future enhancement/hardening.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's good idea for an enhancement in future PR. Would you mind opening an issue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I'll do first thing tomorrow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#45 tracks the improvement. I'm pretty convinced that #68 is impacted by this.

defer watcher.Stop()
for result := range watcher.ResultChan() {
if result.Type == watch.Added || result.Type == watch.Modified {
job, ok := result.Object.(*batchv1.Job)
if !ok {
return fmt.Errorf("%w: %s: %T", ErrICSenderJobFailed,
"expected to watch batchv1.Job, got", result.Object)
}
var brk bool
brk, err = changeFn(job)
if err != nil {
return fmt.Errorf("%w: %v", ErrICSenderJobFailed, err)
}
if brk {
watcher.Stop()
}
}
}
return nil
}
73 changes: 73 additions & 0 deletions pkg/k8s/jobrunner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package k8s_test

import (
"sync"
"testing"

"gotest.tools/v3/assert"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"knative.dev/kn-plugin-event/pkg/k8s"
"knative.dev/kn-plugin-event/pkg/tests"
)

func TestJobRunnerRun(t *testing.T) {
clients := &tests.FakeClients{TB: t, Objects: make([]runtime.Object, 0)}
runner := k8s.CreateJobRunner(clients)
job := examplePiJob()
jobs := clients.Typed().BatchV1().Jobs(job.Namespace)
ctx := clients.Context()
watcher, err := jobs.Watch(ctx, metav1.ListOptions{})
assert.NilError(t, err)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
assert.NilError(t, runner.Run(&job))
}()
<-watcher.ResultChan()
watcher.Stop()
sucJob := jobSuccess(job)
_, err = jobs.Update(ctx, &sucJob, metav1.UpdateOptions{})
assert.NilError(t, err)
wg.Wait()
}

func jobSuccess(job batchv1.Job) batchv1.Job {
now := metav1.Now()
job.Status.Succeeded = 1
job.Status.Active = 0
job.Status.Failed = 0
job.Status.CompletionTime = &now
job.Status.StartTime = &now
job.Status.Conditions = []batchv1.JobCondition{{
Type: batchv1.JobComplete,
Status: corev1.ConditionTrue,
LastProbeTime: now,
LastTransitionTime: now,
Reason: "done",
Message: "success",
}}
return job
}

func examplePiJob() batchv1.Job {
return batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "demo",
},
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Image: "docker.io/library/perl",
Command: []string{"perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"},
}},
},
},
},
}
}