Skip to content

Commit

Permalink
chore(job): serialize a struct to bytes
Browse files Browse the repository at this point in the history
Signed-off-by: Bo-Yi.Wu <[email protected]>
  • Loading branch information
appleboy committed Jan 29, 2023
1 parent bd08b4a commit 321f27e
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 81 deletions.
73 changes: 38 additions & 35 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package queue

import (
"context"
"log"
"testing"
"time"

Expand Down Expand Up @@ -55,59 +54,63 @@ func BenchmarkQueueTask(b *testing.B) {
)
b.ReportAllocs()
b.ResetTimer()

m := job.NewTask(func(context.Context) error {
return nil
})

for n := 0; n < b.N; n++ {
err := q.QueueTask(func(context.Context) error {
return nil
})
if err != nil {
log.Fatal(err)
if err := q.queue(m); err != nil {
b.Fatal(err)
}
}
}

func BenchmarkQueue(b *testing.B) {
m := &mockMessage{
message: "foo",
}
w := NewRing()
q, _ := NewQueue(
WithWorker(w),
WithLogger(emptyLogger{}),
)
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
err := q.Queue(m)
if err != nil {
log.Fatal(err)
}
}
}

func BenchmarkRingPayload(b *testing.B) {
b.ReportAllocs()

task := &job.Message{
Timeout: 100 * time.Millisecond,
Payload: []byte(`{"timeout":3600000000000}`),
}
w := NewRing(
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
return nil
}),
)

q, _ := NewQueue(
WithWorker(w),
WithLogger(emptyLogger{}),
)
m := job.NewMessage(&mockMessage{
message: "foo",
})
m.Encode()

for n := 0; n < b.N; n++ {
_ = q.run(task)
if err := q.queue(m); err != nil {
b.Fatal(err)
}
}
}

func BenchmarkRingTask(b *testing.B) {
// func BenchmarkRingPayload(b *testing.B) {
// b.ReportAllocs()

// task := &job.Message{
// Timeout: 100 * time.Millisecond,
// Payload: []byte(`{"timeout":3600000000000}`),
// }
// w := NewRing(
// WithFn(func(ctx context.Context, m core.QueuedMessage) error {
// return nil
// }),
// )

// q, _ := NewQueue(
// WithWorker(w),
// WithLogger(emptyLogger{}),
// )

// for n := 0; n < b.N; n++ {
// _ = q.run(task)
// }
// }

func BenchmarkRingWithTask(b *testing.B) {
b.ReportAllocs()

task := &job.Message{
Expand Down
4 changes: 3 additions & 1 deletion core/worker.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package core

import "context"
import (
"context"
)

// Worker interface
type Worker interface {
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ module github.com/golang-queue/queue
go 1.18

require (
github.com/goccy/go-json v0.10.0
github.com/golang/mock v1.6.0
github.com/stretchr/testify v1.8.1
go.uber.org/goleak v1.2.0
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/goccy/go-json v0.10.0 h1:mXKd9Qw4NuzShiRlOXKews24ufknHO7gx30lsDyokKA=
github.com/goccy/go-json v0.10.0/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
Expand Down
39 changes: 20 additions & 19 deletions job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package job
import (
"context"
"time"
"unsafe"

"github.com/goccy/go-json"
"github.com/golang-queue/queue/core"
)

Expand All @@ -30,30 +30,23 @@ type Message struct {
// RetryDelay set delay between retry
// default is 100ms
RetryDelay time.Duration `json:"retry_delay"`
}

// Bytes get string body
func (m *Message) Bytes() []byte {
if m.Task != nil {
return nil
}
return m.Payload
// Data to save Unsafe cast
Data []byte
}

// Encode for encoding the structure
func (m *Message) Encode() []byte {
b, _ := json.Marshal(m)
const (
movementSize = int(unsafe.Sizeof(Message{}))
)

return b
// Bytes get internal data
func (m *Message) Bytes() []byte {
return m.Data
}

// Rest for reset default value
func (m *Message) Rest() {
m.Task = nil
m.Payload = nil
m.RetryCount = 0
m.Timeout = 0
m.RetryDelay = 0
// Encode for encoding the structure
func (m *Message) Encode() {
m.Data = Encode(m)
}

func NewMessage(m core.QueuedMessage, opts ...AllowOption) *Message {
Expand All @@ -77,3 +70,11 @@ func NewTask(task TaskFunc, opts ...AllowOption) *Message {
Task: task,
}
}

func Encode(m *Message) []byte {
return (*[movementSize]byte)(unsafe.Pointer(m))[:]
}

func Decode(m []byte) *Message {
return (*Message)(unsafe.Pointer(&m[0]))
}
52 changes: 52 additions & 0 deletions job/job_test.go
Original file line number Diff line number Diff line change
@@ -1 +1,53 @@
package job

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestTaskBinaryEncode(t *testing.T) {
m := NewTask(func(context.Context) error {
return nil
},
AllowOption{
RetryCount: Int64(100),
RetryDelay: Time(30 * time.Millisecond),
Timeout: Time(3 * time.Millisecond),
},
)

out := Decode(Encode(m))

assert.Equal(t, int64(100), out.RetryCount)
assert.Equal(t, 30*time.Millisecond, out.RetryDelay)
}

type mockMessage struct {
message string
}

func (m mockMessage) Bytes() []byte {
return []byte(m.message)
}

func TestMessageBinaryEncode(t *testing.T) {
m := NewMessage(&mockMessage{
message: "foo",
},
AllowOption{
RetryCount: Int64(100),
RetryDelay: Time(30 * time.Millisecond),
Timeout: Time(3 * time.Millisecond),
},
)

m.Encode()
out := Decode(m.Bytes())

assert.Equal(t, int64(100), out.RetryCount)
assert.Equal(t, 30*time.Millisecond, out.RetryDelay)
assert.Equal(t, "foo", string(out.Payload))
}
37 changes: 14 additions & 23 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"sync/atomic"
"time"

"github.com/goccy/go-json"
"github.com/golang-queue/queue/core"
"github.com/golang-queue/queue/job"
)
Expand Down Expand Up @@ -113,35 +112,26 @@ func (q *Queue) Wait() {
q.routineGroup.Wait()
}

// Queue to queue all job
func (q *Queue) Queue(m core.QueuedMessage, opts ...job.AllowOption) error {
if atomic.LoadInt32(&q.stopFlag) == 1 {
return ErrQueueShutdown
}

message := job.NewMessage(m, opts...)
payload := message.Encode()
message.Rest()
message.Payload = payload

if err := q.worker.Queue(message); err != nil {
return err
}
// Queue to queue single job with binary
func (q *Queue) Queue(message core.QueuedMessage, opts ...job.AllowOption) error {
data := job.NewMessage(message, opts...)
data.Encode()

q.metric.IncSubmittedTask()

return nil
return q.queue(data)
}

// QueueTask to queue job task
// QueueTask to queue single task
func (q *Queue) QueueTask(task job.TaskFunc, opts ...job.AllowOption) error {
data := job.NewTask(task, opts...)
return q.queue(data)
}

func (q *Queue) queue(m *job.Message) error {
if atomic.LoadInt32(&q.stopFlag) == 1 {
return ErrQueueShutdown
}

message := job.NewTask(task, opts...)

if err := q.worker.Queue(message); err != nil {
if err := q.worker.Queue(m); err != nil {
return err
}

Expand Down Expand Up @@ -178,7 +168,8 @@ func (q *Queue) work(task core.QueuedMessage) {
func (q *Queue) run(task core.QueuedMessage) error {
data := task.(*job.Message)
if data.Task == nil {
_ = json.Unmarshal(task.Bytes(), data)
data = job.Decode(task.Bytes())
data.Data = data.Payload
}

return q.handle(data)
Expand Down

0 comments on commit 321f27e

Please sign in to comment.