Skip to content

Commit

Permalink
chore(consumer): move run model to queue main package (#91)
Browse files Browse the repository at this point in the history
  • Loading branch information
appleboy authored Dec 4, 2022
1 parent 787f08f commit 0ef81ac
Show file tree
Hide file tree
Showing 8 changed files with 213 additions and 209 deletions.
81 changes: 2 additions & 79 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ import (
"sync/atomic"
"time"

"github.com/goccy/go-json"
"github.com/golang-queue/queue/core"
"github.com/golang-queue/queue/job"
)

var _ core.Worker = (*Consumer)(nil)
Expand All @@ -29,84 +27,9 @@ type Consumer struct {
requestTimeout time.Duration
}

func (s *Consumer) handle(m *job.Message) error {
// create channel with buffer size 1 to avoid goroutine leak
done := make(chan error, 1)
panicChan := make(chan interface{}, 1)
startTime := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), m.Timeout)
defer func() {
cancel()
}()

// run the job
go func() {
// handle panic issue
defer func() {
if p := recover(); p != nil {
panicChan <- p
}
}()

// run custom process function
var err error
loop:
for {
if m.Task != nil {
err = m.Task(ctx)
} else {
err = s.runFunc(ctx, m)
}

// check error and retry count
if err == nil || m.RetryCount == 0 {
break
}
m.RetryCount--

select {
case <-time.After(m.RetryDelay): // retry delay time
case <-ctx.Done(): // timeout reached
err = ctx.Err()
break loop
}
}

done <- err
}()

select {
case p := <-panicChan:
panic(p)
case <-ctx.Done(): // timeout reached
return ctx.Err()
case <-s.stop: // shutdown service
// cancel job
cancel()

leftTime := m.Timeout - time.Since(startTime)
// wait job
select {
case <-time.After(leftTime):
return context.DeadlineExceeded
case err := <-done: // job finish
return err
case p := <-panicChan:
panic(p)
}
case err := <-done: // job finish
return err
}
}

// Run to execute new task
func (s *Consumer) Run(task core.QueuedMessage) error {
data := task.(*job.Message)
if data.Task == nil {
_ = json.Unmarshal(task.Bytes(), data)
}

return s.handle(data)
func (s *Consumer) Run(ctx context.Context, task core.QueuedMessage) error {
return s.runFunc(ctx, task)
}

// Shutdown the worker
Expand Down
115 changes: 0 additions & 115 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,121 +207,6 @@ func TestGoroutinePanic(t *testing.T) {
q.Release()
}

func TestHandleTimeout(t *testing.T) {
m := &job.Message{
Timeout: 100 * time.Millisecond,
Payload: []byte("foo"),
}
w := NewConsumer(
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
time.Sleep(200 * time.Millisecond)
return nil
}),
)

err := w.handle(m)
assert.Error(t, err)
assert.Equal(t, context.DeadlineExceeded, err)

m = &job.Message{
Timeout: 150 * time.Millisecond,
Payload: []byte("foo"),
}

w = NewConsumer(
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
time.Sleep(200 * time.Millisecond)
return nil
}),
)

done := make(chan error)
go func() {
done <- w.handle(m)
}()

err = <-done
assert.Error(t, err)
assert.Equal(t, context.DeadlineExceeded, err)
}

func TestJobComplete(t *testing.T) {
m := &job.Message{
Timeout: 100 * time.Millisecond,
Payload: []byte("foo"),
}
w := NewConsumer(
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
return errors.New("job completed")
}),
)

err := w.handle(m)
assert.Error(t, err)
assert.Equal(t, errors.New("job completed"), err)

m = &job.Message{
Timeout: 250 * time.Millisecond,
Payload: []byte("foo"),
}

w = NewConsumer(
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
time.Sleep(200 * time.Millisecond)
return errors.New("job completed")
}),
)

done := make(chan error)
go func() {
done <- w.handle(m)
}()

err = <-done
assert.Error(t, err)
assert.Equal(t, errors.New("job completed"), err)
}

func TestTaskJobComplete(t *testing.T) {
m := &job.Message{
Timeout: 100 * time.Millisecond,
Task: func(ctx context.Context) error {
return errors.New("job completed")
},
}
w := NewConsumer()

err := w.handle(m)
assert.Error(t, err)
assert.Equal(t, errors.New("job completed"), err)

m = &job.Message{
Timeout: 250 * time.Millisecond,
Task: func(ctx context.Context) error {
return nil
},
}

w = NewConsumer()
done := make(chan error)
go func() {
done <- w.handle(m)
}()

err = <-done
assert.NoError(t, err)

// job timeout
m = &job.Message{
Timeout: 50 * time.Millisecond,
Task: func(ctx context.Context) error {
time.Sleep(60 * time.Millisecond)
return nil
},
}
assert.Equal(t, context.DeadlineExceeded, w.handle(m))
}

func TestIncreaseWorkerCount(t *testing.T) {
w := NewConsumer(
WithLogger(NewEmptyLogger()),
Expand Down
4 changes: 3 additions & 1 deletion core/worker.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package core

import "context"

// Worker interface
type Worker interface {
// Run is called to start the worker
Run(task QueuedMessage) error
Run(ctx context.Context, task QueuedMessage) error
// Shutdown is called if stop all worker
Shutdown() error
// Queue to send message in Queue
Expand Down
9 changes: 5 additions & 4 deletions mocks/mock_worker.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

83 changes: 82 additions & 1 deletion queue.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package queue

import (
"context"
"errors"
"sync"
"sync/atomic"
"time"

"github.com/goccy/go-json"
"github.com/golang-queue/queue/core"
"github.com/golang-queue/queue/job"
)
Expand Down Expand Up @@ -167,11 +169,90 @@ func (q *Queue) work(task core.QueuedMessage) {
}
}()

if err = q.worker.Run(task); err != nil {
if err = q.run(task); err != nil {
q.logger.Errorf("runtime error: %s", err.Error())
}
}

func (q *Queue) run(task core.QueuedMessage) error {
data := task.(*job.Message)
if data.Task == nil {
_ = json.Unmarshal(task.Bytes(), data)
}

return q.handle(data)
}

func (q *Queue) handle(m *job.Message) error {
// create channel with buffer size 1 to avoid goroutine leak
done := make(chan error, 1)
panicChan := make(chan interface{}, 1)
startTime := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), m.Timeout)
defer func() {
cancel()
}()

// run the job
go func() {
// handle panic issue
defer func() {
if p := recover(); p != nil {
panicChan <- p
}
}()

// run custom process function
var err error
loop:
for {
if m.Task != nil {
err = m.Task(ctx)
} else {
err = q.worker.Run(ctx, m)
}

// check error and retry count
if err == nil || m.RetryCount == 0 {
break
}
m.RetryCount--

select {
case <-time.After(m.RetryDelay): // retry delay time
case <-ctx.Done(): // timeout reached
err = ctx.Err()
break loop
}
}

done <- err
}()

select {
case p := <-panicChan:
panic(p)
case <-ctx.Done(): // timeout reached
return ctx.Err()
case <-q.quit: // shutdown service
// cancel job
cancel()

leftTime := m.Timeout - time.Since(startTime)
// wait job
select {
case <-time.After(leftTime):
return context.DeadlineExceeded
case err := <-done: // job finish
return err
case p := <-panicChan:
panic(p)
}
case err := <-done: // job finish
return err
}
}

// UpdateWorkerCount to update worker number dynamically.
func (q *Queue) UpdateWorkerCount(num int) {
q.workerCount = num
Expand Down
Loading

0 comments on commit 0ef81ac

Please sign in to comment.