Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Moved to #4057] Apply SOURCE_DATE_EPOCH to the files and the whiteouts inside OCI tar layers #3560

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 40 additions & 11 deletions cache/blobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"
"strconv"
"time"

"github.com/containerd/containerd/diff"
"github.com/containerd/containerd/diff/walking"
Expand All @@ -19,6 +20,7 @@ import (
imagespecidentity "github.com/opencontainers/image-spec/identity"
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
)

Expand All @@ -32,7 +34,7 @@ var ErrNoBlobs = errors.Errorf("no blobs for snapshot")
// a blob is missing and createIfNeeded is true, then the blob will be created, otherwise ErrNoBlobs will
// be returned. Caller must hold a lease when calling this function.
// If forceCompression is specified but the blob of compressionType doesn't exist, this function creates it.
func (sr *immutableRef) computeBlobChain(ctx context.Context, createIfNeeded bool, comp compression.Config, s session.Group) error {
func (sr *immutableRef) computeBlobChain(ctx context.Context, createIfNeeded bool, comp compression.Config, s session.Group, sourceDateEpoch *time.Time) error {
if _, ok := leases.FromContext(ctx); !ok {
return errors.Errorf("missing lease requirement for computeBlobChain")
}
Expand All @@ -58,36 +60,58 @@ func (sr *immutableRef) computeBlobChain(ctx context.Context, createIfNeeded boo
// refs rather than every single layer present among their ancestors.
filter := sr.layerSet()

return computeBlobChain(ctx, sr, createIfNeeded, comp, s, filter)
return computeBlobChain(ctx, sr, createIfNeeded, comp, s, filter, sourceDateEpoch)
}

func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool, comp compression.Config, s session.Group, filter map[string]struct{}) error {
func blobExistsWithSourceDateEpoch(sr *immutableRef, sourceDateEpoch *time.Time) bool {
if sr.getBlob() == "" {
return false
}
if sourceDateEpoch == nil {
// nil means "any epoch is ok"
return true
}
srEpoch := sr.GetSourceDateEpoch()
return srEpoch != nil && srEpoch.Equal(*sourceDateEpoch)
}

func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool, comp compression.Config, s session.Group, filter map[string]struct{}, sourceDateEpoch *time.Time) error {
if blob := sr.getBlob(); blob != "" {
imageRefs := sr.getImageRefs()
logrus.Debugf("blob=%q, imageRefs=%v", blob, imageRefs)
if len(imageRefs) > 0 {
// Do not check the epoch for the blobs of the base images.
// https://github.com/moby/buildkit/pull/3560#pullrequestreview-1353829200
sourceDateEpoch = nil
}
}

eg, ctx := errgroup.WithContext(ctx)
switch sr.kind() {
case Merge:
for _, parent := range sr.mergeParents {
parent := parent
eg.Go(func() error {
return computeBlobChain(ctx, parent, createIfNeeded, comp, s, filter)
return computeBlobChain(ctx, parent, createIfNeeded, comp, s, filter, sourceDateEpoch)
})
}
case Diff:
if _, ok := filter[sr.ID()]; !ok && sr.diffParents.upper != nil {
// This diff is just re-using the upper blob, compute that
eg.Go(func() error {
return computeBlobChain(ctx, sr.diffParents.upper, createIfNeeded, comp, s, filter)
return computeBlobChain(ctx, sr.diffParents.upper, createIfNeeded, comp, s, filter, sourceDateEpoch)
})
}
case Layer:
eg.Go(func() error {
return computeBlobChain(ctx, sr.layerParent, createIfNeeded, comp, s, filter)
return computeBlobChain(ctx, sr.layerParent, createIfNeeded, comp, s, filter, sourceDateEpoch)
})
}

if _, ok := filter[sr.ID()]; ok {
eg.Go(func() error {
_, err := g.Do(ctx, fmt.Sprintf("%s-%t", sr.ID(), createIfNeeded), func(ctx context.Context) (interface{}, error) {
if sr.getBlob() != "" {
if blobExistsWithSourceDateEpoch(sr, sourceDateEpoch) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that if there is no blob for epoch but there is one for another one, we will run the differ again? Is it a good idea? I think it would be better to do something similar to compressions there we convert a blob to another one.

return nil, nil
}
if !createIfNeeded {
Expand Down Expand Up @@ -169,7 +193,7 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
}
}
if enableOverlay {
computed, ok, err := sr.tryComputeOverlayBlob(ctx, lower, upper, mediaType, sr.ID(), compressorFunc)
computed, ok, err := sr.tryComputeOverlayBlob(ctx, lower, upper, mediaType, sr.ID(), compressorFunc, sourceDateEpoch)
if !ok || err != nil {
Comment on lines 195 to 197
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably yes, but we can defer that, as overlaybd will stay optional.

if !fallback {
if !ok {
Expand All @@ -196,6 +220,7 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
diff.WithMediaType(mediaType),
diff.WithReference(sr.ID()),
diff.WithCompressor(compressorFunc),
diff.WithSourceDateEpoch(sourceDateEpoch),
)
if err != nil {
bklog.G(ctx).WithError(err).Warnf("failed to compute blob by buildkit differ")
Expand All @@ -207,6 +232,7 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
diff.WithMediaType(mediaType),
diff.WithReference(sr.ID()),
diff.WithCompressor(compressorFunc),
diff.WithSourceDateEpoch(sourceDateEpoch),
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -238,7 +264,7 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
return nil, errors.Errorf("unknown layer compression type")
}

if err := sr.setBlob(ctx, desc); err != nil {
if err := sr.setBlob(ctx, desc, sourceDateEpoch); err != nil {
return nil, err
}
return nil, nil
Expand All @@ -264,7 +290,7 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool

// setBlob associates a blob with the cache record.
// A lease must be held for the blob when calling this function
func (sr *immutableRef) setBlob(ctx context.Context, desc ocispecs.Descriptor) (rerr error) {
func (sr *immutableRef) setBlob(ctx context.Context, desc ocispecs.Descriptor, sourceDateEpoch *time.Time) (rerr error) {
if _, ok := leases.FromContext(ctx); !ok {
return errors.Errorf("missing lease requirement for setBlob")
}
Expand All @@ -285,7 +311,7 @@ func (sr *immutableRef) setBlob(ctx context.Context, desc ocispecs.Descriptor) (
sr.mu.Lock()
defer sr.mu.Unlock()

if sr.getBlob() != "" {
if blobExistsWithSourceDateEpoch(sr, sourceDateEpoch) {
return nil
}

Expand All @@ -305,6 +331,9 @@ func (sr *immutableRef) setBlob(ctx context.Context, desc ocispecs.Descriptor) (
sr.queueMediaType(desc.MediaType)
sr.queueBlobSize(desc.Size)
sr.appendURLs(desc.URLs)
if sourceDateEpoch != nil {
sr.queueSourceDateEpoch(*sourceDateEpoch)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand how this works. Doesn't it just fix a ref to a specific epoch forever? Instead, it should track multiple variants of blobs that are all associated with a ref (similar to compressions).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should track multiple variants of blobs that are all associated with a ref (similar to compressions).

Does that require putting the epoch value into an OCI descriptor?

Copy link
Member

@tonistiigi tonistiigi Mar 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean for the remote build-cache or for something else? In build-cache we already have "created time" but not exactly the same(although both would be epoch value). We might need a separate field for that.

If you think for the case of avoiding regenerating blobs for base image layers if they were already generated with the same epoch, then yes, I think saving epoch would be needed. Otoh, I don't think any user would want us to regenerate the layers for the base image, just to change the timestamps in it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sr.queueSourceDateEpoch() is only called for new diffs, so it seems safe for now

}
if err := sr.commitMetadata(); err != nil {
return err
}
Expand Down
7 changes: 4 additions & 3 deletions cache/blobs_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"bufio"
"context"
"io"
"time"

"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
Expand All @@ -25,7 +26,7 @@ var emptyDesc = ocispecs.Descriptor{}
// diff between lower and upper snapshot. If the passed mounts cannot
// be computed (e.g. because the mounts aren't overlayfs), it returns
// an error.
func (sr *immutableRef) tryComputeOverlayBlob(ctx context.Context, lower, upper []mount.Mount, mediaType string, ref string, compressorFunc compression.Compressor) (_ ocispecs.Descriptor, ok bool, err error) {
func (sr *immutableRef) tryComputeOverlayBlob(ctx context.Context, lower, upper []mount.Mount, mediaType string, ref string, compressorFunc compression.Compressor, sourceDateEpoch *time.Time) (_ ocispecs.Descriptor, ok bool, err error) {
// Get upperdir location if mounts are overlayfs that can be processed by this differ.
upperdir, err := overlay.GetUpperdir(lower, upper)
if err != nil {
Expand Down Expand Up @@ -60,7 +61,7 @@ func (sr *immutableRef) tryComputeOverlayBlob(ctx context.Context, lower, upper
}
// Close ensure compressorFunc does some finalization works.
defer compressed.Close()
if err := overlay.WriteUpperdir(ctx, io.MultiWriter(compressed, dgstr.Hash()), upperdir, lower); err != nil {
if err := overlay.WriteUpperdir(ctx, io.MultiWriter(compressed, dgstr.Hash()), upperdir, lower, sourceDateEpoch); err != nil {
return emptyDesc, false, errors.Wrap(err, "failed to write compressed diff")
}
if err := compressed.Close(); err != nil {
Expand All @@ -71,7 +72,7 @@ func (sr *immutableRef) tryComputeOverlayBlob(ctx context.Context, lower, upper
}
labels[containerdUncompressed] = dgstr.Digest().String()
} else {
if err = overlay.WriteUpperdir(ctx, bufW, upperdir, lower); err != nil {
if err = overlay.WriteUpperdir(ctx, bufW, upperdir, lower, sourceDateEpoch); err != nil {
return emptyDesc, false, errors.Wrap(err, "failed to write diff")
}
}
Expand Down
5 changes: 3 additions & 2 deletions cache/blobs_nolinux.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ package cache

import (
"context"
"time"

"github.com/moby/buildkit/util/compression"
"github.com/containerd/containerd/mount"
"github.com/moby/buildkit/util/compression"
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
)

func (sr *immutableRef) tryComputeOverlayBlob(ctx context.Context, lower, upper []mount.Mount, mediaType string, ref string, compressorFunc compression.Compressor) (_ ocispecs.Descriptor, ok bool, err error) {
func (sr *immutableRef) tryComputeOverlayBlob(ctx context.Context, lower, upper []mount.Mount, mediaType string, ref string, compressorFunc compression.Compressor, sourceDateEpoch *time.Time) (_ ocispecs.Descriptor, ok bool, err error) {
return ocispecs.Descriptor{}, true, errors.Errorf("overlayfs-based diff computing is unsupported")
}
26 changes: 13 additions & 13 deletions cache/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ func TestMergeBlobchainID(t *testing.T) {
mergeRef, err := cm.Merge(ctx, mergeInputs, nil)
require.NoError(t, err)

_, err = mergeRef.GetRemotes(ctx, true, config.RefConfig{Compression: compression.New(compression.Default)}, false, nil)
_, err = mergeRef.GetRemotes(ctx, true, config.RefConfig{Compression: compression.New(compression.Default)}, false, nil, nil)
require.NoError(t, err)

// verify the merge blobchain ID isn't just set to one of the inputs (regression test)
Expand Down Expand Up @@ -616,7 +616,7 @@ func TestExtractOnMutable(t *testing.T) {
leaseCtx, done, err := leaseutil.WithLease(ctx, co.lm, leases.WithExpiration(0))
require.NoError(t, err)

err = snap.(*immutableRef).setBlob(leaseCtx, desc)
err = snap.(*immutableRef).setBlob(leaseCtx, desc, nil)
done(context.TODO())
require.NoError(t, err)
err = snap.(*immutableRef).computeChainMetadata(leaseCtx, map[string]struct{}{snap.ID(): {}})
Expand Down Expand Up @@ -733,10 +733,10 @@ func TestSetBlob(t *testing.T) {
Annotations: map[string]string{
"containerd.io/uncompressed": digest.FromBytes([]byte("foobar2")).String(),
},
})
}, nil)
require.Error(t, err)

err = snap.(*immutableRef).setBlob(ctx, desc)
err = snap.(*immutableRef).setBlob(ctx, desc, nil)
require.NoError(t, err)
err = snap.(*immutableRef).computeChainMetadata(ctx, map[string]struct{}{snap.ID(): {}})
require.NoError(t, err)
Expand All @@ -762,7 +762,7 @@ func TestSetBlob(t *testing.T) {
err = content.WriteBlob(ctx, co.cs, "ref2", bytes.NewBuffer(b2), desc2)
require.NoError(t, err)

err = snap2.(*immutableRef).setBlob(ctx, desc2)
err = snap2.(*immutableRef).setBlob(ctx, desc2, nil)
require.NoError(t, err)
err = snap2.(*immutableRef).computeChainMetadata(ctx, map[string]struct{}{snap.ID(): {}, snap2.ID(): {}})
require.NoError(t, err)
Expand Down Expand Up @@ -1171,7 +1171,7 @@ func TestLoopLeaseContent(t *testing.T) {
}()
var chain []ocispecs.Descriptor
for _, compressionType := range compressionLoop {
remotes, err := ref.GetRemotes(ctx, true, config.RefConfig{Compression: compression.New(compressionType).SetForce(true)}, false, nil)
remotes, err := ref.GetRemotes(ctx, true, config.RefConfig{Compression: compression.New(compressionType).SetForce(true)}, false, nil, nil)
require.NoError(t, err)
require.Equal(t, 1, len(remotes))
require.Equal(t, 1, len(remotes[0].Descriptors))
Expand Down Expand Up @@ -1368,7 +1368,7 @@ func testSharingCompressionVariant(ctx context.Context, t *testing.T, co *cmOut,
defer aRef.Release(ctx)
var bDesc ocispecs.Descriptor
for _, compressionType := range append([]compression.Type{testCase.a}, testCase.aVariants...) {
remotes, err := aRef.GetRemotes(ctx, true, config.RefConfig{Compression: compression.New(compressionType).SetForce(true)}, false, nil)
remotes, err := aRef.GetRemotes(ctx, true, config.RefConfig{Compression: compression.New(compressionType).SetForce(true)}, false, nil, nil)
require.NoError(t, err)
require.Equal(t, 1, len(remotes))
require.Equal(t, 1, len(remotes[0].Descriptors))
Expand All @@ -1381,7 +1381,7 @@ func testSharingCompressionVariant(ctx context.Context, t *testing.T, co *cmOut,
require.NoError(t, err)
defer bRef.Release(ctx)
for _, compressionType := range append([]compression.Type{testCase.b}, testCase.bVariants...) {
remotes, err := bRef.GetRemotes(ctx, true, config.RefConfig{Compression: compression.New(compressionType).SetForce(true)}, false, nil)
remotes, err := bRef.GetRemotes(ctx, true, config.RefConfig{Compression: compression.New(compressionType).SetForce(true)}, false, nil, nil)
require.NoError(t, err)
require.Equal(t, 1, len(remotes))
require.Equal(t, 1, len(remotes[0].Descriptors))
Expand Down Expand Up @@ -1730,7 +1730,7 @@ func TestGetRemotes(t *testing.T) {
compressionType := compressionType
refCfg := config.RefConfig{Compression: compression.New(compressionType).SetForce(true)}
eg.Go(func() error {
remotes, err := ir.GetRemotes(egctx, true, refCfg, false, nil)
remotes, err := ir.GetRemotes(egctx, true, refCfg, false, nil, nil)
require.NoError(t, err)
require.Equal(t, 1, len(remotes))
remote := remotes[0]
Expand Down Expand Up @@ -1819,13 +1819,13 @@ func TestGetRemotes(t *testing.T) {
compressionType := compressionType
refCfg := config.RefConfig{Compression: compression.New(compressionType)}
eg.Go(func() error {
remotes, err := ir.GetRemotes(egctx, false, refCfg, true, nil)
remotes, err := ir.GetRemotes(egctx, false, refCfg, true, nil, nil)
require.NoError(t, err)
require.True(t, len(remotes) > 0, "for %s : %d", compressionType, len(remotes))
gotMain, gotVariants := remotes[0], remotes[1:]

// Check the main blob is compatible with all == false
mainOnly, err := ir.GetRemotes(egctx, false, refCfg, false, nil)
mainOnly, err := ir.GetRemotes(egctx, false, refCfg, false, nil, nil)
require.NoError(t, err)
require.Equal(t, 1, len(mainOnly))
mainRemote := mainOnly[0]
Expand Down Expand Up @@ -1944,15 +1944,15 @@ func TestNondistributableBlobs(t *testing.T) {
ref, err := cm.GetByBlob(ctx, desc, nil, descHandlers)
require.NoError(t, err)

remotes, err := ref.GetRemotes(ctx, true, config.RefConfig{PreferNonDistributable: true}, false, nil)
remotes, err := ref.GetRemotes(ctx, true, config.RefConfig{PreferNonDistributable: true}, false, nil, nil)
require.NoError(t, err)

desc2 := remotes[0].Descriptors[0]

require.Equal(t, desc.MediaType, desc2.MediaType)
require.Equal(t, desc.URLs, desc2.URLs)

remotes, err = ref.GetRemotes(ctx, true, config.RefConfig{PreferNonDistributable: false}, false, nil)
remotes, err = ref.GetRemotes(ctx, true, config.RefConfig{PreferNonDistributable: false}, false, nil, nil)
require.NoError(t, err)

desc2 = remotes[0].Descriptors[0]
Expand Down
24 changes: 21 additions & 3 deletions cache/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const keyEqualMutable = "cache.equalMutable"
const keyCachePolicy = "cache.cachePolicy"
const keyDescription = "cache.description"
const keyCreatedAt = "cache.createdAt"
const keySourceDateEpoch = "cache.sourceDateEpoch"
const keyLastUsedAt = "cache.lastUsedAt"
const keyUsageCount = "cache.usageCount"
const keyLayerType = "cache.layerType"
Expand Down Expand Up @@ -191,6 +192,14 @@ func (md *cacheMetadata) GetCreatedAt() time.Time {
return md.getTime(keyCreatedAt)
}

func (md *cacheMetadata) queueSourceDateEpoch(tm time.Time) error {
return md.queueTime(keySourceDateEpoch, tm, "")
}

func (md *cacheMetadata) GetSourceDateEpoch() *time.Time {
return md.getTimeOrNil(keySourceDateEpoch)
}

func (md *cacheMetadata) HasCachePolicyDefault() bool {
return md.getCachePolicy() == cachePolicyDefault
}
Expand Down Expand Up @@ -506,16 +515,25 @@ func (md *cacheMetadata) queueTime(key string, value time.Time, index string) er
return md.queueValue(key, value.UnixNano(), index)
}

func (md *cacheMetadata) getTime(key string) time.Time {
func (md *cacheMetadata) getTimeOrNil(key string) *time.Time {
v := md.si.Get(key)
if v == nil {
return time.Time{}
return nil
}
var tm int64
if err := v.Unmarshal(&tm); err != nil {
return nil
}
u := time.Unix(tm/1e9, tm%1e9)
return &u
}

func (md *cacheMetadata) getTime(key string) time.Time {
v := md.getTimeOrNil(key)
if v == nil {
return time.Time{}
}
return time.Unix(tm/1e9, tm%1e9)
return *v
}

func (md *cacheMetadata) getBool(key string) bool {
Expand Down
2 changes: 1 addition & 1 deletion cache/refs.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type ImmutableRef interface {
Finalize(context.Context) error

Extract(ctx context.Context, s session.Group) error // +progress
GetRemotes(ctx context.Context, createIfNeeded bool, cfg config.RefConfig, all bool, s session.Group) ([]*solver.Remote, error)
GetRemotes(ctx context.Context, createIfNeeded bool, cfg config.RefConfig, all bool, s session.Group, sourceDateEpoch *time.Time) ([]*solver.Remote, error)
LayerChain() RefList
FileList(ctx context.Context, s session.Group) ([]string, error)
}
Expand Down
Loading