Skip to content

Commit

Permalink
[8.14] Fix handling of custom Endpoint when using S3 + SQS (#39709)
Browse files Browse the repository at this point in the history
Fix handling of custom Endpoint when using S3 + SQS
  • Loading branch information
strawgate authored May 28, 2024
1 parent 497f7ed commit 35eccb8
Show file tree
Hide file tree
Showing 3 changed files with 257 additions and 17 deletions.
92 changes: 76 additions & 16 deletions x-pack/filebeat/input/awss3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,20 +69,38 @@ type s3Input struct {

func newInput(config config, store beater.StateStore) (*s3Input, error) {
awsConfig, err := awscommon.InitializeAWSConfig(config.AWSConfig)
if err != nil {
return nil, fmt.Errorf("failed to initialize AWS credentials: %w", err)
}

if config.AWSConfig.Endpoint != "" {
// Add a custom endpointResolver to the awsConfig so that all the requests are routed to this endpoint
awsConfig.EndpointResolverWithOptions = awssdk.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (awssdk.Endpoint, error) {
return awssdk.Endpoint{
PartitionID: "aws",
URL: config.AWSConfig.Endpoint,
SigningRegion: awsConfig.Region,
}, nil
})
// The awsConfig now contains the region from the credential profile or default region
// if the region is explicitly set in the config, then it wins
if config.RegionName != "" {
awsConfig.Region = config.RegionName
}

if err != nil {
return nil, fmt.Errorf("failed to initialize AWS credentials: %w", err)
// A custom endpoint has been specified!
if config.AWSConfig.Endpoint != "" {

// Parse a URL for the host regardless of it missing the scheme
endpointUri, err := url.Parse(config.AWSConfig.Endpoint)
if err != nil {
return nil, fmt.Errorf("failed to parse endpoint: %w", err)
}

// For backwards compat:
// If the endpoint does not start with S3, we will use the endpoint resolver to make all SDK requests use the specified endpoint
// If the endpoint does start with S3, we will use the default resolver uses the endpoint field but can replace s3 with the desired service name like sqs
if !strings.HasPrefix(endpointUri.Hostname(), "s3") {
awsConfig.EndpointResolverWithOptions = awssdk.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (awssdk.Endpoint, error) {
return awssdk.Endpoint{
PartitionID: "aws",
Source: awssdk.EndpointSourceCustom,
URL: config.AWSConfig.Endpoint,
SigningRegion: awsConfig.Region,
}, nil
})
}
}

return &s3Input{
Expand Down Expand Up @@ -112,16 +130,23 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error {
defer cancelInputCtx()

if in.config.QueueURL != "" {
regionName, err := getRegionFromQueueURL(in.config.QueueURL, in.config.AWSConfig.Endpoint, in.config.RegionName)
if err != nil && in.config.RegionName == "" {
return fmt.Errorf("failed to get AWS region from queue_url: %w", err)
regionName, err := getRegionFromQueueURL(in.config.QueueURL, in.config.AWSConfig.Endpoint, in.config.AWSConfig.DefaultRegion)

// If we can't get a region from anywhere, error out
if err != nil && regionName == "" && in.config.RegionName == "" {
return fmt.Errorf("region not specified and failed to get AWS region from queue_url: %w", err)
}
var warn regionMismatchError
if errors.As(err, &warn) {
// Warn of mismatch, but go ahead with configured region name.
inputContext.Logger.Warnf("%v: using %q", err, regionName)
}
in.awsConfig.Region = regionName

// Ensure we don't overwrite region when getRegionFromURL fails
// Ensure we don't overwrite a user-specified region with a parsed region.
if regionName != "" && in.config.RegionName == "" {
in.awsConfig.Region = regionName
}

// Create SQS receiver and S3 notification processor.
receiver, err := in.createSQSReceiver(inputContext, pipeline)
Expand Down Expand Up @@ -186,7 +211,11 @@ func (in *s3Input) createSQSReceiver(ctx v2.Context, pipeline beat.Pipeline) (*s
if in.config.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
if in.config.AWSConfig.Endpoint != "" {
o.EndpointResolver = sqs.EndpointResolverFromURL(in.config.AWSConfig.Endpoint)
}
}),

queueURL: in.config.QueueURL,
apiTimeout: in.config.APITimeout,
visibilityTimeout: in.config.VisibilityTimeout,
Expand All @@ -198,6 +227,9 @@ func (in *s3Input) createSQSReceiver(ctx v2.Context, pipeline beat.Pipeline) (*s
if in.config.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
if in.config.AWSConfig.Endpoint != "" {
o.EndpointResolver = s3.EndpointResolverFromURL(in.config.AWSConfig.Endpoint)
}
o.UsePathStyle = in.config.PathStyle
}),
}
Expand Down Expand Up @@ -322,17 +354,45 @@ var errBadQueueURL = errors.New("QueueURL is not in format: https://sqs.{REGION_

func getRegionFromQueueURL(queueURL string, endpoint, defaultRegion string) (region string, err error) {
// get region from queueURL
// Example for custom domain queue: https://sqs.us-east-1.abc.xyz/12345678912/test-s3-logs
// Example for sqs queue: https://sqs.us-east-1.amazonaws.com/12345678912/test-s3-logs
// Example for vpce: https://vpce-test.sqs.us-east-1.vpce.amazonaws.com/12345678912/sqs-queue
u, err := url.Parse(queueURL)
if err != nil {
return "", fmt.Errorf(queueURL + " is not a valid URL")
}

e, err := url.Parse(endpoint)
if err != nil {
return "", fmt.Errorf(endpoint + " is not a valid URL")
}

if (u.Scheme == "https" || u.Scheme == "http") && u.Host != "" {
queueHostSplit := strings.SplitN(u.Host, ".", 3)
endpointSplit := strings.SplitN(e.Host, ".", 3)
// check for sqs queue url

// Parse a user-provided custom endpoint
if endpoint != "" && queueHostSplit[0] == "sqs" && len(queueHostSplit) == 3 && len(endpointSplit) == 3 {
// Check if everything after the second dot in the queue url matches everything after the second dot in the endpoint
endpointMatchesQueueUrl := strings.SplitN(u.Hostname(), ".", 3)[2] == strings.SplitN(e.Hostname(), ".", 3)[2]
if !endpointMatchesQueueUrl {
// We couldn't resolve the URL
// We cannot infer the region by matching the endpoint and queue url, return the default region with a region mismatch warning
return defaultRegion, regionMismatchError{queueURLRegion: queueHostSplit[1], defaultRegion: endpointSplit[1]}
}

region = queueHostSplit[1]
if defaultRegion != "" && region != defaultRegion {
return region, regionMismatchError{queueURLRegion: region, defaultRegion: defaultRegion}
}
return region, nil
}

// Parse a standard SQS url
if len(queueHostSplit) == 3 && queueHostSplit[0] == "sqs" {
if queueHostSplit[2] == endpoint || (endpoint == "" && strings.HasPrefix(queueHostSplit[2], "amazonaws.")) {
// handle endpoint with no scheme, handle endpoint with scheme
if queueHostSplit[2] == endpoint || queueHostSplit[2] == e.Host || (endpoint == "" && strings.HasPrefix(queueHostSplit[2], "amazonaws.")) {
region = queueHostSplit[1]
if defaultRegion != "" && region != defaultRegion {
return defaultRegion, regionMismatchError{queueURLRegion: region, defaultRegion: defaultRegion}
Expand Down
170 changes: 169 additions & 1 deletion x-pack/filebeat/input/awss3/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,174 @@ func TestInputRunSQSOnLocalstack(t *testing.T) {
assert.EqualValues(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) // Workers are reset after processing and hence utilization should be 0 at the end
}

func TestInputRunSQSWithConfig(t *testing.T) {
tests := []struct {
name string
queue_url string
endpoint string
region string
default_region string
want string
wantErr error
}{
{
name: "no region",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
want: "us-east-1",
},
{
name: "no region but with long endpoint",
queue_url: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs",
endpoint: "https://s3.us-east-1.abc.xyz",
want: "us-east-1",
},
{
name: "no region but with short endpoint",
queue_url: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs",
endpoint: "https://abc.xyz",
want: "us-east-1",
},
{
name: "no region custom queue domain",
queue_url: "https://sqs.us-east-1.xyz.abc/627959692251/test-s3-logs",
wantErr: errBadQueueURL,
},
{
name: "region",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
region: "us-west-2",
want: "us-west-2",
},
{
name: "default_region",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
default_region: "us-west-2",
want: "us-west-2",
},
{
name: "region and default_region",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
region: "us-east-2",
default_region: "us-east-3",
want: "us-east-2",
},
{
name: "short_endpoint",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
endpoint: "https://amazonaws.com",
want: "us-east-1",
},
{
name: "long_endpoint",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
endpoint: "https://s3.us-east-1.amazonaws.com",
want: "us-east-1",
},
{
name: "region and custom short_endpoint",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
region: "us-west-2",
endpoint: "https://.elastic.co",
want: "us-west-2",
},
{
name: "region and custom long_endpoint",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
region: "us-west-2",
endpoint: "https://s3.us-east-1.elastic.co",
want: "us-west-2",
},
{
name: "region and short_endpoint",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
region: "us-west-2",
endpoint: "https://amazonaws.com",
want: "us-west-2",
},
{
name: "region and long_endpoint",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
region: "us-west-2",
endpoint: "https://s3.us-east-1.amazonaws.com",
want: "us-west-2",
},
{
name: "region and default region and short_endpoint",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
region: "us-west-2",
default_region: "us-east-1",
endpoint: "https://amazonaws.com",
want: "us-west-2",
},
{
name: "region and default region and long_endpoint",
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
region: "us-west-2",
default_region: "us-east-1",
endpoint: "https://s3.us-east-1.amazonaws.com",
want: "us-west-2",
},
}

for _, test := range tests {
logp.TestingSetup()

// Create a filebeat config using the provided test parameters
config := ""
if test.queue_url != "" {
config += fmt.Sprintf("queue_url: %s \n", test.queue_url)
}
if test.region != "" {
config += fmt.Sprintf("region: %s \n", test.region)
}
if test.default_region != "" {
config += fmt.Sprintf("default_region: %s \n", test.default_region)
}
if test.endpoint != "" {
config += fmt.Sprintf("endpoint: %s \n", test.endpoint)
}

s3Input := createInput(t, conf.MustNewConfigFrom(config))

inputCtx, cancel := newV2Context()
t.Cleanup(cancel)
time.AfterFunc(5*time.Second, func() {
cancel()
})

var errGroup errgroup.Group
errGroup.Go(func() error {
return s3Input.Run(inputCtx, &fakePipeline{})
})

if err := errGroup.Wait(); err != nil {
// assert that err == test.wantErr
if test.wantErr != nil {
continue
}
// Print the test name to help identify the failing test
t.Fatal(test.name, err)
}

// If the endpoint starts with s3, the endpoint resolver should be null at this point
// If the endpoint does not start with s3, the endpointresolverwithoptions should be set
// If the endpoint is not set, the endpoint resolver should be null
if test.endpoint == "" {
assert.Nil(t, s3Input.awsConfig.EndpointResolver, test.name)
assert.Nil(t, s3Input.awsConfig.EndpointResolverWithOptions, test.name)
} else if strings.HasPrefix(test.endpoint, "https://s3") {
// S3 resolvers are added later in the code than this integration test covers
assert.Nil(t, s3Input.awsConfig.EndpointResolver, test.name)
assert.Nil(t, s3Input.awsConfig.EndpointResolverWithOptions, test.name)
} else { // If the endpoint is specified but is not s3
assert.Nil(t, s3Input.awsConfig.EndpointResolver, test.name)
assert.NotNil(t, s3Input.awsConfig.EndpointResolverWithOptions, test.name)
}

assert.EqualValues(t, test.want, s3Input.awsConfig.Region, test.name)
}
}

func TestInputRunSQS(t *testing.T) {
logp.TestingSetup()

Expand Down Expand Up @@ -353,7 +521,7 @@ func TestInputRunS3(t *testing.T) {
assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7)
assert.EqualValues(t, s3Input.metrics.s3ObjectsListedTotal.Get(), 8)
assert.EqualValues(t, s3Input.metrics.s3ObjectsProcessedTotal.Get(), 7)
assert.EqualValues(t, s3Input.metrics.s3ObjectsAckedTotal.Get(), 6)
assert.EqualValues(t, s3Input.metrics.s3ObjectsAckedTotal.Get(), 7)
assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12)
}

Expand Down
12 changes: 12 additions & 0 deletions x-pack/filebeat/input/awss3/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,18 @@ func TestGetRegionFromQueueURL(t *testing.T) {
endpoint: "abc.xyz",
want: "us-east-1",
},
{
name: "abc.xyz_and_domain_with_matching_endpoint_and_scheme",
queueURL: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs",
endpoint: "https://abc.xyz",
want: "us-east-1",
},
{
name: "abc.xyz_and_domain_with_matching_url_endpoint",
queueURL: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs",
endpoint: "https://s3.us-east-1.abc.xyz",
want: "us-east-1",
},
{
name: "abc.xyz_and_domain_with_blank_endpoint",
queueURL: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs",
Expand Down

0 comments on commit 35eccb8

Please sign in to comment.