From 4339ee59df557abbaf389416b5c007147b4e48c1 Mon Sep 17 00:00:00 2001 From: Tonis Tiigi Date: Sun, 7 Jan 2024 22:23:32 -0800 Subject: [PATCH] fix lease management with flightcontrol When one flightcontrol callback gets canceled ctx.Value() stops working for aquiring leases for remaining callbacks. While this behavior should be also looked at more carefully, returning a lease for the first callback or for remaining callback would not be correct as some objects can be tracked by first lease and that lease could be already deleted by the first callpath. This fixes it so that any object tracked by flightcontrol callback will be copied to the lease of every codepath after the callback has returned. Signed-off-by: Tonis Tiigi --- cache/blobs.go | 97 +++++++++++++++++++++++++++------------ cache/refs.go | 25 +++++----- util/leaseutil/manager.go | 59 ++++++++++++++++++++++-- 3 files changed, 137 insertions(+), 44 deletions(-) diff --git a/cache/blobs.go b/cache/blobs.go index eea33db099db..824b1d0a9fd0 100644 --- a/cache/blobs.go +++ b/cache/blobs.go @@ -16,6 +16,7 @@ import ( "github.com/moby/buildkit/util/compression" "github.com/moby/buildkit/util/converter" "github.com/moby/buildkit/util/flightcontrol" + "github.com/moby/buildkit/util/leaseutil" "github.com/moby/buildkit/util/winlayers" digest "github.com/opencontainers/go-digest" imagespecidentity "github.com/opencontainers/image-spec/identity" @@ -24,7 +25,7 @@ import ( "golang.org/x/sync/errgroup" ) -var g flightcontrol.Group[struct{}] +var g flightcontrol.Group[*leaseutil.LeaseRef] var gFileList flightcontrol.Group[[]string] var ErrNoBlobs = errors.Errorf("no blobs for snapshot") @@ -87,14 +88,24 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool if _, ok := filter[sr.ID()]; ok { eg.Go(func() error { - _, err := g.Do(ctx, fmt.Sprintf("%s-%t", sr.ID(), createIfNeeded), func(ctx context.Context) (struct{}, error) { + l, err := g.Do(ctx, fmt.Sprintf("%s-%t", sr.ID(), createIfNeeded), func(ctx context.Context) (_ *leaseutil.LeaseRef, err error) { if sr.getBlob() != "" { - return struct{}{}, nil + return nil, nil } if !createIfNeeded { - return struct{}{}, errors.WithStack(ErrNoBlobs) + return nil, errors.WithStack(ErrNoBlobs) } + l, ctx, err := leaseutil.NewLease(ctx, sr.cm.LeaseManager, leaseutil.MakeTemporary) + if err != nil { + return nil, err + } + defer func() { + if err != nil { + l.Discard() + } + }() + compressorFunc, finalize := comp.Type.Compress(ctx, comp) mediaType := comp.Type.MediaType() @@ -109,12 +120,12 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool if lowerRef != nil { m, err := lowerRef.Mount(ctx, true, s) if err != nil { - return struct{}{}, err + return nil, err } var release func() error lower, release, err = m.Mount() if err != nil { - return struct{}{}, err + return nil, err } if release != nil { defer release() @@ -132,12 +143,12 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool if upperRef != nil { m, err := upperRef.Mount(ctx, true, s) if err != nil { - return struct{}{}, err + return nil, err } var release func() error upper, release, err = m.Mount() if err != nil { - return struct{}{}, err + return nil, err } if release != nil { defer release() @@ -145,14 +156,13 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool } var desc ocispecs.Descriptor - var err error // Determine differ and error/log handling according to the platform, envvar and the snapshotter. var enableOverlay, fallback, logWarnOnErr bool if forceOvlStr := os.Getenv("BUILDKIT_DEBUG_FORCE_OVERLAY_DIFF"); forceOvlStr != "" && sr.kind() != Diff { enableOverlay, err = strconv.ParseBool(forceOvlStr) if err != nil { - return struct{}{}, errors.Wrapf(err, "invalid boolean in BUILDKIT_DEBUG_FORCE_OVERLAY_DIFF") + return nil, errors.Wrapf(err, "invalid boolean in BUILDKIT_DEBUG_FORCE_OVERLAY_DIFF") } fallback = false // prohibit fallback on debug } else if !isTypeWindows(sr) { @@ -174,10 +184,10 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool if !ok || err != nil { if !fallback { if !ok { - return struct{}{}, errors.Errorf("overlay mounts not detected (lower=%+v,upper=%+v)", lower, upper) + return nil, errors.Errorf("overlay mounts not detected (lower=%+v,upper=%+v)", lower, upper) } if err != nil { - return struct{}{}, errors.Wrapf(err, "failed to compute overlay diff") + return nil, errors.Wrapf(err, "failed to compute overlay diff") } } if logWarnOnErr { @@ -210,7 +220,7 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool diff.WithCompressor(compressorFunc), ) if err != nil { - return struct{}{}, err + return nil, err } } @@ -220,7 +230,7 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool if finalize != nil { a, err := finalize(ctx, sr.cm.ContentStore) if err != nil { - return struct{}{}, errors.Wrapf(err, "failed to finalize compression") + return nil, errors.Wrapf(err, "failed to finalize compression") } for k, v := range a { desc.Annotations[k] = v @@ -228,7 +238,7 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool } info, err := sr.cm.ContentStore.Info(ctx, desc.Digest) if err != nil { - return struct{}{}, err + return nil, err } if diffID, ok := info.Labels[labels.LabelUncompressed]; ok { @@ -236,18 +246,24 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool } else if mediaType == ocispecs.MediaTypeImageLayer { desc.Annotations[labels.LabelUncompressed] = desc.Digest.String() } else { - return struct{}{}, errors.Errorf("unknown layer compression type") + return nil, errors.Errorf("unknown layer compression type") } if err := sr.setBlob(ctx, desc); err != nil { - return struct{}{}, err + return nil, err } - return struct{}{}, nil + return l, nil }) if err != nil { return err } + if l != nil { + if err := l.Adopt(ctx); err != nil { + return err + } + } + if comp.Force { if err := ensureCompression(ctx, sr, comp, s); err != nil { return errors.Wrapf(err, "failed to ensure compression type of %q", comp.Type) @@ -416,29 +432,42 @@ func isTypeWindows(sr *immutableRef) bool { // ensureCompression ensures the specified ref has the blob of the specified compression Type. func ensureCompression(ctx context.Context, ref *immutableRef, comp compression.Config, s session.Group) error { - _, err := g.Do(ctx, fmt.Sprintf("ensureComp-%s-%s", ref.ID(), comp.Type), func(ctx context.Context) (struct{}, error) { + l, err := g.Do(ctx, fmt.Sprintf("ensureComp-%s-%s", ref.ID(), comp.Type), func(ctx context.Context) (_ *leaseutil.LeaseRef, err error) { desc, err := ref.ociDesc(ctx, ref.descHandlers, true) if err != nil { - return struct{}{}, err + return nil, err + } + + l, ctx, err := leaseutil.NewLease(ctx, ref.cm.LeaseManager, leaseutil.MakeTemporary) + if err != nil { + return nil, err } + defer func() { + if err != nil { + l.Discard() + } + }() // Resolve converters layerConvertFunc, err := converter.New(ctx, ref.cm.ContentStore, desc, comp) if err != nil { - return struct{}{}, err + return nil, err } else if layerConvertFunc == nil { if isLazy, err := ref.isLazy(ctx); err != nil { - return struct{}{}, err + return nil, err } else if isLazy { // This ref can be used as the specified compressionType. Keep it lazy. - return struct{}{}, nil + return l, nil } - return struct{}{}, ref.linkBlob(ctx, desc) + if err := ref.linkBlob(ctx, desc); err != nil { + return nil, err + } + return l, nil } // First, lookup local content store if _, err := ref.getBlobWithCompression(ctx, comp.Type); err == nil { - return struct{}{}, nil // found the compression variant. no need to convert. + return l, nil // found the compression variant. no need to convert. } // Convert layer compression type @@ -448,18 +477,26 @@ func ensureCompression(ctx context.Context, ref *immutableRef, comp compression. dh: ref.descHandlers[desc.Digest], session: s, }).Unlazy(ctx); err != nil { - return struct{}{}, err + return l, err } newDesc, err := layerConvertFunc(ctx, ref.cm.ContentStore, desc) if err != nil { - return struct{}{}, errors.Wrapf(err, "failed to convert") + return nil, errors.Wrapf(err, "failed to convert") } // Start to track converted layer if err := ref.linkBlob(ctx, *newDesc); err != nil { - return struct{}{}, errors.Wrapf(err, "failed to add compression blob") + return nil, errors.Wrapf(err, "failed to add compression blob") } - return struct{}{}, nil + return l, nil }) - return err + if err != nil { + return err + } + if l != nil { + if err := l.Adopt(ctx); err != nil { + return err + } + } + return nil } diff --git a/cache/refs.go b/cache/refs.go index 0d1f089ce836..fbb47db9c9c7 100644 --- a/cache/refs.go +++ b/cache/refs.go @@ -1062,7 +1062,7 @@ func (sr *immutableRef) withRemoteSnapshotLabelsStargzMode(ctx context.Context, } func (sr *immutableRef) prepareRemoteSnapshotsStargzMode(ctx context.Context, s session.Group) error { - _, err := g.Do(ctx, sr.ID()+"-prepare-remote-snapshot", func(ctx context.Context) (_ struct{}, rerr error) { + _, err := g.Do(ctx, sr.ID()+"-prepare-remote-snapshot", func(ctx context.Context) (_ *leaseutil.LeaseRef, rerr error) { dhs := sr.descHandlers for _, r := range sr.layerChain() { r := r @@ -1074,7 +1074,7 @@ func (sr *immutableRef) prepareRemoteSnapshotsStargzMode(ctx context.Context, s dh := dhs[digest.Digest(r.getBlob())] if dh == nil { // We cannot prepare remote snapshots without descHandler. - return struct{}{}, nil + return nil, nil } // tmpLabels contains dh.SnapshotLabels + session IDs. All keys contain @@ -1135,7 +1135,7 @@ func (sr *immutableRef) prepareRemoteSnapshotsStargzMode(ctx context.Context, s break } - return struct{}{}, nil + return nil, nil }) return err } @@ -1158,28 +1158,31 @@ func makeTmpLabelsStargzMode(labels map[string]string, s session.Group) (fields } func (sr *immutableRef) unlazy(ctx context.Context, dhs DescHandlers, pg progress.Controller, s session.Group, topLevel bool, ensureContentStore bool) error { - _, err := g.Do(ctx, sr.ID()+"-unlazy", func(ctx context.Context) (_ struct{}, rerr error) { + _, err := g.Do(ctx, sr.ID()+"-unlazy", func(ctx context.Context) (_ *leaseutil.LeaseRef, rerr error) { if _, err := sr.cm.Snapshotter.Stat(ctx, sr.getSnapshotID()); err == nil { if !ensureContentStore { - return struct{}{}, nil + return nil, nil } if blob := sr.getBlob(); blob == "" { - return struct{}{}, nil + return nil, nil } if _, err := sr.cm.ContentStore.Info(ctx, sr.getBlob()); err == nil { - return struct{}{}, nil + return nil, nil } } switch sr.kind() { case Merge, Diff: - return struct{}{}, sr.unlazyDiffMerge(ctx, dhs, pg, s, topLevel, ensureContentStore) + return nil, sr.unlazyDiffMerge(ctx, dhs, pg, s, topLevel, ensureContentStore) case Layer, BaseLayer: - return struct{}{}, sr.unlazyLayer(ctx, dhs, pg, s, ensureContentStore) + return nil, sr.unlazyLayer(ctx, dhs, pg, s, ensureContentStore) } - return struct{}{}, nil + return nil, nil }) - return err + if err != nil { + return err + } + return nil } // should be called within sizeG.Do call for this ref's ID diff --git a/util/leaseutil/manager.go b/util/leaseutil/manager.go index a02fb9613c2d..42381b6d07ac 100644 --- a/util/leaseutil/manager.go +++ b/util/leaseutil/manager.go @@ -2,10 +2,12 @@ package leaseutil import ( "context" + "sync" "time" "github.com/containerd/containerd/leases" "github.com/containerd/containerd/namespaces" + "github.com/pkg/errors" ) func WithLease(ctx context.Context, ls leases.Manager, opts ...leases.Opt) (context.Context, func(context.Context) error, error) { @@ -16,17 +18,68 @@ func WithLease(ctx context.Context, ls leases.Manager, opts ...leases.Opt) (cont }, nil } - l, err := ls.Create(ctx, append([]leases.Opt{leases.WithRandomID(), leases.WithExpiration(time.Hour)}, opts...)...) + lr, ctx, err := NewLease(ctx, ls, opts...) if err != nil { return nil, nil, err } - ctx = leases.WithLease(ctx, l.ID) return ctx, func(ctx context.Context) error { - return ls.Delete(ctx, l) + return ls.Delete(ctx, lr.l) }, nil } +func NewLease(ctx context.Context, lm leases.Manager, opts ...leases.Opt) (*LeaseRef, context.Context, error) { + l, err := lm.Create(ctx, append([]leases.Opt{leases.WithRandomID(), leases.WithExpiration(time.Hour)}, opts...)...) + if err != nil { + return nil, nil, err + } + + ctx = leases.WithLease(ctx, l.ID) + return &LeaseRef{lm: lm, l: l}, ctx, nil +} + +type LeaseRef struct { + lm leases.Manager + l leases.Lease + + once sync.Once + resources []leases.Resource + err error +} + +func (l *LeaseRef) Discard() error { + return l.lm.Delete(context.Background(), l.l) +} + +func (l *LeaseRef) Adopt(ctx context.Context) error { + l.once.Do(func() { + resources, err := l.lm.ListResources(ctx, l.l) + if err != nil { + l.err = err + return + } + l.resources = resources + }) + if l.err != nil { + return l.err + } + currentID, ok := leases.FromContext(ctx) + if !ok { + return errors.Errorf("missing lease requirement for adopt") + } + for _, r := range l.resources { + if err := l.lm.AddResource(ctx, leases.Lease{ID: currentID}, r); err != nil { + return err + } + } + if len(l.resources) == 0 { + l.Discard() + return nil + } + go l.Discard() + return nil +} + func MakeTemporary(l *leases.Lease) error { if l.Labels == nil { l.Labels = map[string]string{}