Skip to content

Commit

Permalink
Add retries to status reporting
Browse files Browse the repository at this point in the history
  • Loading branch information
pkosiec committed Mar 3, 2023
1 parent a279fdf commit 5960fb8
Show file tree
Hide file tree
Showing 13 changed files with 156 additions and 70 deletions.
6 changes: 3 additions & 3 deletions cmd/botkube/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func run(ctx context.Context) error {
logger.Warnf("Configuration validation warnings: %v", confDetails.ValidateWarnings.Error())
}

statusReporter := status.NewStatusReporter(remoteCfgSyncEnabled, logger, gqlClient, cfgVersion)
statusReporter := status.NewStatusReporter(remoteCfgSyncEnabled, logger, gqlClient, deployClient, cfgVersion)
auditReporter := audit.NewAuditReporter(remoteCfgSyncEnabled, logger, gqlClient)

// Set up analytics reporter
Expand Down Expand Up @@ -376,7 +376,7 @@ func run(ctx context.Context) error {
statusReporter,
)

if _, err := statusReporter.ReportDeploymentStartup(ctx); err != nil {
if err := statusReporter.ReportDeploymentStartup(ctx); err != nil {
return reportFatalError("while reporting botkube startup", err)
}

Expand Down Expand Up @@ -480,7 +480,7 @@ func reportFatalErrFn(logger logrus.FieldLogger, reporter analytics.Reporter, st
logger.Errorf("while reporting fatal error: %s", err.Error())
}

if _, err := status.ReportDeploymentFailed(ctxTimeout); err != nil {
if err := status.ReportDeploymentFailed(ctxTimeout); err != nil {
logger.Errorf("while reporting deployment failure: %s", err.Error())
}

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/kubeshop/botkube
require (
github.com/MakeNowJust/heredoc v1.0.0
github.com/alexflint/go-arg v1.4.3
github.com/avast/retry-go v3.0.0+incompatible
github.com/aws/aws-sdk-go v1.44.122
github.com/bwmarrin/discordgo v0.25.0
github.com/dustin/go-humanize v1.0.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,8 @@ github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj
github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
github.com/avast/retry-go v3.0.0+incompatible h1:4SOWQ7Qs+oroOTQOYnAHqelpCO0biHSxpiH9JdtuBj0=
github.com/avast/retry-go v3.0.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY=
github.com/avct/uasurfer v0.0.0-20191028135549-26b5daa857f1/go.mod h1:noBAuukeYOXa0aXGqxr24tADqkwDO2KRD15FsuaZ5a8=
github.com/aws/aws-sdk-go v1.15.11/go.mod h1:mFuSZ37Z9YOHbQEwBWztmVzqXrEkub65tZoCYDt7FT0=
github.com/aws/aws-sdk-go v1.17.7/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
Expand Down
25 changes: 21 additions & 4 deletions internal/config/gql_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

// DeploymentClient defines GraphQL client.
type DeploymentClient interface {
GetDeployment(ctx context.Context) (Deployment, error)
GetConfigWithResourceVersion(ctx context.Context) (Deployment, error)
}

// Gql defines GraphQL client data structure.
Expand All @@ -31,8 +31,8 @@ type Deployment struct {
YAMLConfig string
}

// GetDeployment retrieves deployment by id.
func (g *Gql) GetDeployment(ctx context.Context) (Deployment, error) {
// GetConfigWithResourceVersion retrieves deployment by id.
func (g *Gql) GetConfigWithResourceVersion(ctx context.Context) (Deployment, error) {
var query struct {
Deployment Deployment `graphql:"deployment(id: $id)"`
}
Expand All @@ -41,7 +41,24 @@ func (g *Gql) GetDeployment(ctx context.Context) (Deployment, error) {
}
err := g.client.Cli.Query(ctx, &query, variables)
if err != nil {
return Deployment{}, fmt.Errorf("while querying deployment details for %q: %w", g.deploymentID, err)
return Deployment{}, fmt.Errorf("while getting config with resource version for %q: %w", g.deploymentID, err)
}
return query.Deployment, nil
}

// GetResourceVersion retrieves resource version for Deployment.
func (g *Gql) GetResourceVersion(ctx context.Context) (int, error) {
var query struct {
Deployment struct {
ResourceVersion int
} `graphql:"deployment(id: $id)"`
}
variables := map[string]interface{}{
"id": graphql.ID(g.deploymentID),
}
err := g.client.Cli.Query(ctx, &query, variables)
if err != nil {
return 0, fmt.Errorf("while querying deployment details for %q: %w", g.deploymentID, err)
}
return query.Deployment.ResourceVersion, nil
}
2 changes: 1 addition & 1 deletion internal/config/gql_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestGql_GetDeployment(t *testing.T) {
graphql.WithDeploymentID("my-id"),
)
g := NewDeploymentClient(gqlClient)
deployment, err := g.GetDeployment(context.Background())
deployment, err := g.GetConfigWithResourceVersion(context.Background())
assert.NoError(t, err)
assert.NotNil(t, deployment.YAMLConfig)
}
2 changes: 1 addition & 1 deletion internal/config/gql_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func NewGqlProvider(dc DeploymentClient) *GqlProvider {

// Configs returns list of config files
func (g *GqlProvider) Configs(ctx context.Context) (config.YAMLFiles, int, error) {
deployment, err := g.client.GetDeployment(ctx)
deployment, err := g.client.GetConfigWithResourceVersion(ctx)
if err != nil {
return nil, 0, errors.Wrapf(err, "while getting deployment")
}
Expand Down
2 changes: 1 addition & 1 deletion internal/config/gql_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var _ DeploymentClient = &fakeGqlClient{}
type fakeGqlClient struct {
}

func (f *fakeGqlClient) GetDeployment(context.Context) (Deployment, error) {
func (f *fakeGqlClient) GetConfigWithResourceVersion(context.Context) (Deployment, error) {
return Deployment{
ResourceVersion: 3,
YAMLConfig: "communications:\n default-group:\n socketSlack:\n appToken: xapp-1-A047D1ZJ03B-4262138376928\n botToken: xoxb-3933899240838\n channels:\n botkube-demo:\n bindings:\n executors:\n - kubectl-read-only\n sources:\n - kubernetes-info\n name: botkube-demo\n notification:\n disabled: false\n enabled: true\nexecutors:\n kubectl-read-only:\n kubectl:\n commands:\n resources:\n - deployments\n - pods\n - namespaces\n - daemonsets\n - statefulsets\n - storageclasses\n - nodes\n verbs:\n - api-resources\n - api-versions\n - cluster-info\n - describe\n - diff\n - explain\n - get\n - logs\n - top\n - auth\n defaultNamespace: default\n enabled: true\n namespaces:\n include:\n - .*\n restrictAccess: false\nsettings:\n clusterName: qa\nsources:\n kubernetes-info:\n displayName: Kubernetes Information\n kubernetes:\n recommendations:\n ingress:\n backendServiceValid: true\n tlsSecretValid: true\n pod:\n labelsSet: true\n noLatestImageTag: true\n",
Expand Down
2 changes: 1 addition & 1 deletion internal/config/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (u *GraphQLConfigUpdater) Do(ctx context.Context) error {
}

func (u *GraphQLConfigUpdater) queryConfig(ctx context.Context) (config.Config, int, error) {
deploy, err := u.deployCli.GetDeployment(ctx)
deploy, err := u.deployCli.GetConfigWithResourceVersion(ctx)
if err != nil {
return config.Config{}, 0, fmt.Errorf("while getting deployment: %w", err)
}
Expand Down
150 changes: 105 additions & 45 deletions internal/status/gql_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package status

import (
"context"
"github.com/avast/retry-go"
"github.com/pkg/errors"
"sync"
"time"

"github.com/hasura/go-graphql-client"
"github.com/sirupsen/logrus"
Expand All @@ -12,83 +15,140 @@ import (

var _ StatusReporter = (*GraphQLStatusReporter)(nil)

// ResVerClient defines client for getting resource version.
type ResVerClient interface {
GetResourceVersion(ctx context.Context) (int, error)
}

type GraphQLStatusReporter struct {
log logrus.FieldLogger
gql *gql.Gql
resVerClient ResVerClient
resourceVersion int
resVerMutex sync.RWMutex
}

func newGraphQLStatusReporter(logger logrus.FieldLogger, client *gql.Gql, cfgVersion int) *GraphQLStatusReporter {
func newGraphQLStatusReporter(logger logrus.FieldLogger, client *gql.Gql, resVerClient ResVerClient, cfgVersion int) *GraphQLStatusReporter {
return &GraphQLStatusReporter{
log: logger,
gql: client,
resVerClient: resVerClient,
resourceVersion: cfgVersion,
}
}

func (r *GraphQLStatusReporter) ReportDeploymentStartup(ctx context.Context) (bool, error) {
r.log.WithFields(logrus.Fields{
func (r *GraphQLStatusReporter) ReportDeploymentStartup(ctx context.Context) error {
logger := r.log.WithFields(logrus.Fields{
"deploymentID": r.gql.DeploymentID,
"resourceVersion": r.getResourceVersion(),
}).Debugf("Reporting deployment startup...")
var mutation struct {
Success bool `graphql:"reportDeploymentStartup(id: $id, resourceVersion: $resourceVersion)"`
}
variables := map[string]interface{}{
"id": graphql.ID(r.gql.DeploymentID),
"resourceVersion": r.getResourceVersion(),
}
if err := r.gql.Cli.Mutate(ctx, &mutation, variables); err != nil {
return false, err
"type": "startup",
})
logger.Debug("Reporting...")

err := r.withRetry(ctx, logger, func() error {
var mutation struct {
Success bool `graphql:"reportDeploymentStartup(id: $id, resourceVersion: $resourceVersion)"`
}
variables := map[string]interface{}{
"id": graphql.ID(r.gql.DeploymentID),
"resourceVersion": r.getResourceVersion(),
}
err := r.gql.Cli.Mutate(ctx, &mutation, variables)
return err
})
if err != nil {
return errors.Wrap(err, "while reporting deployment startup")
}
return mutation.Success, nil

return nil
}

func (r *GraphQLStatusReporter) ReportDeploymentShutdown(ctx context.Context) (bool, error) {
r.log.WithFields(logrus.Fields{

func (r *GraphQLStatusReporter) ReportDeploymentShutdown(ctx context.Context) error {
logger := r.log.WithFields(logrus.Fields{
"deploymentID": r.gql.DeploymentID,
"resourceVersion": r.getResourceVersion(),
}).Debugf("Reporting deployment shutdown...")
var mutation struct {
Success bool `graphql:"reportDeploymentShutdown(id: $id, resourceVersion: $resourceVersion)"`
}
variables := map[string]interface{}{
"id": graphql.ID(r.gql.DeploymentID),
"resourceVersion": r.getResourceVersion(),
}
if err := r.gql.Cli.Mutate(ctx, &mutation, variables); err != nil {
return false, err
"type": "shutdown",
})
logger.Debug("Reporting...")

err := r.withRetry(ctx, logger, func() error {
var mutation struct {
Success bool `graphql:"reportDeploymentShutdown(id: $id, resourceVersion: $resourceVersion)"`
}
variables := map[string]interface{}{
"id": graphql.ID(r.gql.DeploymentID),
"resourceVersion": r.getResourceVersion(),
}
return r.gql.Cli.Mutate(ctx, &mutation, variables)
})
if err != nil {
return errors.Wrap(err, "while reporting deployment shutdown")
}
return mutation.Success, nil

return nil
}

func (r *GraphQLStatusReporter) ReportDeploymentFailed(ctx context.Context) (bool, error) {
r.log.WithFields(logrus.Fields{
func (r *GraphQLStatusReporter) ReportDeploymentFailed(ctx context.Context) error {
logger := r.log.WithFields(logrus.Fields{
"deploymentID": r.gql.DeploymentID,
"resourceVersion": r.getResourceVersion(),
}).Debugf("Reporting deployment failure...")
var mutation struct {
Success bool `graphql:"reportDeploymentFailed(id: $id, resourceVersion: $resourceVersion)"`
}
variables := map[string]interface{}{
"id": graphql.ID(r.gql.DeploymentID),
"resourceVersion": r.getResourceVersion(),
}
if err := r.gql.Cli.Mutate(ctx, &mutation, variables); err != nil {
return false, err
"type": "failure",
})
logger.Debug("Reporting...")

err := r.withRetry(ctx, logger, func() error {
var mutation struct {
Success bool `graphql:"reportDeploymentFailed(id: $id, resourceVersion: $resourceVersion)"`
}
variables := map[string]interface{}{
"id": graphql.ID(r.gql.DeploymentID),
"resourceVersion": r.getResourceVersion(),
}
return r.gql.Cli.Mutate(ctx, &mutation, variables)
})
if err != nil {
return errors.Wrap(err, "while reporting deployment shutdown")
}
return mutation.Success, nil
}

func (r *GraphQLStatusReporter) getResourceVersion() int {
r.resVerMutex.RLock()
defer r.resVerMutex.RUnlock()
return r.resourceVersion
return nil
}

func (r *GraphQLStatusReporter) SetResourceVersion(resourceVersion int) {
r.resVerMutex.Lock()
defer r.resVerMutex.Unlock()
r.resourceVersion = resourceVersion
}

const (
retries = 3
delay = 200 * time.Millisecond
)
func (r *GraphQLStatusReporter) withRetry(ctx context.Context, logger logrus.FieldLogger, fn func() error) error {
err := retry.Do(
fn,
retry.OnRetry(func(n uint, err error) {
logger.Debugf("Retrying (attempt no %d/%d): %s.\nFetching latest resource version...", n+1, retries, err)
resVer, err := r.resVerClient.GetResourceVersion(ctx)
if err != nil {
logger.Errorf("Error while fetching resource version: %s", err)
}
r.SetResourceVersion(resVer)
}),
retry.Delay(delay),
retry.Attempts(retries),
retry.LastErrorOnly(true),
retry.Context(ctx),
)
if err != nil {
return errors.Wrap(err, "while retrying")
}

return nil
}

func (r *GraphQLStatusReporter) getResourceVersion() int {
r.resVerMutex.RLock()
defer r.resVerMutex.RUnlock()
return r.resourceVersion
}
12 changes: 6 additions & 6 deletions internal/status/noop_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ type NoopStatusReporter struct{}
func newNoopStatusReporter() *NoopStatusReporter {
return &NoopStatusReporter{}
}
func (r *NoopStatusReporter) ReportDeploymentStartup(context.Context) (bool, error) {
return true, nil
func (r *NoopStatusReporter) ReportDeploymentStartup(context.Context) error {
return nil
}

func (r *NoopStatusReporter) ReportDeploymentShutdown(context.Context) (bool, error) {
return true, nil
func (r *NoopStatusReporter) ReportDeploymentShutdown(context.Context) error {
return nil
}

func (r *NoopStatusReporter) ReportDeploymentFailed(context.Context) (bool, error) {
return true, nil
func (r *NoopStatusReporter) ReportDeploymentFailed(context.Context) error {
return nil
}

func (r *NoopStatusReporter) SetResourceVersion(int) {}
15 changes: 10 additions & 5 deletions internal/status/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,20 @@ import (
)

type StatusReporter interface {
ReportDeploymentStartup(ctx context.Context) (bool, error)
ReportDeploymentShutdown(ctx context.Context) (bool, error)
ReportDeploymentFailed(ctx context.Context) (bool, error)
ReportDeploymentStartup(ctx context.Context) error
ReportDeploymentShutdown(ctx context.Context) error
ReportDeploymentFailed(ctx context.Context) error
SetResourceVersion(resourceVersion int)
}

func NewStatusReporter(remoteCfgEnabled bool, logger logrus.FieldLogger, gql *graphql.Gql, cfgVersion int) StatusReporter {
func NewStatusReporter(remoteCfgEnabled bool, logger logrus.FieldLogger, gql *graphql.Gql, resVerClient ResVerClient, cfgVersion int) StatusReporter {
if remoteCfgEnabled {
return newGraphQLStatusReporter(logger.WithField("component", "GraphQLStatusReporter"), gql, cfgVersion)
return newGraphQLStatusReporter(
logger.WithField("component", "GraphQLStatusReporter"),
gql,
resVerClient,
cfgVersion,
)
}

return newNoopStatusReporter()
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (c *Controller) Start(ctx context.Context) error {
// use separate ctx as parent ctx is already cancelled
ctxTimeout, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
if _, err := c.statusReporter.ReportDeploymentShutdown(ctxTimeout); err != nil {
if err := c.statusReporter.ReportDeploymentShutdown(ctxTimeout); err != nil {
return fmt.Errorf("while reporting botkube shutdown: %w", err)
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/controller/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,12 @@ func (c *UpgradeChecker) notifyAboutUpgradeIfShould(ctx context.Context) (bool,
return false, fmt.Errorf("while getting latest release from GitHub: %w", err)
}

c.log.Debugf("latest release info: %+v", release)
if release.TagName == nil {
if release == nil || release.TagName == nil {
return false, errors.New("release tag is empty")
}

c.log.Debugf("latest release tag: %s", *release.TagName)

// Send notification if newer version available
if version.Short() == *release.TagName {
// no new version, finish
Expand Down

0 comments on commit 5960fb8

Please sign in to comment.