Skip to content

Commit

Permalink
Merge pull request #2 from khvysofq/master
Browse files Browse the repository at this point in the history
Add support for etcd client version 3
  • Loading branch information
kpacha authored Apr 23, 2019
2 parents bd64594 + 5987743 commit 4418916
Show file tree
Hide file tree
Showing 6 changed files with 283 additions and 41 deletions.
33 changes: 0 additions & 33 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,48 +7,15 @@ import (
"io/ioutil"
"net"
"net/http"
"time"

etcd "github.com/coreos/etcd/client"
)

// Code taken from https://github.com/go-kit/kit/blob/master/sd/etcd/client.go

const defaultTTL = 3 * time.Second

// Client is a wrapper around the etcd client.
type Client interface {
// GetEntries queries the given prefix in etcd and returns a slice
// containing the values of all keys found, recursively, underneath that
// prefix.
GetEntries(prefix string) ([]string, error)

// WatchPrefix watches the given prefix in etcd for changes. When a change
// is detected, it will signal on the passed channel. Clients are expected
// to call GetEntries to update themselves with the latest set of complete
// values. WatchPrefix will always send an initial sentinel value on the
// channel after establishing the watch, to ensure that clients always
// receive the latest set of values. WatchPrefix will block until the
// context passed to the NewClient constructor is terminated.
WatchPrefix(prefix string, ch chan struct{})
}

type client struct {
keysAPI etcd.KeysAPI
ctx context.Context
}

// ClientOptions defines options for the etcd client. All values are optional.
// If any duration is not specified, a default of 3 seconds will be used.
type ClientOptions struct {
Cert string
Key string
CACert string
DialTimeout time.Duration
DialKeepAlive time.Duration
HeaderTimeoutPerRequest time.Duration
}

// NewClient returns Client with a connection to the named machines. It will
// return an error if a connection to the cluster cannot be made. The parameter
// machines needs to be a full URL with schemas. e.g. "http://localhost:2379"
Expand Down
109 changes: 109 additions & 0 deletions clientv3.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package etcd

import (
"context"
"crypto/tls"
"crypto/x509"
"io/ioutil"
"time"

etcdv3 "github.com/coreos/etcd/clientv3"
)

type clientv3 struct {
client *etcdv3.Client
ctx context.Context
timeout time.Duration
}

// NewClient returns Client with a connection to the named machines. It will
// return an error if a connection to the cluster cannot be made. The parameter
// machines needs to be a full URL with schemas. e.g. "http://localhost:2379"
// will work, but "localhost:2379" will not.
func NewClientV3(ctx context.Context, machines []string, options ClientOptions) (Client, error) {
if options.DialTimeout == 0 {
options.DialTimeout = defaultTTL
}
if options.DialKeepAlive == 0 {
options.DialKeepAlive = defaultTTL
}
if options.HeaderTimeoutPerRequest == 0 {
options.HeaderTimeoutPerRequest = defaultTTL
}

var tlsCfg *tls.Config
if options.Cert != "" && options.Key != "" {
tlsCert, err := tls.LoadX509KeyPair(options.Cert, options.Key)
if err != nil {
return nil, err
}
tlsCfg = &tls.Config{
Certificates: []tls.Certificate{tlsCert},
}
if caCertCt, err := ioutil.ReadFile(options.CACert); err == nil {
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCertCt)
tlsCfg.RootCAs = caCertPool
}
}

ce, err := etcdv3.New(etcdv3.Config{
Endpoints: machines,
DialTimeout: options.DialTimeout,
DialKeepAliveTime: options.DialKeepAlive,
DialKeepAliveTimeout: options.HeaderTimeoutPerRequest,
TLS: tlsCfg,
})
if err != nil {
return nil, err
}

return &clientv3{
client: ce,
ctx: ctx,
timeout: options.HeaderTimeoutPerRequest,
}, nil
}

// GetEntries implements the etcd Client interface.
func (c *clientv3) GetEntries(key string) ([]string, error) {

if c.client == nil {
return nil, ErrNilClient
}

// set the timeout for this requisition
timeoutCtx, cancel := context.WithTimeout(c.ctx, c.timeout)
resp, err := c.client.Get(timeoutCtx, key, etcdv3.WithPrefix())
cancel()

if err != nil {
return nil, err
}

// Special case. Note that it's possible that len(resp.Node.Nodes) == 0 and
// resp.Node.Value is also empty, in which case the key is empty and we
// should not return any entries.
if len(resp.Kvs) == 0 || resp.Count != int64(len(resp.Kvs)) {
return nil, nil
}

entries := make([]string, resp.Count)
for i, ev := range resp.Kvs {
entries[i] = string(ev.Value[:])
}
return entries, nil
}

// WatchPrefix implements the etcd Client interface.
func (c *clientv3) WatchPrefix(prefix string, ch chan struct{}) {

if c.client == nil {
return
}
watch := c.client.Watch(c.ctx, prefix, etcdv3.WithPrefix())
ch <- struct{}{} // make sure caller invokes GetEntries
for _ := range watch {
ch <- struct{}{}
}
}
86 changes: 86 additions & 0 deletions clientv3_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package etcd

import (
"context"
"testing"
"time"
)

func TestNewClient_withDefaultsV3(t *testing.T) {
client, err := NewClientV3(
context.Background(),
[]string{"http://irrelevant:12345"},
ClientOptions{},
)
if err == nil {
t.Fatalf("unexpected error creating client: %v", err)
}
if client != nil {
t.Fatal("expected new Client, got nil")
}
}

// NewClient should fail when providing invalid or missing endpoints.
func TestOptionsV3(t *testing.T) {
a, err := NewClientV3(
context.Background(),
[]string{},
ClientOptions{
Cert: "",
Key: "",
CACert: "",
DialTimeout: 2 * time.Second,
DialKeepAlive: 2 * time.Second,
HeaderTimeoutPerRequest: 2 * time.Second,
},
)
if err == nil {
t.Errorf("expected error: %v", err)
}
if a != nil {
t.Fatalf("expected client to be nil on failure")
}

_, err = NewClientV3(
context.Background(),
[]string{"http://irrelevant:12345"},
ClientOptions{
Cert: "blank.crt",
Key: "blank.key",
CACert: "blank.CACert",
DialTimeout: 2 * time.Second,
DialKeepAlive: 2 * time.Second,
HeaderTimeoutPerRequest: 2 * time.Second,
},
)
if err == nil {
t.Errorf("expected error: %v", err)
}
}

// ---------------------------------------------------------------------------------------------------------------------

func newFakeClientV3(ctx context.Context) Client {
return &clientv3{
client: nil,
ctx: ctx,
timeout: 3 * time.Second,
}
}

func TestWatchPrefixV3(t *testing.T) {

cv3 := newFakeClientV3(context.Background())

ch := make(chan struct{})
cv3.WatchPrefix("prefix", ch)
}

func TestGetEntriesV3(t *testing.T) {
cv3 := newFakeClientV3(context.Background())

res, err := cv3.GetEntries("prefix")
if res != nil || err == nil {
t.Errorf("expected client error")
}
}
54 changes: 54 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,39 @@ import (
"github.com/devopsfaith/krakend/config"
)

// Code taken from https://github.com/go-kit/kit/blob/master/sd/etcd/client.go

const defaultTTL = 3 * time.Second

// Client is a wrapper around the etcd client.
type Client interface {
// GetEntries queries the given prefix in etcd and returns a slice
// containing the values of all keys found, recursively, underneath that
// prefix.
GetEntries(prefix string) ([]string, error)

// WatchPrefix watches the given prefix in etcd for changes. When a change
// is detected, it will signal on the passed channel. Clients are expected
// to call GetEntries to update themselves with the latest set of complete
// values. WatchPrefix will always send an initial sentinel value on the
// channel after establishing the watch, to ensure that clients always
// receive the latest set of values. WatchPrefix will block until the
// context passed to the NewClient constructor is terminated.
WatchPrefix(prefix string, ch chan struct{})
}

// ClientOptions defines options for the etcd client. All values are optional.
// If any duration is not specified, a default of 3 seconds will be used.
type ClientOptions struct {
Cert string
Key string
CACert string
DialTimeout time.Duration
DialKeepAlive time.Duration
DialKeepAliveTimeout time.Duration
HeaderTimeoutPerRequest time.Duration
}

// Namespace is the key to use to store and access the custom config data
const Namespace = "github.com/devopsfaith/krakend-etcd"

Expand All @@ -18,6 +51,8 @@ var (
ErrBadConfig = fmt.Errorf("unable to create the etcd client with the received config")
// ErrNoMachines is the error to be returned when the config has not defined one or more servers
ErrNoMachines = fmt.Errorf("unable to create the etcd client without a set of servers")
// ErrNilClient is the error to be nil client
ErrNilClient = fmt.Errorf("nil etcd client")
)

// New creates an etcd client with the config extracted from the extra config param
Expand All @@ -34,10 +69,29 @@ func New(ctx context.Context, e config.ExtraConfig) (Client, error) {
if err != nil {
return nil, err
}
version, err := parseVersion(tmp)
if err != nil {
return nil, err
}

if version == "v3" {
return NewClientV3(ctx, machines, parseOptions(tmp))
}
return NewClient(ctx, machines, parseOptions(tmp))
}

func parseVersion(cfg map[string]interface{}) (string, error) {
value, ok := cfg["client_version"]
if !ok {
return "v2", nil
}
result, ok := value.(string)
if !ok || (result != "v2" && result != "v3") {
result = "v2"
}
return result, nil
}

func parseMachines(cfg map[string]interface{}) ([]string, error) {
result := []string{}
machines, ok := cfg["machines"]
Expand Down
34 changes: 30 additions & 4 deletions example/krakend.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,45 @@
"endpoints": [
{
"endpoint": "/github/{user}",
"method": "GET",
"output_encoding": "no-op",
"backend": [
{
"host": [
"github"
],
"url_pattern": "/",
"disable_host_sanitize": true,
"whitelist": [
"authorizations_url",
"code_search_url"
],
"sd": "etcd",

"sd": "etcd"
}
]
},
},
{
"endpoint": "/",
"method": "GET",
"extra_config": {},
"output_encoding": "no-op",
"concurrent_calls": 1,
"querystring_params":[
"*"
],
"backend": [
{
"url_pattern": "/",
"encoding": "no-op",
"extra_config": {},
"sd": "etcd",
"host": [
"github.com"
],
"disable_host_sanitize": true
}
]
},
{
"endpoint": "/combination/{id}",
"backend": [
Expand All @@ -30,6 +54,7 @@
"jsonplaceholder.typicode"
],
"url_pattern": "/posts?userId={id}",
"encoding": "no-op",
"is_collection": true,
"mapping": {
"collection": "posts"
Expand All @@ -53,7 +78,8 @@
],
"extra_config": {
"github.com/devopsfaith/krakend-etcd": {
"machines": [ "http://192.168.99.100:4001" ],
"machines": [ "http://192.168.110.111:2379" ],
"client_version": "v3",
"options": {
"dial_timeout": "5s",
"dial_keepalive": "30s",
Expand Down
Loading

0 comments on commit 4418916

Please sign in to comment.