Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: pub sub #62

Merged
merged 8 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions broker/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,16 @@ func (b *memory) Publish(data interface{}, options broker.PublishOptions) error
}
b.mtx.RUnlock()

bs, err := json.Marshal(data)
if err != nil {
return err
var bs []byte

if p, ok := data.([]byte); ok {
bs = p
} else {
p, err := json.Marshal(data)
if err != nil {
return err
}
bs = p
}

for _, sub := range subsOfThisTopic {
Expand Down
8 changes: 4 additions & 4 deletions broker/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ type BrokerOption func(o *BrokerOptions)

type BrokerOptions struct {
Nodes []string
PublishOptions PublishOptions
SubscribeOptions SubscribeOptions
PublishOptions *PublishOptions
SubscribeOptions *SubscribeOptions
Context context.Context
}

Expand All @@ -17,13 +17,13 @@ func BrokerWithNodes(addrs ...string) BrokerOption {
}
}

func BrokerWithPublishOptions(options PublishOptions) BrokerOption {
func BrokerWithPublishOptions(options *PublishOptions) BrokerOption {
return func(o *BrokerOptions) {
o.PublishOptions = options
}
}

func BrokerWithSubscribeOptions(options SubscribeOptions) BrokerOption {
func BrokerWithSubscribeOptions(options *SubscribeOptions) BrokerOption {
return func(o *BrokerOptions) {
o.SubscribeOptions = options
}
Expand Down
31 changes: 23 additions & 8 deletions broker/snssqs/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package snssqs

import (
"context"
"encoding/json"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/sns"
Expand Down Expand Up @@ -37,13 +38,18 @@ type SqsClient interface {

type sqsClient struct {
*sqs.Client
queueUrl *string
visibilityTimeout int32
waitTimeSeconds int32
}

type sqsMsg struct {
Message string `json:"message"`
}

func (c *sqsClient) ConsumeFromGroup(sub broker.Subscriber) {
result, err := c.ReceiveMessage(context.Background(), &sqs.ReceiveMessageInput{
QueueUrl: aws.String(sub.Options().Group),
QueueUrl: c.queueUrl,
MaxNumberOfMessages: 1,
VisibilityTimeout: c.visibilityTimeout,
WaitTimeSeconds: c.waitTimeSeconds,
Expand All @@ -60,14 +66,23 @@ func (c *sqsClient) ConsumeFromGroup(sub broker.Subscriber) {

for _, msg := range result.Messages {
body := msg.Body
if err := sub.Handler([]byte(*body)); err != nil {

var sqsMsg sqsMsg

if err := json.Unmarshal([]byte(*body), &sqsMsg); err != nil {
log.Errorf("failed to unmarshal message from group %s: %s", sub.Options().Group, err)
continue
}

if err := sub.Handler([]byte(sqsMsg.Message)); err != nil {
log.Errorf("failed to handle message from group %s: %s", sub.Options().Group, err)
} else {
msgHandle := msg.ReceiptHandle
c.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
QueueUrl: aws.String(sub.Options().Group),
ReceiptHandle: msgHandle,
})
continue
}

msgHandle := msg.ReceiptHandle
c.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
QueueUrl: aws.String(sub.Options().Group),
ReceiptHandle: msgHandle,
})
}
}
51 changes: 36 additions & 15 deletions broker/snssqs/snssqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sns"
"github.com/aws/aws-sdk-go-v2/service/sqs"
Expand All @@ -14,7 +15,7 @@ import (
)

const (
defaultVisibilityTimeout int32 = 4
defaultVisibilityTimeout int32 = 8
defaultWaitSeconds int32 = 8
)

Expand All @@ -29,9 +30,16 @@ func (b *snssqs) Options() broker.BrokerOptions {
}

func (b *snssqs) Publish(data interface{}, options broker.PublishOptions) error {
bs, err := json.Marshal(data)
if err != nil {
return err
var bs []byte

if p, ok := data.([]byte); ok {
bs = p
} else {
p, err := json.Marshal(data)
if err != nil {
return err
}
bs = p
}

if err := b.snsClient.ProduceToTopic(bs, options.Topic); err != nil {
Expand Down Expand Up @@ -77,7 +85,7 @@ func (b *snssqs) configure() error {
b.sqsClient = sqs
}

if b.snsClient != nil && b.sqsClient != nil {
if b.snsClient != nil || b.sqsClient != nil {
return nil
}

Expand All @@ -86,21 +94,34 @@ func (b *snssqs) configure() error {
return err
}

b.snsClient = &snsClient{sns.NewFromConfig(cfg)}
if b.options.PublishOptions != nil {
b.snsClient = &snsClient{sns.NewFromConfig(cfg)}
}

visibilityTimeout := defaultVisibilityTimeout
if b.options.SubscribeOptions != nil {
visibilityTimeout := defaultVisibilityTimeout

waitTimeSeconds := defaultWaitSeconds
waitTimeSeconds := defaultWaitSeconds

if timeout, ok := GetVisibilityTimeoutFromContext(b.options.SubscribeOptions.Context); ok {
visibilityTimeout = timeout
}
if timeout, ok := GetVisibilityTimeoutFromContext(b.options.SubscribeOptions.Context); ok {
visibilityTimeout = timeout
}

if waitTime, ok := GetWaitTimeSecondsFromContext(b.options.SubscribeOptions.Context); ok {
waitTimeSeconds = waitTime
}
if waitTime, ok := GetWaitTimeSecondsFromContext(b.options.SubscribeOptions.Context); ok {
waitTimeSeconds = waitTime
}

client := sqs.NewFromConfig(cfg)

b.sqsClient = &sqsClient{sqs.NewFromConfig(cfg), visibilityTimeout, waitTimeSeconds}
url, err := client.GetQueueUrl(context.Background(), &sqs.GetQueueUrlInput{
QueueName: aws.String(b.options.SubscribeOptions.Group),
})
if err != nil {
return err
}

b.sqsClient = &sqsClient{client, url.QueueUrl, visibilityTimeout, waitTimeSeconds}
}

return nil
}
Expand Down
Loading
Loading