Skip to content

Commit

Permalink
feat: add consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
georgehao committed Apr 11, 2022
1 parent 2ed66f3 commit 7c0a165
Show file tree
Hide file tree
Showing 11 changed files with 180 additions and 67 deletions.
6 changes: 4 additions & 2 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ package rocketmq

import (
"context"
"github.com/apache/rocketmq-client-go/v2/errors"
"time"

"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/errors"
"github.com/apache/rocketmq-client-go/v2/internal"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)
Expand All @@ -34,7 +35,8 @@ type Producer interface {
SendAsync(ctx context.Context, mq func(ctx context.Context, result *primitive.SendResult, err error),
msg ...*primitive.Message) error
SendOneWay(ctx context.Context, mq ...*primitive.Message) error
Request(ctx context.Context, ttl time.Duration, mq ...*primitive.Message) (*primitive.Message, error)
Request(ctx context.Context, ttl time.Duration, msg ...*primitive.Message) (*primitive.Message, error)
RequestAsync(ctx context.Context, ttl time.Duration, callback internal.RequestCallback, msg ...*primitive.Message) error
}

func NewProducer(opts ...producer.Option) (Producer, error) {
Expand Down
3 changes: 1 addition & 2 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,15 @@ package consumer
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2/errors"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/apache/rocketmq-client-go/v2/errors"
jsoniter "github.com/json-iterator/go"

"github.com/tidwall/gjson"

"github.com/apache/rocketmq-client-go/v2/internal"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package main
import (
"context"
"fmt"
"time"

"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/internal"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)
Expand Down Expand Up @@ -38,36 +40,27 @@ func main() {
consumer.WithPullInterval(0),
consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
)

err = c.Subscribe(topic, consumer.MessageSelector{
Type: consumer.TAG,
Expression: "*",
}, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
Type: consumer.TAG, Expression: "*"}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {

fmt.Printf("subscribe callback: %v \n", msgs)
for _, msg := range msgs {
fmt.Printf("handle message: %s", msg.String())
replyTo := msg.GetProperty("REPLY_TO_CLIENT")
// why sleep here ? because if not sleep, consumer reply message, but producer not register to broker, it will
// make the broker can't find the reply channel.
// only the go client has this problem.
time.Sleep(time.Millisecond * 1000)
fmt.Println("consumer sleep over, start reply")

replyContent := []byte("reply message contents.")
// create reply message with given util, do not create reply message by yourself
cluster := msg.GetProperty("CLUSTER")
correlationId := msg.GetProperty("CORRELATION_ID")
ttl := msg.GetProperty("TTL")
var replyMessage *primitive.Message
if cluster != "" {
replyMessage = primitive.NewMessage(
cluster+"_REPLY_TOPIC",
replyContent,
)
} else {
replyMessage = primitive.NewMessage(
"",
replyContent,
)
replyMessage, err := internal.CreateReplyMessage(msg, replyContent)
if err != nil {
fmt.Printf("create reply message err:%v\n", err)
continue
}
replyMessage.WithProperty("MSG_TYPE", "reply")
replyMessage.WithProperty("CORRELATION_ID", correlationId)
replyMessage.WithProperty("REPLY_TO_CLIENT", replyTo)
replyMessage.WithProperty("TTL", ttl)

replyTo := internal.GetReplyToClient(msg)
replyResult, err := replyProducer.SendSync(context.Background(), replyMessage)
if err != nil {
fmt.Printf("send message error: %s\n", err)
Expand All @@ -87,4 +80,10 @@ func main() {
return
}
fmt.Printf("Consumer Started.\n")

time.Sleep(time.Hour)
err = c.Shutdown()
if err != nil {
fmt.Printf("shutdown Consumer error: %s", err.Error())
}
}
52 changes: 52 additions & 0 deletions examples/producer/rpc/async/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package main

import (
"context"
"fmt"
"os"
"time"

"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)

func main() {
p, _ := rocketmq.NewProducer(
producer.WithGroupName("please_rename_unique_group_name"),
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
)
err := p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}

topic := "RequestTopic"
ttl := 5 * time.Second
msg := &primitive.Message{
Topic: topic,
Body: []byte("Hello RPC RocketMQ Go Client!"),
}

now := time.Now()
f := func(ctx context.Context, responseMsg *primitive.Message, respErr error) {
if respErr != nil {
fmt.Printf("request to <%s> fail, err:%v \n", topic, respErr)
return
}
fmt.Printf("Requst to %s cost:%d ms responseMsg:%s\n", topic, time.Since(now).Milliseconds(), responseMsg.String())
}
err = p.RequestAsync(context.Background(), ttl, f, msg)
if err != nil {
fmt.Printf("Request Async message error: %s\n", err)
return
}

time.Sleep(time.Minute)
err = p.Shutdown()
if err != nil {
fmt.Printf("shutdown producer error: %s", err.Error())
}
}
File renamed without changes.
5 changes: 5 additions & 0 deletions internal/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,13 @@ const (
ClientInnerProducerGroup = "CLIENT_INNER_PRODUCER"
SystemTopicPrefix = "rmq_sys_"
ReplyMessageFlag = "reply"
ReplyTopicPostfix = "REPLY_TOPIC"
)

func GetReplyTopic(clusterName string) string {
return clusterName + "_" + ReplyTopicPostfix
}

func GetRetryTopic(group string) string {
return RetryGroupTopicPrefix + group
}
39 changes: 39 additions & 0 deletions internal/message_util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package internal

import (
"errors"
"fmt"

"github.com/apache/rocketmq-client-go/v2/primitive"
)

// CreateReplyMessage build reply message from the request message
func CreateReplyMessage(requestMessage *primitive.MessageExt, body []byte) (*primitive.Message, error) {
if requestMessage == nil {
return nil, errors.New("create reply message fail, requestMessage cannot be null")
}

cluster := requestMessage.GetProperty(primitive.PropertyCluster)
replyTo := requestMessage.GetProperty(primitive.PropertyMessageReplyToClient)
correlationId := requestMessage.GetProperty(primitive.PropertyCorrelationID)
ttl := requestMessage.GetProperty(primitive.PropertyMessageTTL)

if cluster == "" {
return nil, fmt.Errorf("create reply message fail, requestMessage error, property[\"%s\"] is null", cluster)
}

var replayMessage primitive.Message

replayMessage.UnmarshalProperties(body)
replayMessage.Topic = GetReplyTopic(cluster)
replayMessage.WithProperty(primitive.PropertyMsgType, ReplyMessageFlag)
replayMessage.WithProperty(primitive.PropertyCorrelationID, correlationId)
replayMessage.WithProperty(primitive.PropertyMessageReplyToClient, replyTo)
replayMessage.WithProperty(primitive.PropertyMessageTTL, ttl)

return &replayMessage, nil
}

func GetReplyToClient(msg *primitive.MessageExt) string {
return msg.GetProperty(primitive.PropertyMessageReplyToClient)
}
16 changes: 15 additions & 1 deletion internal/request_response_future.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,18 @@ func NewRequestResponseFutureMap() *requestResponseFutureCache {

// OnEvicted delete the timeout RequestResponseFuture, trigger set the failure cause.
tmpRrfCache.cache.OnEvicted(func(s string, i interface{}) {
err := fmt.Errorf("correlationId:%s request timeout, no reply message", s)
rrf, ok := i.(*RequestResponseFuture)
if !ok {
rlog.Error("convert i to RequestResponseFuture err", map[string]interface{}{
"correlationId": s,
})
return
}
if !rrf.IsTimeout() {
return
}

err := fmt.Errorf("correlationId:%s request timeout, no reply message", s)
rrf.CauseErr = err
rrf.ExecuteRequestCallback()
})
Expand All @@ -52,6 +56,9 @@ func (fm *requestResponseFutureCache) SetResponseToRequestResponseFuture(correla
return errors.Wrapf(nil, "correlationId:%s not exist in map", correlationId)
}
rrf.PutResponseMessage(reply)
if rrf.RequestCallback != nil {
rrf.ExecuteRequestCallback()
}
return nil
}

Expand Down Expand Up @@ -81,6 +88,7 @@ type RequestResponseFuture struct {
SendRequestOk bool
Done chan struct{}
CauseErr error
BeginTime time.Time
}

func NewRequestResponseFuture(correlationId string, timeout time.Duration, callback RequestCallback) *RequestResponseFuture {
Expand All @@ -89,6 +97,7 @@ func NewRequestResponseFuture(correlationId string, timeout time.Duration, callb
Timeout: timeout,
RequestCallback: callback,
Done: make(chan struct{}),
BeginTime: time.Now(),
}
}

Expand Down Expand Up @@ -119,3 +128,8 @@ func (rf *RequestResponseFuture) PutResponseMessage(message *primitive.Message)
rf.ResponseMsg = message
close(rf.Done)
}

func (rf *RequestResponseFuture) IsTimeout() bool {
diff := time.Since(rf.BeginTime)
return diff > rf.Timeout
}
38 changes: 0 additions & 38 deletions internal/utils/message_util.go

This file was deleted.

1 change: 1 addition & 0 deletions primitive/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ const (
PropertyMessageTTL = "TTL"
PropertyReplyMessageArriveTime = "ARRIVE_TIME"
PropertyMsgType = "MSG_TYPE"
PropertyCluster = "CLUSTER"
)

type Message struct {
Expand Down
40 changes: 40 additions & 0 deletions producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,46 @@ func (p *defaultProducer) Request(ctx context.Context, timeout time.Duration, ms
return requestResponseFuture.WaitResponseMessage(msg)
}

// RequestAsync Async Send messages to consumer
func (p *defaultProducer) RequestAsync(ctx context.Context, timeout time.Duration, callback internal.RequestCallback, msgs ...*primitive.Message) error {
if err := p.checkMsg(msgs...); err != nil {
return err
}

p.messagesWithNamespace(msgs...)
msg := p.encodeBatch(msgs...)

correlationId, err := p.prepareSendRequest(msg, timeout)
if err != nil {
return err
}

requestResponseFuture := internal.NewRequestResponseFuture(correlationId, timeout, callback)
internal.RequestResponseFutureMap.SetRequestResponseFuture(requestResponseFuture)

f := func(ctx context.Context, result *primitive.SendResult, err error) {
if err != nil {
requestResponseFuture.SendRequestOk = false
requestResponseFuture.ResponseMsg = nil
requestResponseFuture.CauseErr = err
return
}
requestResponseFuture.SendRequestOk = true
}

if p.interceptor != nil {
primitive.WithMethod(ctx, primitive.SendAsync)

return p.interceptor(ctx, msg, nil, func(ctx context.Context, req, reply interface{}) error {
return p.sendAsync(ctx, msg, f)
})
}
if err := p.sendAsync(ctx, msg, f); err != nil {
return errors.Wrap(err, "sendAsync error")
}
return nil
}

func (p *defaultProducer) SendSync(ctx context.Context, msgs ...*primitive.Message) (*primitive.SendResult, error) {
if err := p.checkMsg(msgs...); err != nil {
return nil, err
Expand Down

0 comments on commit 7c0a165

Please sign in to comment.