diff --git a/client.go b/client.go index f6ef686..01a513b 100644 --- a/client.go +++ b/client.go @@ -7,48 +7,15 @@ import ( "io/ioutil" "net" "net/http" - "time" etcd "github.com/coreos/etcd/client" ) -// Code taken from https://github.com/go-kit/kit/blob/master/sd/etcd/client.go - -const defaultTTL = 3 * time.Second - -// Client is a wrapper around the etcd client. -type Client interface { - // GetEntries queries the given prefix in etcd and returns a slice - // containing the values of all keys found, recursively, underneath that - // prefix. - GetEntries(prefix string) ([]string, error) - - // WatchPrefix watches the given prefix in etcd for changes. When a change - // is detected, it will signal on the passed channel. Clients are expected - // to call GetEntries to update themselves with the latest set of complete - // values. WatchPrefix will always send an initial sentinel value on the - // channel after establishing the watch, to ensure that clients always - // receive the latest set of values. WatchPrefix will block until the - // context passed to the NewClient constructor is terminated. - WatchPrefix(prefix string, ch chan struct{}) -} - type client struct { keysAPI etcd.KeysAPI ctx context.Context } -// ClientOptions defines options for the etcd client. All values are optional. -// If any duration is not specified, a default of 3 seconds will be used. -type ClientOptions struct { - Cert string - Key string - CACert string - DialTimeout time.Duration - DialKeepAlive time.Duration - HeaderTimeoutPerRequest time.Duration -} - // NewClient returns Client with a connection to the named machines. It will // return an error if a connection to the cluster cannot be made. The parameter // machines needs to be a full URL with schemas. e.g. "http://localhost:2379" diff --git a/clientv3.go b/clientv3.go new file mode 100644 index 0000000..c21bb60 --- /dev/null +++ b/clientv3.go @@ -0,0 +1,109 @@ +package etcd + +import ( + "context" + "crypto/tls" + "crypto/x509" + "io/ioutil" + "time" + + etcdv3 "github.com/coreos/etcd/clientv3" +) + +type clientv3 struct { + client *etcdv3.Client + ctx context.Context + timeout time.Duration +} + +// NewClient returns Client with a connection to the named machines. It will +// return an error if a connection to the cluster cannot be made. The parameter +// machines needs to be a full URL with schemas. e.g. "http://localhost:2379" +// will work, but "localhost:2379" will not. +func NewClientV3(ctx context.Context, machines []string, options ClientOptions) (Client, error) { + if options.DialTimeout == 0 { + options.DialTimeout = defaultTTL + } + if options.DialKeepAlive == 0 { + options.DialKeepAlive = defaultTTL + } + if options.HeaderTimeoutPerRequest == 0 { + options.HeaderTimeoutPerRequest = defaultTTL + } + + var tlsCfg *tls.Config + if options.Cert != "" && options.Key != "" { + tlsCert, err := tls.LoadX509KeyPair(options.Cert, options.Key) + if err != nil { + return nil, err + } + tlsCfg = &tls.Config{ + Certificates: []tls.Certificate{tlsCert}, + } + if caCertCt, err := ioutil.ReadFile(options.CACert); err == nil { + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCertCt) + tlsCfg.RootCAs = caCertPool + } + } + + ce, err := etcdv3.New(etcdv3.Config{ + Endpoints: machines, + DialTimeout: options.DialTimeout, + DialKeepAliveTime: options.DialKeepAlive, + DialKeepAliveTimeout: options.HeaderTimeoutPerRequest, + TLS: tlsCfg, + }) + if err != nil { + return nil, err + } + + return &clientv3{ + client: ce, + ctx: ctx, + timeout: options.HeaderTimeoutPerRequest, + }, nil +} + +// GetEntries implements the etcd Client interface. +func (c *clientv3) GetEntries(key string) ([]string, error) { + + if c.client == nil { + return nil, ErrNilClient + } + + // set the timeout for this requisition + timeoutCtx, cancel := context.WithTimeout(c.ctx, c.timeout) + resp, err := c.client.Get(timeoutCtx, key, etcdv3.WithPrefix()) + cancel() + + if err != nil { + return nil, err + } + + // Special case. Note that it's possible that len(resp.Node.Nodes) == 0 and + // resp.Node.Value is also empty, in which case the key is empty and we + // should not return any entries. + if len(resp.Kvs) == 0 || resp.Count != int64(len(resp.Kvs)) { + return nil, nil + } + + entries := make([]string, resp.Count) + for i, ev := range resp.Kvs { + entries[i] = string(ev.Value[:]) + } + return entries, nil +} + +// WatchPrefix implements the etcd Client interface. +func (c *clientv3) WatchPrefix(prefix string, ch chan struct{}) { + + if c.client == nil { + return + } + watch := c.client.Watch(c.ctx, prefix, etcdv3.WithPrefix()) + ch <- struct{}{} // make sure caller invokes GetEntries + for _ := range watch { + ch <- struct{}{} + } +} diff --git a/clientv3_test.go b/clientv3_test.go new file mode 100644 index 0000000..1cf583b --- /dev/null +++ b/clientv3_test.go @@ -0,0 +1,86 @@ +package etcd + +import ( + "context" + "testing" + "time" +) + +func TestNewClient_withDefaultsV3(t *testing.T) { + client, err := NewClientV3( + context.Background(), + []string{"http://irrelevant:12345"}, + ClientOptions{}, + ) + if err == nil { + t.Fatalf("unexpected error creating client: %v", err) + } + if client != nil { + t.Fatal("expected new Client, got nil") + } +} + +// NewClient should fail when providing invalid or missing endpoints. +func TestOptionsV3(t *testing.T) { + a, err := NewClientV3( + context.Background(), + []string{}, + ClientOptions{ + Cert: "", + Key: "", + CACert: "", + DialTimeout: 2 * time.Second, + DialKeepAlive: 2 * time.Second, + HeaderTimeoutPerRequest: 2 * time.Second, + }, + ) + if err == nil { + t.Errorf("expected error: %v", err) + } + if a != nil { + t.Fatalf("expected client to be nil on failure") + } + + _, err = NewClientV3( + context.Background(), + []string{"http://irrelevant:12345"}, + ClientOptions{ + Cert: "blank.crt", + Key: "blank.key", + CACert: "blank.CACert", + DialTimeout: 2 * time.Second, + DialKeepAlive: 2 * time.Second, + HeaderTimeoutPerRequest: 2 * time.Second, + }, + ) + if err == nil { + t.Errorf("expected error: %v", err) + } +} + +// --------------------------------------------------------------------------------------------------------------------- + +func newFakeClientV3(ctx context.Context) Client { + return &clientv3{ + client: nil, + ctx: ctx, + timeout: 3 * time.Second, + } +} + +func TestWatchPrefixV3(t *testing.T) { + + cv3 := newFakeClientV3(context.Background()) + + ch := make(chan struct{}) + cv3.WatchPrefix("prefix", ch) +} + +func TestGetEntriesV3(t *testing.T) { + cv3 := newFakeClientV3(context.Background()) + + res, err := cv3.GetEntries("prefix") + if res != nil || err == nil { + t.Errorf("expected client error") + } +} diff --git a/config.go b/config.go index 99f2ed5..f9ded8c 100644 --- a/config.go +++ b/config.go @@ -8,6 +8,39 @@ import ( "github.com/devopsfaith/krakend/config" ) +// Code taken from https://github.com/go-kit/kit/blob/master/sd/etcd/client.go + +const defaultTTL = 3 * time.Second + +// Client is a wrapper around the etcd client. +type Client interface { + // GetEntries queries the given prefix in etcd and returns a slice + // containing the values of all keys found, recursively, underneath that + // prefix. + GetEntries(prefix string) ([]string, error) + + // WatchPrefix watches the given prefix in etcd for changes. When a change + // is detected, it will signal on the passed channel. Clients are expected + // to call GetEntries to update themselves with the latest set of complete + // values. WatchPrefix will always send an initial sentinel value on the + // channel after establishing the watch, to ensure that clients always + // receive the latest set of values. WatchPrefix will block until the + // context passed to the NewClient constructor is terminated. + WatchPrefix(prefix string, ch chan struct{}) +} + +// ClientOptions defines options for the etcd client. All values are optional. +// If any duration is not specified, a default of 3 seconds will be used. +type ClientOptions struct { + Cert string + Key string + CACert string + DialTimeout time.Duration + DialKeepAlive time.Duration + DialKeepAliveTimeout time.Duration + HeaderTimeoutPerRequest time.Duration +} + // Namespace is the key to use to store and access the custom config data const Namespace = "github.com/devopsfaith/krakend-etcd" @@ -18,6 +51,8 @@ var ( ErrBadConfig = fmt.Errorf("unable to create the etcd client with the received config") // ErrNoMachines is the error to be returned when the config has not defined one or more servers ErrNoMachines = fmt.Errorf("unable to create the etcd client without a set of servers") + // ErrNilClient is the error to be nil client + ErrNilClient = fmt.Errorf("nil etcd client") ) // New creates an etcd client with the config extracted from the extra config param @@ -34,10 +69,29 @@ func New(ctx context.Context, e config.ExtraConfig) (Client, error) { if err != nil { return nil, err } + version, err := parseVersion(tmp) + if err != nil { + return nil, err + } + if version == "v3" { + return NewClientV3(ctx, machines, parseOptions(tmp)) + } return NewClient(ctx, machines, parseOptions(tmp)) } +func parseVersion(cfg map[string]interface{}) (string, error) { + value, ok := cfg["client_version"] + if !ok { + return "v2", nil + } + result, ok := value.(string) + if !ok || (result != "v2" && result != "v3") { + result = "v2" + } + return result, nil +} + func parseMachines(cfg map[string]interface{}) ([]string, error) { result := []string{} machines, ok := cfg["machines"] diff --git a/example/krakend.json b/example/krakend.json index 58c7ade..3af1253 100644 --- a/example/krakend.json +++ b/example/krakend.json @@ -7,21 +7,45 @@ "endpoints": [ { "endpoint": "/github/{user}", + "method": "GET", + "output_encoding": "no-op", "backend": [ { "host": [ "github" ], "url_pattern": "/", + "disable_host_sanitize": true, "whitelist": [ "authorizations_url", "code_search_url" ], - "sd": "etcd", - + "sd": "etcd" } ] - }, + }, + { + "endpoint": "/", + "method": "GET", + "extra_config": {}, + "output_encoding": "no-op", + "concurrent_calls": 1, + "querystring_params":[ + "*" + ], + "backend": [ + { + "url_pattern": "/", + "encoding": "no-op", + "extra_config": {}, + "sd": "etcd", + "host": [ + "github.com" + ], + "disable_host_sanitize": true + } + ] + }, { "endpoint": "/combination/{id}", "backend": [ @@ -30,6 +54,7 @@ "jsonplaceholder.typicode" ], "url_pattern": "/posts?userId={id}", + "encoding": "no-op", "is_collection": true, "mapping": { "collection": "posts" @@ -53,7 +78,8 @@ ], "extra_config": { "github.com/devopsfaith/krakend-etcd": { - "machines": [ "http://192.168.99.100:4001" ], + "machines": [ "http://192.168.110.111:2379" ], + "client_version": "v3", "options": { "dial_timeout": "5s", "dial_keepalive": "30s", diff --git a/example/main.go b/example/main.go index 2ca055b..11088e6 100644 --- a/example/main.go +++ b/example/main.go @@ -6,14 +6,13 @@ import ( "log" "os" - "github.com/gin-gonic/gin" - "github.com/devopsfaith/krakend-etcd" - "github.com/devopsfaith/krakend-viper" "github.com/devopsfaith/krakend/config" "github.com/devopsfaith/krakend/logging" "github.com/devopsfaith/krakend/proxy" + "github.com/devopsfaith/krakend/router" krakendgin "github.com/devopsfaith/krakend/router/gin" + "github.com/gin-gonic/gin" ) func main() { @@ -23,7 +22,7 @@ func main() { configFile := flag.String("c", "/etc/krakend/configuration.json", "Path to the configuration filename") flag.Parse() - parser := viper.New() + parser := config.NewParser() serviceConfig, err := parser.Parse(*configFile) if err != nil { log.Fatal("ERROR:", err.Error()) @@ -54,6 +53,7 @@ func main() { logger, proxy.DefaultFactoryWithSubscriber(logger, etcd.SubscriberFactory(ctx, etcdClient)), }, + RunServer: router.RunServer, }) routerFactory.NewWithContext(ctx).Run(serviceConfig)