Skip to content

Commit

Permalink
Merge pull request #71 from w-h-a/broker-endpoints
Browse files Browse the repository at this point in the history
feat: use broker node option for custom resolution
  • Loading branch information
w-h-a authored Sep 14, 2024
2 parents a6c5fc4 + 2e496e9 commit 811b6ef
Show file tree
Hide file tree
Showing 16 changed files with 261 additions and 200 deletions.
48 changes: 48 additions & 0 deletions broker/snssqs/resolvers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package snssqs

import (
"context"
"net/url"

"github.com/aws/aws-sdk-go-v2/service/sns"
"github.com/aws/aws-sdk-go-v2/service/sqs"
transport "github.com/aws/smithy-go/endpoints"
)

type snsResolver struct {
nodes []string
}

func (r *snsResolver) ResolveEndpoint(ctx context.Context, params sns.EndpointParameters) (transport.Endpoint, error) {
if len(r.nodes) == 0 {
return sns.NewDefaultEndpointResolverV2().ResolveEndpoint(ctx, params)
}

u, err := url.Parse(r.nodes[0])
if err != nil {
return transport.Endpoint{}, err
}

return transport.Endpoint{
URI: *u,
}, nil
}

type sqsResolver struct {
nodes []string
}

func (r *sqsResolver) ResolveEndpoint(ctx context.Context, params sqs.EndpointParameters) (transport.Endpoint, error) {
if len(r.nodes) == 0 {
return sqs.NewDefaultEndpointResolverV2().ResolveEndpoint(ctx, params)
}

u, err := url.Parse(r.nodes[0])
if err != nil {
return transport.Endpoint{}, err
}

return transport.Endpoint{
URI: *u,
}, nil
}
19 changes: 16 additions & 3 deletions broker/snssqs/snssqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,21 @@ func (b *snssqs) configure() error {
return nil
}

cfg, err := awsconfig.LoadDefaultConfig(context.Background(), awsconfig.WithRegion("us-west-2"))
cfg, err := awsconfig.LoadDefaultConfig(
context.Background(),
awsconfig.WithRegion("us-west-2"),
)
if err != nil {
return err
}

if b.options.PublishOptions != nil {
b.snsClient = &snsClient{sns.NewFromConfig(cfg)}
b.snsClient = &snsClient{sns.NewFromConfig(
cfg,
func(o *sns.Options) {
o.EndpointResolverV2 = &snsResolver{b.options.Nodes}
},
)}
}

if b.options.SubscribeOptions != nil {
Expand All @@ -110,7 +118,12 @@ func (b *snssqs) configure() error {
waitTimeSeconds = waitTime
}

client := sqs.NewFromConfig(cfg)
client := sqs.NewFromConfig(
cfg,
func(o *sqs.Options) {
o.EndpointResolverV2 = &sqsResolver{b.options.Nodes}
},
)

url, err := client.GetQueueUrl(context.Background(), &sqs.GetQueueUrlInput{
QueueName: aws.String(b.options.SubscribeOptions.Group),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package controllers
package handlers

import (
"context"
Expand All @@ -9,13 +9,13 @@ import (
"github.com/w-h-a/pkg/utils/metadatautils"
)

type GreetController interface {
type GreetHandler interface {
Greet(ctx context.Context, req *proto.GreetRequest, rsp *proto.GreetResponse) error
}

type greetController struct {}
type greetHandler struct {}

func (c *greetController) Greet(ctx context.Context, req *proto.GreetRequest, rsp *proto.GreetResponse) error {
func (c *greetHandler) Greet(ctx context.Context, req *proto.GreetRequest, rsp *proto.GreetResponse) error {
md, _ := metadatautils.FromContext(ctx)
log.Infof("received Greeter.Greet request with metadata: %v", md)

Expand All @@ -24,17 +24,17 @@ func (c *greetController) Greet(ctx context.Context, req *proto.GreetRequest, rs
return nil
}

func NewGreetController() GreetController {
return &greetController{}
func NewGreetHandler() GreetHandler {
return &greetHandler{}
}

type Greeter struct {
GreetController
GreetHandler
}

func RegisterGreetController(s server.Server, controller GreetController, opts ...server.ControllerOption) error {
return s.RegisterController(
s.NewController(
func RegisterGreetHandler(s server.Server, controller GreetHandler, opts ...server.HandlerOption) error {
return s.Handle(
s.NewHandler(
&Greeter{controller},
opts...,
),
Expand Down
10 changes: 5 additions & 5 deletions examples/greeter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"strings"

"github.com/w-h-a/pkg/examples/greeter/controllers"
"github.com/w-h-a/pkg/examples/greeter/handlers"
"github.com/w-h-a/pkg/server"
"github.com/w-h-a/pkg/server/grpcserver"
"github.com/w-h-a/pkg/telemetry/log"
Expand All @@ -15,19 +15,19 @@ func main() {
server.ServerWithNamespace("app"),
server.ServerWithName("greeter"),
server.ServerWithVersion("v1.0.0"),
server.WrapController(controllerLogWrapper),
server.WrapHandler(handlerLogWrapper),
)

if err := controllers.RegisterGreetController(grpcServer, controllers.NewGreetController()); err != nil {
log.Fatalf("failed to register controller: %v", err)
if err := handlers.RegisterGreetHandler(grpcServer, handlers.NewGreetHandler()); err != nil {
log.Fatalf("failed to register handler: %v", err)
}

if err := grpcServer.Run(); err != nil {
log.Fatalf("failed to run server: %v", err)
}
}

func controllerLogWrapper(fn server.ControllerFunc) server.ControllerFunc {
func handlerLogWrapper(fn server.HandlerFunc) server.HandlerFunc {
return func(ctx context.Context, req server.Request, rsp interface{}) error {
if strings.HasPrefix(req.Method(), "Health.") {
return fn(ctx, req, rsp)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/aws/aws-sdk-go-v2/config v1.27.30
github.com/aws/aws-sdk-go-v2/service/sns v1.31.5
github.com/aws/aws-sdk-go-v2/service/sqs v1.34.5
github.com/aws/smithy-go v1.20.4
github.com/docker/docker v27.2.0+incompatible
github.com/golang-jwt/jwt v3.2.2+incompatible
github.com/google/uuid v1.6.0
Expand Down Expand Up @@ -37,7 +38,6 @@ require (
github.com/aws/aws-sdk-go-v2/service/sso v1.22.5 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.5 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.30.5 // indirect
github.com/aws/smithy-go v1.20.4 // indirect
github.com/containerd/log v0.1.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/distribution/reference v0.6.0 // indirect
Expand Down
7 changes: 0 additions & 7 deletions server/controller.go

This file was deleted.

83 changes: 0 additions & 83 deletions server/grpcserver/grpc_controller.go

This file was deleted.

83 changes: 83 additions & 0 deletions server/grpcserver/grpc_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package grpcserver

import (
"reflect"

"github.com/w-h-a/pkg/server"
)

type grpcHandler struct {
options server.HandlerOptions
name string
receiver reflect.Value
methods map[string]*grpcMethod
}

type grpcMethod struct {
name string
value reflect.Value
ctxType reflect.Type
reqType reflect.Type
rspType reflect.Type
stream bool
}

func (c *grpcHandler) Options() server.HandlerOptions {
return c.options
}

func (c *grpcHandler) Name() string {
return c.name
}

func (c *grpcHandler) String() string {
return "grpc"
}

func NewHandler(handler interface{}, opts ...server.HandlerOption) server.Handler {
// TODO: handler options
options := server.HandlerOptions{}

methods := map[string]*grpcMethod{}

// used to get method data
typeOfHandler := reflect.TypeOf(handler)

for i := 0; i < typeOfHandler.NumMethod(); i++ {
m := typeOfHandler.Method(i)

method := &grpcMethod{
name: m.Name,
value: m.Func,
}

// TODO: make this better/safer
switch m.Type.NumIn() {
case 3:
method.ctxType = m.Type.In(1)
method.rspType = m.Type.In(2)
method.stream = true
case 4:
method.ctxType = m.Type.In(1)
method.reqType = m.Type.In(2)
method.rspType = m.Type.In(3)
}

methods[m.Name] = method
}

// keep the value to use as a receiver in function invocation
valueOfHandler := reflect.ValueOf(handler)

// get the name of the handler struct
nameOfHandler := reflect.Indirect(valueOfHandler).Type().Name()

c := &grpcHandler{
options: options,
name: nameOfHandler,
receiver: valueOfHandler,
methods: methods,
}

return c
}
Loading

0 comments on commit 811b6ef

Please sign in to comment.