Skip to content

Commit

Permalink
Initial commit for ksql operator
Browse files Browse the repository at this point in the history
  • Loading branch information
mgazza committed Dec 11, 2020
1 parent f00e796 commit 532cbe4
Show file tree
Hide file tree
Showing 51 changed files with 3,886 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
vendor
36 changes: 36 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# ksql operator
A kubernetes operator for ksql tables streams and running queries

# Manifests
This directory contains kubernetes resources used by this deployment

# args
| arg | default | comments |
|------------|----------------|---------------------------------------------------------------------------------------------------------------|
| kubeConfig | | Path to a kubeConfig. Only required if out-of-cluster. |
| master | | The address of the Kubernetes API server. Overrides any value in kubeConfig. Only required if out-of-cluster. |
| baseURL | $KSQL_URL | The Base URL of the ksql rest api. |
| username | $KSQL_USERNAME | The Username to use with the ksql rest api. |
| password | $KSQL_PASSWORD | The Password to use with the ksql rest api. |

# env
| env | default | comments |
|---------------|----------------------|----------------------------------------------|
| KSQL_URL | http://ksqldb | The Base URL of the ksql rest api. |
| KSQL_USERNAME | | The Username to use with the ksql rest api. |
| KSQL_PASSWORD | | The Password to use with the ksql rest api. |

# Build
This project is continuously integrated by github and produces a docker image
```bash
docker pull ghcr.io/mgazza/ksql_operator:latest
```

# Generated resources
This project uses a few generated resources.
To regenerate the generated code issue the following command.
```bash
./hack/update-codegen.sh
```
This project uses go mod
you may need to execute `go mod vendor` before the above will work.
43 changes: 43 additions & 0 deletions cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package main

import "sync"

type safeCache struct {
lock sync.Mutex
store map[string]interface{}
}

func NewSafeCache() *safeCache {
return &safeCache{
store: map[string]interface{}{},
}
}

// Sync calls the cb function passing the current object in the store for that key in a thread safe manor
// if the cb errors it returns that error
// Sync then stores the result of cb back in the store for that key
func (c *safeCache) Sync(key string, cb func(obj interface{}) (interface{}, error)) error {
c.lock.Lock()
defer c.lock.Unlock()
item := c.store[key]
cbi, err := cb(item)
if err != nil {
return err
}
if cbi != item && cbi != nil {
c.store[key] = cbi
}
return nil
}

func (c *safeCache) Get(key string) interface{} {
c.lock.Lock()
defer c.lock.Unlock()
return c.store[key]
}

func (c *safeCache) Delete(key string) {
c.lock.Lock()
defer c.lock.Unlock()
delete(c.store, key)
}
Loading

0 comments on commit 532cbe4

Please sign in to comment.