Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

proposal: sync: add golang.org/x/sync/errgroup #57534

Open
changkun opened this issue Dec 31, 2022 · 39 comments
Open

proposal: sync: add golang.org/x/sync/errgroup #57534

changkun opened this issue Dec 31, 2022 · 39 comments
Labels
Milestone

Comments

@changkun
Copy link
Member

changkun commented Dec 31, 2022

As briefly discussed in #56102 (comment), I propose to promote errgroup.Group to the sync package. The proposed API set is listed below.

Rationale

Compared to sync.WaitGroup, errgroup.Group do not require cognitive load to manage Add() and Done() and can easily manage the number of concurrent tasks using SetLimit. For example,

g := sync.WaitGroup{}
sem := make(chan struct{}, 5)
for i := 0; i < n; i++ {
	sem <- struct{}{}
	g.Add(1)
	go func() {
		defer func() {
			g.Done()
			<-sem
		}()

		// ...
	}()
}
g.Wait()

vs.

g := errgroup.Group{}
g.SetLimit(5)
for i := 0; i < n; i++ {
	g.Go(func() {
		// ...
	})
}
g.Wait()

Tu et al. [1] also reported that WaitGroup is often misused and causes concurrency bugs. For instance, an example taken from Figure 9:

func (p *peer) send() {
	p.mu.Lock()
	defer p.mu.Unlock()
	switch p.status {
		case idle:
+			p.wg.Add(1)
			go func() {
-				p.wg.Add(1)
				...
				p.wg.Done()
			}()
		case stopped:
	}
}

func (p * peer) stop() {
	p.mu.Lock()
	p.status = stopped
	p.mu.Unlock()
	p.wg.Wait()
}

[1] Tu, Tengfei, et al. "Understanding real-world concurrency bugs in go." Proceedings of the Twenty-Fourth International Conference on Architectural Support for Programming Languages and Operating Systems. 2019. https://doi.org/10.1145/3297858.3304069

Existing Usage

A search over GitHub, code that uses errgroup.Group includes 16.1k files and pkg.go.dev shows there are 10,456 imports.

APIs

package sync

// An ErrGroup is a collection of goroutines working on subtasks that are part of
// the same overall task.
//
// A zero ErrGroup is valid, has no limit on the number of active goroutines,
// and does not cancel on error.
type ErrGroup struct

// SetLimit limits the number of active goroutines in this group to at most n.
// A negative value indicates no limit.
//
// Any subsequent call to the Go method will block until it can add an active
// goroutine without exceeding the configured limit.
//
// The limit must not be modified while any goroutines in the group are active.
func (g *ErrGroup) SetLimit(n int)

// Go calls the given function in a new goroutine.
// It blocks until the new goroutine can be added without the number of
// active goroutines in the group exceeding the configured limit.
//
// The first call to return a non-nil error cancels the group's context, if the
// group was created by calling WithContext. The error will be returned by Wait.
func (g *ErrGroup) Go(f func() error)

// TryGo calls the given function in a new goroutine only if the number of
// active goroutines in the group is currently below the configured limit.
//
// The return value reports whether the goroutine was started.
func (g *ErrGroup) TryGo(f func() error) bool

// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *ErrGroup) Wait() error

// NewErrGroupWithContext returns a new ErrGroup and an associated Context derived from ctx.
//
// The derived Context is canceled the first time a function passed to Go
// returns a non-nil error or the first time Wait returns, whichever occurs
// first.
func NewErrGroupWithContext(ctx context.Context) (*ErrGroup, context.Context)

Update: WithContext is renamed to NewErrGroupWithContext.

@gopherbot gopherbot added this to the Proposal milestone Dec 31, 2022
@itstarsun
Copy link

Note that sync cannot import context.

@changkun
Copy link
Member Author

Note that sync could not import context because context use sync.Mutex. This is not an absolute constraint because context can switch to a different mutex lock to avoid importing sync package.

@zephyrtronium
Copy link
Contributor

Having a function just named "WithContext" in package sync seems not great, since it would be called like g, ctx := sync.WithContext(ctx).

@changkun
Copy link
Member Author

That's a good point. However, naming can be revised in other liked forms, the core idea is to include errgroup.WithContext. Other naming possibilities can be NewErrGroup, e.g., g, ctx := sync.NewErrGroup(ctx).

@ianlancetaylor ianlancetaylor moved this to Incoming in Proposals Dec 31, 2022
@ianlancetaylor
Copy link
Member

Personally I don't think the low-level sync package is the right place for the high level errgroup concept. Let's not assume that x/sync has to match to the standard library sync.

@earthboundkid
Copy link
Contributor

I think this is a common enough need and has proven its worth. Most users shouldn’t use the original WaitGroup anymore. My only question would be if there should be a sync/v2 with this plus generics for Map etc.

@changkun
Copy link
Member Author

changkun commented Jan 2, 2023

@ianlancetaylor I am also fine if it is a sub package of sync sync/errgroup, and then the API signature doesn't need any rename, etc. 😃

The primary purpose is to get it into stdlib, and the naming can be decided into whatever community likes.

If the decision is to have a standalone package, an emphasis comment in the WaitGroup can increase the public exposure of errgroup.

@apparentlymart
Copy link

With the addition of errors.Join in Go 1.20, would it make sense for WaitGroup.Wait to promise to return a "join" of all of the errors returned across all of the functions?

I don't think it must necessarily use errors.Join directly, but more that whatever value it does return could support the same errors.Is idiom that the errors.Join result supports, so that the caller can recognize potentially many distinct errors, which I believe just means the error type having a method Unwrap() []error that returns all of them.

@earthboundkid
Copy link
Contributor

I had been joining errors in my errutil.ExecParallel helper, which uses my pre-Go 1.20 multierror type. I'm not sure though if it would work as well for errgroup.

@bcmills
Copy link
Contributor

bcmills commented Jan 5, 2023

Hi, original errgroup author here. 👋
I would not want errgroup to enter the standard library without some API changes. It has served us well, but now we have the benefit of hindsight.

There are two significant problems with the API:

  1. An errgroup.WithContext today cancels the Context when its Wait method returns, which makes it easier to avoid leaking resources but somewhat prone to bugs involving accidental reuse of the Context after the call to Wait.

  2. The need to call Wait in order to free resources makes errgroup.WithContext unfortunately still somewhat prone to leaks. If you start a bunch of goroutines and then encounter an error while setting up another one, it's easy to accidentally leak all of the goroutines started so far — along with their associated Context — by writing

return err

instead of

cancel()
g.Wait()
return err

Those can both be remedied by adding a method Stop that cancels the Context and also waits for in-flight goroutines to exit:

g, ctx := errgroup.New(ctx)
defer g.Stop()
…
if err != nil {
	return err
}

Ideally, that would be accompanied by an improvement to the lostcancel check in cmd/vet to report missing calls to Stop, and/or a finalizer that verifies that Stop has been called by the time the group becomes unreachable.

@bcmills
Copy link
Contributor

bcmills commented Jan 5, 2023

@apparentlymart, I do not think it would be appropriate for errgroup to join the errors together. Generally only the first error matters, because it causes the Context to be canceled. After that happens, it can be difficult to tell whether subsequent errors are errors in their own right, or secondary effects of that cancellation.

@anjmao
Copy link

anjmao commented Mar 28, 2023

Also, even on error and canceled context it still creates remaining goroutines.

Consider this example with 10k batch.

errg, ctx := errgroup.WithContext(context.Background())
items := []int{1, 2, 3, ...... 10000}
for _, item := range items {
	item := item
	errg.Go(func() error {
		if err := process(ctx, item); err != nil {
			return fmt.Errorf("something went wrong while processing item %d", item)
		}
		return nil
	})
}
err := errg.Wait()

Usually simple fix would be to check for canceled context. But this doesn't always look obvious.

errg, ctx := errgroup.WithContext(context.Background())
items := []int{}
loop:
for _, item := range items {
	item := item
	select {
	case <-ctx.Done():
		break loop
	default:
	}

	errg.Go(func() error {
		if err := process(ctx, item); err != nil {
			return fmt.Errorf("something went wrong while processing item %d", item)
		}
		return nil
	})
}
err := errg.Wait()

@bcmills Do you have thoughts on how can api look to bake this cancellation into errgroup package?

@jimen0
Copy link
Contributor

jimen0 commented Mar 28, 2023

Hi @anjmao if I'm understanding right what you are proposing, it's the same as https://pkg.go.dev/github.com/sourcegraph/conc/pool#ContextPool.WithCancelOnError. Maybe conc's API can be used as an inspiration for this.

WithCancelOnError configures the pool to cancel its context as soon as any task returns an error or panics. By default, the pool's context is not canceled until the parent context is canceled.

@anjmao
Copy link

anjmao commented Mar 28, 2023

@jimen0 errgroup package already works like that and cancels context when created using WithContext. The issue that even if your context is canceled errgroup.Go will spawn new goroutine and run your function.

@bjwrk
Copy link

bjwrk commented Sep 15, 2023

I do not think it would be appropriate for errgroup to join the errors together. Generally only the first error matters, because it causes the Context to be canceled. After that happens, it can be difficult to tell whether subsequent errors are errors in their own right, or secondary effects of that cancellation.

I agree, errgroup.Wait() should definitely return the first error, but it's also reasonable to provide a way to get all of the errors too, perhaps via Errors() []error. This is very much useful, in all the ways that Promise.allSettled is useful in Javascript. Several of the errors would just be context.Canceled or similar if WithContext is used, that would be fine in practice. If I ask the errgroup for all the errors it's possible to give me, I'm happy to get what I get.

@josharian
Copy link
Contributor

Re: @bcmills

I do not think it would be appropriate for errgroup to join the errors together. Generally only the first error matters, because it causes the Context to be canceled.

That is only true if you construct the errgroup using errgroup.WithContext. If you use the zero value new(errgroup.Group), then all errors are meaningful. (And in fact I found this issue searching for a way to get all errors back in exactly this situation.)

To thread that needle, perhaps it could return errors.Join of all errors such that errors.Is(err, context.Canceled) returns false.

@bcmills
Copy link
Contributor

bcmills commented Oct 25, 2023

To thread that needle, perhaps it could return errors.Join of all errors such that errors.Is(err, context.Canceled) returns false.

That's an interesting suggestion, but I'm not sure how well it would work out in practice — errors that stem from a Context cancellation don't necessarily wrap context.Canceled, and it can be very difficult for libraries that propagate those errors to reliably convert them back.

For example, a function that performs a network operation might use context.AfterFunc and net.Conn.SetDeadline to cancel it, resulting in an error wrapping syscall.ETIMEDOUT instead of context.Canceled.

@josharian
Copy link
Contributor

@bcmills good point.

Strictly speaking, if you returned a composite, ordered error, anyone who wanted could pick out just the first error, but that's a common enough use case that it'd be annoying to have to do the dance.

Yet another perspective is that Wait could return the first error when there is a cancelable context set up, but an errors.Join of all errors when there isn't. That should track the likely meaningfulness of the errors.

@bcmills
Copy link
Contributor

bcmills commented Oct 25, 2023

Strictly speaking, if you returned a composite, ordered error, anyone who wanted could pick out just the first error, but that's a common enough use case that it'd be annoying to have to do the dance.

That, and worse: if someone has a very long-lived errgroup, collecting all of the errors when only one is wanted could cause a significant memory leak (to store errors that are ultimately going to be ignored).

Yet another perspective is that Wait could return the first error when there is a cancelable context set up, but an errors.Join of all errors when there isn't.

That would be possible, but it seems a little too magical to me. If we're going down this road, I would rather have an explicit call or parameter to toggle the behavior.

@josharian
Copy link
Contributor

If we're going down this road, I would rather have an explicit call or parameter to toggle the behavior.

Works for me. It'd be much nicer than my current code that separately tracks and juggles a slice of errors...

@bcmills
Copy link
Contributor

bcmills commented Oct 25, 2023

The cleanest alternative right now is a defer, I think:

errs := make(chan []error, 1)
errs <- nil
saveErr := func(p *error) {
	if *p != nil {
		errs <- append(<-errs, *p)
	}
}
g.Go(func() (err error) {
		defer saveErr(&err)
		…
})

@dolmen
Copy link
Contributor

dolmen commented Oct 28, 2023

My own rendezvous.WaitFirstError(... TaskCtx) (designed independently before discovering this thread) has the following features:

  • a child context is shared between all the tasks (to inform them of failure of a sibling) (like errgroup.WithContext)
  • all non-nil errors are reported, with errors.Join
  • if the parent context is canceled AND at least one task fails, the error of the parent context (ctx.Err()) is reported as the first error as that error may have triggered the others in cascade. This ensures that if the parent context has been canceled/timeout and at least an error occurred then errors.Is(err, context.Canceled /* or Timeout*/) is true, but if the context has been canceled/timeout but no error occurred in task (task succeeded or cancellation occurred even before the start) then errors.Is(err, context.Canceled /* or Timeout*/) is false.

@bcmills wrote:

That's an interesting suggestion, but I'm not sure how well it would work out in practice — errors that stem from a Context cancellation don't necessarily wrap context.Canceled, and it can be very difficult for libraries that propagate those errors to reliably convert them back.

That's the reason for my last feature.

@betamos
Copy link

betamos commented Jan 13, 2024

I often find myself using an errgroup where Go/TryGo and Wait runs in different goroutines, when there's an unknown set of tasks:

for e := range events {
  if !group.TryGo(task) {
    // Here I need to know that the task couldn't be started
    // I might e.g. need a cancellation handle for ad-hoc task spawning
  }
}

In another goroutine:

<-ctx.Done()
group.Wait()
// All tasks should be complete.

With current x/sync functionality, reuse is possible (is this a bug?):

g, ctx := errgroup.WithContext(context.Background)
g.Wait() // Done, right?
g.TryGo(fn) // Reports true and runs the goroutine, after it's waited. Risk of leaks

So I'm very much in favor of a Stop method, especially if it can prevent accidental group reuse:

  • TryGo after a group is stopped returns false forever.
  • Go after stop should probably panic?

As an aside, would it be wise to break out and remove the semaphore features (SetLimit) for simplicity? Concurrency is notoriously difficult already, so the bar for complexity should imo be extremely high.

@gspeicher
Copy link

gspeicher commented Aug 12, 2024

My 2 cents: the problem of managing a long-running pool of workers seems very different to me than what errgroup was designed to do. I think of errgroup as more of a Go implementation of Javascript's Promise.All than a worker pool. Trying to bend it to support a long-running for loop in a server process seems to conflate two issues and unnecessarily complicate the simple concept encapsulated by this package.

@shayn-orca
Copy link

I'm for it. Also (not sure it's very relevant information): this proposal was just mentioned by Yarden Laifenfeld in GopherCon Israel and the vibe in the room was of general consensus to add this to sync as well instead of keeping it in x since it seems so useful (even if it's rather high-level, it seems like it's the "one way to solve it" which Go likes).

@jub0bs
Copy link

jub0bs commented Nov 11, 2024

I don't think errgroup should make its way into the standard library with the current API.

One gripe I have with it is the SetLimit setter. It's somewhat error-prone since it must not be called while there are active goroutines. Incidentally, it attempts to detect this situation, but only on a best-effort basis (by checking whether the length of the underlying channel is zero).

How often do people change the limit of an existing errgroup.Group anyway? Without any data to back up my claim, I would guess: not very often. Wouldn't it be safer to make the limit effectively constant after instantiation of a group? If a different limit is needed, clients could simply create another group.

The only remaining question is what the factory function(s) for groups would look like. I don't have a satisfying answer, but I've been experimenting with the following API:

package errgroup

type Group struct {
  // contains filtered or unexported fields
}

func New(opts ...Option) (*Group, error)
func (g *Group) Context() context.Context
func (g *Group) Go(f func() error)
func (g *Group) TryGo(f func() error) bool
func (g *Group) Wait() error

type Option interface {
  // contains filtered or unexported methods
}
func WithContext(ctx context.Context) Option
func WithLimit(n int) Option

I know that many members of the community (@bcmills included) don't like functional options, and I've certainly softened my stance about them since I gave a talk about the pattern at GopherCon Europe 2023, but if you can stomach them, client code is quite nice:

g, err := errgroup.New(
  errgroup.WithContext(ctx),
  errgroup.WithLimit(64),
)
if err != nil {
  // handle error
}

In particular, a single factory function would be enough and its error result, though superfluous now, would enable the future addition of fallible configuration options.

One main downside is that it allocates more than the current API.

Anyway, I'm not seriously suggesting that the hypothetical new API should look like this; I just wanted to share my thoughts here.

@earthboundkid
Copy link
Contributor

Did you mean to include func (g *Group) SetLimit(n int) as part of the API? I imagined you would want to drop it.

@earthboundkid
Copy link
Contributor

FWIW, I wrote it on some other issue, but I really like https://pkg.go.dev/cmd/internal/par and wish it were exported. It's a very slick and useful API. You could add errgroup to it and that would be a fairly complete set of high level tools for concurrency management.

@thepudds
Copy link
Contributor

thepudds commented Nov 11, 2024

Maybe worth noting that https://pkg.go.dev/github.com/rogpeppe/go-internal/par is available, though I think it is an earlier snapshot of the cmd/go internal package (including it has less functionality than what is currently used in cmd/go).

@earthboundkid
Copy link
Contributor

It's out of date because it's from before generics.

@jub0bs
Copy link

jub0bs commented Nov 11, 2024

@earthboundkid

Did you mean to include func (g *Group) SetLimit(n int) as part of the API? I imagined you would want to drop it.

You're correct. My mistake. Fixed. Thanks!

I really like https://pkg.go.dev/cmd/internal/par

That package wasn't on my radar! I'll check it out.

@jub0bs
Copy link

jub0bs commented Nov 12, 2024

How often do people change the limit of an existing errgroup.Group anyway? Without any data to back up my claim, I would guess: not very often.

I'm realising now that a context-aware errgroup.Group (i.e. one created with errgroup.WithContext) cannot in fact be reused, whereas a zero-value errgroup.Group is. It would be incorrect to call (*errgroup.Group) SetLimit after (*errgroup.Group) Wait in the former case, but correct in the latter case.

If anything, this subtlety reinforces my belief that the API would be improved by the removal of the (*errgroup.Group) SetLimit setter.

@earthboundkid
Copy link
Contributor

earthboundkid commented Nov 12, 2024

Another idea for the API is to move context into the callback:

package errgroup

type Group struct {
  // contains filtered or unexported fields
}

func New(opts ...Option) (*Group, error)
func (g *Group) Go(f func(context.Context) error)
func (g *Group) TryGo(f func(context.Context) error) bool
func (g *Group) Wait() error

type Option interface {
  // contains filtered or unexported methods
}
func WithContext(ctx context.Context) Option
func WithLimit(n int) Option

That makes the scope of the derived context pretty clear.

Having two options leaves open the door to more, but will this really develop more options? New(ctx, limit) would be fairly simple and you can pass nil and -1 if you want the defaults.

With the par proposal, you get

package par // import "sync/par"

type Group struct {
  // contains filtered or unexported fields
}

func NewGroup(context.Context, int) (*Group, error)
func (g *Group) Go(f func(context.Context) error)
func (g *Group) TryGo(f func(context.Context) error) bool
func (g *Group) Wait() error

@jub0bs
Copy link

jub0bs commented Nov 12, 2024

@earthboundkid

Another idea for the API is to move context into the callback [...]

I like the idea! I think that's one improvement that Peter Bourgon hinted at years ago in one of his talks.

Having two options leaves open the door to more, but will this really develop more options?

I could live without functional options.

New(ctx, limit) would be fairly simple and you can pass nil and -1 if you want the defaults.

I would expect New to panic if passed nil, though. All factory functions in the context package that have a context.Context param (context.WithCancel & friends) panic when called with a nil argument; and http.NewRequestWithContext returns a non-nil error when called a nil context. I don't think we should deviate from this convention that a nil context.Context argument is a programming error.

By the way, would your factory function even need to return an error? AFAICT, aside from the situation where it's passed a nil context, it would be infallible.

@earthboundkid
Copy link
Contributor

errgroup.Group behaves differently if it has a context or not. If we wanted to lose the behavior of not cancelling that comes from not having a context, then we could require the context to be non-nil.

@jub0bs
Copy link

jub0bs commented Nov 12, 2024

I'd be in favour of requiring a non-nil context, context.Background() or context.TODO() if no other will do.

@jub0bs
Copy link

jub0bs commented Nov 13, 2024

it's also reasonable to provide a way to get all of the errors too, perhaps via Errors() []error

If errgroup's API is augmented with a method for returning all the errors, that method should return, not a []error, but an iter.Seq[error], now that Go has standardised iterators.

@Merovius
Copy link
Contributor

Merovius commented Jan 2, 2025

If errgroup's API is augmented with a method for returning all the errors, that method should return, not a []error, but an iter.Seq[error], now that Go has standardised iterators.

I'm not sure. If the group has to store the errors in a slice anyways, I don't really see the harm of returning a slice (note that an iter.Seq can't be used with ... for example, while it's easy to go from an []error to an iter.Seq). On the other hand, the iter.Seq could be intended to be streaming the errors as they happen. But I would argue that API fails in its mission to structure concurrency. You would really have to do that in a goroutine or something, concurrently with Wait.

I'd want to see a fuller API design that addresses these.

@bcmills

I do not think it would be appropriate for errgroup to join the errors together. Generally only the first error matters, because it causes the Context to be canceled. After that happens, it can be difficult to tell whether subsequent errors are errors in their own right, or secondary effects of that cancellation.

Just to mention an alternative: We have context.Cause now. Wait could return a joined []error, while context.Cause(ctx) returns the first error, that caused the cancellation.

However, I generally like the idea of passing the Context through to the callback. The scoping implications of the current API tend to annoy me, especially if I need to do things after the errgroup is done. But context.Cause requires that scoping, to work. On the other, other side, passing the Context through to the callback does not work well with the idea of a zero errgroup that does not cancel its context.

I don't know what I prefer, at the end of the day. But I don't really find any of the options presented so far clearly right.

@bcmills
Copy link
Contributor

bcmills commented Jan 2, 2025

I remain opposed to a method to get all of the errors: that would make the storage requirement of an errgroup O(N) with the number of tasks processed instead of O(1) as it is today.

In typical errgroup usage, all errors after the first will be caused by the cancellation triggered by the first error, and thus unreliable. Moreover, since Go calls may be made concurrently there is no obvious order of results, and thus no obvious way to collate them or correlate them with the associated Go calls. So, I prefer to keep the API focused on collecting the first (guaranteed-relevant) error and coordinating shutdown, and leave any more complex error collection up to the caller to add.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
Status: Incoming
Development

No branches or pull requests