Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for OpenSearch to existing ElasticSearch output plugin #10390

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions plugins/outputs/elasticsearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
This plugin writes to [Elasticsearch](https://www.elastic.co) via HTTP using Elastic (<http://olivere.github.io/elastic/).>

It supports Elasticsearch releases from 5.x up to 7.x.
OpenSearch is also supported but as it has its own versioning schema it needs to be enabled by setting the configuration value `opensearch = true`.

## Elasticsearch indexes and templates

Expand Down Expand Up @@ -95,7 +96,6 @@ Example of an index template created by telegraf on Elasticsearch 5.x:
},
"aliases": {}
}

```

### Example events
Expand Down Expand Up @@ -163,6 +163,8 @@ This plugin will format the events in the following way:
## HTTP basic authentication details.
# username = "telegraf"
# password = "mypassword"
## Optional enable support for OpenSearch
# opensearch = true
## HTTP bearer token authentication details
# auth_bearer_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9"

Expand Down Expand Up @@ -216,8 +218,8 @@ This plugin will format the events in the following way:

If you are using authentication within your Elasticsearch cluster, you need
to create a account and create a role with at least the manage role in the
Cluster Privileges category. Overwise, your account will not be able to
connect to your Elasticsearch cluster and send logs to your cluster. After
Cluster Privileges category. Otherwise, your account will not be able to
connect to your Elasticsearch cluster and send logs to your cluster. After
that, you need to add "create_indice" and "write" permission to your specific
index pattern.

Expand Down Expand Up @@ -249,6 +251,7 @@ Additionally, you can specify dynamic index names by using tags with the notatio
* `force_document_id`: Set to true will compute a unique hash from as sha256(concat(timestamp,measurement,series-hash)),enables resend or update data withoud ES duplicated documents.
* `float_handling`: Specifies how to handle `NaN` and infinite field values. `"none"` (default) will do nothing, `"drop"` will drop the field and `replace` will replace the field value by the number in `float_replacement_value`
* `float_replacement_value`: Value (defaulting to `0.0`) to replace `NaN`s and `inf`s if `float_handling` is set to `replace`. Negative `inf` will be replaced by the negative value in this number to respect the sign of the field's original value.
* `opensearch`: Set to true if you want to connect to OpenSearch instances.

## Known issues

Expand Down
12 changes: 11 additions & 1 deletion plugins/outputs/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Elasticsearch struct {
Password string
AuthBearerToken string
EnableSniffer bool
OpenSearch bool `toml:"opensearch"`
Timeout config.Duration
HealthCheckInterval config.Duration
EnableGzip bool
Expand Down Expand Up @@ -67,6 +68,8 @@ var sampleConfig = `
# password = "mypassword"
## HTTP bearer token authentication details
# auth_bearer_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9"
## Optional enable support for OpenSearch
# opensearch = true

## Index Config
## The target index for metrics (Elasticsearch will create if it not exists).
Expand Down Expand Up @@ -271,7 +274,14 @@ func (a *Elasticsearch) Connect() error {

// quit if ES version is not supported
majorReleaseNumber, err := strconv.Atoi(strings.Split(esVersion, ".")[0])
if err != nil || majorReleaseNumber < 5 {
if err != nil {
return fmt.Errorf("unable to parse major release number from ES version: %s", err)
}

// OpenSearch is based on ES version 7.x but has its own versioning and reports 1.x
if a.OpenSearch {
majorReleaseNumber = 7
} else if majorReleaseNumber < 5 {
return fmt.Errorf("elasticsearch version not supported: %s", esVersion)
}

Expand Down
31 changes: 30 additions & 1 deletion plugins/outputs/elasticsearch/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)

func TestConnectAndWriteIntegration(t *testing.T) {
Expand Down Expand Up @@ -515,3 +516,31 @@ func TestAuthorizationHeaderWhenBearerTokenIsPresent(t *testing.T) {
err = e.Write(testutil.MockMetrics())
require.NoError(t, err)
}

func TestVersionCheckOpenSearch(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, err := w.Write([]byte(`{"version": {"number": "1.2.3"}}`))
require.NoError(t, err)
}))
defer ts.Close()

urls := []string{"http://" + ts.Listener.Addr().String()}

e := &Elasticsearch{
URLs: urls,
IndexName: "{{host}}-%Y.%m.%d",
Timeout: config.Duration(time.Second * 5),
EnableGzip: false,
ManageTemplate: false,
OpenSearch: true,
Log: testutil.Logger{},
}

err := e.Connect()
require.NoError(t, err)

require.Equal(t, 7, e.MajorReleaseNumber)

err = e.Write(testutil.MockMetrics())
require.NoError(t, err)
}