From c23a0b4434fceff9a16e621c17f389e6240a1a4b Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 27 Dec 2018 16:34:04 -0800 Subject: [PATCH] [6.x] Changing version fields/vars datatypes from string to common.Version (#9813) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Partial backport of #9056 — only the code changes related to changing the datatype of version fields/variables from `string` to `common.Version`. --- filebeat/beater/filebeat.go | 21 +++---- filebeat/fileset/fileset.go | 16 ++--- filebeat/fileset/fileset_test.go | 53 +++++++++------- filebeat/fileset/modules_integration_test.go | 9 +-- filebeat/fileset/pipelines.go | 3 +- libbeat/cmd/export/template.go | 11 +++- libbeat/cmd/instance/ilm.go | 11 ++-- libbeat/common/version.go | 15 +++++ libbeat/dashboards/dashboards.go | 60 +++---------------- libbeat/dashboards/es_loader.go | 8 ++- libbeat/dashboards/es_loader_test.go | 10 ++-- libbeat/dashboards/export.go | 9 +-- libbeat/dashboards/kibana_loader.go | 5 +- libbeat/kibana/client.go | 30 ++++++---- libbeat/ml-importer/importer.go | 13 ++-- .../elasticsearch/api_integration_test.go | 5 +- libbeat/outputs/elasticsearch/client.go | 22 +++++-- .../elasticsearch/client_integration_test.go | 9 ++- libbeat/template/load.go | 5 +- libbeat/template/template.go | 17 ++---- libbeat/template/template_test.go | 9 ++- 21 files changed, 166 insertions(+), 175 deletions(-) diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index f057cf5c1ca..edda8eb647e 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -234,12 +234,7 @@ func (fb *Filebeat) loadModulesML(b *beat.Beat, kibanaConfig *common.Config) err return errors.Errorf("Error creating Kibana client: %v", err) } - kibanaVersion, err := common.NewVersion(kibanaClient.GetVersion()) - if err != nil { - return errors.Errorf("Error checking Kibana version: %v", err) - } - - if err := setupMLBasedOnVersion(fb.moduleRegistry, esClient, kibanaClient, kibanaVersion); err != nil { + if err := setupMLBasedOnVersion(fb.moduleRegistry, esClient, kibanaClient); err != nil { errs = append(errs, err) } @@ -265,7 +260,7 @@ func (fb *Filebeat) loadModulesML(b *beat.Beat, kibanaConfig *common.Config) err continue } - if err := setupMLBasedOnVersion(set, esClient, kibanaClient, kibanaVersion); err != nil { + if err := setupMLBasedOnVersion(set, esClient, kibanaClient); err != nil { errs = append(errs, err) } @@ -275,18 +270,16 @@ func (fb *Filebeat) loadModulesML(b *beat.Beat, kibanaConfig *common.Config) err return errs.Err() } -func setupMLBasedOnVersion(reg *fileset.ModuleRegistry, esClient *elasticsearch.Client, kibanaClient *kibana.Client, kibanaVersion *common.Version) error { - if isElasticsearchLoads(kibanaVersion) { +func setupMLBasedOnVersion(reg *fileset.ModuleRegistry, esClient *elasticsearch.Client, kibanaClient *kibana.Client) error { + if isElasticsearchLoads(kibanaClient.GetVersion()) { return reg.LoadML(esClient) } return reg.SetupML(esClient, kibanaClient) } -func isElasticsearchLoads(kibanaVersion *common.Version) bool { - if kibanaVersion.Major < 6 || kibanaVersion.Major == 6 && kibanaVersion.Minor < 1 { - return true - } - return false +func isElasticsearchLoads(kibanaVersion common.Version) bool { + return kibanaVersion.Major < 6 || + (kibanaVersion.Major == 6 && kibanaVersion.Minor < 1) } // Run allows the beater to be run as a beat. diff --git a/filebeat/fileset/fileset.go b/filebeat/fileset/fileset.go index 49c2b2a542e..a57a40f6752 100644 --- a/filebeat/fileset/fileset.go +++ b/filebeat/fileset/fileset.go @@ -25,6 +25,7 @@ package fileset import ( "bytes" "encoding/json" + "errors" "fmt" "io/ioutil" "os" @@ -192,15 +193,14 @@ func (fs *Fileset) evaluateVars() (map[string]interface{}, error) { // turnOffElasticsearchVars re-evaluates the variables that have `min_elasticsearch_version` // set. -func (fs *Fileset) turnOffElasticsearchVars(vars map[string]interface{}, esVersion string) (map[string]interface{}, error) { +func (fs *Fileset) turnOffElasticsearchVars(vars map[string]interface{}, esVersion common.Version) (map[string]interface{}, error) { retVars := map[string]interface{}{} for key, val := range vars { retVars[key] = val } - haveVersion, err := common.NewVersion(esVersion) - if err != nil { - return vars, fmt.Errorf("Error parsing version %s: %v", esVersion, err) + if !esVersion.IsValid() { + return vars, errors.New("Unknown Elasticsearch version") } for _, vals := range fs.manifest.Vars { @@ -217,11 +217,11 @@ func (fs *Fileset) turnOffElasticsearchVars(vars map[string]interface{}, esVersi return vars, fmt.Errorf("Error parsing version %s: %v", minESVersion["version"].(string), err) } - logp.Debug("fileset", "Comparing ES version %s with requirement of %s", haveVersion, minVersion) + logp.Debug("fileset", "Comparing ES version %s with requirement of %s", esVersion.String(), minVersion) - if haveVersion.LessThan(minVersion) { + if esVersion.LessThan(minVersion) { retVars[name] = minESVersion["value"] - logp.Info("Setting var %s (%s) to %v because Elasticsearch version is %s", name, fs, minESVersion["value"], haveVersion) + logp.Info("Setting var %s (%s) to %v because Elasticsearch version is %s", name, fs, minESVersion["value"], esVersion.String()) } } } @@ -358,7 +358,7 @@ func (fs *Fileset) getPipelineID(beatVersion string) (string, error) { } // GetPipeline returns the JSON content of the Ingest Node pipeline that parses the logs. -func (fs *Fileset) GetPipeline(esVersion string) (pipelineID string, content map[string]interface{}, err error) { +func (fs *Fileset) GetPipeline(esVersion common.Version) (pipelineID string, content map[string]interface{}, err error) { path, err := applyTemplate(fs.vars, fs.manifest.IngestPipeline, false) if err != nil { return "", nil, fmt.Errorf("Error expanding vars on the ingest pipeline path: %v", err) diff --git a/filebeat/fileset/fileset_test.go b/filebeat/fileset/fileset_test.go index 93456b2ee3b..8435532efa7 100644 --- a/filebeat/fileset/fileset_test.go +++ b/filebeat/fileset/fileset_test.go @@ -27,7 +27,9 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" ) @@ -213,7 +215,8 @@ func TestGetPipelineNginx(t *testing.T) { fs := getModuleForTesting(t, "nginx", "access") assert.NoError(t, fs.Read("5.2.0")) - pipelineID, content, err := fs.GetPipeline("5.2.0") + version := common.MustNewVersion("5.2.0") + pipelineID, content, err := fs.GetPipeline(*version) assert.NoError(t, err) assert.Equal(t, "filebeat-5.2.0-nginx-access-default", pipelineID) assert.Contains(t, content, "description") @@ -234,27 +237,31 @@ func TestGetPipelineConvertTS(t *testing.T) { assert.NoError(t, err) assert.NoError(t, fs.Read("6.1.0")) - // ES 6.0.0 should not have beat.timezone referenced - pipelineID, content, err := fs.GetPipeline("6.0.0") - assert.NoError(t, err) - assert.Equal(t, "filebeat-6.1.0-system-syslog-pipeline", pipelineID) - marshaled, err := json.Marshal(content) - assert.NoError(t, err) - assert.NotContains(t, string(marshaled), "beat.timezone") - - // ES 6.1.0 should have beat.timezone referenced - pipelineID, content, err = fs.GetPipeline("6.1.0") - assert.NoError(t, err) - assert.Equal(t, "filebeat-6.1.0-system-syslog-pipeline", pipelineID) - marshaled, err = json.Marshal(content) - assert.NoError(t, err) - assert.Contains(t, string(marshaled), "beat.timezone") + cases := map[string]struct { + Beat string + Timezone bool + }{ + "6.0.0": {Timezone: false}, + "6.1.0": {Timezone: true}, + "6.2.0": {Timezone: true}, + } - // ES 6.2.0 should have beat.timezone referenced - pipelineID, content, err = fs.GetPipeline("6.2.0") - assert.NoError(t, err) - assert.Equal(t, "filebeat-6.1.0-system-syslog-pipeline", pipelineID) - marshaled, err = json.Marshal(content) - assert.NoError(t, err) - assert.Contains(t, string(marshaled), "beat.timezone") + for esVersion, cfg := range cases { + pipelineName := "filebeat-6.1.0-system-syslog-pipeline" + + t.Run(fmt.Sprintf("es=%v", esVersion), func(t *testing.T) { + ver := common.MustNewVersion(esVersion) + pipelineID, content, err := fs.GetPipeline(*ver) + require.NoError(t, err) + assert.Equal(t, pipelineName, pipelineID) + + marshaled, err := json.Marshal(content) + require.NoError(t, err) + if cfg.Timezone { + assert.Contains(t, string(marshaled), "beat.timezone") + } else { + assert.NotContains(t, string(marshaled), "beat.timezone") + } + }) + } } diff --git a/filebeat/fileset/modules_integration_test.go b/filebeat/fileset/modules_integration_test.go index dbf786053a8..45baabdc354 100644 --- a/filebeat/fileset/modules_integration_test.go +++ b/filebeat/fileset/modules_integration_test.go @@ -22,7 +22,6 @@ package fileset import ( "encoding/json" "path/filepath" - "strconv" "testing" "github.com/stretchr/testify/assert" @@ -142,11 +141,5 @@ func TestAvailableProcessors(t *testing.T) { func hasIngest(client *elasticsearch.Client) bool { v := client.GetVersion() - majorVersion := string(v[0]) - version, err := strconv.Atoi(majorVersion) - if err != nil { - return true - } - - return version >= 5 + return v.Major >= 5 } diff --git a/filebeat/fileset/pipelines.go b/filebeat/fileset/pipelines.go index b67e9fce487..d7f63bdabad 100644 --- a/filebeat/fileset/pipelines.go +++ b/filebeat/fileset/pipelines.go @@ -22,6 +22,7 @@ import ( "fmt" "strings" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" ) @@ -33,7 +34,7 @@ type PipelineLoaderFactory func() (PipelineLoader, error) type PipelineLoader interface { LoadJSON(path string, json map[string]interface{}) ([]byte, error) Request(method, path string, pipeline string, params map[string]string, body interface{}) (int, []byte, error) - GetVersion() string + GetVersion() common.Version } // LoadPipelines loads the pipelines for each configured fileset. diff --git a/libbeat/cmd/export/template.go b/libbeat/cmd/export/template.go index bf3cbbb9b27..d56330a62ff 100644 --- a/libbeat/cmd/export/template.go +++ b/libbeat/cmd/export/template.go @@ -57,7 +57,16 @@ func GenTemplateConfigCmd(settings instance.Settings, name, idxPrefix, beatVersi } } - tmpl, err := template.New(b.Info.Version, index, version, cfg) + if version == "" { + version = b.Info.Version + } + + esVersion, err := common.NewVersion(version) + if err != nil { + fmt.Fprintf(os.Stderr, "Invalid Elasticsearch version: %s\n", err) + } + + tmpl, err := template.New(b.Info.Version, index, *esVersion, cfg) if err != nil { fmt.Fprintf(os.Stderr, "Error generating template: %+v", err) os.Exit(1) diff --git a/libbeat/cmd/instance/ilm.go b/libbeat/cmd/instance/ilm.go index 7f6304419e0..473999e1407 100644 --- a/libbeat/cmd/instance/ilm.go +++ b/libbeat/cmd/instance/ilm.go @@ -154,10 +154,9 @@ func loadConfigWithDefaults(config *ilmConfig, b *Beat) { } func checkElasticsearchVersionIlm(client *elasticsearch.Client) error { - esVer := client.GetVersion() - esV, err := common.NewVersion(esVer) - if err != nil { - return err + esVersion := client.GetVersion() + if !esVersion.IsValid() { + return errors.New("Unknown Elasticsearch version") } requiredVersion, err := common.NewVersion("6.6.0") @@ -165,8 +164,8 @@ func checkElasticsearchVersionIlm(client *elasticsearch.Client) error { return err } - if esV.LessThan(requiredVersion) { - return fmt.Errorf("ILM requires at least Elasticsearch 6.6.0. Used version: %s", esV.String()) + if esVersion.LessThan(requiredVersion) { + return fmt.Errorf("ILM requires at least Elasticsearch 6.6.0. Used version: %s", esVersion.String()) } return nil diff --git a/libbeat/common/version.go b/libbeat/common/version.go index 795eb2b8ddb..54293e9ff79 100644 --- a/libbeat/common/version.go +++ b/libbeat/common/version.go @@ -31,6 +31,16 @@ type Version struct { Meta string } +// MustNewVersion creates a version from the given version string. +// If the version string is invalid, MustNewVersion panics. +func MustNewVersion(version string) *Version { + v, err := NewVersion(version) + if err != nil { + panic(err) + } + return v +} + // NewVersion expects a string in the format: // major.minor.bugfix(-meta) func NewVersion(version string) (*Version, error) { @@ -69,6 +79,11 @@ func NewVersion(version string) (*Version, error) { return &v, nil } +// IsValid returns true if the version object stores a successfully parsed version number. +func (v *Version) IsValid() bool { + return v.version != "" +} + func (v *Version) IsMajor(major int) bool { return major == v.Major } diff --git a/libbeat/dashboards/dashboards.go b/libbeat/dashboards/dashboards.go index 81106f6064f..4edf29c4019 100644 --- a/libbeat/dashboards/dashboards.go +++ b/libbeat/dashboards/dashboards.go @@ -22,8 +22,6 @@ import ( "errors" "fmt" "path/filepath" - "strconv" - "strings" errw "github.com/pkg/errors" @@ -106,12 +104,7 @@ func ImportDashboards( esLoader.statusMsg("Elasticsearch URL %v", esLoader.client.Connection.URL) - majorVersion, _, err := getMajorAndMinorVersion(esLoader.version) - if err != nil { - return fmt.Errorf("wrong Elasticsearch version: %v", err) - } - - if majorVersion < 6 { + if esLoader.version.Major < 6 { importVia = importViaES } else { importVia = useKibana @@ -145,17 +138,16 @@ func setupAndImportDashboardsViaKibana(ctx context.Context, hostname string, kib } func ImportDashboardsViaKibana(kibanaLoader *KibanaLoader) error { - - if !isKibanaAPIavailable(kibanaLoader.version) { - return fmt.Errorf("Kibana API is not available in Kibana version %s", kibanaLoader.version) + version := kibanaLoader.version + if !version.IsValid() { + return errors.New("No valid kibana version available") } - version, err := common.NewVersion(kibanaLoader.version) - if err != nil { - return fmt.Errorf("Invalid Kibana version: %s", kibanaLoader.version) + if !isKibanaAPIavailable(version) { + return fmt.Errorf("Kibana API is not available in Kibana version %s", version.String()) } - importer, err := NewImporter(*version, kibanaLoader.config, kibanaLoader) + importer, err := NewImporter(version, kibanaLoader.config, kibanaLoader) if err != nil { return fmt.Errorf("fail to create a Kibana importer for loading the dashboards: %v", err) } @@ -187,40 +179,6 @@ func ImportDashboardsViaElasticsearch(esLoader *ElasticsearchLoader) error { return nil } -func getMajorAndMinorVersion(version string) (int, int, error) { - fields := strings.Split(version, ".") - if len(fields) != 3 { - return 0, 0, fmt.Errorf("wrong version %s", version) - } - majorVersion := fields[0] - minorVersion := fields[1] - - majorVersionInt, err := strconv.Atoi(majorVersion) - if err != nil { - return 0, 0, err - } - - minorVersionInt, err := strconv.Atoi(minorVersion) - if err != nil { - return 0, 0, err - } - - return majorVersionInt, minorVersionInt, nil -} - -func isKibanaAPIavailable(version string) bool { - majorVersion, minorVersion, err := getMajorAndMinorVersion(version) - if err != nil { - return false - } - - if majorVersion == 5 && minorVersion >= 6 { - return true - } - - if majorVersion >= 6 { - return true - } - - return false +func isKibanaAPIavailable(version common.Version) bool { + return (version.Major == 5 && version.Minor >= 6) || version.Major >= 6 } diff --git a/libbeat/dashboards/es_loader.go b/libbeat/dashboards/es_loader.go index 270e2f26dd0..4787bb3faf5 100644 --- a/libbeat/dashboards/es_loader.go +++ b/libbeat/dashboards/es_loader.go @@ -19,6 +19,7 @@ package dashboards import ( "encoding/json" + "errors" "fmt" "io/ioutil" "path" @@ -33,7 +34,7 @@ import ( type ElasticsearchLoader struct { client *elasticsearch.Client config *Config - version string + version common.Version msgOutputter MessageOutputter } @@ -48,6 +49,9 @@ func NewElasticsearchLoader(cfg *common.Config, dashboardsConfig *Config, msgOut } version := esClient.GetVersion() + if !version.IsValid() { + return nil, errors.New("No valid Elasticsearch version available") + } loader := ElasticsearchLoader{ client: esClient, @@ -56,7 +60,7 @@ func NewElasticsearchLoader(cfg *common.Config, dashboardsConfig *Config, msgOut msgOutputter: msgOutputter, } - loader.statusMsg("Initialize the Elasticsearch %s loader", version) + loader.statusMsg("Initialize the Elasticsearch %s loader", version.String()) return &loader, nil } diff --git a/libbeat/dashboards/es_loader_test.go b/libbeat/dashboards/es_loader_test.go index 4b9c468a335..29f5684da90 100644 --- a/libbeat/dashboards/es_loader_test.go +++ b/libbeat/dashboards/es_loader_test.go @@ -20,7 +20,6 @@ package dashboards import ( - "strings" "testing" "github.com/stretchr/testify/assert" @@ -40,8 +39,9 @@ func TestImporter(t *testing.T) { } client := estest.GetTestingElasticsearch(t) - if strings.HasPrefix(client.Connection.GetVersion(), "6.") || - strings.HasPrefix(client.Connection.GetVersion(), "7.") { + major := client.GetVersion().Major + + if major == 6 || major == 7 { t.Skip("Skipping tests for Elasticsearch 6.x releases") } @@ -76,8 +76,8 @@ func TestImporterEmptyBeat(t *testing.T) { } client := estest.GetTestingElasticsearch(t) - if strings.HasPrefix(client.Connection.GetVersion(), "6.") || - strings.HasPrefix(client.Connection.GetVersion(), "7.") { + major := client.GetVersion().Major + if major == 6 || major == 7 { t.Skip("Skipping tests for Elasticsearch 6.x releases") } diff --git a/libbeat/dashboards/export.go b/libbeat/dashboards/export.go index d74a1d84b7b..cfb3074ae05 100644 --- a/libbeat/dashboards/export.go +++ b/libbeat/dashboards/export.go @@ -84,14 +84,9 @@ func ExportAll(client *kibana.Client, list ListYML) ([]common.MapStr, error) { } // SaveToFile creates the required directories if needed and saves dashboard. -func SaveToFile(dashboard common.MapStr, filename, root, versionStr string) error { - version, err := common.NewVersion(versionStr) - if err != nil { - return err - } - +func SaveToFile(dashboard common.MapStr, filename, root string, version common.Version) error { dashboardsPath := "_meta/kibana/" + strconv.Itoa(version.Major) + "/dashboard" - err = generator.CreateDirectories(root, dashboardsPath) + err := generator.CreateDirectories(root, dashboardsPath) if err != nil { return err } diff --git a/libbeat/dashboards/kibana_loader.go b/libbeat/dashboards/kibana_loader.go index 8a09e72bfae..6c6e291e4ae 100644 --- a/libbeat/dashboards/kibana_loader.go +++ b/libbeat/dashboards/kibana_loader.go @@ -35,7 +35,7 @@ var importAPI = "/api/kibana/dashboards/import" type KibanaLoader struct { client *kibana.Client config *Config - version string + version common.Version hostname string msgOutputter MessageOutputter } @@ -59,7 +59,8 @@ func NewKibanaLoader(ctx context.Context, cfg *common.Config, dashboardsConfig * msgOutputter: msgOutputter, } - loader.statusMsg("Initialize the Kibana %s loader", client.GetVersion()) + version := client.GetVersion() + loader.statusMsg("Initialize the Kibana %s loader", version.String()) return &loader, nil } diff --git a/libbeat/kibana/client.go b/libbeat/kibana/client.go index fe21af40dcb..115a1bffc29 100644 --- a/libbeat/kibana/client.go +++ b/libbeat/kibana/client.go @@ -43,7 +43,7 @@ type Connection struct { Headers map[string]string http *http.Client - version string + version common.Version } type Client struct { @@ -146,7 +146,7 @@ func NewClientWithConfig(config *ClientConfig) (*Client, error) { } if !config.IgnoreVersion { - if err = client.SetVersion(); err != nil { + if err = client.readVersion(); err != nil { return nil, fmt.Errorf("fail to get the Kibana version: %v", err) } } @@ -178,7 +178,7 @@ func (conn *Connection) Request(method, extraPath string, } if method != "GET" { - req.Header.Set("kbn-version", conn.version) + req.Header.Set("kbn-version", conn.version.String()) } resp, err := conn.http.Do(req) @@ -201,7 +201,7 @@ func (conn *Connection) Request(method, extraPath string, return resp.StatusCode, result, retError } -func (client *Client) SetVersion() error { +func (client *Client) readVersion() error { type kibanaVersionResponse struct { Name string `json:"name"` Version struct { @@ -221,11 +221,12 @@ func (client *Client) SetVersion() error { err, truncateString(result)) } - var kibanaVersion kibanaVersionResponse - var kibanaVersion5x kibanaVersionResponse5x + var versionString string + var kibanaVersion kibanaVersionResponse err = json.Unmarshal(result, &kibanaVersion) if err != nil { + var kibanaVersion5x kibanaVersionResponse5x // The response returned by /api/status is different in Kibana 5.x than in Kibana 6.x err5x := json.Unmarshal(result, &kibanaVersion5x) @@ -234,21 +235,28 @@ func (client *Client) SetVersion() error { return fmt.Errorf("fail to unmarshal the response from GET %s/api/status. Response: %s. Kibana 5.x status api returns: %v. Kibana 6.x status api returns: %v", client.Connection.URL, truncateString(result), err5x, err) } - client.version = kibanaVersion5x.Version + versionString = kibanaVersion5x.Version } else { - - client.version = kibanaVersion.Version.Number + versionString = kibanaVersion.Version.Number if kibanaVersion.Version.Snapshot { // needed for the tests - client.version = client.version + "-SNAPSHOT" + versionString += "-SNAPSHOT" } } + version, err := common.NewVersion(versionString) + if err != nil { + return fmt.Errorf("fail to parse kibana version (%v): %+v", versionString, err) + } + + client.version = *version return nil } -func (client *Client) GetVersion() string { return client.version } +// GetVersion returns the version read from kibana. The version is not set if +// IgnoreVersion was set when creating the client. +func (client *Client) GetVersion() common.Version { return client.version } func (client *Client) ImportJSON(url string, params url.Values, jsonBody map[string]interface{}) error { diff --git a/libbeat/ml-importer/importer.go b/libbeat/ml-importer/importer.go index 1c384f7c445..fa0c84e0f0d 100644 --- a/libbeat/ml-importer/importer.go +++ b/libbeat/ml-importer/importer.go @@ -56,13 +56,13 @@ type MLConfig struct { type MLLoader interface { Request(method, path string, pipeline string, params map[string]string, body interface{}) (int, []byte, error) LoadJSON(path string, json map[string]interface{}) ([]byte, error) - GetVersion() string + GetVersion() common.Version } // MLSetupper is a subset of the Kibana client API capable of setting up ML objects. type MLSetupper interface { Request(method, path string, params url.Values, headers http.Header, body io.Reader) (int, []byte, error) - GetVersion() string + GetVersion() common.Version } // MLResponse stores the relevant parts of the response from Kibana to check for errors. @@ -125,10 +125,11 @@ func ImportMachineLearningJob(esClient MLLoader, cfg *MLConfig) error { datafeedURL := fmt.Sprintf(esDataFeedURL, cfg.ID) if len(cfg.MinVersion) > 0 { - esVersion, err := common.NewVersion(esClient.GetVersion()) - if err != nil { - return errors.Errorf("Error parsing ES version: %s: %v", esClient.GetVersion(), err) + esVersion := esClient.GetVersion() + if !esVersion.IsValid() { + return errors.New("Invalid Elasticsearch version") } + minVersion, err := common.NewVersion(cfg.MinVersion) if err != nil { return errors.Errorf("Error parsing min_version: %s: %v", minVersion, err) @@ -136,7 +137,7 @@ func ImportMachineLearningJob(esClient MLLoader, cfg *MLConfig) error { if esVersion.LessThan(minVersion) { logp.Debug("machine-learning", "Skipping job %s, because ES version (%s) is smaller than min version (%s)", - cfg.ID, esVersion, minVersion) + cfg.ID, esVersion.String(), minVersion) return nil } } diff --git a/libbeat/outputs/elasticsearch/api_integration_test.go b/libbeat/outputs/elasticsearch/api_integration_test.go index 33294c1012c..8ac9a0ac233 100644 --- a/libbeat/outputs/elasticsearch/api_integration_test.go +++ b/libbeat/outputs/elasticsearch/api_integration_test.go @@ -24,7 +24,6 @@ import ( "fmt" "net/http" "os" - "strings" "testing" "github.com/stretchr/testify/assert" @@ -105,8 +104,8 @@ func TestIngest(t *testing.T) { } client := getTestingElasticsearch(t) - if strings.HasPrefix(client.Connection.version, "2.") { - t.Skip("Skipping tests as pipeline not available in 2.x releases") + if client.Connection.version.Major < 5 { + t.Skip("Skipping tests as pipeline not available in <5.x releases") } status, _, err := client.DeletePipeline(pipeline, nil) diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index f9d13388233..f5b8252f034 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -28,6 +28,8 @@ import ( "net/url" "time" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/outputs" @@ -89,7 +91,7 @@ type Connection struct { onConnectCallback func() error encoder bodyEncoder - version string + version common.Version } type bulkIndexAction struct { @@ -629,7 +631,7 @@ func (client *Client) LoadJSON(path string, json map[string]interface{}) ([]byte } // GetVersion returns the elasticsearch version the client is connected to -func (client *Client) GetVersion() string { +func (client *Client) GetVersion() common.Version { return client.Connection.version } @@ -661,7 +663,7 @@ func (client *Client) Test(d testing.Driver) { err = client.Connect() d.Fatal("talk to server", err) - d.Info("version", client.version) + d.Info("version", client.version.String()) }) } @@ -671,12 +673,18 @@ func (client *Client) String() string { // Connect connects the client. func (conn *Connection) Connect() error { - var err error - conn.version, err = conn.Ping() + versionString, err := conn.Ping() if err != nil { return err } + if version, err := common.NewVersion(versionString); err != nil { + logp.Err("Invalid version from Elasticsearch: %v", versionString) + conn.version = common.Version{} + } else { + conn.version = *version + } + err = conn.onConnectCallback() if err != nil { return fmt.Errorf("Connection marked as failed because the onConnect callback failed: %v", err) @@ -803,7 +811,9 @@ func (conn *Connection) execHTTPRequest(req *http.Request) (int, []byte, error) return status, obj, err } -func (conn *Connection) GetVersion() string { +// GetVersion returns the elasticsearch version the client is connected to. +// The version is read and updated on 'Connect'. +func (conn *Connection) GetVersion() common.Version { return conn.version } diff --git a/libbeat/outputs/elasticsearch/client_integration_test.go b/libbeat/outputs/elasticsearch/client_integration_test.go index 9588e5ee162..8a3a7d5f5b9 100644 --- a/libbeat/outputs/elasticsearch/client_integration_test.go +++ b/libbeat/outputs/elasticsearch/client_integration_test.go @@ -21,7 +21,6 @@ package elasticsearch import ( "math/rand" - "strings" "testing" "time" @@ -92,8 +91,8 @@ func TestClientPublishEventWithPipeline(t *testing.T) { client.Delete(index, "", "", nil) // Check version - if strings.HasPrefix(client.Connection.version, "2.") { - t.Skip("Skipping tests as pipeline not available in 2.x releases") + if client.Connection.version.Major < 5 { + t.Skip("Skipping tests as pipeline not available in <5.x releases") } publish := func(event beat.Event) { @@ -173,8 +172,8 @@ func TestClientBulkPublishEventsWithPipeline(t *testing.T) { }) client.Delete(index, "", "", nil) - if strings.HasPrefix(client.Connection.version, "2.") { - t.Skip("Skipping tests as pipeline not available in 2.x releases") + if client.Connection.version.Major < 5 { + t.Skip("Skipping tests as pipeline not available in <5.x releases") } publish := func(events ...beat.Event) { diff --git a/libbeat/template/load.go b/libbeat/template/load.go index 157e2c7e915..34fb5638194 100644 --- a/libbeat/template/load.go +++ b/libbeat/template/load.go @@ -34,7 +34,7 @@ import ( type ESClient interface { LoadJSON(path string, json map[string]interface{}) ([]byte, error) Request(method, path string, pipeline string, params map[string]string, body interface{}) (int, []byte, error) - GetVersion() string + GetVersion() common.Version } type Loader struct { @@ -79,7 +79,8 @@ func (l *Loader) Load() error { exists := l.CheckTemplate(templateName) if !exists || l.config.Overwrite { - logp.Info("Loading template for Elasticsearch version: %s", l.client.GetVersion()) + version := l.client.GetVersion() + logp.Info("Loading template for Elasticsearch version: %s", version.String()) if l.config.Overwrite { logp.Info("Existing template will be overwritten, as overwrite is enabled.") } diff --git a/libbeat/template/template.go b/libbeat/template/template.go index 49e274f3727..1e7070f13aa 100644 --- a/libbeat/template/template.go +++ b/libbeat/template/template.go @@ -22,11 +22,12 @@ import ( "sync" "time" + "github.com/elastic/go-ucfg/yaml" + "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/cfgwarn" "github.com/elastic/beats/libbeat/common/fmtstr" - "github.com/elastic/go-ucfg/yaml" ) var ( @@ -51,7 +52,7 @@ type Template struct { } // New creates a new template instance -func New(beatVersion string, beatName string, esVersion string, config TemplateConfig) (*Template, error) { +func New(beatVersion string, beatName string, esVersion common.Version, config TemplateConfig) (*Template, error) { bV, err := common.NewVersion(beatVersion) if err != nil { return nil, err @@ -95,21 +96,15 @@ func New(beatVersion string, beatName string, esVersion string, config TemplateC return nil, err } - // In case no esVersion is set, it is assumed the same as beat version - if esVersion == "" { - esVersion = beatVersion - } - - esV, err := common.NewVersion(esVersion) - if err != nil { - return nil, err + if !esVersion.IsValid() { + esVersion = *bV } return &Template{ pattern: pattern, name: name, beatVersion: *bV, - esVersion: *esV, + esVersion: esVersion, config: config, }, nil } diff --git a/libbeat/template/template_test.go b/libbeat/template/template_test.go index 09eb50b93a1..701f692737f 100644 --- a/libbeat/template/template_test.go +++ b/libbeat/template/template_test.go @@ -34,7 +34,8 @@ func TestNumberOfRoutingShards(t *testing.T) { config := TemplateConfig{} // Test it exists in 6.1 - template, err := New(beatVersion, beatName, "6.1.0", config) + ver := common.MustNewVersion("6.1.0") + template, err := New(beatVersion, beatName, *ver, config) assert.NoError(t, err) data := template.Generate(nil, nil) @@ -44,7 +45,8 @@ func TestNumberOfRoutingShards(t *testing.T) { assert.Equal(t, 30, shards.(int)) // Test it does not exist in 6.0 - template, err = New(beatVersion, beatName, "6.0.0", config) + ver = common.MustNewVersion("6.0.0") + template, err = New(beatVersion, beatName, *ver, config) assert.NoError(t, err) data = template.Generate(nil, nil) @@ -64,7 +66,8 @@ func TestNumberOfRoutingShardsOverwrite(t *testing.T) { } // Test it exists in 6.1 - template, err := New(beatVersion, beatName, "6.1.0", config) + ver := common.MustNewVersion("6.1.0") + template, err := New(beatVersion, beatName, *ver, config) assert.NoError(t, err) data := template.Generate(nil, nil)