Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.14] Fix handling of custom Endpoint when using S3 + SQS #39709

Merged
merged 14 commits into from
May 28, 2024
Merged
Next Next commit
Initial fix of endpoint in a backwards compat way
  • Loading branch information
strawgate committed May 24, 2024
commit 0aa6f22d54d4ab6992fe3599f5ad63a2cd73f1d7
44 changes: 33 additions & 11 deletions x-pack/filebeat/input/awss3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,31 @@ type s3Input struct {
func newInput(config config, store beater.StateStore) (*s3Input, error) {
awsConfig, err := awscommon.InitializeAWSConfig(config.AWSConfig)

strawgate marked this conversation as resolved.
Show resolved Hide resolved
// A custom endpoint has been specified!
if config.AWSConfig.Endpoint != "" {
// Add a custom endpointResolver to the awsConfig so that all the requests are routed to this endpoint
awsConfig.EndpointResolverWithOptions = awssdk.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (awssdk.Endpoint, error) {
return awssdk.Endpoint{
PartitionID: "aws",
URL: config.AWSConfig.Endpoint,
SigningRegion: awsConfig.Region,
}, nil
})
endpointUri, err := url.Parse(config.AWSConfig.Endpoint)

if err != nil {
// Log the error and continue with the default endpoint
fmt.Printf("Failed to parse the endpoint: %v", err)
}

// For backwards compat:
// If the endpoint does not start with S3, we will use the endpoint resolver to all SDK requests through this endpoint
// If the endpoint does start with S3, we will use the default resolver which can replace s3 with the service name

if !strings.HasPrefix(endpointUri.Hostname(), "s3") {
// Get the resolver from the endpoint url
awsConfig.EndpointResolverWithOptions = awssdk.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (awssdk.Endpoint, error) {
return awssdk.Endpoint{
PartitionID: "aws",
Source: awssdk.EndpointSourceCustom,
URL: config.AWSConfig.Endpoint,
SigningRegion: awsConfig.Region,
HostnameImmutable: true,
}, nil
})
}
}

if err != nil {
Expand Down Expand Up @@ -112,7 +128,7 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error {
defer cancelInputCtx()

if in.config.QueueURL != "" {
regionName, err := getRegionFromQueueURL(in.config.QueueURL, in.config.AWSConfig.Endpoint, in.config.RegionName)
regionName, err := getRegionFromQueueURL(in.config.QueueURL, in.config.AWSConfig.Endpoint, in.config.AWSConfig.DefaultRegion)
strawgate marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's uncommon to return an error and a value that the caller should use. Typically these are mutually exclusive. You either get an error OR you get values that you should use. I suggest trying to a do a small bit of refactoring to keep with those conventions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@faec has refactored basically all of this plugin on main including undoing this but it's too different to backport.

I made a series of integration tests which cover all the various combinations of settings but I'm worried refactoring this might not be worth it given it's all going away soon

if err != nil && in.config.RegionName == "" {
return fmt.Errorf("failed to get AWS region from queue_url: %w", err)
}
Expand Down Expand Up @@ -328,11 +344,17 @@ func getRegionFromQueueURL(queueURL string, endpoint, defaultRegion string) (reg
if err != nil {
return "", fmt.Errorf(queueURL + " is not a valid URL")
}

e, err := url.Parse(endpoint)
if err != nil {
return "", fmt.Errorf(endpoint + " is not a valid URL")
}

if (u.Scheme == "https" || u.Scheme == "http") && u.Host != "" {
queueHostSplit := strings.SplitN(u.Host, ".", 3)
queueHostSplit := strings.SplitN(u.Hostname(), ".", 3)
// check for sqs queue url
if len(queueHostSplit) == 3 && queueHostSplit[0] == "sqs" {
if queueHostSplit[2] == endpoint || (endpoint == "" && strings.HasPrefix(queueHostSplit[2], "amazonaws.")) {
if queueHostSplit[2] == e.Hostname() || (endpoint == "" && strings.HasPrefix(queueHostSplit[2], "amazonaws.")) {
region = queueHostSplit[1]
if defaultRegion != "" && region != defaultRegion {
return defaultRegion, regionMismatchError{queueURLRegion: region, defaultRegion: defaultRegion}
Expand Down