Skip to content

Commit

Permalink
feat(ctx) change close policy from chan to context
Browse files Browse the repository at this point in the history
  • Loading branch information
Stepan Pesternikov committed Sep 23, 2019
1 parent 695ad6b commit b029f76
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 40 deletions.
21 changes: 11 additions & 10 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ type Config struct {
// as well as processing task.
type Pool struct {
config Config
close chan struct{}
cancel context.CancelFunc
ctx context.Context
syncClose sync.Once
taskDeque *customTaskDeque
arbiterWg sync.WaitGroup
Expand Down Expand Up @@ -95,14 +96,14 @@ func NewPool(c *Config) *Pool {

p.config = c.withDefaults()

p.close = make(chan struct{})
p.ctx, p.cancel = context.WithCancel(context.Background())
p.taskDeque = newCustomTaskDeque()
p.workers = make([]*customWorker, 0, p.config.UnstoppableWorkers)
p.waitChan = make(chan struct{}, 1)

p.workerWg.Add(p.config.UnstoppableWorkers)
for i := 0; i < p.config.UnstoppableWorkers; i++ {
worker := newCustomWorker(p.close)
worker := newCustomWorker(p.ctx)
p.workers = append(p.workers, worker)
go worker.Run(func() {
p.waitTask()
Expand Down Expand Up @@ -139,8 +140,8 @@ func (p *Pool) arbiter() {
select {
case <-p.waitChan:
break
case <-p.close:
p.taskDeque.put(t, true)
case <-p.ctx.Done():
_ = p.taskDeque.put(t, true)
p.arbiterWg.Done()
return
}
Expand Down Expand Up @@ -174,7 +175,7 @@ func (p *Pool) setUnstoppableWorkers(count int) {
for i := 0; i < p.config.UnstoppableWorkers-count; i++ {
p.workerWg.Add(p.config.UnstoppableWorkers - count)
for i := 0; i < p.config.UnstoppableWorkers; i++ {
worker := newCustomWorker(p.close)
worker := newCustomWorker(p.ctx)
p.workers = append(p.workers, worker)
go worker.Run(func() {
p.waitTask()
Expand All @@ -199,7 +200,7 @@ func (p *Pool) spawn(t *wrappedTask) bool {
}

atomic.AddInt64(&p.spawnCount, 1)
worker := newCustomWorker(p.close)
worker := newCustomWorker(p.ctx)
go worker.Spawn(t, func() {
atomic.AddInt64(&p.spawnCount, -1)
p.waitTask()
Expand Down Expand Up @@ -284,7 +285,7 @@ func (p *Pool) UnstoppableWorkers() int {
func (p *Pool) Close() {
p.syncClose.Do(func() {
p.guard.Lock()
close(p.close)
p.cancel()
p.guard.Unlock()
p.taskDeque.close()

Expand All @@ -296,7 +297,7 @@ func (p *Pool) Close() {
for _, worker := range workers {
worker.Release()
for task := range worker.taskChan {
p.taskDeque.put(task, true)
_ = p.taskDeque.put(task, true)
}
}

Expand All @@ -313,7 +314,7 @@ func (p *Pool) Close() {
// IsClosed returns true if this pool has been closed.
func (p *Pool) IsClosed() bool {
select {
case <-p.close:
case <-p.ctx.Done():
return true
default:
return false
Expand Down
46 changes: 23 additions & 23 deletions pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestSubmit(t *testing.T) {
return nil, nil
})

p.Submit(task)
_ = p.Submit(task)
}

activeCount := p.ActiveCount()
Expand Down Expand Up @@ -78,14 +78,14 @@ func TestSubmitClose1(t *testing.T) {
select {
case completion <- i:
break
case <-p.close:
case <-p.ctx.Done():
return nil, nil
}
atomic.AddUint32(&count, 1)
return nil, nil
}, i)

p.Submit(task)
_ = p.Submit(task)
}

resultCount := 0
Expand Down Expand Up @@ -127,7 +127,7 @@ func TestSubmitClose2(t *testing.T) {
select {
case completion <- i:
break
case <-p.close:
case <-p.ctx.Done():
return nil, nil
}
atomic.AddUint32(&count, 1)
Expand Down Expand Up @@ -169,13 +169,13 @@ func TestSubmitWithCompletionClose1(t *testing.T) {
select {
case completion1 <- i:
break
case <-p.close:
case <-p.ctx.Done():
break
}
return 1, nil
}, i)

p.SubmitWithCompletion(completion2, task)
_ = p.SubmitWithCompletion(completion2, task)
}

resultCount := 0
Expand Down Expand Up @@ -205,13 +205,13 @@ func TestSubmitWithCompletionClose2(t *testing.T) {
select {
case completion1 <- i:
break
case <-p.close:
case <-p.ctx.Done():
break
}
return 1, nil
}, i)

p.SubmitWithCompletion(completion2, task)
_ = p.SubmitWithCompletion(completion2, task)
}

resultCount := 0
Expand Down Expand Up @@ -239,7 +239,7 @@ func TestSubmitWithCompletion(t *testing.T) {
return 1, nil
})

p.SubmitWithCompletion(completion, task)
_ = p.SubmitWithCompletion(completion, task)
}

count := 0
Expand Down Expand Up @@ -285,7 +285,7 @@ func TestSubmitWithCompletionPanic(t *testing.T) {
return 1, nil
}, i)

p.SubmitWithCompletion(completion, task)
_ = p.SubmitWithCompletion(completion, task)
}

countPanic := 0
Expand Down Expand Up @@ -348,7 +348,7 @@ func TestSubmitWithCancel(t *testing.T) {
return nil, nil
}, i)

p.SubmitWithContext(ctx, task)
_ = p.SubmitWithContext(ctx, task)
}

wg1.Wait()
Expand Down Expand Up @@ -381,7 +381,7 @@ func TestSubmitWithCancel(t *testing.T) {
return nil, nil
}, i)

p.SubmitWithContext(ctx, task)
_ = p.SubmitWithContext(ctx, task)
}

wg1.Wait()
Expand Down Expand Up @@ -417,7 +417,7 @@ func TestSubmitCustom1(t *testing.T) {
return 1, nil
}, i)

p.SubmitCustom(ctx, completion, task)
_ = p.SubmitCustom(ctx, completion, task)
}

wg1.Wait()
Expand Down Expand Up @@ -447,7 +447,7 @@ func TestSubmitCustom2(t *testing.T) {
return 1, nil
}, i)

p.SubmitCustom(ctx, completion, task)
_ = p.SubmitCustom(ctx, completion, task)
}

wg1.Wait()
Expand Down Expand Up @@ -569,7 +569,7 @@ func TestWaitClose(t *testing.T) {
return nil, nil
})

p.SubmitWithCompletion(completion, task)
_ = p.SubmitWithCompletion(completion, task)
}

resultCount := 0
Expand Down Expand Up @@ -610,22 +610,22 @@ func TestResize1(t *testing.T) {
return nil, nil
}, i)

p.Submit(task)
_ = p.Submit(task)
}

wg1.Wait()

p.SetSize(7)
_ = p.SetSize(7)
if p.config.Size != 7 {
t.Fatalf("invalid pool size %d; want 7", p.Size())
}

p.SetSize(12)
_ = p.SetSize(12)
if p.config.Size != 12 {
t.Fatalf("invalid pool size %d; want 12", p.Size())
}

p.SetSize(3)
_ = p.SetSize(3)
if p.config.Size != 3 {
t.Fatalf("invalid pool size %d; want 3", p.Size())
}
Expand Down Expand Up @@ -665,22 +665,22 @@ func TestResize2(t *testing.T) {
return nil, nil
}, i)

p.Submit(task)
_ = p.Submit(task)
}

wg1.Wait()

p.SetSize(7)
_ = p.SetSize(7)
if p.config.Size != 7 {
t.Fatalf("invalid pool size %d; want 7", p.Size())
}

p.SetSize(12)
_ = p.SetSize(12)
if p.config.Size != 12 {
t.Fatalf("invalid pool size %d; want 12", p.Size())
}

p.SetSize(3)
_ = p.SetSize(3)
if p.config.Size != 3 {
t.Fatalf("invalid pool size %d; want 3", p.Size())
}
Expand Down
4 changes: 2 additions & 2 deletions task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func TestCustomTaskCancel(t *testing.T) {
}

task.Get()
task.Error()
_ = task.Error()
task.Panic()
}

Expand Down Expand Up @@ -44,6 +44,6 @@ func TestCustomTaskCancelDone(t *testing.T) {
}

task.Get()
task.Error()
_ = task.Error()
task.Panic()
}
10 changes: 5 additions & 5 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,19 @@ import (
)

type customWorker struct {
ctx context.Context
taskChan chan *wrappedTask
freeChan chan struct{}
syncRelease sync.Once
release chan struct{}
kill <-chan struct{}
}

func newCustomWorker(kill <-chan struct{}) *customWorker {
func newCustomWorker(ctx context.Context) *customWorker {
return &customWorker{
ctx: ctx,
taskChan: make(chan *wrappedTask, 1),
release: make(chan struct{}),
freeChan: make(chan struct{}, 1),
kill: kill,
}
}

Expand All @@ -55,7 +55,7 @@ func (cw *customWorker) completeTask(t *wrappedTask) {
break
case <-cw.release:
break
case <-cw.kill:
case <-cw.ctx.Done():
break
case <-ctx.Done():
break
Expand Down Expand Up @@ -105,7 +105,7 @@ LOOP:
cw.runTask(t, onComplete)
case <-cw.release:
break LOOP
case <-cw.kill:
case <-cw.ctx.Done():
break LOOP
}
}
Expand Down

0 comments on commit b029f76

Please sign in to comment.