diff --git a/cache/contenthash/filehash.go b/cache/contenthash/filehash.go index 84018e785226..27b6570ba74f 100644 --- a/cache/contenthash/filehash.go +++ b/cache/contenthash/filehash.go @@ -40,6 +40,9 @@ func NewFileHash(path string, fi os.FileInfo) (hash.Hash, error) { } func NewFromStat(stat *fstypes.Stat) (hash.Hash, error) { + // Clear the socket bit since archive/tar.FileInfoHeader does not handle it + stat.Mode &^= uint32(os.ModeSocket) + fi := &statInfo{stat} hdr, err := tar.FileInfoHeader(fi, stat.Linkname) if err != nil { diff --git a/cache/contenthash/tarsum.go b/cache/contenthash/tarsum.go index 3327ab2c208c..de72d6cdd0ab 100644 --- a/cache/contenthash/tarsum.go +++ b/cache/contenthash/tarsum.go @@ -39,7 +39,7 @@ func v1TarHeaderSelect(h *tar.Header) (orderedHeaders [][2]string) { // Get extended attributes. xAttrKeys := make([]string, len(h.Xattrs)) for k := range h.Xattrs { - if !strings.HasPrefix(k, "security.") && !strings.HasPrefix(k, "system.") { + if k == "security.capability" || !strings.HasPrefix(k, "security.") && !strings.HasPrefix(k, "system.") { xAttrKeys = append(xAttrKeys, k) } } diff --git a/cache/refs.go b/cache/refs.go index cfedd6ae8b3f..5990edb5f41e 100644 --- a/cache/refs.go +++ b/cache/refs.go @@ -180,7 +180,10 @@ func (cr *cacheRecord) Size(ctx context.Context) (int64, error) { cr.mu.Unlock() return usage.Size, nil }) - return s.(int64), err + if err != nil { + return 0, err + } + return s.(int64), nil } func (cr *cacheRecord) Parent() ImmutableRef { diff --git a/cache/remotecache/inline/inline.go b/cache/remotecache/inline/inline.go index 96b979bdd362..3cddb13d212e 100644 --- a/cache/remotecache/inline/inline.go +++ b/cache/remotecache/inline/inline.go @@ -72,7 +72,7 @@ func (ce *exporter) ExportForLayers(layers []digest.Digest) ([]byte, error) { return nil, nil } - cache := map[digest.Digest]int{} + cache := map[int]int{} // reorder layers based on the order in the image for i, r := range cfg.Records { @@ -93,14 +93,14 @@ func (ce *exporter) ExportForLayers(layers []digest.Digest) ([]byte, error) { return dt, nil } -func getSortedLayerIndex(idx int, layers []v1.CacheLayer, cache map[digest.Digest]int) int { +func getSortedLayerIndex(idx int, layers []v1.CacheLayer, cache map[int]int) int { if idx == -1 { return -1 } l := layers[idx] - if i, ok := cache[l.Blob]; ok { + if i, ok := cache[idx]; ok { return i } - cache[l.Blob] = getSortedLayerIndex(l.ParentIndex, layers, cache) + 1 - return cache[l.Blob] + cache[idx] = getSortedLayerIndex(l.ParentIndex, layers, cache) + 1 + return cache[idx] } diff --git a/client/client_test.go b/client/client_test.go index ba33121a4e0f..1214b1951c6f 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -107,6 +107,7 @@ func TestIntegration(t *testing.T) { testCacheMountNoCache, testExporterTargetExists, testTarExporterWithSocket, + testTarExporterWithSocketCopy, testTarExporterSymlink, testMultipleRegistryCacheImportExport, }, mirrors) @@ -1630,6 +1631,30 @@ func testTarExporterWithSocket(t *testing.T, sb integration.Sandbox) { require.NoError(t, err) } +func testTarExporterWithSocketCopy(t *testing.T, sb integration.Sandbox) { + if os.Getenv("TEST_DOCKERD") == "1" { + t.Skip("tar exporter is temporarily broken on dockerd") + } + + requiresLinux(t) + c, err := New(context.TODO(), sb.Address()) + require.NoError(t, err) + defer c.Close() + + alpine := llb.Image("docker.io/library/alpine:latest") + state := alpine.Run(llb.Args([]string{"sh", "-c", "nc -l -s local:/root/socket.sock & usleep 100000; kill %1"})).Root() + + fa := llb.Copy(state, "/root", "/roo2", &llb.CopyInfo{}) + + scratchCopy := llb.Scratch().File(fa) + + def, err := scratchCopy.Marshal() + require.NoError(t, err) + + _, err = c.Solve(context.TODO(), def, SolveOpt{}, nil) + require.NoError(t, err) +} + // moby/buildkit#1418 func testTarExporterSymlink(t *testing.T, sb integration.Sandbox) { requiresLinux(t) diff --git a/solver/edge.go b/solver/edge.go index 0a076a2168b9..52613adbc542 100644 --- a/solver/edge.go +++ b/solver/edge.go @@ -26,12 +26,13 @@ func (t edgeStatusType) String() string { func newEdge(ed Edge, op activeOp, index *edgeIndex) *edge { e := &edge{ - edge: ed, - op: op, - depRequests: map[pipe.Receiver]*dep{}, - keyMap: map[string]struct{}{}, - cacheRecords: map[string]*CacheRecord{}, - index: index, + edge: ed, + op: op, + depRequests: map[pipe.Receiver]*dep{}, + keyMap: map[string]struct{}{}, + cacheRecords: map[string]*CacheRecord{}, + cacheRecordsLoaded: map[string]struct{}{}, + index: index, } return e } @@ -44,14 +45,16 @@ type edge struct { depRequests map[pipe.Receiver]*dep deps []*dep - cacheMapReq pipe.Receiver - cacheMapDone bool - cacheMapIndex int - cacheMapDigests []digest.Digest - execReq pipe.Receiver - err error - cacheRecords map[string]*CacheRecord - keyMap map[string]struct{} + cacheMapReq pipe.Receiver + cacheMapDone bool + cacheMapIndex int + cacheMapDigests []digest.Digest + execReq pipe.Receiver + execCacheLoad bool + err error + cacheRecords map[string]*CacheRecord + cacheRecordsLoaded map[string]struct{} + keyMap map[string]struct{} noCacheMatchPossible bool allDepsCompletedCacheFast bool @@ -425,7 +428,11 @@ func (e *edge) processUpdate(upt pipe.Receiver) (depChanged bool) { if upt == e.execReq && upt.Status().Completed { if err := upt.Status().Err; err != nil { e.execReq = nil - if !upt.Status().Canceled && e.err == nil { + if e.execCacheLoad { + for k := range e.cacheRecordsLoaded { + delete(e.cacheRecords, k) + } + } else if !upt.Status().Canceled && e.err == nil { e.err = err } } else { @@ -561,7 +568,9 @@ func (e *edge) recalcCurrentState() { } for _, r := range records { - e.cacheRecords[r.ID] = r + if _, ok := e.cacheRecordsLoaded[r.ID]; !ok { + e.cacheRecords[r.ID] = r + } } e.keys = append(e.keys, e.makeExportable(mergedKey, records)) @@ -821,6 +830,7 @@ func (e *edge) execIfPossible(f *pipeFactory) bool { return true } e.execReq = f.NewFuncRequest(e.loadCache) + e.execCacheLoad = true for req := range e.depRequests { req.Cancel() } @@ -831,6 +841,7 @@ func (e *edge) execIfPossible(f *pipeFactory) bool { return true } e.execReq = f.NewFuncRequest(e.execOp) + e.execCacheLoad = false return true } return false @@ -851,6 +862,7 @@ func (e *edge) loadCache(ctx context.Context) (interface{}, error) { } rec := getBestResult(recs) + e.cacheRecordsLoaded[rec.ID] = struct{}{} logrus.Debugf("load cache for %s with %s", e.edge.Vertex.Name(), rec.ID) res, err := e.op.LoadCache(ctx, rec) diff --git a/solver/scheduler_test.go b/solver/scheduler_test.go index 4e3814686cf0..81b2853283e2 100644 --- a/solver/scheduler_test.go +++ b/solver/scheduler_test.go @@ -3102,6 +3102,106 @@ func TestMergedEdgesLookup(t *testing.T) { } } +func TestCacheLoadError(t *testing.T) { + t.Parallel() + + rand.Seed(time.Now().UnixNano()) + + ctx := context.TODO() + + cacheManager := newTrackingCacheManager(NewInMemoryCacheManager()) + + l := NewSolver(SolverOpt{ + ResolveOpFunc: testOpResolver, + DefaultCache: cacheManager, + }) + defer l.Close() + + j0, err := l.NewJob("j0") + require.NoError(t, err) + + defer func() { + if j0 != nil { + j0.Discard() + } + }() + + g := Edge{ + Vertex: vtxSum(3, vtxOpt{inputs: []Edge{ + {Vertex: vtxSum(0, vtxOpt{inputs: []Edge{ + {Vertex: vtxSum(2, vtxOpt{inputs: []Edge{ + {Vertex: vtxConst(2, vtxOpt{})}, + }})}, + {Vertex: vtxConst(0, vtxOpt{})}, + }})}, + {Vertex: vtxSum(2, vtxOpt{inputs: []Edge{ + {Vertex: vtxConst(2, vtxOpt{})}, + }})}, + }}), + } + g.Vertex.(*vertexSum).setupCallCounters() + + res, err := j0.Build(ctx, g) + require.NoError(t, err) + require.Equal(t, unwrapInt(res), 11) + require.Equal(t, int64(7), *g.Vertex.(*vertexSum).cacheCallCount) + require.Equal(t, int64(5), *g.Vertex.(*vertexSum).execCallCount) + require.Equal(t, int64(0), cacheManager.loadCounter) + + require.NoError(t, j0.Discard()) + j0 = nil + + // repeat with cache + j1, err := l.NewJob("j1") + require.NoError(t, err) + + defer func() { + if j1 != nil { + j1.Discard() + } + }() + + g1 := g + + g1.Vertex.(*vertexSum).setupCallCounters() + + res, err = j1.Build(ctx, g1) + require.NoError(t, err) + require.Equal(t, unwrapInt(res), 11) + require.Equal(t, int64(7), *g.Vertex.(*vertexSum).cacheCallCount) + require.Equal(t, int64(0), *g.Vertex.(*vertexSum).execCallCount) + require.Equal(t, int64(1), cacheManager.loadCounter) + + require.NoError(t, j1.Discard()) + j1 = nil + + // repeat with cache but loading will now fail + j2, err := l.NewJob("j2") + require.NoError(t, err) + + defer func() { + if j2 != nil { + j2.Discard() + } + }() + + g2 := g + + g2.Vertex.(*vertexSum).setupCallCounters() + + cacheManager.forceFail = true + + res, err = j2.Build(ctx, g2) + require.NoError(t, err) + require.Equal(t, unwrapInt(res), 11) + require.Equal(t, int64(7), *g.Vertex.(*vertexSum).cacheCallCount) + require.Equal(t, int64(5), *g.Vertex.(*vertexSum).execCallCount) + require.Equal(t, int64(6), cacheManager.loadCounter) + + require.NoError(t, j2.Discard()) + j2 = nil +} + func TestInputRequestDeadlock(t *testing.T) { t.Parallel() ctx := context.TODO() @@ -3584,10 +3684,14 @@ func newTrackingCacheManager(cm CacheManager) *trackingCacheManager { type trackingCacheManager struct { CacheManager loadCounter int64 + forceFail bool } func (cm *trackingCacheManager) Load(ctx context.Context, rec *CacheRecord) (Result, error) { atomic.AddInt64(&cm.loadCounter, 1) + if cm.forceFail { + return nil, errors.Errorf("force fail") + } return cm.CacheManager.Load(ctx, rec) } diff --git a/util/contentutil/pusher.go b/util/contentutil/pusher.go index ab88128aa2ad..693dcfea93c9 100644 --- a/util/contentutil/pusher.go +++ b/util/contentutil/pusher.go @@ -2,21 +2,34 @@ package contentutil import ( "context" + "runtime" + "sync" + "time" "github.com/containerd/containerd/content" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/remotes" + digest "github.com/opencontainers/go-digest" "github.com/pkg/errors" ) func FromPusher(p remotes.Pusher) content.Ingester { + var mu sync.Mutex + c := sync.NewCond(&mu) return &pushingIngester{ - p: p, + mu: &mu, + c: c, + p: p, + active: map[digest.Digest]struct{}{}, } } type pushingIngester struct { p remotes.Pusher + + mu *sync.Mutex + c *sync.Cond + active map[digest.Digest]struct{} } // Writer implements content.Ingester. desc.MediaType must be set for manifest blobs. @@ -30,20 +43,55 @@ func (i *pushingIngester) Writer(ctx context.Context, opts ...content.WriterOpt) if wOpts.Ref == "" { return nil, errors.Wrap(errdefs.ErrInvalidArgument, "ref must not be empty") } + + st := time.Now() + + i.mu.Lock() + for { + if time.Since(st) > time.Hour { + i.mu.Unlock() + return nil, errors.Wrapf(errdefs.ErrUnavailable, "ref %v locked", wOpts.Desc.Digest) + } + if _, ok := i.active[wOpts.Desc.Digest]; ok { + i.c.Wait() + } else { + break + } + } + + i.active[wOpts.Desc.Digest] = struct{}{} + i.mu.Unlock() + + var once sync.Once + release := func() { + once.Do(func() { + i.mu.Lock() + delete(i.active, wOpts.Desc.Digest) + i.c.Broadcast() + i.mu.Unlock() + }) + } + // pusher requires desc.MediaType to determine the PUT URL, especially for manifest blobs. contentWriter, err := i.p.Push(ctx, wOpts.Desc) if err != nil { + release() return nil, err } + runtime.SetFinalizer(contentWriter, func(_ content.Writer) { + release() + }) return &writer{ Writer: contentWriter, contentWriterRef: wOpts.Ref, + release: release, }, nil } type writer struct { content.Writer // returned from pusher.Push contentWriterRef string // ref passed for Writer() + release func() } func (w *writer) Status() (content.Status, error) { @@ -56,3 +104,19 @@ func (w *writer) Status() (content.Status, error) { } return st, nil } + +func (w *writer) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { + err := w.Writer.Commit(ctx, size, expected, opts...) + if w.release != nil { + w.release() + } + return err +} + +func (w *writer) Close() error { + err := w.Writer.Close() + if w.release != nil { + w.release() + } + return err +} diff --git a/util/push/push.go b/util/push/push.go index 428010c39d71..b8d372d7f3f4 100644 --- a/util/push/push.go +++ b/util/push/push.go @@ -14,6 +14,7 @@ import ( "github.com/containerd/containerd/remotes/docker" "github.com/docker/distribution/reference" "github.com/moby/buildkit/session" + "github.com/moby/buildkit/util/flightcontrol" "github.com/moby/buildkit/util/imageutil" "github.com/moby/buildkit/util/progress" "github.com/moby/buildkit/util/resolver" @@ -73,7 +74,7 @@ func Push(ctx context.Context, sm *session.Manager, cs content.Store, dgst diges handlers := append([]images.Handler{}, images.HandlerFunc(annotateDistributionSourceHandler(cs, childrenHandler(cs))), filterHandler, - pushUpdateSourceHandler, + dedupeHandler(pushUpdateSourceHandler), ) ra, err := cs.ReaderAt(ctx, desc) @@ -248,3 +249,37 @@ func updateDistributionSourceHandler(cs content.Store, pushF images.HandlerFunc, return children, nil }), nil } + +func dedupeHandler(h images.HandlerFunc) images.HandlerFunc { + var g flightcontrol.Group + res := map[digest.Digest][]ocispec.Descriptor{} + var mu sync.Mutex + + return images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { + res, err := g.Do(ctx, desc.Digest.String(), func(ctx context.Context) (interface{}, error) { + mu.Lock() + if r, ok := res[desc.Digest]; ok { + mu.Unlock() + return r, nil + } + mu.Unlock() + + children, err := h(ctx, desc) + if err != nil { + return nil, err + } + + mu.Lock() + res[desc.Digest] = children + mu.Unlock() + return children, nil + }) + if err != nil { + return nil, err + } + if res == nil { + return nil, nil + } + return res.([]ocispec.Descriptor), nil + }) +}