Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(job): serialize a struct to bytes #103

Merged
merged 1 commit into from
Jan 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 38 additions & 35 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package queue

import (
"context"
"log"
"testing"
"time"

Expand Down Expand Up @@ -55,59 +54,63 @@ func BenchmarkQueueTask(b *testing.B) {
)
b.ReportAllocs()
b.ResetTimer()

m := job.NewTask(func(context.Context) error {
return nil
})

for n := 0; n < b.N; n++ {
err := q.QueueTask(func(context.Context) error {
return nil
})
if err != nil {
log.Fatal(err)
if err := q.queue(m); err != nil {
b.Fatal(err)
}
}
}

func BenchmarkQueue(b *testing.B) {
m := &mockMessage{
message: "foo",
}
w := NewRing()
q, _ := NewQueue(
WithWorker(w),
WithLogger(emptyLogger{}),
)
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
err := q.Queue(m)
if err != nil {
log.Fatal(err)
}
}
}

func BenchmarkRingPayload(b *testing.B) {
b.ReportAllocs()

task := &job.Message{
Timeout: 100 * time.Millisecond,
Payload: []byte(`{"timeout":3600000000000}`),
}
w := NewRing(
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
return nil
}),
)

q, _ := NewQueue(
WithWorker(w),
WithLogger(emptyLogger{}),
)
m := job.NewMessage(&mockMessage{
message: "foo",
})
m.Encode()

for n := 0; n < b.N; n++ {
_ = q.run(task)
if err := q.queue(m); err != nil {
b.Fatal(err)
}
}
}

func BenchmarkRingTask(b *testing.B) {
// func BenchmarkRingPayload(b *testing.B) {
// b.ReportAllocs()

// task := &job.Message{
// Timeout: 100 * time.Millisecond,
// Payload: []byte(`{"timeout":3600000000000}`),
// }
// w := NewRing(
// WithFn(func(ctx context.Context, m core.QueuedMessage) error {
// return nil
// }),
// )

// q, _ := NewQueue(
// WithWorker(w),
// WithLogger(emptyLogger{}),
// )

// for n := 0; n < b.N; n++ {
// _ = q.run(task)
// }
// }

func BenchmarkRingWithTask(b *testing.B) {
b.ReportAllocs()

task := &job.Message{
Expand Down
4 changes: 3 additions & 1 deletion core/worker.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package core

import "context"
import (
"context"
)

// Worker interface
type Worker interface {
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ module github.com/golang-queue/queue
go 1.18

require (
github.com/goccy/go-json v0.10.0
github.com/golang/mock v1.6.0
github.com/stretchr/testify v1.8.1
go.uber.org/goleak v1.2.0
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/goccy/go-json v0.10.0 h1:mXKd9Qw4NuzShiRlOXKews24ufknHO7gx30lsDyokKA=
github.com/goccy/go-json v0.10.0/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
Expand Down
39 changes: 20 additions & 19 deletions job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package job
import (
"context"
"time"
"unsafe"

"github.com/goccy/go-json"
"github.com/golang-queue/queue/core"
)

Expand All @@ -30,30 +30,23 @@ type Message struct {
// RetryDelay set delay between retry
// default is 100ms
RetryDelay time.Duration `json:"retry_delay"`
}

// Bytes get string body
func (m *Message) Bytes() []byte {
if m.Task != nil {
return nil
}
return m.Payload
// Data to save Unsafe cast
Data []byte
}

// Encode for encoding the structure
func (m *Message) Encode() []byte {
b, _ := json.Marshal(m)
const (
movementSize = int(unsafe.Sizeof(Message{}))
)

return b
// Bytes get internal data
func (m *Message) Bytes() []byte {
return m.Data
}

// Rest for reset default value
func (m *Message) Rest() {
m.Task = nil
m.Payload = nil
m.RetryCount = 0
m.Timeout = 0
m.RetryDelay = 0
// Encode for encoding the structure
func (m *Message) Encode() {
m.Data = Encode(m)
}

func NewMessage(m core.QueuedMessage, opts ...AllowOption) *Message {
Expand All @@ -77,3 +70,11 @@ func NewTask(task TaskFunc, opts ...AllowOption) *Message {
Task: task,
}
}

func Encode(m *Message) []byte {
return (*[movementSize]byte)(unsafe.Pointer(m))[:]
}

func Decode(m []byte) *Message {
return (*Message)(unsafe.Pointer(&m[0]))
}
52 changes: 52 additions & 0 deletions job/job_test.go
Original file line number Diff line number Diff line change
@@ -1 +1,53 @@
package job

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestTaskBinaryEncode(t *testing.T) {
m := NewTask(func(context.Context) error {
return nil
},
AllowOption{
RetryCount: Int64(100),
RetryDelay: Time(30 * time.Millisecond),
Timeout: Time(3 * time.Millisecond),
},
)

out := Decode(Encode(m))

assert.Equal(t, int64(100), out.RetryCount)
assert.Equal(t, 30*time.Millisecond, out.RetryDelay)
}

type mockMessage struct {
message string
}

func (m mockMessage) Bytes() []byte {
return []byte(m.message)
}

func TestMessageBinaryEncode(t *testing.T) {
m := NewMessage(&mockMessage{
message: "foo",
},
AllowOption{
RetryCount: Int64(100),
RetryDelay: Time(30 * time.Millisecond),
Timeout: Time(3 * time.Millisecond),
},
)

m.Encode()
out := Decode(m.Bytes())

assert.Equal(t, int64(100), out.RetryCount)
assert.Equal(t, 30*time.Millisecond, out.RetryDelay)
assert.Equal(t, "foo", string(out.Payload))
}
37 changes: 14 additions & 23 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"sync/atomic"
"time"

"github.com/goccy/go-json"
"github.com/golang-queue/queue/core"
"github.com/golang-queue/queue/job"
)
Expand Down Expand Up @@ -113,35 +112,26 @@ func (q *Queue) Wait() {
q.routineGroup.Wait()
}

// Queue to queue all job
func (q *Queue) Queue(m core.QueuedMessage, opts ...job.AllowOption) error {
if atomic.LoadInt32(&q.stopFlag) == 1 {
return ErrQueueShutdown
}

message := job.NewMessage(m, opts...)
payload := message.Encode()
message.Rest()
message.Payload = payload

if err := q.worker.Queue(message); err != nil {
return err
}
// Queue to queue single job with binary
func (q *Queue) Queue(message core.QueuedMessage, opts ...job.AllowOption) error {
data := job.NewMessage(message, opts...)
data.Encode()

q.metric.IncSubmittedTask()

return nil
return q.queue(data)
}

// QueueTask to queue job task
// QueueTask to queue single task
func (q *Queue) QueueTask(task job.TaskFunc, opts ...job.AllowOption) error {
data := job.NewTask(task, opts...)
return q.queue(data)
}

func (q *Queue) queue(m *job.Message) error {
if atomic.LoadInt32(&q.stopFlag) == 1 {
return ErrQueueShutdown
}

message := job.NewTask(task, opts...)

if err := q.worker.Queue(message); err != nil {
if err := q.worker.Queue(m); err != nil {
return err
}

Expand Down Expand Up @@ -178,7 +168,8 @@ func (q *Queue) work(task core.QueuedMessage) {
func (q *Queue) run(task core.QueuedMessage) error {
data := task.(*job.Message)
if data.Task == nil {
_ = json.Unmarshal(task.Bytes(), data)
data = job.Decode(task.Bytes())
data.Data = data.Payload
}

return q.handle(data)
Expand Down