Skip to content

Commit

Permalink
Merge pull request #62 from w-h-a/pubsub
Browse files Browse the repository at this point in the history
feat: pub sub
  • Loading branch information
w-h-a authored Aug 29, 2024
2 parents 0e818a0 + fd92f47 commit ef15e57
Show file tree
Hide file tree
Showing 10 changed files with 425 additions and 229 deletions.
13 changes: 10 additions & 3 deletions broker/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,16 @@ func (b *memory) Publish(data interface{}, options broker.PublishOptions) error
}
b.mtx.RUnlock()

bs, err := json.Marshal(data)
if err != nil {
return err
var bs []byte

if p, ok := data.([]byte); ok {
bs = p
} else {
p, err := json.Marshal(data)
if err != nil {
return err
}
bs = p
}

for _, sub := range subsOfThisTopic {
Expand Down
8 changes: 4 additions & 4 deletions broker/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ type BrokerOption func(o *BrokerOptions)

type BrokerOptions struct {
Nodes []string
PublishOptions PublishOptions
SubscribeOptions SubscribeOptions
PublishOptions *PublishOptions
SubscribeOptions *SubscribeOptions
Context context.Context
}

Expand All @@ -17,13 +17,13 @@ func BrokerWithNodes(addrs ...string) BrokerOption {
}
}

func BrokerWithPublishOptions(options PublishOptions) BrokerOption {
func BrokerWithPublishOptions(options *PublishOptions) BrokerOption {
return func(o *BrokerOptions) {
o.PublishOptions = options
}
}

func BrokerWithSubscribeOptions(options SubscribeOptions) BrokerOption {
func BrokerWithSubscribeOptions(options *SubscribeOptions) BrokerOption {
return func(o *BrokerOptions) {
o.SubscribeOptions = options
}
Expand Down
31 changes: 23 additions & 8 deletions broker/snssqs/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package snssqs

import (
"context"
"encoding/json"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/sns"
Expand Down Expand Up @@ -37,13 +38,18 @@ type SqsClient interface {

type sqsClient struct {
*sqs.Client
queueUrl *string
visibilityTimeout int32
waitTimeSeconds int32
}

type sqsMsg struct {
Message string `json:"message"`
}

func (c *sqsClient) ConsumeFromGroup(sub broker.Subscriber) {
result, err := c.ReceiveMessage(context.Background(), &sqs.ReceiveMessageInput{
QueueUrl: aws.String(sub.Options().Group),
QueueUrl: c.queueUrl,
MaxNumberOfMessages: 1,
VisibilityTimeout: c.visibilityTimeout,
WaitTimeSeconds: c.waitTimeSeconds,
Expand All @@ -60,14 +66,23 @@ func (c *sqsClient) ConsumeFromGroup(sub broker.Subscriber) {

for _, msg := range result.Messages {
body := msg.Body
if err := sub.Handler([]byte(*body)); err != nil {

var sqsMsg sqsMsg

if err := json.Unmarshal([]byte(*body), &sqsMsg); err != nil {
log.Errorf("failed to unmarshal message from group %s: %s", sub.Options().Group, err)
continue
}

if err := sub.Handler([]byte(sqsMsg.Message)); err != nil {
log.Errorf("failed to handle message from group %s: %s", sub.Options().Group, err)
} else {
msgHandle := msg.ReceiptHandle
c.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
QueueUrl: aws.String(sub.Options().Group),
ReceiptHandle: msgHandle,
})
continue
}

msgHandle := msg.ReceiptHandle
c.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
QueueUrl: aws.String(sub.Options().Group),
ReceiptHandle: msgHandle,
})
}
}
51 changes: 36 additions & 15 deletions broker/snssqs/snssqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sns"
"github.com/aws/aws-sdk-go-v2/service/sqs"
Expand All @@ -14,7 +15,7 @@ import (
)

const (
defaultVisibilityTimeout int32 = 4
defaultVisibilityTimeout int32 = 8
defaultWaitSeconds int32 = 8
)

Expand All @@ -29,9 +30,16 @@ func (b *snssqs) Options() broker.BrokerOptions {
}

func (b *snssqs) Publish(data interface{}, options broker.PublishOptions) error {
bs, err := json.Marshal(data)
if err != nil {
return err
var bs []byte

if p, ok := data.([]byte); ok {
bs = p
} else {
p, err := json.Marshal(data)
if err != nil {
return err
}
bs = p
}

if err := b.snsClient.ProduceToTopic(bs, options.Topic); err != nil {
Expand Down Expand Up @@ -77,7 +85,7 @@ func (b *snssqs) configure() error {
b.sqsClient = sqs
}

if b.snsClient != nil && b.sqsClient != nil {
if b.snsClient != nil || b.sqsClient != nil {
return nil
}

Expand All @@ -86,21 +94,34 @@ func (b *snssqs) configure() error {
return err
}

b.snsClient = &snsClient{sns.NewFromConfig(cfg)}
if b.options.PublishOptions != nil {
b.snsClient = &snsClient{sns.NewFromConfig(cfg)}
}

visibilityTimeout := defaultVisibilityTimeout
if b.options.SubscribeOptions != nil {
visibilityTimeout := defaultVisibilityTimeout

waitTimeSeconds := defaultWaitSeconds
waitTimeSeconds := defaultWaitSeconds

if timeout, ok := GetVisibilityTimeoutFromContext(b.options.SubscribeOptions.Context); ok {
visibilityTimeout = timeout
}
if timeout, ok := GetVisibilityTimeoutFromContext(b.options.SubscribeOptions.Context); ok {
visibilityTimeout = timeout
}

if waitTime, ok := GetWaitTimeSecondsFromContext(b.options.SubscribeOptions.Context); ok {
waitTimeSeconds = waitTime
}
if waitTime, ok := GetWaitTimeSecondsFromContext(b.options.SubscribeOptions.Context); ok {
waitTimeSeconds = waitTime
}

client := sqs.NewFromConfig(cfg)

b.sqsClient = &sqsClient{sqs.NewFromConfig(cfg), visibilityTimeout, waitTimeSeconds}
url, err := client.GetQueueUrl(context.Background(), &sqs.GetQueueUrlInput{
QueueName: aws.String(b.options.SubscribeOptions.Group),
})
if err != nil {
return err
}

b.sqsClient = &sqsClient{client, url.QueueUrl, visibilityTimeout, waitTimeSeconds}
}

return nil
}
Expand Down
Loading

0 comments on commit ef15e57

Please sign in to comment.