Skip to content

Commit

Permalink
Add concurrency limit for task
Browse files Browse the repository at this point in the history
  • Loading branch information
nekohasekai committed Nov 30, 2023
1 parent eb56a60 commit 1f82310
Showing 1 changed file with 19 additions and 0 deletions.
19 changes: 19 additions & 0 deletions common/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Group struct {
tasks []taskItem
cleanup func()
fastFail bool
queue chan struct{}
}

func (g *Group) Append(name string, f func(ctx context.Context) error) {
Expand All @@ -46,6 +47,13 @@ func (g *Group) FastFail() {
g.fastFail = true
}

func (g *Group) Concurrency(n int) {
g.queue = make(chan struct{}, n)
for i := 0; i < n; i++ {
g.queue <- struct{}{}
}
}

func (g *Group) Run(contextList ...context.Context) error {
return g.RunContextList(contextList)
}
Expand All @@ -65,6 +73,14 @@ func (g *Group) RunContextList(contextList []context.Context) error {
for _, task := range g.tasks {
currentTask := task
go func() {
if g.queue != nil {
<-g.queue
select {
case <-taskCancelContext.Done():
return
default:
}
}
err := currentTask.Run(taskCancelContext)
errorAccess.Lock()
if err != nil {
Expand All @@ -83,6 +99,9 @@ func (g *Group) RunContextList(contextList []context.Context) error {
taskCancel(errTaskSucceed{})
taskFinish(errTaskSucceed{})
}
if g.queue != nil {
g.queue <- struct{}{}
}
}()
}

Expand Down

0 comments on commit 1f82310

Please sign in to comment.