Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for etcd client version 3 #2

Merged
merged 7 commits into from
Apr 23, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 0 additions & 33 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,48 +7,15 @@ import (
"io/ioutil"
"net"
"net/http"
"time"

etcd "github.com/coreos/etcd/client"
)

// Code taken from https://github.com/go-kit/kit/blob/master/sd/etcd/client.go

const defaultTTL = 3 * time.Second

// Client is a wrapper around the etcd client.
type Client interface {
// GetEntries queries the given prefix in etcd and returns a slice
// containing the values of all keys found, recursively, underneath that
// prefix.
GetEntries(prefix string) ([]string, error)

// WatchPrefix watches the given prefix in etcd for changes. When a change
// is detected, it will signal on the passed channel. Clients are expected
// to call GetEntries to update themselves with the latest set of complete
// values. WatchPrefix will always send an initial sentinel value on the
// channel after establishing the watch, to ensure that clients always
// receive the latest set of values. WatchPrefix will block until the
// context passed to the NewClient constructor is terminated.
WatchPrefix(prefix string, ch chan struct{})
}

type client struct {
keysAPI etcd.KeysAPI
ctx context.Context
}

// ClientOptions defines options for the etcd client. All values are optional.
// If any duration is not specified, a default of 3 seconds will be used.
type ClientOptions struct {
Cert string
Key string
CACert string
DialTimeout time.Duration
DialKeepAlive time.Duration
HeaderTimeoutPerRequest time.Duration
}

// NewClient returns Client with a connection to the named machines. It will
// return an error if a connection to the cluster cannot be made. The parameter
// machines needs to be a full URL with schemas. e.g. "http://localhost:2379"
Expand Down
110 changes: 110 additions & 0 deletions clientv3.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package etcd

import (
"context"
"crypto/tls"
"crypto/x509"
"io/ioutil"
"time"

etcdv3 "github.com/coreos/etcd/clientv3"
)

type clientv3 struct {
client *etcdv3.Client
ctx context.Context
timeout time.Duration
}

// NewClient returns Client with a connection to the named machines. It will
// return an error if a connection to the cluster cannot be made. The parameter
// machines needs to be a full URL with schemas. e.g. "http://localhost:2379"
// will work, but "localhost:2379" will not.
func NewClientV3(ctx context.Context, machines []string, options ClientOptions) (Client, error) {
if options.DialTimeout == 0 {
options.DialTimeout = defaultTTL
}
if options.DialKeepAlive == 0 {
options.DialKeepAlive = defaultTTL
}
if options.HeaderTimeoutPerRequest == 0 {
options.HeaderTimeoutPerRequest = defaultTTL
}

var tlsCfg *tls.Config
if options.Cert != "" && options.Key != "" {
tlsCert, err := tls.LoadX509KeyPair(options.Cert, options.Key)
if err != nil {
return nil, err
}
tlsCfg = &tls.Config{
Certificates: []tls.Certificate{tlsCert},
}
if caCertCt, err := ioutil.ReadFile(options.CACert); err == nil {
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCertCt)
tlsCfg.RootCAs = caCertPool
}
}

ce, err := etcdv3.New(etcdv3.Config{
Endpoints: machines,
DialTimeout: options.DialTimeout,
DialKeepAliveTime: options.DialKeepAlive,
DialKeepAliveTimeout: options.HeaderTimeoutPerRequest,
TLS: tlsCfg,
})
if err != nil {
return nil, err
}

return &clientv3{
client: ce,
ctx: ctx,
timeout: options.HeaderTimeoutPerRequest,
}, nil
}

// GetEntries implements the etcd Client interface.
func (c *clientv3) GetEntries(key string) ([]string, error) {

if c.client == nil {
return nil, ErrNilClient
}

// set the timeout for this requisition
timeoutCtx, cancel := context.WithTimeout(c.ctx, c.timeout)
resp, err := c.client.Get(timeoutCtx, key, etcdv3.WithPrefix())
cancel()

if err != nil {
return nil, err
}

// Special case. Note that it's possible that len(resp.Node.Nodes) == 0 and
// resp.Node.Value is also empty, in which case the key is empty and we
// should not return any entries.
if len(resp.Kvs) == 0 || resp.Count != int64(len(resp.Kvs)) {
return nil, nil
}

entries := make([]string, resp.Count)
for i, ev := range resp.Kvs {
entries[i] = string(ev.Value[:])
}
return entries, nil
}

// WatchPrefix implements the etcd Client interface.
func (c *clientv3) WatchPrefix(prefix string, ch chan struct{}) {

if c.client == nil {
return
}
watch := c.client.Watch(c.ctx, prefix, etcdv3.WithPrefix())
// watch := c.keysAPI.Watcher(prefix, &etcd.WatcherOptions{AfterIndex: 0, Recursive: true})
khvysofq marked this conversation as resolved.
Show resolved Hide resolved
ch <- struct{}{} // make sure caller invokes GetEntries
for _ := range watch {
ch <- struct{}{}
}
}
86 changes: 86 additions & 0 deletions clientv3_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package etcd

import (
"context"
"testing"
"time"
)

func TestNewClient_withDefaultsV3(t *testing.T) {
client, err := NewClientV3(
context.Background(),
[]string{"http://irrelevant:12345"},
ClientOptions{},
)
if err == nil {
t.Fatalf("unexpected error creating client: %v", err)
}
if client != nil {
t.Fatal("expected new Client, got nil")
}
}

// NewClient should fail when providing invalid or missing endpoints.
func TestOptionsV3(t *testing.T) {
a, err := NewClientV3(
context.Background(),
[]string{},
ClientOptions{
Cert: "",
Key: "",
CACert: "",
DialTimeout: 2 * time.Second,
DialKeepAlive: 2 * time.Second,
HeaderTimeoutPerRequest: 2 * time.Second,
},
)
if err == nil {
t.Errorf("expected error: %v", err)
}
if a != nil {
t.Fatalf("expected client to be nil on failure")
}

_, err = NewClientV3(
context.Background(),
[]string{"http://irrelevant:12345"},
ClientOptions{
Cert: "blank.crt",
Key: "blank.key",
CACert: "blank.CACert",
DialTimeout: 2 * time.Second,
DialKeepAlive: 2 * time.Second,
HeaderTimeoutPerRequest: 2 * time.Second,
},
)
if err == nil {
t.Errorf("expected error: %v", err)
}
}

// ---------------------------------------------------------------------------------------------------------------------

func newFakeClientV3(ctx context.Context) Client {
return &clientv3{
client: nil,
ctx: ctx,
timeout: 3 * time.Second,
}
}

func TestWatchPrefixV3(t *testing.T) {

cv3 := newFakeClientV3(context.Background())

ch := make(chan struct{})
cv3.WatchPrefix("prefix", ch)
}

func TestGetEntriesV3(t *testing.T) {
cv3 := newFakeClientV3(context.Background())

res, err := cv3.GetEntries("prefix")
if res != nil || err == nil {
t.Errorf("expected client error")
}
}
54 changes: 54 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,39 @@ import (
"github.com/devopsfaith/krakend/config"
)

// Code taken from https://github.com/go-kit/kit/blob/master/sd/etcd/client.go

const defaultTTL = 3 * time.Second

// Client is a wrapper around the etcd client.
type Client interface {
// GetEntries queries the given prefix in etcd and returns a slice
// containing the values of all keys found, recursively, underneath that
// prefix.
GetEntries(prefix string) ([]string, error)

// WatchPrefix watches the given prefix in etcd for changes. When a change
// is detected, it will signal on the passed channel. Clients are expected
// to call GetEntries to update themselves with the latest set of complete
// values. WatchPrefix will always send an initial sentinel value on the
// channel after establishing the watch, to ensure that clients always
// receive the latest set of values. WatchPrefix will block until the
// context passed to the NewClient constructor is terminated.
WatchPrefix(prefix string, ch chan struct{})
}

// ClientOptions defines options for the etcd client. All values are optional.
// If any duration is not specified, a default of 3 seconds will be used.
type ClientOptions struct {
Cert string
Key string
CACert string
DialTimeout time.Duration
DialKeepAlive time.Duration
DialKeepAliveTimeout time.Duration
HeaderTimeoutPerRequest time.Duration
}

// Namespace is the key to use to store and access the custom config data
const Namespace = "github.com/devopsfaith/krakend-etcd"

Expand All @@ -18,6 +51,8 @@ var (
ErrBadConfig = fmt.Errorf("unable to create the etcd client with the received config")
// ErrNoMachines is the error to be returned when the config has not defined one or more servers
ErrNoMachines = fmt.Errorf("unable to create the etcd client without a set of servers")
// ErrNilClient is the error to be nil client
ErrNilClient = fmt.Errorf("nil etcd client")
)

// New creates an etcd client with the config extracted from the extra config param
Expand All @@ -34,10 +69,29 @@ func New(ctx context.Context, e config.ExtraConfig) (Client, error) {
if err != nil {
return nil, err
}
version, err := parseVersion(tmp)
if err != nil {
return nil, err
}

if version == "v3" {
return NewClientV3(ctx, machines, parseOptions(tmp))
}
return NewClient(ctx, machines, parseOptions(tmp))
}

func parseVersion(cfg map[string]interface{}) (string, error) {
value, ok := cfg["client_version"]
if !ok {
return "v2", nil
}
result := value.(string)
if result != "v2" && result != "v3" {
khvysofq marked this conversation as resolved.
Show resolved Hide resolved
result = "v2"
}
return result, nil
}

func parseMachines(cfg map[string]interface{}) ([]string, error) {
result := []string{}
machines, ok := cfg["machines"]
Expand Down
34 changes: 30 additions & 4 deletions example/krakend.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,45 @@
"endpoints": [
{
"endpoint": "/github/{user}",
"method": "GET",
"output_encoding": "no-op",
"backend": [
{
"host": [
"github"
],
"url_pattern": "/",
"disable_host_sanitize": true,
"whitelist": [
"authorizations_url",
"code_search_url"
],
"sd": "etcd",

"sd": "etcd"
}
]
},
},
{
"endpoint": "/",
"method": "GET",
"extra_config": {},
"output_encoding": "no-op",
"concurrent_calls": 1,
"querystring_params":[
"*"
],
"backend": [
{
"url_pattern": "/",
"encoding": "no-op",
"extra_config": {},
"sd": "etcd",
"host": [
"github.com"
],
"disable_host_sanitize": true
}
]
},
{
"endpoint": "/combination/{id}",
"backend": [
Expand All @@ -30,6 +54,7 @@
"jsonplaceholder.typicode"
],
"url_pattern": "/posts?userId={id}",
"encoding": "no-op",
"is_collection": true,
"mapping": {
"collection": "posts"
Expand All @@ -53,7 +78,8 @@
],
"extra_config": {
"github.com/devopsfaith/krakend-etcd": {
"machines": [ "http://192.168.99.100:4001" ],
"machines": [ "http://192.168.110.111:2379" ],
"client_version": "v3",
"options": {
"dial_timeout": "5s",
"dial_keepalive": "30s",
Expand Down
Loading