Octopus is a golang library for managing a goroutine pool that can dynamic adjust the number of goroutine, the api is a bit like java concurrent pool api.
Octopus can new a pool to submit Callable job or Runnable job, Callable job is a function with a interface{} return value and no arguments, Runnable job is a function without arguments and return value, a job will be allocated to a worker when it becomes available.
- dynamic adjust the number of goroutine according the idle of goroutine
- support synchronous and asynchronous to get calculating result
- support timeout to get calculating result
- support to get status of a job
- can drop jobs when pool is busy
- automatic recovery from a job's panic
- can set a log function to record pool's log infos
- the api is a bit like java concurrent pool and more easily to use
https://godoc.org/github.com/Comdex/octopus
go get github.com/Comdex/octopus
package main
import (
"time"
"fmt"
"github.com/Comdex/octopus"
)
func main() {
// the cachedpool will dynamic adjust the number of goroutine called worker according
// the timeout of workers process job and idle time of workers
pool, err := octopus.NewCachedWorkerPool()
if err != nil {
fmt.Println(err)
}
// you can set a log func to get pool's log info
pool.SetLogFunc(func(msg string){
fmt.Println(msg)
})
// the Runnable is a simple function
var r Runnable = func() {
fmt.Println("test runnable var")
}
pool.SubmitRunnable(r)
// the Callable is a function with a return value
var c Callable = func() interface{} {
s := "test callable var"
return
}
pool.SubmitCallable(c)
pool.SubmitRunnable(func(){
fmt.Println("test1")
})
future, err2 := pool.SubmitCallable(func() interface{} {
time.Sleep(2*time.Second)
return "test2"
})
if err2 != nil {
fmt.Println(err2)
}
// the Get method of future will wait for return value is prepared
// Is it like a java concurrent pool api?
value, err3 := future.Get()
if err3 != nil {
fmt.Println(err3)
}
fmt.Println("value: ", value)
future2 , _ := pool.SubmitCallable(func() interface{} {
time.Sleep(2*time.Second)
return "test3"
})
//Get Value support timeout
value2, timeoutErr := future2.GetTimed(1*time.Second)
if timeoutErr != nil {
fmt.Println(timeoutErr)
}
fmt.Println(value2)
// close the pool and wait for all goroutines done
pool.Shutdown()
}
package main
import (
"fmt"
"github.com/Comdex/octopus"
)
func main() {
pool, err := octopus.NewCachedDataProcessPool(func(object interface{}) interface{} {
v := object.(int)
return "data: " + strconv.Itoa(v)
})
if err != nil {
fmt.Println(err)
}
pool.Submit(8)
pool.Submit(29)
future, err2 := pool.Submit(100)
if err != nil {
fmt.Println(err)
}
// the api is synchronous
value, err3 := future.Get()
if err3 != nil {
fmt.Println(err3)
}
fmt.Println("100 value: ", value)
future2, _ := pool.Submit(200)
// Get method support timeout
value2, _ := future2.GetTimed(2*time.Second)
fmt.Println("200 value: ", value2)
// close the pool and wait for all goroutine done
pool.Shutdown()
}
Apache License
more api usage please refer to docs