diff --git a/broker/options.go b/broker/options.go index 78948c6..4e1ee03 100644 --- a/broker/options.go +++ b/broker/options.go @@ -44,8 +44,9 @@ func NewBrokerOptions(opts ...BrokerOption) BrokerOptions { type PublishOption func(o *PublishOptions) type PublishOptions struct { - Topic string - Context context.Context + Topic string + Endpoint string + Context context.Context } func PublishWithTopic(topic string) PublishOption { @@ -54,6 +55,12 @@ func PublishWithTopic(topic string) PublishOption { } } +func PublishWithEndpoint(endpoint string) PublishOption { + return func(o *PublishOptions) { + o.Endpoint = endpoint + } +} + func NewPublishOptions(opts ...PublishOption) PublishOptions { options := PublishOptions{ Context: context.Background(), @@ -69,8 +76,9 @@ func NewPublishOptions(opts ...PublishOption) PublishOptions { type SubscribeOption func(o *SubscribeOptions) type SubscribeOptions struct { - Group string - Context context.Context + Group string + Endpoint string + Context context.Context } func SubscribeWithGroup(group string) SubscribeOption { @@ -79,6 +87,12 @@ func SubscribeWithGroup(group string) SubscribeOption { } } +func SubscribeWithEndpoint(endpoint string) SubscribeOption { + return func(o *SubscribeOptions) { + o.Endpoint = endpoint + } +} + func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions { options := SubscribeOptions{ Context: context.Background(), diff --git a/broker/snssqs/resolvers.go b/broker/snssqs/resolvers.go new file mode 100644 index 0000000..7d1c061 --- /dev/null +++ b/broker/snssqs/resolvers.go @@ -0,0 +1,49 @@ +package snssqs + +import ( + "context" + "net/url" + + "github.com/aws/aws-sdk-go-v2/service/sns" + "github.com/aws/aws-sdk-go-v2/service/sqs" + transport "github.com/aws/smithy-go/endpoints" + "github.com/w-h-a/pkg/broker" +) + +type snsResolver struct { + options broker.PublishOptions +} + +func (r *snsResolver) ResolveEndpoint(ctx context.Context, params sns.EndpointParameters) (transport.Endpoint, error) { + if len(r.options.Endpoint) == 0 { + return sns.NewDefaultEndpointResolverV2().ResolveEndpoint(ctx, params) + } + + u, err := url.Parse(r.options.Endpoint) + if err != nil { + return transport.Endpoint{}, err + } + + return transport.Endpoint{ + URI: *u, + }, nil +} + +type sqsResolver struct { + options broker.SubscribeOptions +} + +func (r *sqsResolver) ResolveEndpoint(ctx context.Context, params sqs.EndpointParameters) (transport.Endpoint, error) { + if len(r.options.Endpoint) == 0 { + return sqs.NewDefaultEndpointResolverV2().ResolveEndpoint(ctx, params) + } + + u, err := url.Parse(r.options.Endpoint) + if err != nil { + return transport.Endpoint{}, err + } + + return transport.Endpoint{ + URI: *u, + }, nil +} diff --git a/broker/snssqs/snssqs.go b/broker/snssqs/snssqs.go index 53cc90c..66ce84e 100644 --- a/broker/snssqs/snssqs.go +++ b/broker/snssqs/snssqs.go @@ -88,13 +88,21 @@ func (b *snssqs) configure() error { return nil } - cfg, err := awsconfig.LoadDefaultConfig(context.Background(), awsconfig.WithRegion("us-west-2")) + cfg, err := awsconfig.LoadDefaultConfig( + context.Background(), + awsconfig.WithRegion("us-west-2"), + ) if err != nil { return err } if b.options.PublishOptions != nil { - b.snsClient = &snsClient{sns.NewFromConfig(cfg)} + b.snsClient = &snsClient{sns.NewFromConfig( + cfg, + func(o *sns.Options) { + o.EndpointResolverV2 = &snsResolver{*b.options.PublishOptions} + }, + )} } if b.options.SubscribeOptions != nil { @@ -110,7 +118,12 @@ func (b *snssqs) configure() error { waitTimeSeconds = waitTime } - client := sqs.NewFromConfig(cfg) + client := sqs.NewFromConfig( + cfg, + func(o *sqs.Options) { + o.EndpointResolverV2 = &sqsResolver{*b.options.SubscribeOptions} + }, + ) url, err := client.GetQueueUrl(context.Background(), &sqs.GetQueueUrlInput{ QueueName: aws.String(b.options.SubscribeOptions.Group), diff --git a/go.mod b/go.mod index 840041e..eea9ffa 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/aws/aws-sdk-go-v2/config v1.27.30 github.com/aws/aws-sdk-go-v2/service/sns v1.31.5 github.com/aws/aws-sdk-go-v2/service/sqs v1.34.5 + github.com/aws/smithy-go v1.20.4 github.com/docker/docker v27.2.0+incompatible github.com/golang-jwt/jwt v3.2.2+incompatible github.com/google/uuid v1.6.0 @@ -37,7 +38,6 @@ require ( github.com/aws/aws-sdk-go-v2/service/sso v1.22.5 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.5 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.30.5 // indirect - github.com/aws/smithy-go v1.20.4 // indirect github.com/containerd/log v0.1.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/distribution/reference v0.6.0 // indirect