Skip to content

Commit

Permalink
Merge branch 'master' into feat-mq-kpush
Browse files Browse the repository at this point in the history
  • Loading branch information
kevwan authored Jul 24, 2024
2 parents dbbb9a3 + 59d5072 commit 2c75b16
Show file tree
Hide file tree
Showing 25 changed files with 1,081 additions and 1,462 deletions.
9 changes: 5 additions & 4 deletions dq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

const (
expiration = 3600 // seconds
guardValue = "1"
tolerance = time.Minute * 30
)

Expand Down Expand Up @@ -45,17 +44,19 @@ func NewConsumer(c DqConf) Consumer {
func (c *consumerCluster) Consume(consume Consume) {
guardedConsume := func(body []byte) {
key := hash.Md5Hex(body)
body, ok := c.unwrap(body)
taskBody, ok := c.unwrap(body)
if !ok {
logx.Errorf("discarded: %q", string(body))
return
}

ok, err := c.red.SetnxEx(key, guardValue, expiration)
redisLock := redis.NewRedisLock(c.red, key)
redisLock.SetExpire(expiration)
ok, err := redisLock.Acquire()
if err != nil {
logx.Error(err)
} else if ok {
consume(body)
consume(taskBody)
}
}

Expand Down
26 changes: 19 additions & 7 deletions dq/consumernode.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dq

import (
"errors"
"time"

"github.com/beanstalkd/go-beanstalk"
Expand Down Expand Up @@ -59,14 +60,25 @@ func (c *consumerNode) consumeEvents(consume Consume) {
}

// the error can only be beanstalk.NameError or beanstalk.ConnError
switch cerr := err.(type) {
case beanstalk.ConnError:
switch cerr.Err {
case beanstalk.ErrTimeout:
var cerr beanstalk.ConnError
switch {
case errors.As(err, &cerr):
switch {
case errors.Is(cerr.Err, beanstalk.ErrTimeout):
// timeout error on timeout, just continue the loop
case beanstalk.ErrBadChar, beanstalk.ErrBadFormat, beanstalk.ErrBuried, beanstalk.ErrDeadline,
beanstalk.ErrDraining, beanstalk.ErrEmpty, beanstalk.ErrInternal, beanstalk.ErrJobTooBig,
beanstalk.ErrNoCRLF, beanstalk.ErrNotFound, beanstalk.ErrNotIgnored, beanstalk.ErrTooLong:
case
errors.Is(cerr.Err, beanstalk.ErrBadChar),
errors.Is(cerr.Err, beanstalk.ErrBadFormat),
errors.Is(cerr.Err, beanstalk.ErrBuried),
errors.Is(cerr.Err, beanstalk.ErrDeadline),
errors.Is(cerr.Err, beanstalk.ErrDraining),
errors.Is(cerr.Err, beanstalk.ErrEmpty),
errors.Is(cerr.Err, beanstalk.ErrInternal),
errors.Is(cerr.Err, beanstalk.ErrJobTooBig),
errors.Is(cerr.Err, beanstalk.ErrNoCRLF),
errors.Is(cerr.Err, beanstalk.ErrNotFound),
errors.Is(cerr.Err, beanstalk.ErrNotIgnored),
errors.Is(cerr.Err, beanstalk.ErrTooLong):
// won't reset
logx.Error(err)
default:
Expand Down
44 changes: 24 additions & 20 deletions dq/producer.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package dq

import (
"bytes"
"log"
"math/rand"
"strconv"
"strings"
"time"

Expand All @@ -25,15 +23,21 @@ type (
Close() error
Delay(body []byte, delay time.Duration) (string, error)
Revoke(ids string) error

at(body []byte, at time.Time) (string, error)
delay(body []byte, delay time.Duration) (string, error)
}

producerCluster struct {
nodes []Producer
}
)

var rng *rand.Rand

func init() {
rand.Seed(time.Now().UnixNano())
source := rand.NewSource(time.Now().UnixNano())
rng = rand.New(source)
}

func NewProducer(beanstalks []Beanstalk) Producer {
Expand All @@ -56,10 +60,8 @@ func NewProducer(beanstalks []Beanstalk) Producer {
}

func (p *producerCluster) At(body []byte, at time.Time) (string, error) {
wrapped := p.wrap(body, at)
return p.insert(func(node Producer) (string, error) {
return node.At(wrapped, at)
})
wrapped := wrap(body, at)
return p.at(wrapped, at)
}

func (p *producerCluster) Close() error {
Expand All @@ -73,10 +75,8 @@ func (p *producerCluster) Close() error {
}

func (p *producerCluster) Delay(body []byte, delay time.Duration) (string, error) {
wrapped := p.wrap(body, time.Now().Add(delay))
return p.insert(func(node Producer) (string, error) {
return node.Delay(wrapped, delay)
})
wrapped := wrap(body, time.Now().Add(delay))
return p.delay(wrapped, delay)
}

func (p *producerCluster) Revoke(ids string) error {
Expand All @@ -98,17 +98,29 @@ func (p *producerCluster) Revoke(ids string) error {
return be.Err()
}

func (p *producerCluster) at(body []byte, at time.Time) (string, error) {
return p.insert(func(node Producer) (string, error) {
return node.at(body, at)
})
}

func (p *producerCluster) cloneNodes() []Producer {
return append([]Producer(nil), p.nodes...)
}

func (p *producerCluster) delay(body []byte, delay time.Duration) (string, error) {
return p.insert(func(node Producer) (string, error) {
return node.delay(body, delay)
})
}

func (p *producerCluster) getWriteNodes() []Producer {
if len(p.nodes) <= replicaNodes {
return p.nodes
}

nodes := p.cloneNodes()
rand.Shuffle(len(nodes), func(i, j int) {
rng.Shuffle(len(nodes), func(i, j int) {
nodes[i], nodes[j] = nodes[j], nodes[i]
})
return nodes[:replicaNodes]
Expand Down Expand Up @@ -156,11 +168,3 @@ func (p *producerCluster) insert(fn func(node Producer) (string, error)) (string

return "", be.Err()
}

func (p *producerCluster) wrap(body []byte, at time.Time) []byte {
var builder bytes.Buffer
builder.WriteString(strconv.FormatInt(at.UnixNano(), 10))
builder.WriteByte(timeSep)
builder.Write(body)
return builder.Bytes()
}
88 changes: 55 additions & 33 deletions dq/producernode.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/beanstalkd/go-beanstalk"
"github.com/zeromicro/go-zero/core/logx"
)

var ErrTimeBeforeNow = errors.New("can't schedule task to past time")
Expand All @@ -27,46 +28,15 @@ func NewProducerNode(endpoint, tube string) Producer {
}

func (p *producerNode) At(body []byte, at time.Time) (string, error) {
now := time.Now()
if at.Before(now) {
return "", ErrTimeBeforeNow
}

duration := at.Sub(now)
return p.Delay(body, duration)
return p.at(wrap(body, at), at)
}

func (p *producerNode) Close() error {
return p.conn.Close()
}

func (p *producerNode) Delay(body []byte, delay time.Duration) (string, error) {
conn, err := p.conn.get()
if err != nil {
return "", err
}

id, err := conn.Put(body, PriNormal, delay, defaultTimeToRun)
if err == nil {
return fmt.Sprintf("%s/%s/%d", p.endpoint, p.tube, id), nil
}

// the error can only be beanstalk.NameError or beanstalk.ConnError
// just return when the error is beanstalk.NameError, don't reset
switch cerr := err.(type) {
case beanstalk.ConnError:
switch cerr.Err {
case beanstalk.ErrBadChar, beanstalk.ErrBadFormat, beanstalk.ErrBuried, beanstalk.ErrDeadline,
beanstalk.ErrDraining, beanstalk.ErrEmpty, beanstalk.ErrInternal, beanstalk.ErrJobTooBig,
beanstalk.ErrNoCRLF, beanstalk.ErrNotFound, beanstalk.ErrNotIgnored, beanstalk.ErrTooLong:
// won't reset
default:
// beanstalk.ErrOOM, beanstalk.ErrTimeout, beanstalk.ErrUnknown and other errors
p.conn.reset()
}
}

return "", err
return p.delay(wrap(body, time.Now().Add(delay)), delay)
}

func (p *producerNode) Revoke(jointId string) error {
Expand Down Expand Up @@ -96,3 +66,55 @@ func (p *producerNode) Revoke(jointId string) error {
// if not in this beanstalk, ignore
return nil
}

func (p *producerNode) at(body []byte, at time.Time) (string, error) {
now := time.Now()
if at.Before(now) {
return "", ErrTimeBeforeNow
}

duration := at.Sub(now)
return p.delay(body, duration)
}

func (p *producerNode) delay(body []byte, delay time.Duration) (string, error) {
conn, err := p.conn.get()
if err != nil {
return "", err
}

id, err := conn.Put(body, PriNormal, delay, defaultTimeToRun)
if err == nil {
return fmt.Sprintf("%s/%s/%d", p.endpoint, p.tube, id), nil
}

// the error can only be beanstalk.NameError or beanstalk.ConnError
// just return when the error is beanstalk.NameError, don't reset
var cerr beanstalk.ConnError
switch {
case errors.As(err, &cerr):
switch {
case
errors.Is(cerr.Err, beanstalk.ErrBadChar),
errors.Is(cerr.Err, beanstalk.ErrBadFormat),
errors.Is(cerr.Err, beanstalk.ErrBuried),
errors.Is(cerr.Err, beanstalk.ErrDeadline),
errors.Is(cerr.Err, beanstalk.ErrDraining),
errors.Is(cerr.Err, beanstalk.ErrEmpty),
errors.Is(cerr.Err, beanstalk.ErrInternal),
errors.Is(cerr.Err, beanstalk.ErrJobTooBig),
errors.Is(cerr.Err, beanstalk.ErrNoCRLF),
errors.Is(cerr.Err, beanstalk.ErrNotFound),
errors.Is(cerr.Err, beanstalk.ErrNotIgnored),
errors.Is(cerr.Err, beanstalk.ErrTooLong):
// won't reset
default:
// beanstalk.ErrOOM, beanstalk.ErrTimeout, beanstalk.ErrUnknown and other errors
p.conn.reset()
}
default:
logx.Error(err)
}

return "", err
}
15 changes: 15 additions & 0 deletions dq/wrapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package dq

import (
"bytes"
"strconv"
"time"
)

func wrap(body []byte, at time.Time) []byte {
var builder bytes.Buffer
builder.WriteString(strconv.FormatInt(at.UnixNano(), 10))
builder.WriteByte(timeSep)
builder.Write(body)
return builder.Bytes()
}
File renamed without changes.
20 changes: 20 additions & 0 deletions example/dq/producer/node/producer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package main

import (
"fmt"
"strconv"
"time"

"github.com/zeromicro/go-queue/dq"
)

func main() {
producer := dq.NewProducerNode("localhost:11300", "tube")

for i := 1000; i < 1005; i++ {
_, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second*5)
if err != nil {
fmt.Println(err)
}
}
}
3 changes: 2 additions & 1 deletion example/kq/consumer/queue.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"fmt"

"github.com/zeromicro/go-queue/kq"
Expand All @@ -11,7 +12,7 @@ func main() {
var c kq.KqConf
conf.MustLoad("config.yaml", &c)

q := kq.MustNewQueue(c, kq.WithHandle(func(k, v string) error {
q := kq.MustNewQueue(c, kq.WithHandle(func(ctx context.Context, k, v string) error {
fmt.Printf("=> %s\n", v)
return nil
}))
Expand Down
2 changes: 0 additions & 2 deletions example/kq/producer/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,10 @@ func main() {
log.Fatal(err)
}

fmt.Println(string(body))
if err := pusher.Push(context.Background(), string(body)); err != nil {
log.Fatal(err)
}

fmt.Println(string(body))
if err := pusher.KPush(context.Background(), "test", string(body)); err != nil {
log.Fatal(err)
}
Expand Down
Loading

0 comments on commit 2c75b16

Please sign in to comment.