Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(http): support aws prometheus #10202

Merged
merged 1 commit into from
Dec 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions plugins/outputs/http/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,27 @@ batch format by default.
## Maximum amount of time before idle connection is closed.
## Zero means no limit.
# idle_conn_timeout = 0

## Amazon Region
#region = "us-east-1"

## Amazon Credentials
## Credentials are loaded in the following order
## 1) Web identity provider credentials via STS if role_arn and web_identity_token_file are specified
## 2) Assumed credentials via STS if role_arn is specified
## 3) explicit credentials from 'access_key' and 'secret_key'
## 4) shared profile from 'profile'
## 5) environment variables
## 6) shared credentials file
## 7) EC2 Instance Profile
#access_key = ""
#secret_key = ""
#token = ""
#role_arn = ""
#web_identity_token_file = ""
#role_session_name = ""
#profile = ""
#shared_credential_file = ""
```

### Optional Cookie Authentication Settings
Expand Down
69 changes: 69 additions & 0 deletions plugins/outputs/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@ import (
"bufio"
"bytes"
"context"
"crypto/sha256"
"fmt"
"io"
"net/http"
"strings"
"time"

awsV2 "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/aws/signer/v4"
"github.com/influxdata/telegraf"
internalaws "github.com/influxdata/telegraf/config/aws"
"github.com/influxdata/telegraf/internal"
httpconfig "github.com/influxdata/telegraf/plugins/common/http"
"github.com/influxdata/telegraf/plugins/outputs"
Expand Down Expand Up @@ -81,6 +86,27 @@ var sampleConfig = `
## Maximum amount of time before idle connection is closed.
## Zero means no limit.
# idle_conn_timeout = 0

## Amazon Region
#region = "us-east-1"

## Amazon Credentials
## Credentials are loaded in the following order
## 1) Web identity provider credentials via STS if role_arn and web_identity_token_file are specified
## 2) Assumed credentials via STS if role_arn is specified
## 3) explicit credentials from 'access_key' and 'secret_key'
## 4) shared profile from 'profile'
## 5) environment variables
## 6) shared credentials file
## 7) EC2 Instance Profile
#access_key = ""
#secret_key = ""
#token = ""
#role_arn = ""
#web_identity_token_file = ""
#role_session_name = ""
#profile = ""
#shared_credential_file = ""
`

const (
Expand All @@ -97,18 +123,29 @@ type HTTP struct {
Headers map[string]string `toml:"headers"`
ContentEncoding string `toml:"content_encoding"`
UseBatchFormat bool `toml:"use_batch_format"`
AwsService string `toml:"aws_service"`
httpconfig.HTTPClientConfig
Log telegraf.Logger `toml:"-"`

client *http.Client
serializer serializers.Serializer

awsCfg *awsV2.Config
internalaws.CredentialConfig
}

func (h *HTTP) SetSerializer(serializer serializers.Serializer) {
h.serializer = serializer
}

func (h *HTTP) Connect() error {
if h.AwsService != "" {
cfg, err := h.CredentialConfig.Credentials()
if err == nil {
h.awsCfg = &cfg
}
}

if h.Method == "" {
h.Method = http.MethodPost
}
Expand Down Expand Up @@ -180,11 +217,43 @@ func (h *HTTP) writeMetric(reqBody []byte) error {
reqBodyBuffer = rc
}

var payloadHash *string
if h.awsCfg != nil {
// We need a local copy of the full buffer, the signature scheme requires a sha256 of the request body.
buf := new(bytes.Buffer)
_, err = io.Copy(buf, reqBodyBuffer)
if err != nil {
return err
}

sum := sha256.Sum256(buf.Bytes())
reqBodyBuffer = buf

// sha256 is hex encoded
hash := fmt.Sprintf("%x", sum)
payloadHash = &hash
}

req, err := http.NewRequest(h.Method, h.URL, reqBodyBuffer)
if err != nil {
return err
}

if h.awsCfg != nil {
signer := v4.NewSigner()
ctx := context.Background()

credentials, err := h.awsCfg.Credentials.Retrieve(ctx)
if err != nil {
return err
}

err = signer.SignHTTP(ctx, credentials, req, *payloadHash, h.AwsService, h.Region, time.Now().UTC())
if err != nil {
return err
}
}

if h.Username != "" || h.Password != "" {
req.SetBasicAuth(h.Username, h.Password)
}
Expand Down
51 changes: 51 additions & 0 deletions plugins/outputs/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf"
internalaws "github.com/influxdata/telegraf/config/aws"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric"
httpconfig "github.com/influxdata/telegraf/plugins/common/http"
Expand Down Expand Up @@ -516,3 +517,53 @@ func TestBatchedUnbatched(t *testing.T) {
})
}
}

func TestAwsCredentials(t *testing.T) {
ts := httptest.NewServer(http.NotFoundHandler())
defer ts.Close()

u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
require.NoError(t, err)

tests := []struct {
name string
plugin *HTTP
tokenHandler TestHandlerFunc
handler TestHandlerFunc
}{
{
name: "simple credentials",
plugin: &HTTP{
URL: u.String(),
AwsService: "aps",
CredentialConfig: internalaws.CredentialConfig{
Region: "us-east-1",
AccessKey: "dummy",
SecretKey: "dummy",
},
},
handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
require.Contains(t, r.Header["Authorization"][0], "AWS4-HMAC-SHA256")
require.Contains(t, r.Header["Authorization"][0], "=dummy/")
require.Contains(t, r.Header["Authorization"][0], "/us-east-1/")
w.WriteHeader(http.StatusOK)
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
tt.handler(t, w, r)
})

serializer := influx.NewSerializer()
tt.plugin.SetSerializer(serializer)
err = tt.plugin.Connect()
require.NoError(t, err)

err = tt.plugin.Write([]telegraf.Metric{getMetric()})
require.NoError(t, err)
})
}
}