From 032a69fc28557cefebf3040ffcf947e9ef5b64f8 Mon Sep 17 00:00:00 2001 From: Steven Hartland Date: Mon, 28 Oct 2024 11:36:34 +0000 Subject: [PATCH] fix!: data races (#2843) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * ci: enable test race checks Enable race checks for all tests in CI. * fix!: data various data races Fix data race when determining default network, this required making DockerProviderOptions.DefaultNetwork field private which is a breaking change. Fix data race in test bufLogger. Fix data races on log production context cancellation and context timeout not being cancelled in read loop. BREAKING_CHANGE! --------- Co-authored-by: Manuel de la Peña --- commons-test.mk | 3 +- docker.go | 179 ++++++++++++++++++++++++++---------------------- docker_test.go | 11 +-- network.go | 4 +- provider.go | 2 +- reaper.go | 7 +- 6 files changed, 115 insertions(+), 91 deletions(-) diff --git a/commons-test.mk b/commons-test.mk index d168ff5c65..91ed6a1244 100644 --- a/commons-test.mk +++ b/commons-test.mk @@ -47,7 +47,8 @@ test-%: $(GOBIN)/gotestsum -- \ -v \ -coverprofile=coverage.out \ - -timeout=30m + -timeout=30m \ + -race .PHONY: tools tools: diff --git a/docker.go b/docker.go index 2ef8c6973a..f82cd55381 100644 --- a/docker.go +++ b/docker.go @@ -16,6 +16,7 @@ import ( "path/filepath" "regexp" "strings" + "sync" "time" "github.com/cenkalti/backoff/v4" @@ -762,11 +763,15 @@ func (c *DockerContainer) startLogProduction(ctx context.Context, opts ...LogPro // Setup the log production context which will be used to stop the log production. c.logProductionCtx, c.logProductionCancel = context.WithCancelCause(ctx) - go func() { - err := c.logProducer(stdout, stderr) - // Set context cancel cause, if not already set. - c.logProductionCancel(err) - }() + // We capture context cancel function to avoid data race with multiple + // calls to startLogProduction. + go func(cancel context.CancelCauseFunc) { + // Ensure the context is cancelled when log productions completes + // so that GetLogProductionErrorChannel functions correctly. + defer cancel(nil) + + c.logProducer(stdout, stderr) + }(c.logProductionCancel) return nil } @@ -775,40 +780,49 @@ func (c *DockerContainer) startLogProduction(ctx context.Context, opts ...LogPro // - logProductionCtx is done // - A fatal error occurs // - No more logs are available -func (c *DockerContainer) logProducer(stdout, stderr io.Writer) error { +func (c *DockerContainer) logProducer(stdout, stderr io.Writer) { // Clean up idle client connections. defer c.provider.Close() // Setup the log options, start from the beginning. - options := container.LogsOptions{ + options := &container.LogsOptions{ ShowStdout: true, ShowStderr: true, Follow: true, } - for { - timeoutCtx, cancel := context.WithTimeout(c.logProductionCtx, *c.logProductionTimeout) - defer cancel() + // Use a separate method so that timeout cancel function is + // called correctly. + for c.copyLogsTimeout(stdout, stderr, options) { + } +} - err := c.copyLogs(timeoutCtx, stdout, stderr, options) - switch { - case err == nil: - // No more logs available. - return nil - case c.logProductionCtx.Err() != nil: - // Log production was stopped or caller context is done. - return nil - case timeoutCtx.Err() != nil, errors.Is(err, net.ErrClosed): - // Timeout or client connection closed, retry. - default: - // Unexpected error, retry. - Logger.Printf("Unexpected error reading logs: %v", err) - } +// copyLogsTimeout copies logs from the container to stdout and stderr with a timeout. +// It returns true if the log production should be retried, false otherwise. +func (c *DockerContainer) copyLogsTimeout(stdout, stderr io.Writer, options *container.LogsOptions) bool { + timeoutCtx, cancel := context.WithTimeout(c.logProductionCtx, *c.logProductionTimeout) + defer cancel() - // Retry from the last log received. - now := time.Now() - options.Since = fmt.Sprintf("%d.%09d", now.Unix(), int64(now.Nanosecond())) + err := c.copyLogs(timeoutCtx, stdout, stderr, *options) + switch { + case err == nil: + // No more logs available. + return false + case c.logProductionCtx.Err() != nil: + // Log production was stopped or caller context is done. + return false + case timeoutCtx.Err() != nil, errors.Is(err, net.ErrClosed): + // Timeout or client connection closed, retry. + default: + // Unexpected error, retry. + Logger.Printf("Unexpected error reading logs: %v", err) } + + // Retry from the last log received. + now := time.Now() + options.Since = fmt.Sprintf("%d.%09d", now.Unix(), int64(now.Nanosecond())) + + return true } // copyLogs copies logs from the container to stdout and stderr. @@ -866,10 +880,12 @@ func (c *DockerContainer) GetLogProductionErrorChannel() <-chan error { } errCh := make(chan error, 1) - go func() { - <-c.logProductionCtx.Done() - errCh <- context.Cause(c.logProductionCtx) - }() + go func(ctx context.Context) { + <-ctx.Done() + errCh <- context.Cause(ctx) + close(errCh) + }(c.logProductionCtx) + return errCh } @@ -906,6 +922,7 @@ type DockerProvider struct { host string hostCache string config config.Config + mtx sync.Mutex } // Client gets the docker client used by the provider @@ -984,29 +1001,26 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque // defer the close of the Docker client connection the soonest defer p.Close() - // Make sure that bridge network exists - // In case it is disabled we will create reaper_default network - if p.DefaultNetwork == "" { - p.DefaultNetwork, err = p.getDefaultNetwork(ctx, p.client) - if err != nil { - return nil, err - } + var defaultNetwork string + defaultNetwork, err = p.ensureDefaultNetwork(ctx) + if err != nil { + return nil, fmt.Errorf("ensure default network: %w", err) } // If default network is not bridge make sure it is attached to the request // as container won't be attached to it automatically // in case of Podman the bridge network is called 'podman' as 'bridge' would conflict - if p.DefaultNetwork != p.defaultBridgeNetworkName { + if defaultNetwork != p.defaultBridgeNetworkName { isAttached := false for _, net := range req.Networks { - if net == p.DefaultNetwork { + if net == defaultNetwork { isAttached = true break } } if !isAttached { - req.Networks = append(req.Networks, p.DefaultNetwork) + req.Networks = append(req.Networks, defaultNetwork) } } @@ -1461,12 +1475,8 @@ func (p *DockerProvider) CreateNetwork(ctx context.Context, req NetworkRequest) // defer the close of the Docker client connection the soonest defer p.Close() - // Make sure that bridge network exists - // In case it is disabled we will create reaper_default network - if p.DefaultNetwork == "" { - if p.DefaultNetwork, err = p.getDefaultNetwork(ctx, p.client); err != nil { - return nil, err - } + if _, err = p.ensureDefaultNetwork(ctx); err != nil { + return nil, fmt.Errorf("ensure default network: %w", err) } if req.Labels == nil { @@ -1537,14 +1547,12 @@ func (p *DockerProvider) GetNetwork(ctx context.Context, req NetworkRequest) (ne func (p *DockerProvider) GetGatewayIP(ctx context.Context) (string, error) { // Use a default network as defined in the DockerProvider - if p.DefaultNetwork == "" { - var err error - p.DefaultNetwork, err = p.getDefaultNetwork(ctx, p.client) - if err != nil { - return "", err - } + defaultNetwork, err := p.ensureDefaultNetwork(ctx) + if err != nil { + return "", fmt.Errorf("ensure default network: %w", err) } - nw, err := p.GetNetwork(ctx, NetworkRequest{Name: p.DefaultNetwork}) + + nw, err := p.GetNetwork(ctx, NetworkRequest{Name: defaultNetwork}) if err != nil { return "", err } @@ -1563,43 +1571,50 @@ func (p *DockerProvider) GetGatewayIP(ctx context.Context) (string, error) { return ip, nil } -func (p *DockerProvider) getDefaultNetwork(ctx context.Context, cli client.APIClient) (string, error) { - // Get list of available networks - networkResources, err := cli.NetworkList(ctx, network.ListOptions{}) - if err != nil { - return "", err - } +// ensureDefaultNetwork ensures that defaultNetwork is set and creates +// it if it does not exist, returning its value. +// It is safe to call this method concurrently. +func (p *DockerProvider) ensureDefaultNetwork(ctx context.Context) (string, error) { + p.mtx.Lock() + defer p.mtx.Unlock() - reaperNetwork := ReaperDefault + if p.defaultNetwork != "" { + // Already set. + return p.defaultNetwork, nil + } - reaperNetworkExists := false + networkResources, err := p.client.NetworkList(ctx, network.ListOptions{}) + if err != nil { + return "", fmt.Errorf("network list: %w", err) + } for _, net := range networkResources { - if net.Name == p.defaultBridgeNetworkName { - return p.defaultBridgeNetworkName, nil - } - - if net.Name == reaperNetwork { - reaperNetworkExists = true + switch net.Name { + case p.defaultBridgeNetworkName: + p.defaultNetwork = p.defaultBridgeNetworkName + return p.defaultNetwork, nil + case ReaperDefault: + p.defaultNetwork = ReaperDefault + return p.defaultNetwork, nil } } - // Create a bridge network for the container communications - if !reaperNetworkExists { - _, err = cli.NetworkCreate(ctx, reaperNetwork, network.CreateOptions{ - Driver: Bridge, - Attachable: true, - Labels: GenericLabels(), - }) - // If the network already exists, we can ignore the error as that can - // happen if we are running multiple tests in parallel and we only - // need to ensure that the network exists. - if err != nil && !errdefs.IsConflict(err) { - return "", err - } + // Create a bridge network for the container communications. + _, err = p.client.NetworkCreate(ctx, ReaperDefault, network.CreateOptions{ + Driver: Bridge, + Attachable: true, + Labels: GenericLabels(), + }) + // If the network already exists, we can ignore the error as that can + // happen if we are running multiple tests in parallel and we only + // need to ensure that the network exists. + if err != nil && !errdefs.IsConflict(err) { + return "", fmt.Errorf("network create: %w", err) } - return reaperNetwork, nil + p.defaultNetwork = ReaperDefault + + return p.defaultNetwork, nil } // containerFromDockerResponse builds a Docker container struct from the response of the Docker API diff --git a/docker_test.go b/docker_test.go index a9fcbd03d6..4be29f8fcd 100644 --- a/docker_test.go +++ b/docker_test.go @@ -1790,11 +1790,14 @@ func TestGetGatewayIP(t *testing.T) { require.NoError(t, err) defer provider.Close() - ip, err := provider.(*DockerProvider).GetGatewayIP(context.Background()) - require.NoError(t, err) - if ip == "" { - t.Fatal("could not get gateway ip") + dockerProvider, ok := provider.(*DockerProvider) + if !ok { + t.Skip("provider is not a DockerProvider") } + + ip, err := dockerProvider.GetGatewayIP(context.Background()) + require.NoError(t, err) + require.NotEmpty(t, ip) } func TestNetworkModeWithContainerReference(t *testing.T) { diff --git a/network.go b/network.go index 9544bee129..5a145ac668 100644 --- a/network.go +++ b/network.go @@ -23,12 +23,12 @@ type DefaultNetwork string // Deprecated: will be removed in the future. func (n DefaultNetwork) ApplyGenericTo(opts *GenericProviderOptions) { - opts.DefaultNetwork = string(n) + opts.defaultNetwork = string(n) } // Deprecated: will be removed in the future. func (n DefaultNetwork) ApplyDockerTo(opts *DockerProviderOptions) { - opts.DefaultNetwork = string(n) + opts.defaultNetwork = string(n) } // Deprecated: will be removed in the future diff --git a/provider.go b/provider.go index b5e5ffa997..31714c0c14 100644 --- a/provider.go +++ b/provider.go @@ -25,7 +25,7 @@ type ( // GenericProviderOptions defines options applicable to all providers GenericProviderOptions struct { Logger Logging - DefaultNetwork string + defaultNetwork string } // GenericProviderOption defines a common interface to modify GenericProviderOptions diff --git a/reaper.go b/reaper.go index 8f2bde8ab6..650bfad0bd 100644 --- a/reaper.go +++ b/reaper.go @@ -402,7 +402,12 @@ func (r *reaperSpawner) newReaper(ctx context.Context, sessionID string, provide // Attach reaper container to a requested network if it is specified if p, ok := provider.(*DockerProvider); ok { - req.Networks = append(req.Networks, p.DefaultNetwork) + defaultNetwork, err := p.ensureDefaultNetwork(ctx) + if err != nil { + return nil, fmt.Errorf("ensure default network: %w", err) + } + + req.Networks = append(req.Networks, defaultNetwork) } c, err := provider.RunContainer(ctx, req)