Skip to content

Commit

Permalink
feat: broker endpoint options
Browse files Browse the repository at this point in the history
  • Loading branch information
w-h-a committed Sep 13, 2024
1 parent a6c5fc4 commit b58324b
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 8 deletions.
22 changes: 18 additions & 4 deletions broker/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ func NewBrokerOptions(opts ...BrokerOption) BrokerOptions {
type PublishOption func(o *PublishOptions)

type PublishOptions struct {
Topic string
Context context.Context
Topic string
Endpoint string
Context context.Context
}

func PublishWithTopic(topic string) PublishOption {
Expand All @@ -54,6 +55,12 @@ func PublishWithTopic(topic string) PublishOption {
}
}

func PublishWithEndpoint(endpoint string) PublishOption {
return func(o *PublishOptions) {
o.Endpoint = endpoint
}
}

func NewPublishOptions(opts ...PublishOption) PublishOptions {
options := PublishOptions{
Context: context.Background(),
Expand All @@ -69,8 +76,9 @@ func NewPublishOptions(opts ...PublishOption) PublishOptions {
type SubscribeOption func(o *SubscribeOptions)

type SubscribeOptions struct {
Group string
Context context.Context
Group string
Endpoint string
Context context.Context
}

func SubscribeWithGroup(group string) SubscribeOption {
Expand All @@ -79,6 +87,12 @@ func SubscribeWithGroup(group string) SubscribeOption {
}
}

func SubscribeWithEndpoint(endpoint string) SubscribeOption {
return func(o *SubscribeOptions) {
o.Endpoint = endpoint
}
}

func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions {
options := SubscribeOptions{
Context: context.Background(),
Expand Down
49 changes: 49 additions & 0 deletions broker/snssqs/resolvers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package snssqs

import (
"context"
"net/url"

"github.com/aws/aws-sdk-go-v2/service/sns"
"github.com/aws/aws-sdk-go-v2/service/sqs"
transport "github.com/aws/smithy-go/endpoints"
"github.com/w-h-a/pkg/broker"
)

type snsResolver struct {
options broker.PublishOptions
}

func (r *snsResolver) ResolveEndpoint(ctx context.Context, params sns.EndpointParameters) (transport.Endpoint, error) {
if len(r.options.Endpoint) == 0 {
return sns.NewDefaultEndpointResolverV2().ResolveEndpoint(ctx, params)
}

u, err := url.Parse(r.options.Endpoint)
if err != nil {
return transport.Endpoint{}, err
}

return transport.Endpoint{
URI: *u,
}, nil
}

type sqsResolver struct {
options broker.SubscribeOptions
}

func (r *sqsResolver) ResolveEndpoint(ctx context.Context, params sqs.EndpointParameters) (transport.Endpoint, error) {
if len(r.options.Endpoint) == 0 {
return sqs.NewDefaultEndpointResolverV2().ResolveEndpoint(ctx, params)
}

u, err := url.Parse(r.options.Endpoint)
if err != nil {
return transport.Endpoint{}, err
}

return transport.Endpoint{
URI: *u,
}, nil
}
19 changes: 16 additions & 3 deletions broker/snssqs/snssqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,21 @@ func (b *snssqs) configure() error {
return nil
}

cfg, err := awsconfig.LoadDefaultConfig(context.Background(), awsconfig.WithRegion("us-west-2"))
cfg, err := awsconfig.LoadDefaultConfig(
context.Background(),
awsconfig.WithRegion("us-west-2"),
)
if err != nil {
return err
}

if b.options.PublishOptions != nil {
b.snsClient = &snsClient{sns.NewFromConfig(cfg)}
b.snsClient = &snsClient{sns.NewFromConfig(
cfg,
func(o *sns.Options) {
o.EndpointResolverV2 = &snsResolver{*b.options.PublishOptions}
},
)}
}

if b.options.SubscribeOptions != nil {
Expand All @@ -110,7 +118,12 @@ func (b *snssqs) configure() error {
waitTimeSeconds = waitTime
}

client := sqs.NewFromConfig(cfg)
client := sqs.NewFromConfig(
cfg,
func(o *sqs.Options) {
o.EndpointResolverV2 = &sqsResolver{*b.options.SubscribeOptions}
},
)

url, err := client.GetQueueUrl(context.Background(), &sqs.GetQueueUrlInput{
QueueName: aws.String(b.options.SubscribeOptions.Group),
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/aws/aws-sdk-go-v2/config v1.27.30
github.com/aws/aws-sdk-go-v2/service/sns v1.31.5
github.com/aws/aws-sdk-go-v2/service/sqs v1.34.5
github.com/aws/smithy-go v1.20.4
github.com/docker/docker v27.2.0+incompatible
github.com/golang-jwt/jwt v3.2.2+incompatible
github.com/google/uuid v1.6.0
Expand Down Expand Up @@ -37,7 +38,6 @@ require (
github.com/aws/aws-sdk-go-v2/service/sso v1.22.5 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.5 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.30.5 // indirect
github.com/aws/smithy-go v1.20.4 // indirect
github.com/containerd/log v0.1.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/distribution/reference v0.6.0 // indirect
Expand Down

0 comments on commit b58324b

Please sign in to comment.