Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release candidate No. 2 #1097

Merged
merged 3 commits into from
Jun 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cmd/botkube/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,9 @@ func run(ctx context.Context) error {
errGroup, ctx := errgroup.WithContext(ctx)
defer func() {
err := errGroup.Wait()
if err != nil {
wrappedErr := reportFatalError("while waiting for goroutines to finish gracefully", err)
logger.Errorf(wrappedErr.Error())
wrappedErr := reportFatalError("while waiting for goroutines to finish gracefully", err)
if wrappedErr != nil {
logger.Error(wrappedErr.Error())
}
}()

Expand Down
2 changes: 1 addition & 1 deletion helm/botkube/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ Controller for the Botkube Slack app which helps you monitor your Kubernetes clu
| [configWatcher.initialSyncTimeout](./values.yaml#L960) | int | `0` | Timeout for the initial Config Watcher sync. If set to 0, waiting for Config Watcher sync will be skipped. In a result, configuration changes may not reload Botkube app during the first few seconds after Botkube startup. |
| [configWatcher.image.registry](./values.yaml#L963) | string | `"ghcr.io"` | Config watcher image registry. |
| [configWatcher.image.repository](./values.yaml#L965) | string | `"kubeshop/k8s-sidecar"` | Config watcher image repository. |
| [configWatcher.image.tag](./values.yaml#L967) | string | `"ignore-initial-events"` | Config watcher image tag. |
| [configWatcher.image.tag](./values.yaml#L967) | string | `"in-cluster-config"` | Config watcher image tag. |
| [configWatcher.image.pullPolicy](./values.yaml#L969) | string | `"IfNotPresent"` | Config watcher image pull policy. |
| [plugins](./values.yaml#L972) | object | `{"cacheDir":"/tmp","repositories":{"botkube":{"url":"https://storage.googleapis.com/botkube-plugins-latest/plugins-index.yaml"}}}` | Configuration for Botkube executors and sources plugins. |
| [plugins.cacheDir](./values.yaml#L974) | string | `"/tmp"` | Directory, where downloaded plugins are cached. |
Expand Down
5 changes: 5 additions & 0 deletions helm/botkube/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,14 @@ spec:
value: "POST"
- name: IGNORE_ALREADY_PROCESSED
value: "true"
- name: BOTKUBE_SETTINGS_SA__CREDENTIALS__PATH__PREFIX
value: /var/run/{{ $uuid }}/secrets/kubernetes.io/serviceaccount/{{ $uuid }}
volumeMounts:
- name: cfg-watcher-tmp
mountPath: {{ .Values.configWatcher.tmpDir }}
- name: {{ $uuid }}
mountPath: /var/run/{{ $uuid }}/secrets/kubernetes.io/serviceaccount
readOnly: true
{{- end }}
volumes:
- name: cache-volume
Expand Down
2 changes: 1 addition & 1 deletion helm/botkube/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -964,7 +964,7 @@ configWatcher:
# -- Config watcher image repository.
repository: kubeshop/k8s-sidecar # kiwigrid/k8s-sidecar:1.19.5 - see https://github.com/kubeshop/k8s-sidecar/pull/1
# -- Config watcher image tag.
tag: ignore-initial-events
tag: in-cluster-config
# -- Config watcher image pull policy.
pullPolicy: IfNotPresent

Expand Down
2 changes: 1 addition & 1 deletion internal/executor/kubectl/builder/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type (

// KubectlRunner provides an option to run a given kubectl command.
KubectlRunner interface {
RunKubectlCommand(ctx context.Context, kubeConfigPath, defaultNamespace, cmd string) (string, error)
RunKubectlCommand(ctx context.Context, defaultNamespace, cmd string) (string, error)
}

// CommandGuard is an interface that allows to check if a given command is allowed to be executed.
Expand Down
3 changes: 1 addition & 2 deletions internal/executor/kubectl/builder/kubectl.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"os"
"strings"

"github.com/google/uuid"
Expand Down Expand Up @@ -286,7 +285,7 @@ func (e *Kubectl) tryToGetResourceNamesSelect(ctx context.Context, state stateDe
}
e.log.Infof("Run cmd %q", cmd)

out, err := e.kcRunner.RunKubectlCommand(ctx, os.Getenv("KUBECONFIG"), e.defaultNamespace, cmd)
out, err := e.kcRunner.RunKubectlCommand(ctx, e.defaultNamespace, cmd)
if err != nil {
e.log.WithField("error", err.Error()).Error("Cannot fetch resource names. Returning empty resource name dropdown.")
return EmptyResourceNameDropdown()
Expand Down
4 changes: 2 additions & 2 deletions internal/executor/kubectl/builder/kubectl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ type fakeKcExecutor struct {
defaultNamespace string
}

func (r *fakeKcExecutor) RunKubectlCommand(_ context.Context, kubeConfigPath, defaultNamespace, cmd string) (string, error) {
func (r *fakeKcExecutor) RunKubectlCommand(_ context.Context, defaultNamespace, cmd string) (string, error) {
r.defaultNamespace = defaultNamespace
r.command = cmd

Expand All @@ -552,7 +552,7 @@ func (r *fakeKcExecutor) RunKubectlCommand(_ context.Context, kubeConfigPath, de

type fakeErrorKcExecutor struct{}

func (r *fakeErrorKcExecutor) RunKubectlCommand(context.Context, string, string, string) (string, error) {
func (r *fakeErrorKcExecutor) RunKubectlCommand(context.Context, string, string) (string, error) {
return "", errors.New("fake error")
}

Expand Down
5 changes: 3 additions & 2 deletions internal/executor/kubectl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,14 @@ func (e *Executor) Execute(ctx context.Context, in executor.ExecuteInput) (execu
}
}()

scopedKubectlRunner := NewKubeconfigScopedRunner(e.kcRunner, kubeConfigPath)
if builder.ShouldHandle(cmd) {
guard, k8sCli, err := getBuilderDependencies(log, kubeConfigPath)
if err != nil {
return executor.ExecuteOutput{}, fmt.Errorf("while creating builder dependecies: %w", err)
}

kcBuilder := builder.NewKubectl(e.kcRunner, cfg.InteractiveBuilder, log, guard, cfg.DefaultNamespace, k8sCli.CoreV1().Namespaces(), accessreview.NewK8sAuth(k8sCli.AuthorizationV1()))
kcBuilder := builder.NewKubectl(scopedKubectlRunner, cfg.InteractiveBuilder, log, guard, cfg.DefaultNamespace, k8sCli.CoreV1().Namespaces(), accessreview.NewK8sAuth(k8sCli.AuthorizationV1()))
msg, err := kcBuilder.Handle(ctx, cmd, in.Context.IsInteractivitySupported, in.Context.SlackState)
if err != nil {
return executor.ExecuteOutput{}, fmt.Errorf("while running command builder: %w", err)
Expand All @@ -121,7 +122,7 @@ func (e *Executor) Execute(ctx context.Context, in executor.ExecuteInput) (execu
}, nil
}

out, err := e.kcRunner.RunKubectlCommand(ctx, kubeConfigPath, cfg.DefaultNamespace, cmd)
out, err := scopedKubectlRunner.RunKubectlCommand(ctx, cfg.DefaultNamespace, cmd)
if err != nil {
return executor.ExecuteOutput{}, err
}
Expand Down
21 changes: 21 additions & 0 deletions internal/executor/kubectl/kc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kubectl

import (
"context"
"errors"
"fmt"
"strings"

Expand Down Expand Up @@ -81,3 +82,23 @@ func isNamespaceFlagSet(cmd string) (bool, error) {
}
return isAllNs || isNs != "", nil
}

// KubeconfigScopedRunner is a runner that executes kubectl commands using a specific kubeconfig file.
type KubeconfigScopedRunner struct {
underlying kcRunner
kubeconfigPath string
}

// NewKubeconfigScopedRunner creates a new instance of KubeconfigScopedRunner.
func NewKubeconfigScopedRunner(underlying kcRunner, kubeconfigPath string) *KubeconfigScopedRunner {
return &KubeconfigScopedRunner{underlying: underlying, kubeconfigPath: kubeconfigPath}
}

// RunKubectlCommand runs a kubectl CLI command scoped to configured kubeconfig.
func (k *KubeconfigScopedRunner) RunKubectlCommand(ctx context.Context, defaultNamespace, cmd string) (string, error) {
if k.kubeconfigPath == "" {
return "", errors.New("kubeconfig is missing")
}

return k.underlying.RunKubectlCommand(ctx, k.kubeconfigPath, defaultNamespace, cmd)
}
46 changes: 44 additions & 2 deletions pkg/bot/cloudslack.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
"regexp"
"strings"
"sync"
"time"

"github.com/avast/retry-go/v4"
"github.com/google/uuid"
"github.com/sirupsen/logrus"
"github.com/slack-go/slack"
Expand All @@ -32,8 +34,11 @@ import (
)

const (
APIKeyContextKey = "X-Api-Key" // #nosec
DeploymentIDContextKey = "X-Deployment-Id" // #nosec
APIKeyContextKey = "X-Api-Key" // #nosec
DeploymentIDContextKey = "X-Deployment-Id" // #nosec
retryDelay = time.Second
maxRetries = 25
successIntervalDuration = 3 * time.Minute
)

var _ Bot = &CloudSlack{}
Expand Down Expand Up @@ -102,6 +107,43 @@ func NewCloudSlack(log logrus.FieldLogger,
}

func (b *CloudSlack) Start(ctx context.Context) error {
return withRetries(ctx, b.log, maxRetries, func() error {
return b.start(ctx)
})
}

func withRetries(ctx context.Context, log logrus.FieldLogger, maxRetries int, fn func() error) error {
failuresNo := 0
return retry.Do(
func() error {
tn := time.Now()
err := fn()
if err != nil {
if time.Since(tn) > successIntervalDuration {
// if the last run was long enough, we treat is as success, so we reset failures
failuresNo = 0
}

if failuresNo >= maxRetries {
log.Debugf("Reached max number of %d retries: %s", maxRetries, err)
return retry.Unrecoverable(err)
}
failuresNo++
return err
}
return nil
},
retry.OnRetry(func(n uint, err error) {
log.Warnf("Retrying Cloud Slack startup (attempt no %d): %s", n, err)
}),
retry.Delay(retryDelay),
retry.Attempts(0), // infinite, we cancel that by our own
retry.LastErrorOnly(true),
retry.Context(ctx),
)
}

func (b *CloudSlack) start(ctx context.Context) error {
creds := grpc.WithTransportCredentials(insecure.NewCredentials())
opts := []grpc.DialOption{creds,
grpc.WithStreamInterceptor(b.addStreamingClientCredentials()),
Expand Down
120 changes: 120 additions & 0 deletions pkg/bot/cloudslack_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package bot

import (
"context"
"errors"
"testing"
"time"

"github.com/avast/retry-go/v4"
"github.com/stretchr/testify/assert"

"github.com/kubeshop/botkube/internal/loggerx"
)

func TestWithRetriesFunc(t *testing.T) {
t.Run("Stop immediately on Unrecoverable error", func(t *testing.T) {
// given
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fixErr := errors.New("some random error")

// when
retriesFinalError := make(chan error, 1)
go func() {
retriesFinalError <- withRetries(ctx, loggerx.NewNoop(), 5, func() error {
return retry.Unrecoverable(fixErr)
})
close(retriesFinalError)
}()

// then
awaitExpectations(t, 500*time.Millisecond, func() {
gotErr := <-retriesFinalError
assert.ErrorIs(t, gotErr, fixErr)
})
})

t.Run("Continues retries on recoverable errors", func(t *testing.T) {
// given
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fixErr := errors.New("some random error")

// when
retriesFinalError := make(chan error, 1)
go func() {
retriesFinalError <- withRetries(ctx, loggerx.NewNoop(), 5, func() error {
return fixErr
})
close(retriesFinalError)
}()

// then
assert.Never(t, func() bool {
<-retriesFinalError
return true
}, time.Second, 10*time.Millisecond)
})

t.Run("Respected max retries", func(t *testing.T) {
// given
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fixErr := errors.New("some random error")

// when
retriesFinalError := make(chan error, 1)
go func() {
retriesFinalError <- withRetries(ctx, loggerx.NewNoop(), 0, func() error {
return fixErr
})
close(retriesFinalError)
}()

// then
awaitExpectations(t, time.Second, func() {
gotErr := <-retriesFinalError
assert.ErrorIs(t, gotErr, fixErr)
})
})

t.Run("Respected canceled context", func(t *testing.T) {
// given
canceledCtx, cancel := context.WithCancel(context.Background())
cancel()
fixErr := errors.New("some random error")

// when
retriesFinalError := make(chan error, 1)
go func() {
retriesFinalError <- withRetries(canceledCtx, loggerx.NewNoop(), 5, func() error {
return retry.Unrecoverable(fixErr)
})
close(retriesFinalError)
}()

// then
awaitExpectations(t, time.Second, func() {
gotErr := <-retriesFinalError
assert.ErrorIs(t, gotErr, canceledCtx.Err())
})
})
}

func awaitExpectations(t *testing.T, dur time.Duration, assertion func()) {
t.Helper()

finished := make(chan struct{})
go func() {
assertion()
close(finished)
}()

select {
case <-time.After(dur):
t.Fatalf("expected function was not fulfilled within given %s duration", dur)
case <-finished:
return
}
}