From 321f27ee1387c851271a5aa470debdc372839565 Mon Sep 17 00:00:00 2001 From: "Bo-Yi.Wu" Date: Sun, 29 Jan 2023 21:10:29 +0800 Subject: [PATCH] chore(job): serialize a struct to bytes Signed-off-by: Bo-Yi.Wu --- benchmark_test.go | 73 ++++++++++++++++++++++++----------------------- core/worker.go | 4 ++- go.mod | 1 - go.sum | 2 -- job/job.go | 39 +++++++++++++------------ job/job_test.go | 52 +++++++++++++++++++++++++++++++++ queue.go | 37 +++++++++--------------- 7 files changed, 127 insertions(+), 81 deletions(-) diff --git a/benchmark_test.go b/benchmark_test.go index 5c30bfc..4731bcb 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -2,7 +2,6 @@ package queue import ( "context" - "log" "testing" "time" @@ -55,20 +54,19 @@ func BenchmarkQueueTask(b *testing.B) { ) b.ReportAllocs() b.ResetTimer() + + m := job.NewTask(func(context.Context) error { + return nil + }) + for n := 0; n < b.N; n++ { - err := q.QueueTask(func(context.Context) error { - return nil - }) - if err != nil { - log.Fatal(err) + if err := q.queue(m); err != nil { + b.Fatal(err) } } } func BenchmarkQueue(b *testing.B) { - m := &mockMessage{ - message: "foo", - } w := NewRing() q, _ := NewQueue( WithWorker(w), @@ -76,38 +74,43 @@ func BenchmarkQueue(b *testing.B) { ) b.ReportAllocs() b.ResetTimer() - for n := 0; n < b.N; n++ { - err := q.Queue(m) - if err != nil { - log.Fatal(err) - } - } -} - -func BenchmarkRingPayload(b *testing.B) { - b.ReportAllocs() - - task := &job.Message{ - Timeout: 100 * time.Millisecond, - Payload: []byte(`{"timeout":3600000000000}`), - } - w := NewRing( - WithFn(func(ctx context.Context, m core.QueuedMessage) error { - return nil - }), - ) - q, _ := NewQueue( - WithWorker(w), - WithLogger(emptyLogger{}), - ) + m := job.NewMessage(&mockMessage{ + message: "foo", + }) + m.Encode() for n := 0; n < b.N; n++ { - _ = q.run(task) + if err := q.queue(m); err != nil { + b.Fatal(err) + } } } -func BenchmarkRingTask(b *testing.B) { +// func BenchmarkRingPayload(b *testing.B) { +// b.ReportAllocs() + +// task := &job.Message{ +// Timeout: 100 * time.Millisecond, +// Payload: []byte(`{"timeout":3600000000000}`), +// } +// w := NewRing( +// WithFn(func(ctx context.Context, m core.QueuedMessage) error { +// return nil +// }), +// ) + +// q, _ := NewQueue( +// WithWorker(w), +// WithLogger(emptyLogger{}), +// ) + +// for n := 0; n < b.N; n++ { +// _ = q.run(task) +// } +// } + +func BenchmarkRingWithTask(b *testing.B) { b.ReportAllocs() task := &job.Message{ diff --git a/core/worker.go b/core/worker.go index 56c4c53..d23d28c 100644 --- a/core/worker.go +++ b/core/worker.go @@ -1,6 +1,8 @@ package core -import "context" +import ( + "context" +) // Worker interface type Worker interface { diff --git a/go.mod b/go.mod index b473921..f9e0b53 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,6 @@ module github.com/golang-queue/queue go 1.18 require ( - github.com/goccy/go-json v0.10.0 github.com/golang/mock v1.6.0 github.com/stretchr/testify v1.8.1 go.uber.org/goleak v1.2.0 diff --git a/go.sum b/go.sum index 685c278..f4447a6 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,6 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/goccy/go-json v0.10.0 h1:mXKd9Qw4NuzShiRlOXKews24ufknHO7gx30lsDyokKA= -github.com/goccy/go-json v0.10.0/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= diff --git a/job/job.go b/job/job.go index 609d1cc..794adb8 100644 --- a/job/job.go +++ b/job/job.go @@ -3,8 +3,8 @@ package job import ( "context" "time" + "unsafe" - "github.com/goccy/go-json" "github.com/golang-queue/queue/core" ) @@ -30,30 +30,23 @@ type Message struct { // RetryDelay set delay between retry // default is 100ms RetryDelay time.Duration `json:"retry_delay"` -} -// Bytes get string body -func (m *Message) Bytes() []byte { - if m.Task != nil { - return nil - } - return m.Payload + // Data to save Unsafe cast + Data []byte } -// Encode for encoding the structure -func (m *Message) Encode() []byte { - b, _ := json.Marshal(m) +const ( + movementSize = int(unsafe.Sizeof(Message{})) +) - return b +// Bytes get internal data +func (m *Message) Bytes() []byte { + return m.Data } -// Rest for reset default value -func (m *Message) Rest() { - m.Task = nil - m.Payload = nil - m.RetryCount = 0 - m.Timeout = 0 - m.RetryDelay = 0 +// Encode for encoding the structure +func (m *Message) Encode() { + m.Data = Encode(m) } func NewMessage(m core.QueuedMessage, opts ...AllowOption) *Message { @@ -77,3 +70,11 @@ func NewTask(task TaskFunc, opts ...AllowOption) *Message { Task: task, } } + +func Encode(m *Message) []byte { + return (*[movementSize]byte)(unsafe.Pointer(m))[:] +} + +func Decode(m []byte) *Message { + return (*Message)(unsafe.Pointer(&m[0])) +} diff --git a/job/job_test.go b/job/job_test.go index 8d7765a..a18bb08 100644 --- a/job/job_test.go +++ b/job/job_test.go @@ -1 +1,53 @@ package job + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestTaskBinaryEncode(t *testing.T) { + m := NewTask(func(context.Context) error { + return nil + }, + AllowOption{ + RetryCount: Int64(100), + RetryDelay: Time(30 * time.Millisecond), + Timeout: Time(3 * time.Millisecond), + }, + ) + + out := Decode(Encode(m)) + + assert.Equal(t, int64(100), out.RetryCount) + assert.Equal(t, 30*time.Millisecond, out.RetryDelay) +} + +type mockMessage struct { + message string +} + +func (m mockMessage) Bytes() []byte { + return []byte(m.message) +} + +func TestMessageBinaryEncode(t *testing.T) { + m := NewMessage(&mockMessage{ + message: "foo", + }, + AllowOption{ + RetryCount: Int64(100), + RetryDelay: Time(30 * time.Millisecond), + Timeout: Time(3 * time.Millisecond), + }, + ) + + m.Encode() + out := Decode(m.Bytes()) + + assert.Equal(t, int64(100), out.RetryCount) + assert.Equal(t, 30*time.Millisecond, out.RetryDelay) + assert.Equal(t, "foo", string(out.Payload)) +} diff --git a/queue.go b/queue.go index 44dfdc1..ad7b788 100644 --- a/queue.go +++ b/queue.go @@ -7,7 +7,6 @@ import ( "sync/atomic" "time" - "github.com/goccy/go-json" "github.com/golang-queue/queue/core" "github.com/golang-queue/queue/job" ) @@ -113,35 +112,26 @@ func (q *Queue) Wait() { q.routineGroup.Wait() } -// Queue to queue all job -func (q *Queue) Queue(m core.QueuedMessage, opts ...job.AllowOption) error { - if atomic.LoadInt32(&q.stopFlag) == 1 { - return ErrQueueShutdown - } - - message := job.NewMessage(m, opts...) - payload := message.Encode() - message.Rest() - message.Payload = payload - - if err := q.worker.Queue(message); err != nil { - return err - } +// Queue to queue single job with binary +func (q *Queue) Queue(message core.QueuedMessage, opts ...job.AllowOption) error { + data := job.NewMessage(message, opts...) + data.Encode() - q.metric.IncSubmittedTask() - - return nil + return q.queue(data) } -// QueueTask to queue job task +// QueueTask to queue single task func (q *Queue) QueueTask(task job.TaskFunc, opts ...job.AllowOption) error { + data := job.NewTask(task, opts...) + return q.queue(data) +} + +func (q *Queue) queue(m *job.Message) error { if atomic.LoadInt32(&q.stopFlag) == 1 { return ErrQueueShutdown } - message := job.NewTask(task, opts...) - - if err := q.worker.Queue(message); err != nil { + if err := q.worker.Queue(m); err != nil { return err } @@ -178,7 +168,8 @@ func (q *Queue) work(task core.QueuedMessage) { func (q *Queue) run(task core.QueuedMessage) error { data := task.(*job.Message) if data.Task == nil { - _ = json.Unmarshal(task.Bytes(), data) + data = job.Decode(task.Bytes()) + data.Data = data.Payload } return q.handle(data)