Skip to content

Commit

Permalink
Update source example
Browse files Browse the repository at this point in the history
  • Loading branch information
mszostok committed Nov 29, 2022
1 parent 4bb45a8 commit 0da015a
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 26 deletions.
5 changes: 4 additions & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ For faster development, you can also build and run Botkube outside K8s cluster.
export BOTKUBE_PLUGINS_CACHE__DIR="/tmp/plugins"
```

3. Each time you make a change to [source](cmd/source) or [executors](cmd/executor) plugins, run:
3. In other terminal window, run:

```bash
# rebuild plugins only for current GOOS and GOARCH
Expand All @@ -162,6 +162,9 @@ For faster development, you can also build and run Botkube outside K8s cluster.
./botkube
```

> **Note**
> Each time you make a change to the [source](cmd/source) or [executors](cmd/executor) plugins re-run the above command.

## Making A Change

- Before making any significant changes, please [open an issue](https://github.com/kubeshop/botkube/issues). Discussing your proposed changes ahead of time will make the contribution process smooth for everyone.
Expand Down
2 changes: 1 addition & 1 deletion cmd/executor/echo/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Echo is the example Botkube executor used during [e2e tests](../../../test/e2e).

## Configuration parameters

The Echo configuration should be specified in YAML format. It accepts such parameters:
The configuration should be specified in YAML format. Such parameters are supported:

```yaml
changeResponseToUpperCase: true # default is 'false'.
Expand Down
2 changes: 1 addition & 1 deletion cmd/executor/echo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (EchoExecutor) Execute(_ context.Context, req *executor.ExecuteRequest) (*e
}

return &executor.ExecuteResponse{
Data: data + "v2",
Data: data,
}, nil
}

Expand Down
10 changes: 6 additions & 4 deletions cmd/source/cm-watcher/README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
# Echo executor
# ConfigMap watcher source

Echo is the example Botkube executor used during [e2e tests](../../../test/e2e).
ConfigMap watcher source is the example Botkube source used during [e2e tests](../../../test/e2e).

## Configuration parameters

The Echo configuration should be specified in YAML format. It accepts such parameters:
The configuration should be specified in YAML format. Such parameters are supported:

```yaml
configMapName: cm-map-watcher # config map name to react to.
configMap:
name: cm-map-watcher # config map name to react to
namespace: botkube # config map namespace
```
98 changes: 79 additions & 19 deletions cmd/source/cm-watcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,38 @@ package main

import (
"context"
"encoding/json"
"time"
"fmt"
"log"
"os"
"path/filepath"

"github.com/hashicorp/go-plugin"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"

watchtools "k8s.io/client-go/tools/watch"

"github.com/kubeshop/botkube/pkg/api/source"
)

const pluginName = "cm-watcher"

// Config holds executor configuration.
type Config struct {
ConfigMapName string
}
type (
Config struct {
ConfigMap Object
}
Object struct {
Name string `yaml:"name"`
Namespace string `yaml:"namespace"`
}
)

// CMWatcher implements Botkube source plugin.
type CMWatcher struct{}
Expand All @@ -24,29 +42,65 @@ type CMWatcher struct{}
func (CMWatcher) Stream(ctx context.Context) (source.StreamOutput, error) {
// TODO: in request we should receive the executor configuration.
cfg := Config{
ConfigMapName: "cm-watcher-trigger",
ConfigMap: Object{
Name: "cm-watcher-trigger",
Namespace: "botkube",
},
}

raw, err := json.Marshal(cfg)
if err != nil {
return source.StreamOutput{}, err
}
out := source.StreamOutput{
Output: make(chan []byte),
}

go func() {
for {
select {
case <-time.Tick(1 * time.Second):
out.Output <- raw
case <-ctx.Done():
go listenEvents(ctx, cfg.ConfigMap, out.Output)

return out, nil
}

func listenEvents(ctx context.Context, obj Object, sink chan<- []byte) {
home, err := os.UserHomeDir()
exitOnError(err)

config, err := clientcmd.BuildConfigFromFlags("", filepath.Join(home, ".kube", "config"))
exitOnError(err)
clientset, err := kubernetes.NewForConfig(config)
exitOnError(err)

fieldSelector := fields.OneTermEqualSelector("metadata.name", obj.Name).String()
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = fieldSelector
return clientset.CoreV1().Pods(obj.Namespace).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = fieldSelector
return clientset.CoreV1().Pods(obj.Namespace).Watch(ctx, options)
},
}

fmt.Println("starting informer")
_, informer, watcher, _ := watchtools.NewIndexerInformerWatcher(lw, &corev1.ConfigMap{})
defer watcher.Stop()

fmt.Println("waiting for cache sync")
cache.WaitForCacheSync(ctx.Done(), informer.HasSynced)

ch := watcher.ResultChan()
defer watcher.Stop()

for {
select {
case event, ok := <-ch:
fmt.Println("get event", event)
if !ok { // finished
return
}
cm := event.Object.(*corev1.ConfigMap)
sink <- []byte(cm.Name)
case <-ctx.Done(): // client closed streaming
return
}
}()

return out, nil
}
}

func main() {
Expand All @@ -56,3 +110,9 @@ func main() {
},
})
}

func exitOnError(err error) {
if err != nil {
log.Fatal(err)
}
}

0 comments on commit 0da015a

Please sign in to comment.