Skip to content

Commit

Permalink
feat: kq support trace
Browse files Browse the repository at this point in the history
  • Loading branch information
MarkJoyMa committed Jul 14, 2024
1 parent 328f46c commit b3263d4
Show file tree
Hide file tree
Showing 8 changed files with 280 additions and 21 deletions.
6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ require (
github.com/nats-io/stan.go v0.10.4
github.com/rabbitmq/amqp091-go v1.9.0
github.com/segmentio/kafka-go v0.4.38
github.com/stretchr/testify v1.9.0
github.com/zeromicro/go-zero v1.6.3
go.opentelemetry.io/otel v1.19.0
)

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/fatih/color v1.16.0 // indirect
github.com/go-logr/logr v1.3.0 // indirect
Expand All @@ -33,13 +36,13 @@ require (
github.com/openzipkin/zipkin-go v0.4.2 // indirect
github.com/pelletier/go-toml/v2 v2.1.1 // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.18.0 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.45.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/redis/go-redis/v9 v9.4.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
go.opentelemetry.io/otel v1.19.0 // indirect
go.opentelemetry.io/otel/exporters/jaeger v1.17.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0 // indirect
Expand All @@ -60,4 +63,5 @@ require (
google.golang.org/grpc v1.62.0 // indirect
google.golang.org/protobuf v1.32.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,16 @@ github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0b
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
github.com/xdg/scram v1.0.5 h1:TuS0RFmt5Is5qm9Tm2SoD89OPqe4IRiFtyFY4iwWXsw=
github.com/xdg/scram v1.0.5/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
Expand Down
34 changes: 34 additions & 0 deletions kq/internal/message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package internal

import "github.com/segmentio/kafka-go"

type Message struct {
*kafka.Message
}

func NewMessage(msg *kafka.Message) *Message {
return &Message{Message: msg}
}

func (m *Message) GetHeader(key string) string {
for _, h := range m.Headers {
if h.Key == key {
return string(h.Value)
}
}
return ""
}

func (m *Message) SetHeader(key, val string) {
// Ensure uniqueness of keys
for i := 0; i < len(m.Headers); i++ {
if m.Headers[i].Key == key {
m.Headers = append(m.Headers[:i], m.Headers[i+1:]...)
i--
}
}
m.Headers = append(m.Headers, kafka.Header{
Key: key,
Value: []byte(val),
})
}
57 changes: 57 additions & 0 deletions kq/internal/message_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package internal

import (
"testing"

"github.com/segmentio/kafka-go"
"github.com/stretchr/testify/assert"
)

func TestMessageGetHeader(t *testing.T) {
testCases := []struct {
name string
msg *Message
key string
expected string
}{
{
name: "exists",
msg: &Message{
Message: &kafka.Message{Headers: []kafka.Header{
{Key: "foo", Value: []byte("bar")},
}}},
key: "foo",
expected: "bar",
},
{
name: "not exists",
msg: &Message{Message: &kafka.Message{Headers: []kafka.Header{}}},
key: "foo",
expected: "",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result := tc.msg.GetHeader(tc.key)
assert.Equal(t, tc.expected, result)
})
}
}

func TestMessageSetHeader(t *testing.T) {
msg := &Message{Message: &kafka.Message{Headers: []kafka.Header{
{Key: "foo", Value: []byte("bar")}},
}}

msg.SetHeader("foo", "bar2")
msg.SetHeader("foo2", "bar2")
msg.SetHeader("foo2", "bar3")
msg.SetHeader("foo3", "bar4")

assert.ElementsMatch(t, msg.Headers, []kafka.Header{
{Key: "foo", Value: []byte("bar2")},
{Key: "foo2", Value: []byte("bar3")},
{Key: "foo3", Value: []byte("bar4")},
})
}
35 changes: 35 additions & 0 deletions kq/internal/trace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package internal

import "go.opentelemetry.io/otel/propagation"

var _ propagation.TextMapCarrier = (*MessageCarrier)(nil)

// MessageCarrier injects and extracts traces from a types.Message.
type MessageCarrier struct {
msg *Message
}

// NewMessageCarrier returns a new MessageCarrier.
func NewMessageCarrier(msg *Message) MessageCarrier {
return MessageCarrier{msg: msg}
}

// Get returns the value associated with the passed key.
func (m MessageCarrier) Get(key string) string {
return m.msg.GetHeader(key)
}

// Set stores the key-value pair.
func (m MessageCarrier) Set(key string, value string) {
m.msg.SetHeader(key, value)
}

// Keys lists the keys stored in this carrier.
func (m MessageCarrier) Keys() []string {
out := make([]string, len(m.msg.Headers))
for i, h := range m.msg.Headers {
out[i] = h.Key
}

return out
}
93 changes: 93 additions & 0 deletions kq/internal/trace_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package internal

import (
"testing"

"github.com/segmentio/kafka-go"
"github.com/stretchr/testify/assert"
)

func TestMessageCarrierGet(t *testing.T) {
testCases := []struct {
name string
carrier MessageCarrier
key string
expected string
}{
{
name: "exists",
carrier: NewMessageCarrier(&Message{&kafka.Message{Headers: []kafka.Header{
{Key: "foo", Value: []byte("bar")},
}}}),
key: "foo",
expected: "bar",
},
{
name: "not exists",
carrier: NewMessageCarrier(&Message{&kafka.Message{Headers: []kafka.Header{}}}),
key: "foo",
expected: "",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result := tc.carrier.Get(tc.key)
assert.Equal(t, tc.expected, result)
})
}
}

func TestMessageCarrierSet(t *testing.T) {
msg := Message{&kafka.Message{Headers: []kafka.Header{
{Key: "foo", Value: []byte("bar")},
}}}
carrier := MessageCarrier{msg: &msg}

carrier.Set("foo", "bar2")
carrier.Set("foo2", "bar2")
carrier.Set("foo2", "bar3")
carrier.Set("foo3", "bar4")

assert.ElementsMatch(t, carrier.msg.Headers, []kafka.Header{
{Key: "foo", Value: []byte("bar2")},
{Key: "foo2", Value: []byte("bar3")},
{Key: "foo3", Value: []byte("bar4")},
})
}

func TestMessageCarrierKeys(t *testing.T) {
testCases := []struct {
name string
carrier MessageCarrier
expected []string
}{
{
name: "one",
carrier: MessageCarrier{msg: &Message{&kafka.Message{Headers: []kafka.Header{
{Key: "foo", Value: []byte("bar")},
}}}},
expected: []string{"foo"},
},
{
name: "none",
carrier: MessageCarrier{msg: &Message{&kafka.Message{Headers: []kafka.Header{}}}},
expected: []string{},
},
{
name: "many",
carrier: MessageCarrier{msg: &Message{&kafka.Message{Headers: []kafka.Header{
{Key: "foo", Value: []byte("bar")},
{Key: "baz", Value: []byte("quux")},
}}}},
expected: []string{"foo", "baz"},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result := tc.carrier.Keys()
assert.Equal(t, tc.expected, result)
})
}
}
36 changes: 30 additions & 6 deletions kq/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (
"time"

"github.com/segmentio/kafka-go"
"github.com/zeromicro/go-queue/kq/internal"
"github.com/zeromicro/go-zero/core/executors"
"github.com/zeromicro/go-zero/core/logx"
"go.opentelemetry.io/otel"
)

type (
Expand All @@ -26,6 +28,9 @@ type (
// executors.ChunkExecutor options
chunkSize int
flushInterval time.Duration

// enable sync push
enableSyncPush bool
}
)

Expand All @@ -46,6 +51,16 @@ func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher {
// apply kafka.Writer options
producer.AllowAutoTopicCreation = options.allowAutoTopicCreation

pusher := &Pusher{
producer: producer,
topic: topic,
}

// apply sync push
if options.enableSyncPush {
return pusher
}

// apply ChunkExecutor options
var chunkOpts []executors.ChunkOption
if options.chunkSize > 0 {
Expand All @@ -55,10 +70,6 @@ func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher {
chunkOpts = append(chunkOpts, executors.WithFlushInterval(options.flushInterval))
}

pusher := &Pusher{
producer: producer,
topic: topic,
}
pusher.executor = executors.NewChunkExecutor(func(tasks []interface{}) {
chunk := make([]kafka.Message, len(tasks))
for i := range tasks {
Expand Down Expand Up @@ -87,15 +98,21 @@ func (p *Pusher) Name() string {
}

// Push sends a message to the Kafka topic.
func (p *Pusher) Push(v string) error {
func (p *Pusher) Push(ctx context.Context, v string) error {
msg := kafka.Message{
Key: []byte(strconv.FormatInt(time.Now().UnixNano(), 10)), // current timestamp
Value: []byte(v),
}

// wrap message into message carrier
mc := internal.NewMessageCarrier(internal.NewMessage(&msg))
// inject trace context into message
otel.GetTextMapPropagator().Inject(ctx, mc)

if p.executor != nil {
return p.executor.Add(msg, len(v))
} else {
return p.producer.WriteMessages(context.Background(), msg)
return p.producer.WriteMessages(ctx, msg)
}
}

Expand All @@ -119,3 +136,10 @@ func WithAllowAutoTopicCreation() PushOption {
options.allowAutoTopicCreation = true
}
}

// WithSyncPush enables the Pusher to push messages synchronously.
func WithSyncPush() PushOption {
return func(options *pushOptions) {
options.enableSyncPush = true
}
}
Loading

0 comments on commit b3263d4

Please sign in to comment.