Skip to content

Commit

Permalink
feat: removed our awaitable and stoppable interfaces
Browse files Browse the repository at this point in the history
I came the conclusion that the interfaces were just causing more work for minimal benefit.

Yes technically this is breaking but again, I'm the only user of this lib and I can;t be bothered with the whole branching / module renaming thing until I can get semantic release to automate the whole process.
  • Loading branch information
brad-jones committed Sep 18, 2020
1 parent 2fa58e5 commit 2594a17
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 72 deletions.
86 changes: 30 additions & 56 deletions await/await.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,13 @@ import (
"time"

"github.com/brad-jones/goasync/v2/stop"
"github.com/brad-jones/goasync/v2/task"
"github.com/brad-jones/goerr/v2"
)

// Awaitable refers to any type that has a Result() method
type Awaitable interface {
Result() (interface{}, error)
}

// All will wait for every given task to emit a result, the results (& errors)
// will be returned in a slice ordered the same as the input.
func All(awaitables ...Awaitable) ([]interface{}, error) {
func All(awaitables ...*task.Task) ([]interface{}, error) {
awaited := []interface{}{}
awaitedErrors := []error{}

Expand All @@ -37,19 +33,19 @@ func All(awaitables ...Awaitable) ([]interface{}, error) {
}

// MustAll does the same thing as All but panics if an error is encountered
func MustAll(awaitables ...Awaitable) []interface{} {
func MustAll(awaitables ...*task.Task) []interface{} {
v, e := All(awaitables...)
goerr.Check(e)
return v
}

// AllOrError will wait for every given task to emit a result or
// return as soon as an error is encountered, stopping all other tasks.
func AllOrError(awaitables ...Awaitable) ([]interface{}, error) {
defer stop.All(awaitableToStopable(awaitables...)...)
func AllOrError(awaitables ...*task.Task) ([]interface{}, error) {
defer stop.All(awaitables...)

errCh := make(chan error, 1)
valueCh := make(chan map[Awaitable]interface{}, 1)
valueCh := make(chan map[*task.Task]interface{}, 1)

for _, v := range awaitables {
awaitable := v
Expand All @@ -59,13 +55,13 @@ func AllOrError(awaitables ...Awaitable) ([]interface{}, error) {
errCh <- goerr.Wrap(err)
return
}
valueCh <- map[Awaitable]interface{}{
valueCh <- map[*task.Task]interface{}{
awaitable: v,
}
}()
}

values := map[Awaitable]interface{}{}
values := map[*task.Task]interface{}{}
for {
select {
case err := <-errCh:
Expand All @@ -86,19 +82,19 @@ func AllOrError(awaitables ...Awaitable) ([]interface{}, error) {
}

// MustAllOrError does the same thing as AllOrError but panics if an error is encountered
func MustAllOrError(awaitables ...Awaitable) []interface{} {
func MustAllOrError(awaitables ...*task.Task) []interface{} {
v, e := AllOrError(awaitables...)
goerr.Check(e)
return v
}

// AllOrErrorWithTimeout does the same as AllOrError but allows you to set a
// timeout for waiting for other tasks to stop.
func AllOrErrorWithTimeout(timeout time.Duration, awaitables ...Awaitable) ([]interface{}, error) {
defer stop.AllWithTimeout(timeout, awaitableToStopableWithTimeout(awaitables...)...)
func AllOrErrorWithTimeout(timeout time.Duration, awaitables ...*task.Task) ([]interface{}, error) {
defer stop.AllWithTimeout(timeout, awaitables...)

errCh := make(chan error, 1)
valueCh := make(chan map[Awaitable]interface{}, 1)
valueCh := make(chan map[*task.Task]interface{}, 1)

for _, v := range awaitables {
awaitable := v
Expand All @@ -108,13 +104,13 @@ func AllOrErrorWithTimeout(timeout time.Duration, awaitables ...Awaitable) ([]in
errCh <- goerr.Wrap(err)
return
}
valueCh <- map[Awaitable]interface{}{
valueCh <- map[*task.Task]interface{}{
awaitable: v,
}
}()
}

values := map[Awaitable]interface{}{}
values := map[*task.Task]interface{}{}
for {
select {
case err := <-errCh:
Expand All @@ -135,19 +131,19 @@ func AllOrErrorWithTimeout(timeout time.Duration, awaitables ...Awaitable) ([]in
}

// MustAllOrErrorWithTimeout does the same thing as AllOrErrorWithTimeout but panics if an error is encountered
func MustAllOrErrorWithTimeout(timeout time.Duration, awaitables ...Awaitable) []interface{} {
func MustAllOrErrorWithTimeout(timeout time.Duration, awaitables ...*task.Task) []interface{} {
v, e := AllOrErrorWithTimeout(timeout, awaitables...)
goerr.Check(e)
return v
}

// FastAllOrError does the same as AllOrError but does not wait for all other
// tasks to stop, it does tell them to stop it just doesn't wait for them to stop.
func FastAllOrError(awaitables ...Awaitable) ([]interface{}, error) {
defer stop.AllAsync(awaitableToStopable(awaitables...)...)
func FastAllOrError(awaitables ...*task.Task) ([]interface{}, error) {
defer stop.AllAsync(awaitables...)

errCh := make(chan error, 1)
valueCh := make(chan map[Awaitable]interface{}, 1)
valueCh := make(chan map[*task.Task]interface{}, 1)

for _, v := range awaitables {
awaitable := v
Expand All @@ -157,13 +153,13 @@ func FastAllOrError(awaitables ...Awaitable) ([]interface{}, error) {
errCh <- goerr.Wrap(err)
return
}
valueCh <- map[Awaitable]interface{}{
valueCh <- map[*task.Task]interface{}{
awaitable: v,
}
}()
}

values := map[Awaitable]interface{}{}
values := map[*task.Task]interface{}{}
for {
select {
case err := <-errCh:
Expand All @@ -184,16 +180,16 @@ func FastAllOrError(awaitables ...Awaitable) ([]interface{}, error) {
}

// MustFastAllOrError does the same thing as FastAllOrError but panics if an error is encountered
func MustFastAllOrError(awaitables ...Awaitable) []interface{} {
func MustFastAllOrError(awaitables ...*task.Task) []interface{} {
v, e := FastAllOrError(awaitables...)
goerr.Check(e)
return v
}

// Any will wait for the first task to emit a result (or an error)
// and return that, stopping all other tasks.
func Any(awaitables ...Awaitable) (interface{}, error) {
defer stop.All(awaitableToStopable(awaitables...)...)
func Any(awaitables ...*task.Task) (interface{}, error) {
defer stop.All(awaitables...)

valueCh := make(chan interface{}, 1)
errCh := make(chan error, 1)
Expand All @@ -218,16 +214,16 @@ func Any(awaitables ...Awaitable) (interface{}, error) {
}

// MustAny does the same thing as Any but panics if an error is encountered
func MustAny(awaitables ...Awaitable) interface{} {
func MustAny(awaitables ...*task.Task) interface{} {
v, e := Any(awaitables...)
goerr.Check(e)
return v
}

// AnyWithTimeout does the same as Any but allows you to set a
// timeout for waiting for other tasks to stop.
func AnyWithTimeout(timeout time.Duration, awaitables ...Awaitable) (interface{}, error) {
defer stop.AllWithTimeout(timeout, awaitableToStopableWithTimeout(awaitables...)...)
func AnyWithTimeout(timeout time.Duration, awaitables ...*task.Task) (interface{}, error) {
defer stop.AllWithTimeout(timeout, awaitables...)

valueCh := make(chan interface{}, 1)
errCh := make(chan error, 1)
Expand All @@ -252,16 +248,16 @@ func AnyWithTimeout(timeout time.Duration, awaitables ...Awaitable) (interface{}
}

// MustAnyWithTimeout does the same thing as AnyWithTimeout but panics if an error is encountered
func MustAnyWithTimeout(timeout time.Duration, awaitables ...Awaitable) interface{} {
func MustAnyWithTimeout(timeout time.Duration, awaitables ...*task.Task) interface{} {
v, e := AnyWithTimeout(timeout, awaitables...)
goerr.Check(e)
return v
}

// FastAny does the same as Any but does not wait for all other tasks to stop,
// it does tell them to stop it just doesn't wait for them to stop.
func FastAny(awaitables ...Awaitable) (interface{}, error) {
defer stop.AllAsync(awaitableToStopable(awaitables...)...)
func FastAny(awaitables ...*task.Task) (interface{}, error) {
defer stop.AllAsync(awaitables...)

valueCh := make(chan interface{}, 1)
errCh := make(chan error, 1)
Expand All @@ -286,7 +282,7 @@ func FastAny(awaitables ...Awaitable) (interface{}, error) {
}

// MustFastAny does the same thing as FastAny but panics if an error is encountered
func MustFastAny(awaitables ...Awaitable) interface{} {
func MustFastAny(awaitables ...*task.Task) interface{} {
v, e := FastAny(awaitables...)
goerr.Check(e)
return v
Expand All @@ -300,25 +296,3 @@ type ErrTaskFailed struct {
func (e *ErrTaskFailed) Error() string {
return "await: one or more errors were returned from the awaited tasks"
}

func awaitableToStopable(awaitables ...Awaitable) []stop.Stopable {
stopables := []stop.Stopable{}
for _, awaitable := range awaitables {
v, ok := awaitable.(stop.Stopable)
if ok {
stopables = append(stopables, v)
}
}
return stopables
}

func awaitableToStopableWithTimeout(awaitables ...Awaitable) []stop.StopableWithTimeout {
stopables := []stop.StopableWithTimeout{}
for _, awaitable := range awaitables {
v, ok := awaitable.(stop.StopableWithTimeout)
if ok {
stopables = append(stopables, v)
}
}
return stopables
}
20 changes: 4 additions & 16 deletions stop/stop.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,42 +7,30 @@ import (
"github.com/brad-jones/goasync/v2/task"
)

// Stopable is any object that has a Stop method,
// the functions in this package then call that method for you.
type Stopable interface {
Stop()
}

// All will loop through all provided objects and call their Stop method.
func All(stopables ...Stopable) {
func All(stopables ...*task.Task) {
for _, stopable := range stopables {
stopable.Stop()
}
}

// AllAsync does exactly the same thing as All but does so asynchronously.
func AllAsync(stopables ...Stopable) *task.Task {
func AllAsync(stopables ...*task.Task) *task.Task {
return task.New(func(t *task.Internal) {
All(stopables...)
})
}

// StopableWithTimeout is any object that has a StopWithTimeout method,
// the functions in this package then call that method for you.
type StopableWithTimeout interface {
StopWithTimeout(timeout time.Duration) error
}

// AllWithTimeout all provided objects and call their StopWithTimeout
// method with the given timeout value.
func AllWithTimeout(timeout time.Duration, stopables ...StopableWithTimeout) {
func AllWithTimeout(timeout time.Duration, stopables ...*task.Task) {
for _, stopable := range stopables {
stopable.StopWithTimeout(timeout)
}
}

// AllWithTimeoutAsync does exactly the same thing as AllWithTimeout but does so asynchronously.
func AllWithTimeoutAsync(timeout time.Duration, stopables ...StopableWithTimeout) *task.Task {
func AllWithTimeoutAsync(timeout time.Duration, stopables ...*task.Task) *task.Task {
return task.New(func(t *task.Internal) {
AllWithTimeout(timeout, stopables...)
})
Expand Down

0 comments on commit 2594a17

Please sign in to comment.