From d1a7f8a95b7ea2f1f9fd4b78a076623673740573 Mon Sep 17 00:00:00 2001 From: Mateusz Szostok Date: Wed, 23 Nov 2022 17:58:33 +0100 Subject: [PATCH] Add source plugin support --- .goreleaser.plugin.yaml | 12 + cmd/botkube/main.go | 12 +- cmd/source/cm-watcher/README.md | 11 + cmd/source/cm-watcher/main.go | 58 +++++ go.mod | 1 + go.sum | 2 + internal/plugin/{bindings.go => collector.go} | 38 ++- internal/plugin/index.go | 41 +++ internal/plugin/manager.go | 246 ++++++++++-------- internal/plugin/store.go | 28 ++ internal/source/source.go | 72 +++++ pkg/api/source/grpc_adapter.go | 144 ++++++++++ pkg/api/source/source.pb.go | 53 ++-- pkg/api/source/source_grpc.pb.go | 40 +-- pkg/config/config.go | 1 + pkg/config/config_test.go | 35 +++ .../TestLoadConfigSuccess/config-all.yaml | 10 +- .../TestLoadConfigSuccess/config.golden.yaml | 7 +- .../TestLoadConfigWithPlugins/config-all.yaml | 61 +++++ pkg/execute/plugin_executor.go | 22 +- proto/source.proto | 9 +- 21 files changed, 725 insertions(+), 178 deletions(-) create mode 100644 cmd/source/cm-watcher/README.md create mode 100644 cmd/source/cm-watcher/main.go rename internal/plugin/{bindings.go => collector.go} (57%) create mode 100644 internal/plugin/store.go create mode 100644 internal/source/source.go create mode 100644 pkg/api/source/grpc_adapter.go create mode 100644 pkg/config/testdata/TestLoadConfigWithPlugins/config-all.yaml diff --git a/.goreleaser.plugin.yaml b/.goreleaser.plugin.yaml index 029aec472f..a4695593f9 100644 --- a/.goreleaser.plugin.yaml +++ b/.goreleaser.plugin.yaml @@ -15,3 +15,15 @@ builds: goarch: - amd64 - arm64 + - id: cm-watcher + binary: source_cm-watcher_{{ .Os }}_{{ .Arch }} + no_unique_dist_dir: true + main: cmd/source/cm-watcher/main.go + env: + - CGO_ENABLED=0 + goos: + - linux + - darwin + goarch: + - amd64 + - arm64 diff --git a/cmd/botkube/main.go b/cmd/botkube/main.go index 3d65339303..07f629923a 100644 --- a/cmd/botkube/main.go +++ b/cmd/botkube/main.go @@ -30,6 +30,7 @@ import ( "github.com/kubeshop/botkube/internal/analytics" "github.com/kubeshop/botkube/internal/lifecycle" "github.com/kubeshop/botkube/internal/plugin" + "github.com/kubeshop/botkube/internal/source" "github.com/kubeshop/botkube/internal/storage" "github.com/kubeshop/botkube/pkg/action" "github.com/kubeshop/botkube/pkg/bot" @@ -99,8 +100,8 @@ func run() error { errGroup, ctx := errgroup.WithContext(ctx) - enabledPluginExecutors := plugin.GetAllEnabledAndUsedPluginExecutors(conf) - pluginManager := plugin.NewManager(logger, conf.Plugins, enabledPluginExecutors) + enabledPluginExecutors, enabledPluginSources := plugin.GetAllEnabledAndUsedPlugins(conf) + pluginManager := plugin.NewManager(logger, conf.Plugins, enabledPluginExecutors, enabledPluginSources) err = pluginManager.Start(ctx) if err != nil { @@ -321,6 +322,13 @@ func run() error { actionProvider := action.NewProvider(logger.WithField(componentLogFieldKey, "Action Provider"), conf.Actions, executorFactory) router.AddEnabledActionBindings(conf.Actions) + // TODO: temporary dispatcher for events + sourcePluginDispatcher := source.NewDispatcher(logger, notifiers, pluginManager, enabledPluginSources) + err = sourcePluginDispatcher.Start(ctx) + if err != nil { + return fmt.Errorf("while starting source plugin event dispatcher: %w", err) + } + // Create and start controller ctrl := controller.New( logger.WithField(componentLogFieldKey, "Controller"), diff --git a/cmd/source/cm-watcher/README.md b/cmd/source/cm-watcher/README.md new file mode 100644 index 0000000000..02b215bc7b --- /dev/null +++ b/cmd/source/cm-watcher/README.md @@ -0,0 +1,11 @@ +# Echo executor + +Echo is the example Botkube executor used during [e2e tests](../../../test/e2e). + +## Configuration parameters + +The Echo configuration should be specified in YAML format. It accepts such parameters: + +```yaml +changeResponseToUpperCase: true # default is 'false'. +``` diff --git a/cmd/source/cm-watcher/main.go b/cmd/source/cm-watcher/main.go new file mode 100644 index 0000000000..0d414a58e3 --- /dev/null +++ b/cmd/source/cm-watcher/main.go @@ -0,0 +1,58 @@ +package main + +import ( + "context" + "encoding/json" + "time" + + "github.com/hashicorp/go-plugin" + + "github.com/kubeshop/botkube/pkg/api/source" +) + +const pluginName = "cm-watcher" + +// Config holds executor configuration. +type Config struct { + ConfigMapName string +} + +// CMWatcher implements Botkube source plugin. +type CMWatcher struct{} + +// Stream returns a given command as response. +func (CMWatcher) Stream(ctx context.Context) (source.StreamOutput, error) { + // TODO: in request we should receive the executor configuration. + cfg := Config{ + ConfigMapName: "cm-watcher-trigger", + } + + raw, err := json.Marshal(cfg) + if err != nil { + return source.StreamOutput{}, err + } + out := source.StreamOutput{ + Output: make(chan []byte), + } + + go func() { + for { + select { + case <-time.Tick(1 * time.Second): + out.Output <- raw + case <-ctx.Done(): + return + } + } + }() + + return out, nil +} + +func main() { + source.Serve(map[string]plugin.Plugin{ + pluginName: &source.Plugin{ + Source: &CMWatcher{}, + }, + }) +} diff --git a/go.mod b/go.mod index adf515106b..62313fa815 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/gorilla/mux v1.8.0 github.com/hashicorp/go-multierror v1.1.1 github.com/hashicorp/go-plugin v1.4.3 + github.com/hashicorp/go-version v1.6.0 github.com/infracloudio/msbotbuilder-go v0.2.5 github.com/knadh/koanf v1.4.4 github.com/mattermost/mattermost-server/v5 v5.39.3 diff --git a/go.sum b/go.sum index 07e9f0bce0..487a86c1b5 100644 --- a/go.sum +++ b/go.sum @@ -881,6 +881,8 @@ github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/b github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go-version v1.1.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/go-version v1.2.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= +github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek= +github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= diff --git a/internal/plugin/bindings.go b/internal/plugin/collector.go similarity index 57% rename from internal/plugin/bindings.go rename to internal/plugin/collector.go index 23af3a2bc3..f577086460 100644 --- a/internal/plugin/bindings.go +++ b/internal/plugin/collector.go @@ -4,16 +4,23 @@ import ( "github.com/kubeshop/botkube/pkg/config" ) -// GetAllEnabledAndUsedPluginExecutors returns the list of all plugins executors that are both enabled and bind to at +// GetAllEnabledAndUsedPlugins returns the list of all plugins that are both enabled and bind to at // least one communicator that is also enabled. -func GetAllEnabledAndUsedPluginExecutors(cfg *config.Config) []string { - // Collect all used executor bindings - bindExecutors := map[string]struct{}{} +func GetAllEnabledAndUsedPlugins(cfg *config.Config) ([]string, []string) { + // Collect all used executor/sources + var ( + bindExecutors = map[string]struct{}{} + bindSources = map[string]struct{}{} + ) + collect := func(channels config.IdentifiableMap[config.ChannelBindingsByName]) { for _, bindings := range channels { for _, name := range bindings.Bindings.Executors { bindExecutors[name] = struct{}{} } + for _, name := range bindings.Bindings.Sources { + bindSources[name] = struct{}{} + } } } @@ -34,6 +41,9 @@ func GetAllEnabledAndUsedPluginExecutors(cfg *config.Config) []string { for _, name := range commGroupCfg.Teams.Bindings.Executors { bindExecutors[name] = struct{}{} } + for _, name := range commGroupCfg.Teams.Bindings.Sources { + bindSources[name] = struct{}{} + } } if commGroupCfg.Discord.Enabled { @@ -41,6 +51,9 @@ func GetAllEnabledAndUsedPluginExecutors(cfg *config.Config) []string { for _, name := range bindings.Bindings.Executors { bindExecutors[name] = struct{}{} } + for _, name := range bindings.Bindings.Sources { + bindSources[name] = struct{}{} + } } } } @@ -61,5 +74,20 @@ func GetAllEnabledAndUsedPluginExecutors(cfg *config.Config) []string { } } - return usedExecutorPlugins + // Collect all sources that are both enabled and bind to at least one communicator that is enabled. + var usedSourcePlugins []string + for groupName, groupItems := range cfg.Sources { + for name, source := range groupItems.Plugins { + if !source.Enabled { + continue + } + _, found := bindSources[groupName] + if !found { + continue + } + + usedSourcePlugins = append(usedSourcePlugins, name) + } + } + return usedExecutorPlugins, usedSourcePlugins } diff --git a/internal/plugin/index.go b/internal/plugin/index.go index 1ba5ac4ba9..d53f6b5abe 100644 --- a/internal/plugin/index.go +++ b/internal/plugin/index.go @@ -1,5 +1,12 @@ package plugin +import ( + "fmt" + "strings" + + semver "github.com/hashicorp/go-version" +) + // Type represents the plugin type. type Type string @@ -24,3 +31,37 @@ type ( Links []string } ) + +// byIndexEntryVersion implements sort.Interface based on the version field. +type byIndexEntryVersion []IndexEntry + +func (a byIndexEntryVersion) Len() int { return len(a) } +func (a byIndexEntryVersion) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a byIndexEntryVersion) Less(i, j int) bool { + return semvVerGreater(a[i].Version, a[j].Version) +} + +// semvVerGreater returns true if A is greater than B. +func semvVerGreater(a, b string) bool { + a = strings.TrimPrefix(a, "v") + b = strings.TrimPrefix(b, "v") + + verA, aErr := semver.NewVersion(a) + verB, bErr := semver.NewVersion(b) + + return aErr == nil && bErr == nil && verA.GreaterThan(verB) +} + +// BuildPluginKey returns plugin key. +func BuildPluginKey(repo, plugin string) string { + return repo + "/" + plugin +} + +// DecomposePluginKey extract details from plugin key. +func DecomposePluginKey(key string) (string, string, error) { + repo, name, found := strings.Cut(key, "/") + if !found { + return "", "", fmt.Errorf("plugin %q doesn't follow required {repo_name}/{plugin_name} syntax", key) + } + return repo, name, nil +} diff --git a/internal/plugin/manager.go b/internal/plugin/manager.go index 296c0b7682..6e1bd769f8 100644 --- a/internal/plugin/manager.go +++ b/internal/plugin/manager.go @@ -10,6 +10,7 @@ import ( "os/exec" "path/filepath" "runtime" + "sort" "strings" "sync" "sync/atomic" @@ -20,6 +21,7 @@ import ( "github.com/kubeshop/botkube/pkg/api" "github.com/kubeshop/botkube/pkg/api/executor" + "github.com/kubeshop/botkube/pkg/api/source" "github.com/kubeshop/botkube/pkg/config" "github.com/kubeshop/botkube/pkg/multierror" ) @@ -28,75 +30,83 @@ import ( var ErrNotStartedPluginManager = errors.New("plugin manager is not started yet") const ( - executorPluginName = "executor" - executorBinaryPrefix = "executor_" - dirPerms = 0o775 - filePerms = 0o664 + executorPluginName = "executor" + sourcePluginName = "source" + dirPerms = 0o775 + binPerms = 0o755 + filePerms = 0o664 ) // pluginMap is the map of plugins we can dispense. // This map is used in order to identify a plugin called Dispense. // This map is globally available and must stay consistent in order for all the plugins to work. var pluginMap = map[string]plugin.Plugin{ - //"source": &source.Plugin{}, + sourcePluginName: &source.Plugin{}, executorPluginName: &executor.Plugin{}, } // Manager provides functionality for managing executor and source plugins. type Manager struct { - isStarted atomic.Bool - log logrus.FieldLogger - cfg config.Plugins - httpClient *http.Client - executors store - allEnabledExecutors []string -} + isStarted atomic.Bool + log logrus.FieldLogger + cfg config.Plugins + httpClient *http.Client -type ( - // store holds information about processed and started plugins. - store struct { - RepoIndex map[string]map[string]IndexEntry - EnabledPlugins map[string]enabledPlugin - } + executorsToEnable []string + executorsStore store[executor.Executor] - // enabledPlugin holds data about enabled plugin. - enabledPlugin struct { - Client executor.Executor - Cleanup func() - } -) + sourcesStore store[source.Source] + sourcesToEnable []string +} // NewManager returns a new Manager instance. -func NewManager(logger logrus.FieldLogger, cfg config.Plugins, executors []string) *Manager { +func NewManager(logger logrus.FieldLogger, cfg config.Plugins, executors, sources []string) *Manager { return &Manager{ - cfg: cfg, - httpClient: newHTTPClient(), - allEnabledExecutors: executors, - executors: store{ - RepoIndex: map[string]map[string]IndexEntry{}, - EnabledPlugins: map[string]enabledPlugin{}, - }, - log: logger.WithField("component", "Plugin Manager"), + cfg: cfg, + httpClient: newHTTPClient(), + executorsToEnable: executors, + executorsStore: newStore[executor.Executor](), + sourcesToEnable: sources, + sourcesStore: newStore[source.Source](), + log: logger.WithField("component", "Plugin Manager"), } } // Start downloads and starts all enabled plugins. func (m *Manager) Start(ctx context.Context) error { - if len(m.allEnabledExecutors) == 0 { + if len(m.executorsToEnable) == 0 && len(m.sourcesToEnable) == 0 { m.log.Info("No external plugins are enabled.") return nil } - m.log.WithField("enabledExecutors", strings.Join(m.allEnabledExecutors, ",")). - Info("Starting Plugin Manager for all enabled plugins") + m.log.WithFields(logrus.Fields{ + "enabledExecutors": strings.Join(m.executorsToEnable, ","), + "enabledSources": strings.Join(m.sourcesToEnable, ","), + }).Info("Starting Plugin Manager for all enabled plugins") if err := m.loadRepositoriesMetadata(ctx); err != nil { return err } - if err := m.loadAllEnabledPlugins(ctx); err != nil { + executorPlugins, err := m.loadPlugins(ctx, executorPluginName, m.executorsToEnable, m.executorsStore.Repository) + if err != nil { return err } + executorClients, err := createGRPCClients[executor.Executor](executorPlugins, executorPluginName) + if err != nil { + return fmt.Errorf("while creating executor plugins: %w", err) + } + m.executorsStore.EnabledPlugins = executorClients + + sourcesPlugins, err := m.loadPlugins(ctx, sourcePluginName, m.sourcesToEnable, m.sourcesStore.Repository) + if err != nil { + return err + } + sourcesClients, err := createGRPCClients[source.Source](sourcesPlugins, sourcePluginName) + if err != nil { + return fmt.Errorf("while creating source plugins: %w", err) + } + m.sourcesStore.EnabledPlugins = sourcesClients m.isStarted.Store(true) return nil @@ -108,7 +118,21 @@ func (m *Manager) GetExecutor(name string) (executor.Executor, error) { return nil, ErrNotStartedPluginManager } - client, found := m.executors.EnabledPlugins[name] + client, found := m.executorsStore.EnabledPlugins[name] + if !found || client.Client == nil { + return nil, fmt.Errorf("client for plugin %q not found", name) + } + + return client.Client, nil +} + +// GetSource returns the source client for a given plugin. +func (m *Manager) GetSource(name string) (source.Source, error) { + if !m.isStarted.Load() { + return nil, ErrNotStartedPluginManager + } + + client, found := m.sourcesStore.EnabledPlugins[name] if !found || client.Client == nil { return nil, fmt.Errorf("client for plugin %q not found", name) } @@ -120,7 +144,7 @@ func (m *Manager) GetExecutor(name string) (executor.Executor, error) { // This method blocks until all cleanup is finished. func (m *Manager) Shutdown() { var wg sync.WaitGroup - for _, p := range m.executors.EnabledPlugins { + for _, p := range m.executorsStore.EnabledPlugins { wg.Add(1) go func(close func()) { @@ -133,55 +157,54 @@ func (m *Manager) Shutdown() { wg.Wait() } -func (m *Manager) loadAllEnabledPlugins(ctx context.Context) error { - for _, name := range m.allEnabledExecutors { - repoName, pluginName, found := strings.Cut(name, "/") - if !found { - return fmt.Errorf("plugin %q doesn't follow required {repo_name}/{plugin_name} syntax", name) +func (m *Manager) loadPlugins(ctx context.Context, pluginType string, pluginsToEnable []string, repo map[string][]IndexEntry) (map[string]string, error) { + loadedPlugins := map[string]string{} + for _, pluginKey := range pluginsToEnable { + candidates, found := repo[pluginKey] + if !found || len(candidates) == 0 { + return nil, fmt.Errorf("not found %q plugin in any repository", pluginKey) } - binPath := filepath.Join(m.cfg.CacheDir, repoName, fmt.Sprintf("%s%s", executorBinaryPrefix, pluginName)) - - // FIXME: Find latest version: - not indexing as it's not hot path. - info := m.executors.RepoIndex[repoName][pluginName] + // TODO(version): check if version is defined in plugin: + // - if yes, use it. + // - if not, find the latest version in the repository. + pluginInfo := candidates[0] - if _, err := os.Stat(binPath); os.IsNotExist(err) { - err := m.downloadPlugin(ctx, binPath, info) - if err != nil { - return fmt.Errorf("while fetching plugin %q binary: %w", name, err) - } + repoName, pluginName, err := DecomposePluginKey(pluginKey) + if err != nil { + return nil, err } + binPath := filepath.Join(m.cfg.CacheDir, repoName, fmt.Sprintf("%s_%s_%s", pluginType, pluginInfo.Version, pluginName)) - m.log.WithFields(logrus.Fields{ - "repo": repoName, - "plugin": pluginName, - "version": info.Version, + log := m.log.WithFields(logrus.Fields{ + "plugin": pluginKey, + "version": pluginInfo.Version, "binPath": binPath, - }).Info("Registering executor plugin.") + }) - client, cleanup, err := m.createGRPCClient(binPath) - if err != nil { - if cleanup != nil { - cleanup() + if _, err := os.Stat(binPath); os.IsNotExist(err) { + log.Debug("Executor plugin not found locally") + err := m.downloadPlugin(ctx, binPath, pluginInfo) + if err != nil { + return nil, fmt.Errorf("while fetching plugin %q binary: %w", pluginKey, err) } - return fmt.Errorf("while creating gRPC client: %w", err) } - m.executors.EnabledPlugins[name] = enabledPlugin{ - Client: client, - Cleanup: cleanup, - } + loadedPlugins[pluginKey] = binPath + + log.Info("Executor plugin registered successfully.") } - return nil + return loadedPlugins, nil } func (m *Manager) loadRepositoriesMetadata(ctx context.Context) error { - for name, url := range m.cfg.Repositories { - path := filepath.Join(m.cfg.CacheDir, filepath.Clean(fmt.Sprintf("%s.yaml", name))) + // index repositories metadata + for repo, url := range m.cfg.Repositories { + path := filepath.Join(m.cfg.CacheDir, filepath.Clean(fmt.Sprintf("%s.yaml", repo))) if _, err := os.Stat(path); os.IsNotExist(err) { err := m.fetchIndex(ctx, path, url) if err != nil { - return fmt.Errorf("while fetching index for %q repository: %w", name, err) + return fmt.Errorf("while fetching index for %q repository: %w", repo, err) } } @@ -195,22 +218,24 @@ func (m *Manager) loadRepositoriesMetadata(ctx context.Context) error { } for _, entry := range index.Entries { + key := BuildPluginKey(repo, entry.Name) switch entry.Type { case TypeExecutor: - _, found := m.executors.RepoIndex[name][entry.Name] - if found { // FIXME: version semver compare - continue - } - if m.executors.RepoIndex[name] == nil { - m.executors.RepoIndex[name] = map[string]IndexEntry{} - } - m.executors.RepoIndex[name][entry.Name] = entry + m.executorsStore.Repository[key] = append(m.executorsStore.Repository[key], entry) case TypeSource: - // TODO: ... + m.sourcesStore.Repository[key] = append(m.sourcesStore.Repository[key], entry) } } } + // sort entries by version + for key := range m.executorsStore.Repository { + sort.Sort(byIndexEntryVersion(m.executorsStore.Repository[key])) + } + for key := range m.sourcesStore.Repository { + sort.Sort(byIndexEntryVersion(m.sourcesStore.Repository[key])) + } + return nil } @@ -247,34 +272,44 @@ func (m *Manager) fetchIndex(ctx context.Context, path, url string) error { return nil } -func (*Manager) createGRPCClient(path string) (executor.Executor, func(), error) { - cli := plugin.NewClient(&plugin.ClientConfig{ - Plugins: pluginMap, - Cmd: exec.Command(path), - AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC}, - HandshakeConfig: plugin.HandshakeConfig{ - ProtocolVersion: executor.ProtocolVersion, - MagicCookieKey: api.HandshakeConfig.MagicCookieKey, - MagicCookieValue: api.HandshakeConfig.MagicCookieValue, - }, - }) - - rpcClient, err := cli.Client() - if err != nil { - return nil, cli.Kill, err - } +func createGRPCClients[C any](bins map[string]string, dispenseType string) (map[string]enabledPlugins[C], error) { + out := map[string]enabledPlugins[C]{} + + for key, path := range bins { + cli := plugin.NewClient(&plugin.ClientConfig{ + Plugins: pluginMap, + //nolint:gosec // warns us about 'Subprocess launching with variable', but we are the one that created that variable. + Cmd: exec.Command(path), + AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC}, + HandshakeConfig: plugin.HandshakeConfig{ + ProtocolVersion: executor.ProtocolVersion, + MagicCookieKey: api.HandshakeConfig.MagicCookieKey, + MagicCookieValue: api.HandshakeConfig.MagicCookieValue, + }, + }) + + rpcClient, err := cli.Client() + if err != nil { + return nil, err + } - raw, err := rpcClient.Dispense(executorPluginName) - if err != nil { - return nil, cli.Kill, err - } + raw, err := rpcClient.Dispense(dispenseType) + if err != nil { + return nil, err + } - concreteCli, ok := raw.(executor.Executor) - if !ok { - return nil, cli.Kill, fmt.Errorf("registered client doesn't implemented executor interface") + concreteCli, ok := raw.(C) + if !ok { + cli.Kill() + return nil, fmt.Errorf("registered client doesn't implemented executor interface") + } + out[key] = enabledPlugins[C]{ + Client: concreteCli, + Cleanup: cli.Kill, + } } - return concreteCli, cli.Kill, nil + return out, nil } func (m *Manager) downloadPlugin(ctx context.Context, binPath string, info IndexEntry) error { @@ -318,8 +353,7 @@ func (m *Manager) downloadPlugin(ctx context.Context, binPath string, info Index return fmt.Errorf("incorrect status code: %d", res.StatusCode) } - //nolint:gosec // Informs as that expect file permissions to be 0600 or less, however we want to make this file executable. - file, err := os.OpenFile(filepath.Clean(binPath), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0o755) + file, err := os.OpenFile(filepath.Clean(binPath), os.O_RDWR|os.O_CREATE|os.O_TRUNC, binPerms) if err != nil { return fmt.Errorf("while creating plugin file: %w", err) } diff --git a/internal/plugin/store.go b/internal/plugin/store.go new file mode 100644 index 0000000000..0003e4bdc1 --- /dev/null +++ b/internal/plugin/store.go @@ -0,0 +1,28 @@ +package plugin + +type ( + // store holds information about processed and started plugins. + store[T any] struct { + Repository storeRepository + EnabledPlugins storePlugins[T] + } + + // storeRepository holds known plugins metadata indexed by {repo}/{plugin_name} key. + // Additionally, all entries for a given key are sorted by Version field. + storeRepository map[string][]IndexEntry + + // storePlugins holds enabled plugins indexed by {repo}/{plugin_name} key. + storePlugins[T any] map[string]enabledPlugins[T] + + enabledPlugins[T any] struct { + Client T + Cleanup func() + } +) + +func newStore[T any]() store[T] { + return store[T]{ + Repository: map[string][]IndexEntry{}, + EnabledPlugins: map[string]enabledPlugins[T]{}, + } +} diff --git a/internal/source/source.go b/internal/source/source.go new file mode 100644 index 0000000000..e5df9b534d --- /dev/null +++ b/internal/source/source.go @@ -0,0 +1,72 @@ +package source + +import ( + "context" + "fmt" + + "github.com/sirupsen/logrus" + + "github.com/kubeshop/botkube/internal/plugin" + "github.com/kubeshop/botkube/pkg/bot/interactive" + "github.com/kubeshop/botkube/pkg/notifier" +) + +// Dispatcher watches for enabled sources events and send them to notifiers. +type Dispatcher struct { + log logrus.FieldLogger + notifiers []notifier.Notifier + manager *plugin.Manager + sources []string +} + +// NewDispatcher create a new Dispatcher instance. +func NewDispatcher(log logrus.FieldLogger, notifiers []notifier.Notifier, manager *plugin.Manager, sources []string) *Dispatcher { + return &Dispatcher{ + log: log, + notifiers: notifiers, + manager: manager, + sources: sources, + } +} + +// Start starts all sources and dispatch received events. +func (d *Dispatcher) Start(ctx context.Context) error { + for _, name := range d.sources { + sourceClient, err := d.manager.GetSource(name) + if err != nil { + return fmt.Errorf("while getting source client for %s: %w", name, err) + } + out, err := sourceClient.Stream(ctx) + if err != nil { + return fmt.Errorf("while opening stream for %s: %w", name, err) + } + + go func() { + for { + select { + case event := <-out.Output: + d.dispatch(ctx, event) + case <-ctx.Done(): + return + } + } + }() + } + + return nil +} + +func (d *Dispatcher) dispatch(ctx context.Context, event []byte) { + for _, n := range d.notifiers { + go func(n notifier.Notifier) { + err := n.SendMessageToAll(ctx, interactive.Message{ + Base: interactive.Base{ + Header: string(event), + }, + }) + if err != nil { + d.log.Errorf("while sending event: %s", err.Error()) + } + }(n) + } +} diff --git a/pkg/api/source/grpc_adapter.go b/pkg/api/source/grpc_adapter.go new file mode 100644 index 0000000000..3ab32058e1 --- /dev/null +++ b/pkg/api/source/grpc_adapter.go @@ -0,0 +1,144 @@ +package source + +import ( + "context" + "io" + "log" + + "github.com/hashicorp/go-plugin" + "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/emptypb" + + "github.com/kubeshop/botkube/pkg/api" +) + +// Source defines the Botkube source plugin functionality. +type Source interface { + Stream(ctx context.Context) (StreamOutput, error) +} + +// StreamOutput contains the stream data. +type StreamOutput struct { + // Output represents the streamed events. It is from start of plugin execution. + Output chan []byte + // TODO: we should consider adding error feedback channel too. +} + +// ProtocolVersion is the version that must match between Botkube core +// and Botkube plugins. This should be bumped whenever a change happens in +// one or the other that makes it so that they can't safely communicate. +// This could be adding a new interface value, it could be how helper/schema computes diffs, etc. +// +// NOTE: In the future we can consider using VersionedPlugins. These can be used to negotiate +// a compatible version between client and server. If this is set, Handshake.ProtocolVersion is not required. +const ProtocolVersion = 1 + +var _ plugin.GRPCPlugin = &Plugin{} + +// Plugin This is the implementation of plugin.GRPCPlugin, so we can serve and consume different Botkube Sources. +type Plugin struct { + // The GRPC plugin must still implement the Plugin interface. + plugin.NetRPCUnsupportedPlugin + + // Source represent a concrete implementation that handles the business logic. + Source Source +} + +// GRPCServer registers plugin for serving with the given GRPCServer. +func (p *Plugin) GRPCServer(_ *plugin.GRPCBroker, s *grpc.Server) error { + RegisterSourceServer(s, &grpcServer{ + Source: p.Source, + }) + return nil +} + +// GRPCClient returns the interface implementation for the plugin that is serving via gRPC by GRPCServer. +func (p *Plugin) GRPCClient(_ context.Context, _ *plugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error) { + return &grpcClient{ + client: NewSourceClient(c), + }, nil +} + +type grpcClient struct { + client SourceClient +} + +func (p *grpcClient) Stream(ctx context.Context) (StreamOutput, error) { + stream, err := p.client.Stream(ctx, &emptypb.Empty{}) + if err != nil { + return StreamOutput{}, err + } + + out := StreamOutput{ + Output: make(chan []byte), + } + + go func() { + for { + // RecvMsg blocks until it receives a message into m or the stream is + // done. It returns io.EOF when the stream completes successfully. + feature, err := stream.Recv() + if err == io.EOF { + break + } + + // On any other error, the stream is aborted and the error contains the RPC + // status. + if err != nil { + log.Print(err) + // TODO: we should consider adding error feedback channel to StreamOutput. + return + } + out.Output <- feature.Output + } + }() + + return out, nil +} + +type grpcServer struct { + UnimplementedSourceServer + Source Source +} + +func (p *grpcServer) Stream(_ *emptypb.Empty, gstream Source_StreamServer) error { + ctx := gstream.Context() + + // It's up to the 'Stream' method to close the returned channels as it sends the data to it. + // We can only use 'ctx' to cancel streaming and release associated resources. + stream, err := p.Source.Stream(ctx) + if err != nil { + return err + } + + for { + select { + case <-ctx.Done(): // client canceled stream, we can release this connection. + return ctx.Err() + case out, ok := <-stream.Output: + if !ok { + return nil // output closed, no more chunk logs + } + + err := gstream.Send(&StreamResponse{ + Output: out, + }) + if err != nil { + return err + } + } + } +} + +// Serve serves given plugins. +func Serve(p map[string]plugin.Plugin) { + plugin.Serve(&plugin.ServeConfig{ + Plugins: p, + HandshakeConfig: plugin.HandshakeConfig{ + ProtocolVersion: ProtocolVersion, + MagicCookieKey: api.HandshakeConfig.MagicCookieKey, + MagicCookieValue: api.HandshakeConfig.MagicCookieValue, + }, + GRPCServer: plugin.DefaultGRPCServer, + }) +} diff --git a/pkg/api/source/source.pb.go b/pkg/api/source/source.pb.go index 39ef9f179a..d22f227bf3 100644 --- a/pkg/api/source/source.pb.go +++ b/pkg/api/source/source.pb.go @@ -21,16 +21,17 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) -type ConsumeResponse struct { +type StreamResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Data string `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"` + // Output represents the streamed Source events. It is from start of Source execution. + Output []byte `protobuf:"bytes,1,opt,name=output,proto3" json:"output,omitempty"` } -func (x *ConsumeResponse) Reset() { - *x = ConsumeResponse{} +func (x *StreamResponse) Reset() { + *x = StreamResponse{} if protoimpl.UnsafeEnabled { mi := &file_source_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -38,13 +39,13 @@ func (x *ConsumeResponse) Reset() { } } -func (x *ConsumeResponse) String() string { +func (x *StreamResponse) String() string { return protoimpl.X.MessageStringOf(x) } -func (*ConsumeResponse) ProtoMessage() {} +func (*StreamResponse) ProtoMessage() {} -func (x *ConsumeResponse) ProtoReflect() protoreflect.Message { +func (x *StreamResponse) ProtoReflect() protoreflect.Message { mi := &file_source_proto_msgTypes[0] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -56,16 +57,16 @@ func (x *ConsumeResponse) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use ConsumeResponse.ProtoReflect.Descriptor instead. -func (*ConsumeResponse) Descriptor() ([]byte, []int) { +// Deprecated: Use StreamResponse.ProtoReflect.Descriptor instead. +func (*StreamResponse) Descriptor() ([]byte, []int) { return file_source_proto_rawDescGZIP(), []int{0} } -func (x *ConsumeResponse) GetData() string { +func (x *StreamResponse) GetOutput() []byte { if x != nil { - return x.Data + return x.Output } - return "" + return nil } var File_source_proto protoreflect.FileDescriptor @@ -74,15 +75,15 @@ var file_source_proto_rawDesc = []byte{ 0x0a, 0x0c, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x22, 0x25, 0x0a, 0x0f, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x32, 0x48, 0x0a, 0x06, 0x53, 0x6f, - 0x75, 0x72, 0x63, 0x65, 0x12, 0x3e, 0x0a, 0x07, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x12, - 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, - 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x17, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, - 0x2e, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x22, 0x00, 0x30, 0x01, 0x42, 0x10, 0x5a, 0x0e, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x2f, - 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x6f, 0x74, 0x6f, 0x22, 0x28, 0x0a, 0x0e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x6f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x6f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x32, 0x46, 0x0a, + 0x06, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x3c, 0x0a, 0x06, 0x53, 0x74, 0x72, 0x65, 0x61, + 0x6d, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x16, 0x2e, 0x73, 0x6f, 0x75, 0x72, + 0x63, 0x65, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x22, 0x00, 0x30, 0x01, 0x42, 0x10, 0x5a, 0x0e, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, + 0x2f, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -99,12 +100,12 @@ func file_source_proto_rawDescGZIP() []byte { var file_source_proto_msgTypes = make([]protoimpl.MessageInfo, 1) var file_source_proto_goTypes = []interface{}{ - (*ConsumeResponse)(nil), // 0: source.ConsumeResponse - (*emptypb.Empty)(nil), // 1: google.protobuf.Empty + (*StreamResponse)(nil), // 0: source.StreamResponse + (*emptypb.Empty)(nil), // 1: google.protobuf.Empty } var file_source_proto_depIdxs = []int32{ - 1, // 0: source.Source.Consume:input_type -> google.protobuf.Empty - 0, // 1: source.Source.Consume:output_type -> source.ConsumeResponse + 1, // 0: source.Source.Stream:input_type -> google.protobuf.Empty + 0, // 1: source.Source.Stream:output_type -> source.StreamResponse 1, // [1:2] is the sub-list for method output_type 0, // [0:1] is the sub-list for method input_type 0, // [0:0] is the sub-list for extension type_name @@ -119,7 +120,7 @@ func file_source_proto_init() { } if !protoimpl.UnsafeEnabled { file_source_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ConsumeResponse); i { + switch v := v.(*StreamResponse); i { case 0: return &v.state case 1: diff --git a/pkg/api/source/source_grpc.pb.go b/pkg/api/source/source_grpc.pb.go index fd2248806f..5f9efae78b 100644 --- a/pkg/api/source/source_grpc.pb.go +++ b/pkg/api/source/source_grpc.pb.go @@ -23,7 +23,7 @@ const _ = grpc.SupportPackageIsVersion7 // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type SourceClient interface { - Consume(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (Source_ConsumeClient, error) + Stream(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (Source_StreamClient, error) } type sourceClient struct { @@ -34,12 +34,12 @@ func NewSourceClient(cc grpc.ClientConnInterface) SourceClient { return &sourceClient{cc} } -func (c *sourceClient) Consume(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (Source_ConsumeClient, error) { - stream, err := c.cc.NewStream(ctx, &Source_ServiceDesc.Streams[0], "/source.Source/Consume", opts...) +func (c *sourceClient) Stream(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (Source_StreamClient, error) { + stream, err := c.cc.NewStream(ctx, &Source_ServiceDesc.Streams[0], "/source.Source/Stream", opts...) if err != nil { return nil, err } - x := &sourceConsumeClient{stream} + x := &sourceStreamClient{stream} if err := x.ClientStream.SendMsg(in); err != nil { return nil, err } @@ -49,17 +49,17 @@ func (c *sourceClient) Consume(ctx context.Context, in *emptypb.Empty, opts ...g return x, nil } -type Source_ConsumeClient interface { - Recv() (*ConsumeResponse, error) +type Source_StreamClient interface { + Recv() (*StreamResponse, error) grpc.ClientStream } -type sourceConsumeClient struct { +type sourceStreamClient struct { grpc.ClientStream } -func (x *sourceConsumeClient) Recv() (*ConsumeResponse, error) { - m := new(ConsumeResponse) +func (x *sourceStreamClient) Recv() (*StreamResponse, error) { + m := new(StreamResponse) if err := x.ClientStream.RecvMsg(m); err != nil { return nil, err } @@ -70,7 +70,7 @@ func (x *sourceConsumeClient) Recv() (*ConsumeResponse, error) { // All implementations must embed UnimplementedSourceServer // for forward compatibility type SourceServer interface { - Consume(*emptypb.Empty, Source_ConsumeServer) error + Stream(*emptypb.Empty, Source_StreamServer) error mustEmbedUnimplementedSourceServer() } @@ -78,8 +78,8 @@ type SourceServer interface { type UnimplementedSourceServer struct { } -func (UnimplementedSourceServer) Consume(*emptypb.Empty, Source_ConsumeServer) error { - return status.Errorf(codes.Unimplemented, "method Consume not implemented") +func (UnimplementedSourceServer) Stream(*emptypb.Empty, Source_StreamServer) error { + return status.Errorf(codes.Unimplemented, "method Stream not implemented") } func (UnimplementedSourceServer) mustEmbedUnimplementedSourceServer() {} @@ -94,24 +94,24 @@ func RegisterSourceServer(s grpc.ServiceRegistrar, srv SourceServer) { s.RegisterService(&Source_ServiceDesc, srv) } -func _Source_Consume_Handler(srv interface{}, stream grpc.ServerStream) error { +func _Source_Stream_Handler(srv interface{}, stream grpc.ServerStream) error { m := new(emptypb.Empty) if err := stream.RecvMsg(m); err != nil { return err } - return srv.(SourceServer).Consume(m, &sourceConsumeServer{stream}) + return srv.(SourceServer).Stream(m, &sourceStreamServer{stream}) } -type Source_ConsumeServer interface { - Send(*ConsumeResponse) error +type Source_StreamServer interface { + Send(*StreamResponse) error grpc.ServerStream } -type sourceConsumeServer struct { +type sourceStreamServer struct { grpc.ServerStream } -func (x *sourceConsumeServer) Send(m *ConsumeResponse) error { +func (x *sourceStreamServer) Send(m *StreamResponse) error { return x.ServerStream.SendMsg(m) } @@ -124,8 +124,8 @@ var Source_ServiceDesc = grpc.ServiceDesc{ Methods: []grpc.MethodDesc{}, Streams: []grpc.StreamDesc{ { - StreamName: "Consume", - Handler: _Source_Consume_Handler, + StreamName: "Stream", + Handler: _Source_Stream_Handler, ServerStreams: true, }, }, diff --git a/pkg/config/config.go b/pkg/config/config.go index d8ccb23455..d0cef5495a 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -198,6 +198,7 @@ type ActionBindings struct { type Sources struct { DisplayName string `yaml:"displayName"` Kubernetes KubernetesSource `yaml:"kubernetes"` + Plugins PluginsExecutors `koanf:",remain"` } // KubernetesSource contains configuration for Kubernetes sources. diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 0e6e0fa4b5..4a7e6aa375 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -48,6 +48,41 @@ func TestLoadConfigSuccess(t *testing.T) { golden.Assert(t, string(gotData), filepath.Join(t.Name(), "config.golden.yaml")) } +func TestLoadConfigWithPlugins(t *testing.T) { + // given + expSourcePlugin := config.PluginsExecutors{ + "botkube/keptn": { + Enabled: true, + Config: map[string]interface{}{ + "field": "value", + }, + }, + } + + expExecutorPlugin := config.PluginsExecutors{ + "botkube/echo": { + Enabled: true, + Config: map[string]interface{}{ + "changeResponseToUpperCase": true, + }, + }, + } + + // when + gotCfg, _, err := config.LoadWithDefaults(func() []string { + return []string{ + testdataFile(t, "config-all.yaml"), + } + }) + + //then + require.NoError(t, err) + require.NotNil(t, gotCfg) + + assert.Equal(t, expSourcePlugin, gotCfg.Sources["k8s-events"].Plugins) + assert.Equal(t, expExecutorPlugin, gotCfg.Executors["plugin-based"].Plugins) +} + func TestFromEnvOrFlag(t *testing.T) { var expConfigPaths = []string{ "configs/first.yaml", diff --git a/pkg/config/testdata/TestLoadConfigSuccess/config-all.yaml b/pkg/config/testdata/TestLoadConfigSuccess/config-all.yaml index e3ab1eb18c..bf10b4a83a 100644 --- a/pkg/config/testdata/TestLoadConfigSuccess/config-all.yaml +++ b/pkg/config/testdata/TestLoadConfigSuccess/config-all.yaml @@ -104,6 +104,8 @@ communications: # req 1 elm. sources: 'k8s-events': + displayName: "Plugins & Builtins" + kubernetes: recommendations: pod: @@ -115,7 +117,7 @@ sources: namespaces: include: - ".*" - exclude: [] + exclude: [ ] event: reason: ".*" message: "^Error .*" @@ -202,6 +204,12 @@ sources: fields: - spec.template.spec.containers[*].image - status.readyReplicas + + botkube/keptn: + enabled: true + config: + field: value + filters: kubernetes: objectAnnotationChecker: true diff --git a/pkg/config/testdata/TestLoadConfigSuccess/config.golden.yaml b/pkg/config/testdata/TestLoadConfigSuccess/config.golden.yaml index bf1b6ebdfd..6ad30aa772 100644 --- a/pkg/config/testdata/TestLoadConfigSuccess/config.golden.yaml +++ b/pkg/config/testdata/TestLoadConfigSuccess/config.golden.yaml @@ -10,7 +10,7 @@ actions: - kubectl-read-only sources: k8s-events: - displayName: "" + displayName: Plugins & Builtins kubernetes: recommendations: ingress: @@ -271,6 +271,11 @@ sources: my-annotation: "true" labels: my-label: "true" + plugins: + botkube/keptn: + enabled: true + config: + field: value executors: kubectl-read-only: kubectl: diff --git a/pkg/config/testdata/TestLoadConfigWithPlugins/config-all.yaml b/pkg/config/testdata/TestLoadConfigWithPlugins/config-all.yaml new file mode 100644 index 0000000000..978caf4118 --- /dev/null +++ b/pkg/config/testdata/TestLoadConfigWithPlugins/config-all.yaml @@ -0,0 +1,61 @@ +communications: # req 1 elm. + 'default-workspace': + slack: + enabled: false + channels: + 'alias': + name: 'SLACK_CHANNEL' + bindings: + executors: + - kubectl-read-only + sources: + - k8s-events + token: 'SLACK_API_TOKEN' + notification: + type: short + +sources: + 'k8s-events': + displayName: "Plugins & Builtins" + + kubernetes: + events: + - create + - delete + - error + namespaces: + include: [ ".*" ] + resources: + - name: v1/pods + + botkube/keptn: + enabled: true + config: + field: value + + +executors: + 'kubectl-read-only': + # Kubectl executor configs + kubectl: + namespaces: + include: [ ".*" ] + exclude: [ "foo", "bar", "test-*-ns" ] + + # Set true to enable kubectl commands execution + enabled: false + # List of allowed commands + commands: + # method which are allowed + verbs: [ "api-resources", "api-versions", "cluster-info", "describe", "diff", "explain", "get", "logs", "top", "auth" ] + # resource configuration which is allowed + resources: [ "deployments", "pods" , "namespaces", "daemonsets", "statefulsets", "storageclasses", "nodes" ] + # set Namespace to execute botkube kubectl commands by default + defaultNamespace: default + # Set true to enable commands execution from configured channel only + restrictAccess: false + 'plugin-based': + botkube/echo: # / is syntax for plugin based executors + enabled: true + config: + changeResponseToUpperCase: true diff --git a/pkg/execute/plugin_executor.go b/pkg/execute/plugin_executor.go index 84ea07936a..f294f56e0e 100644 --- a/pkg/execute/plugin_executor.go +++ b/pkg/execute/plugin_executor.go @@ -3,7 +3,6 @@ package execute import ( "context" "fmt" - "strings" "github.com/sirupsen/logrus" "gopkg.in/yaml.v3" @@ -57,10 +56,7 @@ func (e *PluginExecutor) Execute(ctx context.Context, bindings []string, args [] "command": command, }).Debugf("Handling plugin command...") - var ( - cmdName = args[0] - ) - + cmdName := args[0] plugins, fullPluginName := e.getEnabledPlugins(bindings, cmdName) configs, err := e.collectConfigs(plugins) @@ -73,7 +69,7 @@ func (e *PluginExecutor) Execute(ctx context.Context, bindings []string, args [] return "", fmt.Errorf("while getting concrete plugin client: %w", err) } - // TODO: Execute RPC with all data: + // TODO(configure plugin): Execute RPC with all data: // - command // - all configuration but in proper order (so it can be merged properly) _ = configs @@ -110,24 +106,24 @@ func (e *PluginExecutor) getEnabledPlugins(bindings []string, cmdName string) ([ fullPluginName string ) - for _, name := range bindings { - executors, found := e.cfg.Executors[name] + for _, bindingName := range bindings { + bindExecutors, found := e.cfg.Executors[bindingName] if !found { continue } - for key, executor := range executors.Plugins { - if !executor.Enabled { + for pluginKey, pluginDetails := range bindExecutors.Plugins { + if !pluginDetails.Enabled { continue } - _, pluginName, _ := strings.Cut(key, "/") // FIXME: maybe add a shared method/func to manager/plugin pkg? + _, pluginName, _ := plugin.DecomposePluginKey(pluginKey) if pluginName != cmdName { continue } - fullPluginName = key - out = append(out, executor) + fullPluginName = pluginKey + out = append(out, pluginDetails) } } diff --git a/proto/source.proto b/proto/source.proto index cf1efb3f49..52c5682108 100644 --- a/proto/source.proto +++ b/proto/source.proto @@ -6,10 +6,11 @@ option go_package = "pkg/api/source"; package source; -service Source { - rpc Consume(google.protobuf.Empty) returns (stream ConsumeResponse) {} +message StreamResponse { + // Output represents the streamed Source events. It is from start of Source execution. + bytes output = 1; } -message ConsumeResponse { - string data = 1; +service Source { + rpc Stream(google.protobuf.Empty) returns (stream StreamResponse) {} }