diff --git a/libbeat/common/cli/cli.go b/libbeat/common/cli/cli.go index 54e0ac8704ad..6785ea3de656 100644 --- a/libbeat/common/cli/cli.go +++ b/libbeat/common/cli/cli.go @@ -45,3 +45,14 @@ func RunWith( } } } + +// GetEnvOr return the value of the environment variable if the value is set, if its not set it will +// return the default value. +// +// Note: if the value is set but it is an empty string we will return the empty string. +func GetEnvOr(name, def string) string { + if env, ok := os.LookupEnv(name); ok { + return env + } + return def +} diff --git a/x-pack/beatless/beatless b/x-pack/beatless/beatless new file mode 100755 index 000000000000..00e8f46d7009 Binary files /dev/null and b/x-pack/beatless/beatless differ diff --git a/x-pack/beatless/data/meta.json b/x-pack/beatless/data/meta.json new file mode 100644 index 000000000000..fe6c915da01a --- /dev/null +++ b/x-pack/beatless/data/meta.json @@ -0,0 +1 @@ +{"uuid":"fd4356a8-16c9-4f5b-9261-e370617be071"} diff --git a/x-pack/beatless/licenser/1 b/x-pack/beatless/licenser/1 new file mode 100644 index 000000000000..b0741cc9d190 --- /dev/null +++ b/x-pack/beatless/licenser/1 @@ -0,0 +1,43 @@ +{ + "build": { + "hash": "595516e", + "date": "2018-08-17T23:22:27.102119Z" + }, + "license": { + "uid": "936183d8-f48c-4a3f-959a-a52aa2563279", + "type": "trial", + "mode": "trial", + "status": "active", + "expiry_date_in_millis": 1538060781728 + }, + "features": { + "graph": { + "available": false, + "enabled": true + }, + "logstash": { + "available": false, + "enabled": true + }, + "ml": { + "available": false, + "enabled": true + }, + "monitoring": { + "available": true, + "enabled": true + }, + "rollup": { + "available": true, + "enabled": true + }, + "security": { + "available": false, + "enabled": true + }, + "watcher": { + "available": false, + "enabled": true + } + } +} diff --git a/x-pack/beatless/licenser/callback_watcher.go b/x-pack/beatless/licenser/callback_watcher.go new file mode 100644 index 000000000000..054ad6b22aae --- /dev/null +++ b/x-pack/beatless/licenser/callback_watcher.go @@ -0,0 +1,29 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package licenser + +// CallbackWatcher defines an addhoc listener for events generated by the manager. +type CallbackWatcher struct { + New func(License) + Stopped func() +} + +// OnNewLicense is called when a new license is set in the manager. +func (cb *CallbackWatcher) OnNewLicense(license License) { + if cb.New == nil { + return + } + cb.New(license) +} + +// OnManagerStopped is called when the manager is stopped, watcher are expected to terminates any +// features that depends on a specific license. +func (cb *CallbackWatcher) OnManagerStopped() { + if cb.Stopped == nil { + return + } + + cb.Stopped() +} diff --git a/x-pack/beatless/licenser/callback_watcher_test.go b/x-pack/beatless/licenser/callback_watcher_test.go new file mode 100644 index 000000000000..4307f7abd9da --- /dev/null +++ b/x-pack/beatless/licenser/callback_watcher_test.go @@ -0,0 +1,30 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package licenser + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestCallbackWatcher(t *testing.T) { + t.Run("when no callback is set do not execute anything", func(t *testing.T) { + w := &CallbackWatcher{} + w.OnNewLicense(License{}) + w.OnManagerStopped() + }) + + t.Run("proxy call to callback function", func(t *testing.T) { + c := 0 + w := &CallbackWatcher{ + New: func(license License) { c++ }, + Stopped: func() { c++ }, + } + w.OnNewLicense(License{}) + w.OnManagerStopped() + assert.Equal(t, 2, c) + }) +} diff --git a/x-pack/beatless/licenser/data/x-pack-trial-6.4.0.json b/x-pack/beatless/licenser/data/x-pack-trial-6.4.0.json new file mode 100644 index 000000000000..b0741cc9d190 --- /dev/null +++ b/x-pack/beatless/licenser/data/x-pack-trial-6.4.0.json @@ -0,0 +1,43 @@ +{ + "build": { + "hash": "595516e", + "date": "2018-08-17T23:22:27.102119Z" + }, + "license": { + "uid": "936183d8-f48c-4a3f-959a-a52aa2563279", + "type": "trial", + "mode": "trial", + "status": "active", + "expiry_date_in_millis": 1538060781728 + }, + "features": { + "graph": { + "available": false, + "enabled": true + }, + "logstash": { + "available": false, + "enabled": true + }, + "ml": { + "available": false, + "enabled": true + }, + "monitoring": { + "available": true, + "enabled": true + }, + "rollup": { + "available": true, + "enabled": true + }, + "security": { + "available": false, + "enabled": true + }, + "watcher": { + "available": false, + "enabled": true + } + } +} diff --git a/x-pack/beatless/licenser/data/xpack-6.4.0.json b/x-pack/beatless/licenser/data/xpack-6.4.0.json new file mode 100644 index 000000000000..a527dec82f35 --- /dev/null +++ b/x-pack/beatless/licenser/data/xpack-6.4.0.json @@ -0,0 +1,42 @@ +{ + "build": { + "hash": "053779d", + "date": "2018-07-20T05:25:16.206115Z" + }, + "license": { + "uid": "936183d8-f48c-4a3f-959a-a52aa2563279", + "type": "platinum", + "mode": "platinum", + "status": "active" + }, + "features": { + "graph": { + "available": false, + "enabled": true + }, + "logstash": { + "available": false, + "enabled": true + }, + "ml": { + "available": false, + "enabled": true + }, + "monitoring": { + "available": true, + "enabled": true + }, + "rollup": { + "available": true, + "enabled": true + }, + "security": { + "available": false, + "enabled": true + }, + "watcher": { + "available": false, + "enabled": true + } + } +} diff --git a/x-pack/beatless/licenser/elastic_fetcher.go b/x-pack/beatless/licenser/elastic_fetcher.go new file mode 100644 index 000000000000..47087309885e --- /dev/null +++ b/x-pack/beatless/licenser/elastic_fetcher.go @@ -0,0 +1,147 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package licenser + +import ( + "encoding/json" + "fmt" + "net/http" + "strconv" + "time" + + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/outputs/elasticsearch" +) + +const xPackURL = "/_xpack" + +// params defaults query parameters to send to the '_xpack' endpoint by default we only need +// machine parseable data. +var params = map[string]string{ + "human": "false", +} + +var stateLookup = map[string]State{ + "inactive": Inactive, + "active": Active, +} + +var licenseLookup = map[string]LicenseType{ + "oss": OSS, + "trial": Trial, + "basic": Basic, + "gold": Gold, + "platinum": Platinum, +} + +// UnmarshalJSON takes a bytes array and convert it to the appropriate license type. +func (t *LicenseType) UnmarshalJSON(b []byte) error { + if len(b) <= 2 { + return fmt.Errorf("invalid string for license type, received: '%s'", string(b)) + } + s := string(b[1 : len(b)-1]) + if license, ok := licenseLookup[s]; ok { + *t = license + return nil + } + + return fmt.Errorf("unknown license type, received: '%s'", s) +} + +// UnmarshalJSON takes a bytes array and convert it to the appropriate state. +func (st *State) UnmarshalJSON(b []byte) error { + // we are only interested in the content between the quotes. + if len(b) <= 2 { + return fmt.Errorf("invalid string for state, received: '%s'", string(b)) + } + + s := string(b[1 : len(b)-1]) + if state, ok := stateLookup[s]; ok { + *st = state + return nil + } + return fmt.Errorf("unknown state, received: '%s'", s) +} + +// UnmarshalJSON takes a bytes array and transform the int64 to a golang time. +func (et *expiryTime) UnmarshalJSON(b []byte) error { + if len(b) < 0 { + return fmt.Errorf("invalid value for expiry time, received: '%s'", string(b)) + } + + ts, err := strconv.Atoi(string(b)) + if err != nil { + return errors.Wrap(err, "could not parse value for expiry time") + } + + *et = expiryTime(time.Unix(0, int64(time.Millisecond)*int64(ts)).UTC()) + return nil +} + +// ElasticFetcher wraps an elasticsearch clients to retrieve licensing information +// on a specific cluster. +type ElasticFetcher struct { + client *elasticsearch.Client + log *logp.Logger +} + +// NewElasticFetcher creates a new Elastic Fetcher +func NewElasticFetcher(client *elasticsearch.Client) *ElasticFetcher { + return &ElasticFetcher{client: client, log: logp.NewLogger("elasticfetcher")} +} + +// Fetch retrieves the license information from an Elasticsearch Client, it will call the `_xpack` +// end point and will return a parsed license. If the `_xpack` endpoint is unreacheable we will +// return the OSS License otherwise we return an error. +func (f *ElasticFetcher) Fetch() (*License, error) { + status, body, err := f.client.Request("GET", xPackURL, "", params, nil) + // When we are running an OSS release of elasticsearch the _xpack endpoint will return a 405, + // "Method Not Allowed", so we return the default OSS license. + if status == http.StatusMethodNotAllowed { + f.log.Debug("received 'Method Not allowed' (405) response from server, fallback to OSS license") + return OSSLicense, nil + } + + if status == http.StatusUnauthorized { + return nil, errors.New("Unauthorized access, could not connect to the xpack endpoint, verify your credentials") + } + + if status != http.StatusOK { + return nil, fmt.Errorf("could not retrieve license information, response code: %d", status) + } + + if err != nil { + return nil, errors.Wrap(err, "could not retrieve the license information from the cluster") + } + + license, err := f.parseJSON(body) + if err != nil { + f.log.Debugw("invalid response from server", "body", string(body)) + return nil, errors.Wrap(err, "could not extract license information from the server response") + } + + return license, nil +} + +// Xpack Response, temporary struct to merge the features into the license struct. +type xpackResponse struct { + License License `json:"license"` + Features features `json:"features"` +} + +func (f *ElasticFetcher) parseJSON(b []byte) (*License, error) { + info := &xpackResponse{} + + if err := json.Unmarshal(b, info); err != nil { + return nil, err + } + + license := info.License + license.Features = info.Features + + return &license, nil +} diff --git a/x-pack/beatless/licenser/elastic_fetcher_integration_test.go b/x-pack/beatless/licenser/elastic_fetcher_integration_test.go new file mode 100644 index 000000000000..b8def938cc30 --- /dev/null +++ b/x-pack/beatless/licenser/elastic_fetcher_integration_test.go @@ -0,0 +1,63 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// +build integration + +package licenser + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/common/cli" + "github.com/elastic/beats/libbeat/outputs/elasticsearch" + "github.com/elastic/beats/libbeat/outputs/outil" +) + +const ( + elasticsearchHost = "localhost" + elasticsearchPort = "9200" +) + +func getTestClient() *elasticsearch.Client { + host := "http://" + cli.GetEnvOr("ES_HOST", elasticsearchHost) + ":" + cli.GetEnvOr("ES_POST", elasticsearchPort) + client, err := elasticsearch.NewClient(elasticsearch.ClientSettings{ + URL: host, + Index: outil.MakeSelector(), + Username: cli.GetEnvOr("ES_USER", ""), + Password: cli.GetEnvOr("ES_PASS", ""), + Timeout: 60 * time.Second, + CompressionLevel: 3, + }, nil) + + if err != nil { + panic(err) + } + return client +} + +// Sanity check for schema change on the HTTP response from a live Elasticsearch instance. +func TestElasticsearch(t *testing.T) { + f := NewElasticFetcher(getTestClient()) + license, err := f.Fetch() + if !assert.NoError(t, err) { + return + } + + assert.NotNil(t, license.Get()) + assert.NotNil(t, license.Type) + assert.Equal(t, Active, license.Status) + + assert.NotEmpty(t, license.UUID) + + assert.NotNil(t, license.Features.Graph) + assert.NotNil(t, license.Features.Logstash) + assert.NotNil(t, license.Features.ML) + assert.NotNil(t, license.Features.Monitoring) + assert.NotNil(t, license.Features.Rollup) + assert.NotNil(t, license.Features.Security) + assert.NotNil(t, license.Features.Watcher) +} diff --git a/x-pack/beatless/licenser/elastic_fetcher_test.go b/x-pack/beatless/licenser/elastic_fetcher_test.go new file mode 100644 index 000000000000..d1041573a246 --- /dev/null +++ b/x-pack/beatless/licenser/elastic_fetcher_test.go @@ -0,0 +1,179 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package licenser + +import ( + "encoding/json" + "io/ioutil" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "testing" + "time" + + uuid "github.com/satori/go.uuid" + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/outputs/elasticsearch" +) + +func newServerClientPair(t *testing.T, handler http.HandlerFunc) (*httptest.Server, *elasticsearch.Client) { + mux := http.NewServeMux() + mux.Handle("/_xpack/", http.HandlerFunc(handler)) + + server := httptest.NewServer(mux) + + client, err := elasticsearch.NewClient(elasticsearch.ClientSettings{URL: server.URL}, nil) + if err != nil { + t.Fatalf("could not create the elasticsearch client, error: %s", err) + } + + return server, client +} + +func TestParseJSON(t *testing.T) { + t.Run("OSS release of Elasticsearch", func(t *testing.T) { + h := func(w http.ResponseWriter, r *http.Request) { + http.Error(w, "Method Not Allowed", 405) + } + s, c := newServerClientPair(t, h) + defer s.Close() + defer c.Close() + + fetcher := NewElasticFetcher(c) + oss, err := fetcher.Fetch() + if assert.NoError(t, err) { + return + } + + assert.Equal(t, OSSLicense, oss) + }) + + t.Run("malformed JSON", func(t *testing.T) { + h := func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte("hello bad JSON")) + } + s, c := newServerClientPair(t, h) + defer s.Close() + defer c.Close() + + fetcher := NewElasticFetcher(c) + _, err := fetcher.Fetch() + assert.Error(t, err) + }) + + t.Run("401 response", func(t *testing.T) { + h := func(w http.ResponseWriter, r *http.Request) { + http.Error(w, "Unauthorized", 401) + } + s, c := newServerClientPair(t, h) + defer s.Close() + defer c.Close() + + fetcher := NewElasticFetcher(c) + _, err := fetcher.Fetch() + assert.Equal(t, err.Error(), "Unauthorized access, could not connect to the xpack endpoint, verify your credentials") + }) + + t.Run("any error from the server", func(t *testing.T) { + h := func(w http.ResponseWriter, r *http.Request) { + http.Error(w, "Not found", 404) + } + s, c := newServerClientPair(t, h) + defer s.Close() + defer c.Close() + + fetcher := NewElasticFetcher(c) + _, err := fetcher.Fetch() + assert.Error(t, err) + }) + + t.Run("200 response", func(t *testing.T) { + filepath.Walk("data/", func(path string, i os.FileInfo, err error) error { + if i.IsDir() { + return nil + } + + t.Run(path, func(t *testing.T) { + h := func(w http.ResponseWriter, r *http.Request) { + json, err := ioutil.ReadFile(path) + if err != nil { + t.Fatal("could not read JSON") + } + w.Write(json) + } + + s, c := newServerClientPair(t, h) + defer s.Close() + defer c.Close() + + fetcher := NewElasticFetcher(c) + license, err := fetcher.Fetch() + if !assert.NoError(t, err) { + return + } + + id, _ := uuid.FromString("936183d8-f48c-4a3f-959a-a52aa2563279") + assert.Equal(t, id, license.UUID) + + assert.NotNil(t, license.Type) + assert.NotNil(t, license.Mode) + assert.NotNil(t, license.Status) + + assert.False(t, license.Features.Graph.Available) + assert.True(t, license.Features.Graph.Enabled) + + assert.False(t, license.Features.Logstash.Available) + assert.True(t, license.Features.Logstash.Enabled) + + assert.False(t, license.Features.ML.Available) + assert.True(t, license.Features.ML.Enabled) + + assert.True(t, license.Features.Monitoring.Available) + assert.True(t, license.Features.Monitoring.Enabled) + + assert.True(t, license.Features.Rollup.Available) + assert.True(t, license.Features.Rollup.Enabled) + + assert.False(t, license.Features.Security.Available) + assert.True(t, license.Features.Security.Enabled) + + assert.False(t, license.Features.Watcher.Available) + assert.True(t, license.Features.Watcher.Enabled) + }) + + return nil + }) + }) + + t.Run("parse milliseconds", func(t *testing.T) { + t.Run("invalid", func(t *testing.T) { + b := []byte("{ \"v\": \"\"}") + ts := struct { + V expiryTime `json:"v"` + }{} + + err := json.Unmarshal(b, &ts) + assert.Error(t, err) + }) + + t.Run("valid", func(t *testing.T) { + b := []byte("{ \"v\": 1538060781728 }") + ts := struct { + V expiryTime `json:"v"` + }{} + + err := json.Unmarshal(b, &ts) + if !assert.NoError(t, err) { + return + } + + // 2018-09-27 15:06:21.728 +0000 UTC + d := time.Date(2018, 9, 27, 15, 6, 21, 728000000, time.UTC).Sub((time.Time(ts.V))) + assert.Equal(t, time.Duration(0), d) + }) + }) +} diff --git a/x-pack/beatless/licenser/license.go b/x-pack/beatless/licenser/license.go new file mode 100644 index 000000000000..7ddabb0bcb09 --- /dev/null +++ b/x-pack/beatless/licenser/license.go @@ -0,0 +1,119 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package licenser + +import ( + "time" + + uuid "github.com/satori/go.uuid" +) + +// License represents the license of this beat, the license is fetched and returned from +// the elasticsearch cluster. +// +// The x-pack endpoint returns the following JSON response. +// +// "license": { +// "uid": "936183d8-f48c-4a3f-959a-a52aa2563279", +// "type": "platinum", +// "mode": "platinum", +// "status": "active" +// }, +// +// Definition: +// type is the installed license. +// mode is the license in operation. (effective license) +// status is the type installed is active or not. +type License struct { + UUID uuid.UUID `json:"uid"` + Type LicenseType `json:"type"` + Mode LicenseType `json:"mode"` + Status State `json:"status"` + Features features `json:"features"` + TrialExpiry expiryTime `json:"expiry_date_in_millis,omitempty"` +} + +// Features defines the list of features exposed by the elasticsearch cluster. +type features struct { + Graph graph `json:"graph"` + Logstash logstash `json:"logstash"` + ML ml `json:"ml"` + Monitoring monitoring `json:"monitoring"` + Rollup rollup `json:"rollup"` + Security security `json:"security"` + Watcher watcher `json:"watcher"` +} + +type expiryTime time.Time + +// Base define the field common for every feature. +type Base struct { + Enabled bool `json:"enabled"` + Available bool `json:"available"` +} + +// Defines all the avaiables features +type graph struct{ *Base } +type logstash struct{ *Base } +type ml struct{ *Base } +type monitoring struct{ *Base } +type rollup struct{ *Base } +type security struct{ *Base } +type watcher struct{ *Base } + +// Get return the current license +func (l *License) Get() LicenseType { + return l.Mode +} + +// Cover returns true if the provided license is included in the range of license. +// +// Basic -> match basic, gold and platinum +// gold -> match gold and platinum +// platinum -> match platinum only +func (l *License) Cover(license LicenseType) bool { + if l.Mode >= license { + return true + } + return false +} + +// Is returns true if the provided license is an exact match. +func (l *License) Is(license LicenseType) bool { + return l.Mode == license +} + +// IsActive returns true if the current license from the server is active. +func (l *License) IsActive() bool { + return l.Status == Active +} + +// IsTrial returns true if the remote cluster is in trial mode. +func (l *License) IsTrial() bool { + return l.Mode == Trial +} + +// IsTrialExpired returns false if the we are not in trial mode and when we are in trial mode +// we check for the expiry data. +func (l *License) IsTrialExpired() bool { + if !l.IsTrial() { + return false + } + + if time.Time(l.TrialExpiry).Sub(time.Now()) > 0 { + return false + } + + return true +} + +// EqualTo returns true if the two license are the same, we compare license to reduce the number +// message send to the watchers. +func (l *License) EqualTo(other *License) bool { + return l.UUID == other.UUID && + l.Type == other.Type && + l.Mode == other.Mode && + l.Status == other.Status +} diff --git a/x-pack/beatless/licenser/license_test.go b/x-pack/beatless/licenser/license_test.go new file mode 100644 index 000000000000..174d762851b8 --- /dev/null +++ b/x-pack/beatless/licenser/license_test.go @@ -0,0 +1,197 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package licenser + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestLicenseGet(t *testing.T) { + tests := []struct { + name string + t LicenseType + }{ + { + name: "Basic", + t: Basic, + }, + { + name: "Platinum", + t: Platinum, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + l := License{Mode: test.t} + assert.Equal(t, test.t, l.Get()) + }) + } +} + +func TestLicenseIs(t *testing.T) { + tests := []struct { + name string + t LicenseType + query LicenseType + expected bool + }{ + { + name: "Basic and asking for Basic", + t: Basic, + query: Basic, + expected: true, + }, + { + name: "Platinum and asking for Basic", + t: Platinum, + query: Basic, + expected: true, + }, + { + name: "Basic and asking for Platinum", + t: Basic, + query: Platinum, + expected: false, + }, + { + name: "Gold and asking for Gold", + t: Gold, + query: Gold, + expected: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + l := License{Mode: test.t} + assert.Equal(t, test.expected, l.Cover(test.query)) + }) + } +} + +func TestLicenseIsStrict(t *testing.T) { + tests := []struct { + name string + t LicenseType + query LicenseType + expected bool + }{ + { + name: "Basic and asking for Basic", + t: Basic, + query: Basic, + expected: true, + }, + { + name: "Platinum and asking for Basic", + t: Platinum, + query: Basic, + expected: false, + }, + { + name: "Basic and asking for Platinum", + t: Basic, + query: Platinum, + expected: false, + }, + { + name: "Gold and asking for Gold", + t: Gold, + query: Gold, + expected: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + l := License{Mode: test.t} + assert.Equal(t, test.expected, l.Is(test.query)) + }) + } +} + +func TestIsActive(t *testing.T) { + tests := []struct { + name string + l License + expected bool + }{ + { + name: "active", + l: License{Status: Active}, + expected: true, + }, + { + name: "inactive", + l: License{Status: Inactive}, + expected: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + assert.Equal(t, test.expected, test.l.IsActive()) + }) + } +} + +func TestIsTrial(t *testing.T) { + tests := []struct { + name string + l License + expected bool + }{ + { + name: "is a trial license", + l: License{Mode: Trial}, + expected: true, + }, + { + name: "is not a trial license", + l: License{Mode: Basic}, + expected: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + assert.Equal(t, test.expected, test.l.IsTrial()) + }) + } +} + +func TestIsTrialExpired(t *testing.T) { + tests := []struct { + name string + l License + expected bool + }{ + { + name: "trial is expired", + l: License{Mode: Trial, TrialExpiry: expiryTime(time.Now().Add(-2 * time.Hour))}, + expected: true, + }, + { + name: "trial is not expired", + l: License{Mode: Trial, TrialExpiry: expiryTime(time.Now().Add(2 * time.Minute))}, + expected: false, + }, + { + name: "license is not on trial", + l: License{Mode: Basic, TrialExpiry: expiryTime(time.Now().Add(2 * time.Minute))}, + expected: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + assert.Equal(t, test.expected, test.l.IsTrialExpired()) + }) + } +} diff --git a/x-pack/beatless/licenser/licensetype_string.go b/x-pack/beatless/licenser/licensetype_string.go new file mode 100644 index 000000000000..400e425c6c17 --- /dev/null +++ b/x-pack/beatless/licenser/licensetype_string.go @@ -0,0 +1,20 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// Code generated by "stringer -type=LicenseType -linecomment=true"; DO NOT EDIT. + +package licenser + +import "strconv" + +const _LicenseType_name = "Open sourceTrialBasicGoldPlatinum" + +var _LicenseType_index = [...]uint8{0, 11, 16, 21, 25, 33} + +func (i LicenseType) String() string { + if i < 0 || i >= LicenseType(len(_LicenseType_index)-1) { + return "LicenseType(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _LicenseType_name[_LicenseType_index[i]:_LicenseType_index[i+1]] +} diff --git a/x-pack/beatless/licenser/manager.go b/x-pack/beatless/licenser/manager.go new file mode 100644 index 000000000000..f94f4cf5d069 --- /dev/null +++ b/x-pack/beatless/licenser/manager.go @@ -0,0 +1,311 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package licenser + +import ( + "errors" + "math/rand" + "sync" + "time" + + uuid "github.com/satori/go.uuid" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/outputs/elasticsearch" +) + +// OSSLicense default license to use. +var ( + OSSLicense = &License{ + UUID: uuid.NewV4(), + Type: OSS, + Mode: OSS, + Status: Active, + Features: features{ + Graph: graph{}, + Logstash: logstash{}, + ML: ml{}, + Monitoring: monitoring{}, + Rollup: rollup{}, + Security: security{}, + Watcher: watcher{}, + }, + } +) + +// Watcher allows a type to receive a new event when a new license is received. +type Watcher interface { + OnNewLicense(license License) + OnManagerStopped() +} + +// Fetcher interface implements the mechanism to retrieve a License. Currently we only +// support license coming from the '/_xpack' rest api. +type Fetcher interface { + Fetch() (*License, error) +} + +// Errors returned by the manager. +var ( + ErrWatcherAlreadyExist = errors.New("watcher already exist") + ErrWatcherDoesntExist = errors.New("watcher doesn't exist") + + ErrManagerStopped = errors.New("license manager is stopped") + ErrNoLicenseFound = errors.New("no license found") +) + +// Backoff values when the remote cluster is not responding. +var ( + maxBackoff = time.Duration(60) + initBackoff = time.Duration(5) + jitterCap = 1000 // 1000 milliseconds +) + +// Manager keeps tracks of license management, it uses a fetcher usually the ElasticFetcher to +// retrieve a licence from a specific cluster. +// +// Starting the manager will start a go routine to periodically query the license fetcher. +// if an error occur on the fetcher we will retry until we successfully +// receive a new license. During that period we start a grace counter, we assume the license is +// still valid during the grace period, when this period expire we will keep retrying but the previous +// license will be invalidated and we will fallback to the OSS license. +// +// Retrieving the current license: +// - Call the `Get()` on the manager instance. +// - Or register a `Watcher` with the manager to receive the new license and acts on it, you will +// also receive an event when the Manager is stopped. +// +// +// Notes: +// - When the manager is started no license is set by default. +// - When a license is invalidated, we fallback to the OSS License and the watchers get notified. +// - Adding a watcher will automatically send the current license to the newly added watcher if +// available. +type Manager struct { + done chan struct{} + sync.RWMutex + wg sync.WaitGroup + fetcher Fetcher + duration time.Duration + gracePeriod time.Duration + license *License + watchers map[Watcher]Watcher + log *logp.Logger +} + +// New takes an elasticsearch client and wraps it into a fetcher, the fetch will handle the JSON +// and response code from the cluster. +func New(client *elasticsearch.Client, duration time.Duration, gracePeriod time.Duration) *Manager { + fetcher := NewElasticFetcher(client) + return NewWithFetcher(fetcher, duration, gracePeriod) +} + +// NewWithFetcher takes a fetcher and return a license manager. +func NewWithFetcher(fetcher Fetcher, duration time.Duration, gracePeriod time.Duration) *Manager { + m := &Manager{ + fetcher: fetcher, + duration: duration, + log: logp.NewLogger("license-manager"), + done: make(chan struct{}), + gracePeriod: gracePeriod, + watchers: make(map[Watcher]Watcher), + } + + return m +} + +// AddWatcher register a new watcher to receive events when the license is retrieved or when the manager +// is closed. +func (m *Manager) AddWatcher(watcher Watcher) error { + m.Lock() + defer m.Unlock() + + if _, ok := m.watchers[watcher]; ok { + return ErrWatcherAlreadyExist + } + + m.watchers[watcher] = watcher + + // when we register a new watchers send the current license unless we did not retrieve it. + if m.license != nil { + watcher.OnNewLicense(*m.license) + } + return nil +} + +// RemoveWatcher removes the watcher if it exist or return an error. +func (m *Manager) RemoveWatcher(watcher Watcher) error { + m.Lock() + defer m.Unlock() + if _, ok := m.watchers[watcher]; ok { + delete(m.watchers, watcher) + return nil + } + return ErrWatcherDoesntExist +} + +// Get return the current active license, it can return an error if the manager is stopped or when +// there is no license in the manager, Instead of querying the Manager it is easier to register a +// watcher to listen to license change. +func (m *Manager) Get() (*License, error) { + m.Lock() + defer m.Unlock() + + select { + case <-m.done: + return nil, ErrManagerStopped + default: + if m.license == nil { + return nil, ErrNoLicenseFound + } + return m.license, nil + } +} + +// Start starts the License manager, the manager will start a go routine to periodically +// retrieve the license from the fetcher. +func (m *Manager) Start() { + // First update should be in sync at startup to ensure a + // consistent state. + m.log.Info("license manager started, no license found.") + m.wg.Add(1) + go m.worker() +} + +// Stop terminates the license manager, the go routine will be stopped and the cached license will +// be removed and no more checks can be done on the manager. +func (m *Manager) Stop() { + select { + case <-m.done: + m.log.Error("license manager already stopped") + default: + } + + defer m.log.Info("license manager stopped") + defer m.notify(func(w Watcher) { + w.OnManagerStopped() + }) + + // stop the periodic check license and wait for it to complete + close(m.done) + m.wg.Wait() + + // invalidate current license + m.Lock() + defer m.Unlock() + m.license = nil +} + +func (m *Manager) notify(op func(Watcher)) { + m.RLock() + defer m.RUnlock() + + if len(m.watchers) == 0 { + m.log.Debugf("no watchers configured") + return + } + + m.log.Debugf("notifying %d watchers", len(m.watchers)) + for _, w := range m.watchers { + op(w) + } +} + +func (m *Manager) worker() { + defer m.wg.Done() + m.log.Debug("starting periodic license check") + defer m.log.Debug("periodic license check is stopped") + + jitter := rand.Intn(jitterCap) + + // Add some jitter to space requests from a large fleet of beats. + select { + case <-time.After(time.Duration(jitter) * time.Millisecond): + } + + // eager initial check. + m.update() + + // periodically checks license. + for { + select { + case <-m.done: + return + case <-time.After(m.duration): + m.log.Debug("license is too old, updating, grace period: %s", m.gracePeriod) + m.update() + } + } +} + +func (m *Manager) update() { + backoff := common.NewBackoff(m.done, initBackoff, maxBackoff) + startedAt := time.Now() + for { + select { + case <-m.done: + return + default: + license, err := m.fetcher.Fetch() + if err != nil { + m.log.Info("cannot retrieve license, retrying later, error: %s", err) + + // check if the license is still in the grace period. + // permit some operations if the license could not be checked + // right away. This is to smooth any networks problems. + if grace := time.Now().Sub(startedAt); grace > m.gracePeriod { + m.log.Info("grace period expired, invalidating license") + m.invalidate() + } else { + m.log.Debugf("license is too old, grace time remaining: %s", m.gracePeriod-grace) + } + + backoff.Wait() + continue + } + + // we have a valid license, notify watchers and sleep until next check. + m.log.Info( + "valid license retrieved, license mode: %s, type: %s, status: %s", + license.Get(), + license.Type, + license.Status, + ) + m.saveAndNotify(license) + return + } + } +} + +func (m *Manager) saveAndNotify(license *License) { + if !m.save(license) { + return + } + + l := *license + m.notify(func(w Watcher) { + w.OnNewLicense(l) + }) +} + +func (m *Manager) save(license *License) bool { + m.Lock() + defer m.Unlock() + + // License didn't change no need to notify watchers. + if m.license != nil && m.license.EqualTo(license) { + return false + } + defer m.log.Debug("license information updated") + + m.license = license + return true +} + +func (m *Manager) invalidate() { + defer m.log.Debug("invalidate cached license, fallback to OSS") + m.saveAndNotify(OSSLicense) +} diff --git a/x-pack/beatless/licenser/manager_test.go b/x-pack/beatless/licenser/manager_test.go new file mode 100644 index 000000000000..ac6b2b19598d --- /dev/null +++ b/x-pack/beatless/licenser/manager_test.go @@ -0,0 +1,285 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package licenser + +import ( + "errors" + "sync" + "testing" + "time" + + uuid "github.com/satori/go.uuid" + "github.com/stretchr/testify/assert" +) + +type message struct { + license *License + err error +} + +type mockFetcher struct { + sync.Mutex + bus chan message + last *message +} + +func newMockFetcher() *mockFetcher { + return &mockFetcher{bus: make(chan message, 1)} +} + +func (m *mockFetcher) Fetch() (*License, error) { + m.Lock() + defer m.Unlock() + for { + select { + case message := <-m.bus: + m.last = &message + + // assume other calls to receive the same value, + // until we change it. + return message.license, message.err + default: + if m.last != nil { + return m.last.license, m.last.err + } + continue + } + } +} + +func (m *mockFetcher) Insert(license *License, err error) { + m.bus <- message{license: license, err: err} +} + +func (m *mockFetcher) Close() { + close(m.bus) +} + +func TestRetrieveLicense(t *testing.T) { + i := &License{ + UUID: uuid.NewV4(), + Type: Basic, + Mode: Basic, + Status: Active, + } + mock := newMockFetcher() + mock.Insert(i, nil) + defer mock.Close() + + t.Run("return an error if the manager is stopped", func(t *testing.T) { + m := NewWithFetcher(mock, time.Duration(2*time.Second), time.Duration(1*time.Second)) + m.Start() + m.Stop() + + _, err := m.Get() + + assert.Error(t, ErrManagerStopped, err) + }) + + t.Run("at startup when no license is retrieved return an error", func(t *testing.T) { + mck := newMockFetcher() + mck.Insert(nil, errors.New("not found")) + defer mck.Close() + + m := NewWithFetcher(mck, time.Duration(2*time.Second), time.Duration(1*time.Second)) + m.Start() + defer m.Stop() + _, err := m.Get() + + assert.Error(t, ErrNoLicenseFound, err) + }) + + t.Run("at startup", func(t *testing.T) { + m := NewWithFetcher(mock, time.Duration(2*time.Second), time.Duration(1*time.Second)) + m.Start() + defer m.Stop() + + // Lets us find the first license. + time.Sleep(1 * time.Second) + _, err := m.Get() + + assert.NoError(t, err) + }) + + t.Run("periodically", func(t *testing.T) { + period := time.Duration(1) + m := NewWithFetcher(mock, period, time.Duration(5*time.Second)) + + m.Start() + defer m.Stop() + + // Lets us find the first license. + time.Sleep(1 * time.Second) + + l, err := m.Get() + if !assert.NoError(t, err) { + return + } + if !assert.True(t, l.Is(Basic)) { + return + } + + i := &License{ + UUID: uuid.NewV4(), + Type: Platinum, + Mode: Platinum, + Status: Active, + } + mock.Insert(i, nil) + + select { + case <-time.After(time.Duration(1 * time.Second)): + l, err := m.Get() + if !assert.NoError(t, err) { + return + } + assert.True(t, l.Is(Platinum)) + } + }) +} + +func TestWatcher(t *testing.T) { + i := &License{ + UUID: uuid.NewV4(), + Type: Basic, + Mode: Basic, + Status: Active, + } + + t.Run("watcher must be uniquely registered", func(t *testing.T) { + mock := newMockFetcher() + mock.Insert(i, nil) + defer mock.Close() + + m := NewWithFetcher(mock, time.Duration(2*time.Second), time.Duration(1*time.Second)) + + m.Start() + defer m.Stop() + + w := CallbackWatcher{New: func(license License) {}} + + err := m.AddWatcher(&w) + if assert.NoError(t, err) { + return + } + defer m.RemoveWatcher(&w) + + err = m.AddWatcher(&w) + assert.Error(t, ErrWatcherAlreadyExist, err) + }) + + t.Run("cannot remove non existing watcher", func(t *testing.T) { + mock := newMockFetcher() + mock.Insert(i, nil) + defer mock.Close() + + m := NewWithFetcher(mock, time.Duration(2*time.Second), time.Duration(1*time.Second)) + + m.Start() + defer m.Stop() + + w := CallbackWatcher{New: func(license License) {}} + + err := m.RemoveWatcher(&w) + + assert.Error(t, ErrWatcherDoesntExist, err) + }) + + t.Run("adding a watcher trigger a a new license callback", func(t *testing.T) { + mock := newMockFetcher() + mock.Insert(i, nil) + defer mock.Close() + + m := NewWithFetcher(mock, time.Duration(2*time.Second), time.Duration(1*time.Second)) + + m.Start() + defer m.Stop() + + chanLicense := make(chan License) + defer close(chanLicense) + + w := CallbackWatcher{ + New: func(license License) { + chanLicense <- license + }, + } + + m.AddWatcher(&w) + defer m.RemoveWatcher(&w) + + select { + case license := <-chanLicense: + assert.Equal(t, Basic, license.Get()) + } + }) + + t.Run("periodically trigger a new license callback when the license change", func(t *testing.T) { + mock := newMockFetcher() + mock.Insert(i, nil) + defer mock.Close() + + m := NewWithFetcher(mock, time.Duration(1), time.Duration(1*time.Second)) + + m.Start() + defer m.Stop() + + chanLicense := make(chan License) + defer close(chanLicense) + + w := CallbackWatcher{ + New: func(license License) { + chanLicense <- license + }, + } + + m.AddWatcher(&w) + defer m.RemoveWatcher(&w) + + c := 0 + for { + select { + case license := <-chanLicense: + if c == 0 { + assert.Equal(t, Basic, license.Get()) + mock.Insert(&License{ + UUID: uuid.NewV4(), + Type: Platinum, + Mode: Platinum, + Status: Active, + }, nil) + c++ + continue + } + assert.Equal(t, Platinum, license.Get()) + return + } + } + }) + + t.Run("trigger OnManagerStopped when the manager is stopped", func(t *testing.T) { + mock := newMockFetcher() + mock.Insert(i, nil) + defer mock.Close() + + m := NewWithFetcher(mock, time.Duration(1), time.Duration(1*time.Second)) + m.Start() + + var wg sync.WaitGroup + + wg.Add(1) + w := CallbackWatcher{ + Stopped: func() { + wg.Done() + }, + } + + m.AddWatcher(&w) + defer m.RemoveWatcher(&w) + + m.Stop() + + wg.Wait() + }) +} diff --git a/x-pack/beatless/licenser/state_string.go b/x-pack/beatless/licenser/state_string.go new file mode 100644 index 000000000000..eb3144c8dc74 --- /dev/null +++ b/x-pack/beatless/licenser/state_string.go @@ -0,0 +1,20 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// Code generated by "stringer -type=State"; DO NOT EDIT. + +package licenser + +import "strconv" + +const _State_name = "InactiveActive" + +var _State_index = [...]uint8{0, 8, 14} + +func (i State) String() string { + if i < 0 || i >= State(len(_State_index)-1) { + return "State(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _State_name[_State_index[i]:_State_index[i+1]] +} diff --git a/x-pack/beatless/licenser/types.go b/x-pack/beatless/licenser/types.go new file mode 100644 index 000000000000..0f76b04096f2 --- /dev/null +++ b/x-pack/beatless/licenser/types.go @@ -0,0 +1,26 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package licenser + +// LicenseType defines what kind of license is currently available. +type LicenseType int + +//go:generate stringer -type=LicenseType -linecomment=true +const ( + OSS LicenseType = iota // Open source + Trial // Trial + Basic // Basic + Gold // Gold + Platinum // Platinum +) + +// State of the license can be active or inactive. +type State int + +//go:generate stringer -type=State +const ( + Inactive State = iota + Active +)