Skip to content

Commit

Permalink
Merge pull request #1597 from tonistiigi/v0.7-picks
Browse files Browse the repository at this point in the history
[v0.7] cherry picks
  • Loading branch information
AkihiroSuda authored Jul 25, 2020
2 parents 4b35ebc + b7759e1 commit 753c685
Show file tree
Hide file tree
Showing 9 changed files with 271 additions and 25 deletions.
3 changes: 3 additions & 0 deletions cache/contenthash/filehash.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ func NewFileHash(path string, fi os.FileInfo) (hash.Hash, error) {
}

func NewFromStat(stat *fstypes.Stat) (hash.Hash, error) {
// Clear the socket bit since archive/tar.FileInfoHeader does not handle it
stat.Mode &^= uint32(os.ModeSocket)

fi := &statInfo{stat}
hdr, err := tar.FileInfoHeader(fi, stat.Linkname)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cache/contenthash/tarsum.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func v1TarHeaderSelect(h *tar.Header) (orderedHeaders [][2]string) {
// Get extended attributes.
xAttrKeys := make([]string, len(h.Xattrs))
for k := range h.Xattrs {
if !strings.HasPrefix(k, "security.") && !strings.HasPrefix(k, "system.") {
if k == "security.capability" || !strings.HasPrefix(k, "security.") && !strings.HasPrefix(k, "system.") {
xAttrKeys = append(xAttrKeys, k)
}
}
Expand Down
5 changes: 4 additions & 1 deletion cache/refs.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,10 @@ func (cr *cacheRecord) Size(ctx context.Context) (int64, error) {
cr.mu.Unlock()
return usage.Size, nil
})
return s.(int64), err
if err != nil {
return 0, err
}
return s.(int64), nil
}

func (cr *cacheRecord) Parent() ImmutableRef {
Expand Down
10 changes: 5 additions & 5 deletions cache/remotecache/inline/inline.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (ce *exporter) ExportForLayers(layers []digest.Digest) ([]byte, error) {
return nil, nil
}

cache := map[digest.Digest]int{}
cache := map[int]int{}

// reorder layers based on the order in the image
for i, r := range cfg.Records {
Expand All @@ -93,14 +93,14 @@ func (ce *exporter) ExportForLayers(layers []digest.Digest) ([]byte, error) {
return dt, nil
}

func getSortedLayerIndex(idx int, layers []v1.CacheLayer, cache map[digest.Digest]int) int {
func getSortedLayerIndex(idx int, layers []v1.CacheLayer, cache map[int]int) int {
if idx == -1 {
return -1
}
l := layers[idx]
if i, ok := cache[l.Blob]; ok {
if i, ok := cache[idx]; ok {
return i
}
cache[l.Blob] = getSortedLayerIndex(l.ParentIndex, layers, cache) + 1
return cache[l.Blob]
cache[idx] = getSortedLayerIndex(l.ParentIndex, layers, cache) + 1
return cache[idx]
}
25 changes: 25 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func TestIntegration(t *testing.T) {
testCacheMountNoCache,
testExporterTargetExists,
testTarExporterWithSocket,
testTarExporterWithSocketCopy,
testTarExporterSymlink,
testMultipleRegistryCacheImportExport,
}, mirrors)
Expand Down Expand Up @@ -1630,6 +1631,30 @@ func testTarExporterWithSocket(t *testing.T, sb integration.Sandbox) {
require.NoError(t, err)
}

func testTarExporterWithSocketCopy(t *testing.T, sb integration.Sandbox) {
if os.Getenv("TEST_DOCKERD") == "1" {
t.Skip("tar exporter is temporarily broken on dockerd")
}

requiresLinux(t)
c, err := New(context.TODO(), sb.Address())
require.NoError(t, err)
defer c.Close()

alpine := llb.Image("docker.io/library/alpine:latest")
state := alpine.Run(llb.Args([]string{"sh", "-c", "nc -l -s local:/root/socket.sock & usleep 100000; kill %1"})).Root()

fa := llb.Copy(state, "/root", "/roo2", &llb.CopyInfo{})

scratchCopy := llb.Scratch().File(fa)

def, err := scratchCopy.Marshal()
require.NoError(t, err)

_, err = c.Solve(context.TODO(), def, SolveOpt{}, nil)
require.NoError(t, err)
}

// moby/buildkit#1418
func testTarExporterSymlink(t *testing.T, sb integration.Sandbox) {
requiresLinux(t)
Expand Down
44 changes: 28 additions & 16 deletions solver/edge.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ func (t edgeStatusType) String() string {

func newEdge(ed Edge, op activeOp, index *edgeIndex) *edge {
e := &edge{
edge: ed,
op: op,
depRequests: map[pipe.Receiver]*dep{},
keyMap: map[string]struct{}{},
cacheRecords: map[string]*CacheRecord{},
index: index,
edge: ed,
op: op,
depRequests: map[pipe.Receiver]*dep{},
keyMap: map[string]struct{}{},
cacheRecords: map[string]*CacheRecord{},
cacheRecordsLoaded: map[string]struct{}{},
index: index,
}
return e
}
Expand All @@ -44,14 +45,16 @@ type edge struct {
depRequests map[pipe.Receiver]*dep
deps []*dep

cacheMapReq pipe.Receiver
cacheMapDone bool
cacheMapIndex int
cacheMapDigests []digest.Digest
execReq pipe.Receiver
err error
cacheRecords map[string]*CacheRecord
keyMap map[string]struct{}
cacheMapReq pipe.Receiver
cacheMapDone bool
cacheMapIndex int
cacheMapDigests []digest.Digest
execReq pipe.Receiver
execCacheLoad bool
err error
cacheRecords map[string]*CacheRecord
cacheRecordsLoaded map[string]struct{}
keyMap map[string]struct{}

noCacheMatchPossible bool
allDepsCompletedCacheFast bool
Expand Down Expand Up @@ -425,7 +428,11 @@ func (e *edge) processUpdate(upt pipe.Receiver) (depChanged bool) {
if upt == e.execReq && upt.Status().Completed {
if err := upt.Status().Err; err != nil {
e.execReq = nil
if !upt.Status().Canceled && e.err == nil {
if e.execCacheLoad {
for k := range e.cacheRecordsLoaded {
delete(e.cacheRecords, k)
}
} else if !upt.Status().Canceled && e.err == nil {
e.err = err
}
} else {
Expand Down Expand Up @@ -561,7 +568,9 @@ func (e *edge) recalcCurrentState() {
}

for _, r := range records {
e.cacheRecords[r.ID] = r
if _, ok := e.cacheRecordsLoaded[r.ID]; !ok {
e.cacheRecords[r.ID] = r
}
}

e.keys = append(e.keys, e.makeExportable(mergedKey, records))
Expand Down Expand Up @@ -821,6 +830,7 @@ func (e *edge) execIfPossible(f *pipeFactory) bool {
return true
}
e.execReq = f.NewFuncRequest(e.loadCache)
e.execCacheLoad = true
for req := range e.depRequests {
req.Cancel()
}
Expand All @@ -831,6 +841,7 @@ func (e *edge) execIfPossible(f *pipeFactory) bool {
return true
}
e.execReq = f.NewFuncRequest(e.execOp)
e.execCacheLoad = false
return true
}
return false
Expand All @@ -851,6 +862,7 @@ func (e *edge) loadCache(ctx context.Context) (interface{}, error) {
}

rec := getBestResult(recs)
e.cacheRecordsLoaded[rec.ID] = struct{}{}

logrus.Debugf("load cache for %s with %s", e.edge.Vertex.Name(), rec.ID)
res, err := e.op.LoadCache(ctx, rec)
Expand Down
104 changes: 104 additions & 0 deletions solver/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3102,6 +3102,106 @@ func TestMergedEdgesLookup(t *testing.T) {
}
}

func TestCacheLoadError(t *testing.T) {
t.Parallel()

rand.Seed(time.Now().UnixNano())

ctx := context.TODO()

cacheManager := newTrackingCacheManager(NewInMemoryCacheManager())

l := NewSolver(SolverOpt{
ResolveOpFunc: testOpResolver,
DefaultCache: cacheManager,
})
defer l.Close()

j0, err := l.NewJob("j0")
require.NoError(t, err)

defer func() {
if j0 != nil {
j0.Discard()
}
}()

g := Edge{
Vertex: vtxSum(3, vtxOpt{inputs: []Edge{
{Vertex: vtxSum(0, vtxOpt{inputs: []Edge{
{Vertex: vtxSum(2, vtxOpt{inputs: []Edge{
{Vertex: vtxConst(2, vtxOpt{})},
}})},
{Vertex: vtxConst(0, vtxOpt{})},
}})},
{Vertex: vtxSum(2, vtxOpt{inputs: []Edge{
{Vertex: vtxConst(2, vtxOpt{})},
}})},
}}),
}
g.Vertex.(*vertexSum).setupCallCounters()

res, err := j0.Build(ctx, g)
require.NoError(t, err)
require.Equal(t, unwrapInt(res), 11)
require.Equal(t, int64(7), *g.Vertex.(*vertexSum).cacheCallCount)
require.Equal(t, int64(5), *g.Vertex.(*vertexSum).execCallCount)
require.Equal(t, int64(0), cacheManager.loadCounter)

require.NoError(t, j0.Discard())
j0 = nil

// repeat with cache
j1, err := l.NewJob("j1")
require.NoError(t, err)

defer func() {
if j1 != nil {
j1.Discard()
}
}()

g1 := g

g1.Vertex.(*vertexSum).setupCallCounters()

res, err = j1.Build(ctx, g1)
require.NoError(t, err)
require.Equal(t, unwrapInt(res), 11)
require.Equal(t, int64(7), *g.Vertex.(*vertexSum).cacheCallCount)
require.Equal(t, int64(0), *g.Vertex.(*vertexSum).execCallCount)
require.Equal(t, int64(1), cacheManager.loadCounter)

require.NoError(t, j1.Discard())
j1 = nil

// repeat with cache but loading will now fail
j2, err := l.NewJob("j2")
require.NoError(t, err)

defer func() {
if j2 != nil {
j2.Discard()
}
}()

g2 := g

g2.Vertex.(*vertexSum).setupCallCounters()

cacheManager.forceFail = true

res, err = j2.Build(ctx, g2)
require.NoError(t, err)
require.Equal(t, unwrapInt(res), 11)
require.Equal(t, int64(7), *g.Vertex.(*vertexSum).cacheCallCount)
require.Equal(t, int64(5), *g.Vertex.(*vertexSum).execCallCount)
require.Equal(t, int64(6), cacheManager.loadCounter)

require.NoError(t, j2.Discard())
j2 = nil
}

func TestInputRequestDeadlock(t *testing.T) {
t.Parallel()
ctx := context.TODO()
Expand Down Expand Up @@ -3584,10 +3684,14 @@ func newTrackingCacheManager(cm CacheManager) *trackingCacheManager {
type trackingCacheManager struct {
CacheManager
loadCounter int64
forceFail bool
}

func (cm *trackingCacheManager) Load(ctx context.Context, rec *CacheRecord) (Result, error) {
atomic.AddInt64(&cm.loadCounter, 1)
if cm.forceFail {
return nil, errors.Errorf("force fail")
}
return cm.CacheManager.Load(ctx, rec)
}

Expand Down
Loading

0 comments on commit 753c685

Please sign in to comment.