Skip to content

Commit

Permalink
Add validation that kubeconfig is specified (#1063)
Browse files Browse the repository at this point in the history
  • Loading branch information
mszostok authored May 12, 2023
1 parent e601056 commit bf1d111
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 7 deletions.
4 changes: 4 additions & 0 deletions cmd/executor/gh/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ func (*GHExecutor) Metadata(context.Context) (api.MetadataOutput, error) {

// Execute returns a given command as a response.
func (e *GHExecutor) Execute(ctx context.Context, in executor.ExecuteInput) (executor.ExecuteOutput, error) {
if err := pluginx.ValidateKubeConfigProvided(pluginName, in.Context.KubeConfig); err != nil {
return executor.ExecuteOutput{}, err
}

var cfg Config
err := pluginx.MergeExecutorConfigs(in.Configs, &cfg)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions internal/executor/helm/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ func (e *Executor) Metadata(context.Context) (api.MetadataOutput, error) {
// - history
// - get [all|manifest|hooks|notes]
func (e *Executor) Execute(ctx context.Context, in executor.ExecuteInput) (executor.ExecuteOutput, error) {
if err := pluginx.ValidateKubeConfigProvided(PluginName, in.Context.KubeConfig); err != nil {
return executor.ExecuteOutput{}, err
}

cfg, err := MergeConfigs(in.Configs)
if err != nil {
return executor.ExecuteOutput{}, fmt.Errorf("while merging input configs: %w", err)
Expand Down
3 changes: 3 additions & 0 deletions internal/executor/helm/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,9 @@ func TestExecutorConfigMergingErrors(t *testing.T) {
// when
_, err := hExec.Execute(context.Background(), executor.ExecuteInput{
Command: "helm install",
Context: executor.ExecuteInputContext{
KubeConfig: []byte("fake config"),
},
Configs: []*executor.Config{
{
RawYAML: mustYAMLMarshal(t, configA),
Expand Down
4 changes: 4 additions & 0 deletions internal/executor/kubectl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ func (e *Executor) Metadata(context.Context) (api.MetadataOutput, error) {

// Execute returns a given command as response.
func (e *Executor) Execute(ctx context.Context, in executor.ExecuteInput) (executor.ExecuteOutput, error) {
if err := pluginx.ValidateKubeConfigProvided(PluginName, in.Context.KubeConfig); err != nil {
return executor.ExecuteOutput{}, err
}

cfg, err := MergeConfigs(in.Configs)
if err != nil {
return executor.ExecuteOutput{}, fmt.Errorf("while merging input configs: %w", err)
Expand Down
12 changes: 9 additions & 3 deletions internal/source/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,16 +115,22 @@ func (d *Dispatcher) Dispatch(dispatch PluginDispatch) error {
},
})
if err != nil {
return fmt.Errorf("while opening stream for %s: %w", dispatch.pluginName, err)
return fmt.Errorf(`while opening stream for "%s.%s" source: %w`, dispatch.sourceName, dispatch.pluginName, err)
}

go func() {
for {
select {
case event := <-out.Output:
case event, ok := <-out.Output:
if !ok {
return
}
log.WithField("event", string(event)).Debug("Dispatching received event...")
d.dispatch(ctx, event, dispatch)
case msg := <-out.Event:
case msg, ok := <-out.Event:
if !ok {
return
}
log.WithField("message", msg).Debug("Dispatching received message...")
d.dispatchMsg(ctx, msg, dispatch)
case <-ctx.Done():
Expand Down
5 changes: 5 additions & 0 deletions internal/source/kubernetes/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/kubeshop/botkube/pkg/api"
"github.com/kubeshop/botkube/pkg/api/source"
pkgConfig "github.com/kubeshop/botkube/pkg/config"
"github.com/kubeshop/botkube/pkg/pluginx"
)

const (
Expand Down Expand Up @@ -64,6 +65,10 @@ func NewSource(version string) *Source {

// Stream streams Kubernetes events
func (*Source) Stream(ctx context.Context, input source.StreamInput) (source.StreamOutput, error) {
if err := pluginx.ValidateKubeConfigProvided(PluginName, input.Context.KubeConfig); err != nil {
return source.StreamOutput{}, err
}

cfg, err := config.MergeConfigs(input.Configs)
if err != nil {
return source.StreamOutput{}, fmt.Errorf("while merging input configs: %w", err)
Expand Down
11 changes: 8 additions & 3 deletions pkg/api/source/grpc_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ import (
"encoding/json"
"fmt"
"io"
"log"

"github.com/hashicorp/go-plugin"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"

"github.com/kubeshop/botkube/pkg/api"
Expand Down Expand Up @@ -114,11 +115,13 @@ func (p *Plugin) GRPCServer(_ *plugin.GRPCBroker, s *grpc.Server) error {
func (p *Plugin) GRPCClient(_ context.Context, _ *plugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error) {
return &grpcClient{
client: NewSourceClient(c),
logger: NewLogger(),
}, nil
}

type grpcClient struct {
client SourceClient
logger logrus.FieldLogger
}

func (p *grpcClient) Stream(ctx context.Context, in StreamInput) (StreamOutput, error) {
Expand Down Expand Up @@ -152,20 +155,22 @@ func (p *grpcClient) Stream(ctx context.Context, in StreamInput) (StreamOutput,
// On any other error, the stream is aborted and the error contains the RPC
// status.
if err != nil {
log.Print(err)
p.logger.Errorf("canceling streaming: %s", status.Convert(err).Message())
// TODO: we should consider adding error feedback channel to StreamOutput.
return
}
var event Event
if len(feature.Event) != 0 && string(feature.Event) != "" {
if err := json.Unmarshal(feature.Event, &event); err != nil {
log.Printf("while unmarshalling message from JSON: %s", err.Error())
p.logger.Errorf("canceling streaming: cannot unmarshal JSON message: %s", err.Error())
return
}
}
out.Output <- feature.Output
out.Event <- event
}
close(out.Output)
close(out.Event)
}()

return out, nil
Expand Down
15 changes: 15 additions & 0 deletions pkg/api/source/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package source

import (
"os"

"github.com/sirupsen/logrus"
)

// NewLogger returns a new logger used internally. We should replace it in the near future, as we shouldn't be so opinionated.
func NewLogger() logrus.FieldLogger {
logger := logrus.New()
logger.SetOutput(os.Stdout)
logger.SetLevel(logrus.InfoLevel)
return logger
}
10 changes: 9 additions & 1 deletion pkg/pluginx/kubeconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/pkg/errors"
)

func PersistKubeConfig(ctx context.Context, kc []byte) (string, func(context.Context) error, error) {
func PersistKubeConfig(_ context.Context, kc []byte) (string, func(context.Context) error, error) {
if len(kc) == 0 {
return "", nil, fmt.Errorf("received empty kube config")
}
Expand All @@ -34,3 +34,11 @@ func PersistKubeConfig(ctx context.Context, kc []byte) (string, func(context.Con

return abs, deleteFn, nil
}

// ValidateKubeConfigProvided returns an error if a given kubeconfig is empty or nil.
func ValidateKubeConfigProvided(pluginName string, kubeconfig []byte) error {
if len(kubeconfig) != 0 {
return nil
}
return fmt.Errorf("The kubeconfig data is missing. Please make sure that you have specified a valid RBAC configuration for %q plugin. Learn more at https://docs.botkube.io/configuration/rbac.", pluginName)
}

0 comments on commit bf1d111

Please sign in to comment.