Skip to content

Commit

Permalink
♻️ Use mutex for waiting
Browse files Browse the repository at this point in the history
  • Loading branch information
elgohr committed Dec 26, 2023
1 parent 770f78b commit a4e9efb
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 21 deletions.
44 changes: 24 additions & 20 deletions localstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,20 +268,22 @@ func (i *Instance) start(ctx context.Context, services ...Service) error {
}
}

containerId, err := i.startLocalstack(ctx, services...)
if err != nil {
if err := i.startLocalstack(ctx, services...); err != nil {
return err
}

i.log.Info("waiting for localstack to start...")
return i.waitToBeAvailable(ctx, containerId)
return i.waitToBeAvailable(ctx)
}

const imageName = "go-localstack"

func (i *Instance) startLocalstack(ctx context.Context, services ...Service) (string, error) {
func (i *Instance) startLocalstack(ctx context.Context, services ...Service) error {
i.containerIdMutex.Lock()
defer i.containerIdMutex.Unlock()

if err := i.buildLocalImage(ctx); err != nil {
return "", fmt.Errorf("localstack: could not build image: %w", err)
return fmt.Errorf("localstack: could not build image: %w", err)
}

pm := nat.PortMap{}
Expand Down Expand Up @@ -317,22 +319,21 @@ func (i *Instance) startLocalstack(ctx context.Context, services ...Service) (st
AutoRemove: true,
}, nil, nil, "")
if err != nil {
return "", fmt.Errorf("localstack: could not create container: %w", err)
return fmt.Errorf("localstack: could not create container: %w", err)
}

i.setContainerId(resp.ID)
i.containerId = resp.ID

i.log.Info("starting localstack")
containerId := resp.ID
if err := i.cli.ContainerStart(ctx, containerId, types.ContainerStartOptions{}); err != nil {
return "", fmt.Errorf("localstack: could not start container: %w", err)
if err := i.cli.ContainerStart(ctx, i.containerId, types.ContainerStartOptions{}); err != nil {
return fmt.Errorf("localstack: could not start container: %w", err)
}

if i.log.Level == logrus.DebugLevel {
go i.writeContainerLogToLogger(ctx, containerId)
go i.writeContainerLogToLogger(ctx, i.containerId)
}

return containerId, i.mapPorts(ctx, services, containerId, 0)
return i.mapPorts(ctx, services, i.containerId, 0)
}

//go:embed Dockerfile
Expand Down Expand Up @@ -412,29 +413,30 @@ func (i *Instance) mapPorts(ctx context.Context, services []Service, containerId
}

func (i *Instance) stop() error {
containerId := i.getContainerId()
if containerId == "" {
i.containerIdMutex.Lock()
defer i.containerIdMutex.Unlock()
if i.containerId == "" {
return nil
}
if err := i.cli.ContainerStop(context.Background(), containerId, container.StopOptions{
if err := i.cli.ContainerStop(context.Background(), i.containerId, container.StopOptions{
Signal: "SIGKILL",
}); err != nil {
return err
}
i.setContainerId("")
i.containerId = ""
i.resetPortMapping()
return nil
}

func (i *Instance) waitToBeAvailable(ctx context.Context, containerId string) error {
func (i *Instance) waitToBeAvailable(ctx context.Context) error {
ticker := time.NewTicker(300 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if err := i.isRunning(ctx, containerId); err != nil {
if err := i.isRunning(ctx); err != nil {
return err
}
if err := i.checkAvailable(ctx); err != nil {
Expand All @@ -447,8 +449,10 @@ func (i *Instance) waitToBeAvailable(ctx context.Context, containerId string) er
}
}

func (i *Instance) isRunning(ctx context.Context, containerId string) error {
_, err := i.cli.ContainerInspect(ctx, containerId)
func (i *Instance) isRunning(ctx context.Context) error {
i.containerIdMutex.RLock()
defer i.containerIdMutex.RUnlock()
_, err := i.cli.ContainerInspect(ctx, i.containerId)
if err != nil {
i.log.Debug(err)
return errors.New("localstack container has been stopped")
Expand Down
2 changes: 1 addition & 1 deletion localstack_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func TestInstance_waitToBeAvailable_Context_Expired(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond)
cancel()
i := &Instance{log: logrus.StandardLogger()}
require.Error(t, i.waitToBeAvailable(ctx, "any"))
require.Error(t, i.waitToBeAvailable(ctx))
}

func ErrCloser(r io.Reader, err error) io.ReadCloser {
Expand Down

0 comments on commit a4e9efb

Please sign in to comment.