Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

aws-s3 input: fix SQS region selection #39327

Merged
merged 1 commit into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 42 additions & 27 deletions x-pack/filebeat/input/awss3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"github.com/elastic/beats/v7/libbeat/feature"
awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/go-concert/unison"
)

Expand Down Expand Up @@ -72,7 +73,7 @@

if config.AWSConfig.Endpoint != "" {
// Add a custom endpointResolver to the awsConfig so that all the requests are routed to this endpoint
awsConfig.EndpointResolverWithOptions = awssdk.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (awssdk.Endpoint, error) {

Check failure on line 76 in x-pack/filebeat/input/awss3/input.go

View workflow job for this annotation

GitHub Actions / lint (windows)

SA1019: awsConfig.EndpointResolverWithOptions is deprecated: with the release of endpoint resolution v2 in API clients, EndpointResolver and EndpointResolverWithOptions are deprecated. Providing a value for this field will likely prevent you from using newer endpoint-related service features. See API client options EndpointResolverV2 and BaseEndpoint. (staticcheck)

Check failure on line 76 in x-pack/filebeat/input/awss3/input.go

View workflow job for this annotation

GitHub Actions / lint (linux)

SA1019: awsConfig.EndpointResolverWithOptions is deprecated: with the release of endpoint resolution v2 in API clients, EndpointResolver and EndpointResolverWithOptions are deprecated. Providing a value for this field will likely prevent you from using newer endpoint-related service features. See API client options EndpointResolverV2 and BaseEndpoint. (staticcheck)
return awssdk.Endpoint{
PartitionID: "aws",
URL: config.AWSConfig.Endpoint,
Expand Down Expand Up @@ -117,17 +118,12 @@
inputContext v2.Context,
pipeline beat.Pipeline,
) error {
configRegion := in.config.RegionName
urlRegion, err := getRegionFromQueueURL(in.config.QueueURL, in.config.AWSConfig.Endpoint)
if err != nil && configRegion == "" {
// Only report an error if we don't have a configured region
// to fall back on.
return fmt.Errorf("failed to get AWS region from queue_url: %w", err)
} else if configRegion != "" && configRegion != urlRegion {
inputContext.Logger.Warnf("configured region disagrees with queue_url region (%q != %q): using %q", configRegion, urlRegion, urlRegion)
// Set awsConfig.Region based on the config and queue URL
region, err := chooseRegion(inputContext.Logger, in.config)
if err != nil {
return err
}

in.awsConfig.Region = urlRegion
in.awsConfig.Region = region

// Create SQS receiver and S3 notification processor.
receiver, err := in.createSQSReceiver(inputContext, pipeline)
Expand Down Expand Up @@ -326,32 +322,51 @@

var errBadQueueURL = errors.New("QueueURL is not in format: https://sqs.{REGION_ENDPOINT}.{ENDPOINT}/{ACCOUNT_NUMBER}/{QUEUE_NAME} or https://{VPC_ENDPOINT}.sqs.{REGION_ENDPOINT}.vpce.{ENDPOINT}/{ACCOUNT_NUMBER}/{QUEUE_NAME}")

func getRegionFromQueueURL(queueURL, endpoint string) (string, error) {
func chooseRegion(log *logp.Logger, config config) (string, error) {
urlRegion := getRegionFromQueueURL(config.QueueURL, config.AWSConfig.Endpoint)
if config.RegionName != "" {
// If a region is configured, that takes precedence over the URL.
if log != nil && config.RegionName != urlRegion {
log.Warnf("configured region disagrees with queue_url region (%q != %q): using %q", config.RegionName, urlRegion, config.RegionName)
}
return config.RegionName, nil
}
if urlRegion != "" {
// If no region is configured, fall back on the URL.
return urlRegion, nil
}
// If we can't get the region from the config or the URL, report an error.
return "", fmt.Errorf("failed to get AWS region from queue_url: %w", errBadQueueURL)
}

// getRegionFromQueueURL returns the region from standard queue URLs, or the
// empty string if it couldn't be determined.
func getRegionFromQueueURL(queueURL, endpoint string) string {
// get region from queueURL
// Example for sqs queue: https://sqs.us-east-1.amazonaws.com/12345678912/test-s3-logs
// Example for vpce: https://vpce-test.sqs.us-east-1.vpce.amazonaws.com/12345678912/sqs-queue
u, err := url.Parse(queueURL)
if err != nil {
return "", fmt.Errorf(queueURL + " is not a valid URL")
}
if (u.Scheme == "https" || u.Scheme == "http") && u.Host != "" {
queueHostSplit := strings.SplitN(u.Host, ".", 3)
// check for sqs queue url
if len(queueHostSplit) == 3 && queueHostSplit[0] == "sqs" {
if queueHostSplit[2] == endpoint || (endpoint == "" && strings.HasPrefix(queueHostSplit[2], "amazonaws.")) {
return queueHostSplit[1], nil
}
return ""
}

// check for sqs queue url
host := strings.SplitN(u.Host, ".", 3)
if len(host) == 3 && host[0] == "sqs" {
if host[2] == endpoint || (endpoint == "" && strings.HasPrefix(host[2], "amazonaws.")) {
return host[1]
}
}

// check for vpce url
queueHostSplitVPC := strings.SplitN(u.Host, ".", 5)
if len(queueHostSplitVPC) == 5 && queueHostSplitVPC[1] == "sqs" {
if queueHostSplitVPC[4] == endpoint || (endpoint == "" && strings.HasPrefix(queueHostSplitVPC[4], "amazonaws.")) {
return queueHostSplitVPC[2], nil
}
// check for vpce url
host = strings.SplitN(u.Host, ".", 5)
if len(host) == 5 && host[1] == "sqs" {
if host[4] == endpoint || (endpoint == "" && strings.HasPrefix(host[4], "amazonaws.")) {
return host[2]
}
}
return "", errBadQueueURL

return ""
}

func getRegionForBucket(ctx context.Context, s3Client *s3.Client, bucketName string) (string, error) {
Expand Down
68 changes: 50 additions & 18 deletions x-pack/filebeat/input/awss3/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
package awss3

import (
"errors"
"testing"

aws "github.com/elastic/beats/v7/x-pack/libbeat/common/aws"

Check failure on line 11 in x-pack/filebeat/input/awss3/input_test.go

View workflow job for this annotation

GitHub Actions / lint (windows)

File is not `goimports`-ed with -local github.com/elastic (goimports)

Check failure on line 11 in x-pack/filebeat/input/awss3/input_test.go

View workflow job for this annotation

GitHub Actions / lint (linux)

File is not `goimports`-ed with -local github.com/elastic (goimports)
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -51,23 +53,36 @@

func TestGetRegionFromQueueURL(t *testing.T) {
tests := []struct {
name string
queueURL string
endpoint string
want string
wantErr error
name string
queueURL string
regionName string
endpoint string
want string
wantErr error
}{
{
name: "amazonaws.com_domain_with_blank_endpoint",
queueURL: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
want: "us-east-1",
},
{
name: "amazonaws.com_domain_with_region_override",
queueURL: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
regionName: "us-east-2",
want: "us-east-2",
},
{
name: "abc.xyz_and_domain_with_matching_endpoint",
queueURL: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs",
endpoint: "abc.xyz",
want: "us-east-1",
},
{
name: "abc.xyz_with_region_override",
queueURL: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs",
regionName: "us-west-3",
want: "us-west-3",
},
{
name: "abc.xyz_and_domain_with_blank_endpoint",
queueURL: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs",
Expand All @@ -78,18 +93,46 @@
queueURL: "https://vpce-test.sqs.us-east-2.vpce.amazonaws.com/12345678912/sqs-queue",
want: "us-east-2",
},
{
name: "vpce_endpoint_with_region_override",
queueURL: "https://vpce-test.sqs.us-east-2.vpce.amazonaws.com/12345678912/sqs-queue",
regionName: "us-west-1",
want: "us-west-1",
},
{
name: "vpce_endpoint_with_endpoint",
queueURL: "https://vpce-test.sqs.us-east-1.vpce.amazonaws.com/12345678912/sqs-queue",
endpoint: "amazonaws.com",
want: "us-east-1",
},
{
name: "non_aws_vpce_with_endpoint",
queueURL: "https://vpce-test.sqs.us-east-1.vpce.abc.xyz/12345678912/sqs-queue",
endpoint: "abc.xyz",
want: "us-east-1",
},
{
name: "non_aws_vpce_without_endpoint",
queueURL: "https://vpce-test.sqs.us-east-1.vpce.abc.xyz/12345678912/sqs-queue",
wantErr: errBadQueueURL,
},
{
name: "non_aws_vpce_with_region_override",
queueURL: "https://vpce-test.sqs.us-east-1.vpce.abc.xyz/12345678912/sqs-queue",
regionName: "us-west-1",
want: "us-west-1",
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
got, err := getRegionFromQueueURL(test.queueURL, test.endpoint)
if !sameError(err, test.wantErr) {
config := config{
QueueURL: test.queueURL,
RegionName: test.regionName,
AWSConfig: aws.ConfigAWS{Endpoint: test.endpoint},
}
got, err := chooseRegion(nil, config)
if !errors.Is(err, test.wantErr) {
t.Errorf("unexpected error: got:%v want:%v", err, test.wantErr)
}
if got != test.want {
Expand All @@ -98,14 +141,3 @@
})
}
}

func sameError(a, b error) bool {
switch {
case a == nil && b == nil:
return true
case a == nil, b == nil:
return false
default:
return a.Error() == b.Error()
}
}
Loading