Skip to content

Commit

Permalink
Add initial MergeOp implementation.
Browse files Browse the repository at this point in the history
This consists of just the base MergeOp with support for merging LLB
results that include deletions using hardlinks as the efficient path
and copies as fallback.

Signed-off-by: Erik Sipsma <[email protected]>
  • Loading branch information
sipsma committed Nov 18, 2021
1 parent 9321ec2 commit d73e62f
Show file tree
Hide file tree
Showing 39 changed files with 3,518 additions and 568 deletions.
237 changes: 147 additions & 90 deletions api/services/control/control.pb.go

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion api/services/control/control.proto
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,14 @@ message UsageRecord {
bool Mutable = 2;
bool InUse = 3;
int64 Size = 4;
string Parent = 5;
string Parent = 5 [deprecated=true];
google.protobuf.Timestamp CreatedAt = 6 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
google.protobuf.Timestamp LastUsedAt = 7 [(gogoproto.stdtime) = true];
int64 UsageCount = 8;
string Description = 9;
string RecordType = 10;
bool Shared = 11;
repeated string Parents = 12;
}

message SolveRequest {
Expand Down
306 changes: 172 additions & 134 deletions cache/blobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,170 +54,177 @@ type compressor func(dest io.Writer, requiredMediaType string) (io.WriteCloser,

func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool, compressionType compression.Type, forceCompression bool, s session.Group) error {
eg, ctx := errgroup.WithContext(ctx)
if sr.parent != nil {
switch sr.kind() {
case Merge:
for _, parent := range sr.mergeParents {
parent := parent
eg.Go(func() error {
return computeBlobChain(ctx, parent, createIfNeeded, compressionType, forceCompression, s)
})
}
case Layer:
eg.Go(func() error {
return computeBlobChain(ctx, sr.parent, createIfNeeded, compressionType, forceCompression, s)
return computeBlobChain(ctx, sr.layerParent, createIfNeeded, compressionType, forceCompression, s)
})
}

eg.Go(func() error {
_, err := g.Do(ctx, fmt.Sprintf("%s-%t", sr.ID(), createIfNeeded), func(ctx context.Context) (interface{}, error) {
if sr.getBlob() != "" {
return nil, nil
}
if !createIfNeeded {
return nil, errors.WithStack(ErrNoBlobs)
}
fallthrough
case BaseLayer:
eg.Go(func() error {
_, err := g.Do(ctx, fmt.Sprintf("%s-%t", sr.ID(), createIfNeeded), func(ctx context.Context) (interface{}, error) {
if sr.getBlob() != "" {
return nil, nil
}
if !createIfNeeded {
return nil, errors.WithStack(ErrNoBlobs)
}

var mediaType string
var compressorFunc compressor
var finalize func(context.Context, content.Store) (map[string]string, error)
switch compressionType {
case compression.Uncompressed:
mediaType = ocispecs.MediaTypeImageLayer
case compression.Gzip:
mediaType = ocispecs.MediaTypeImageLayerGzip
case compression.EStargz:
compressorFunc, finalize = compressEStargz()
mediaType = ocispecs.MediaTypeImageLayerGzip
case compression.Zstd:
compressorFunc = zstdWriter
mediaType = ocispecs.MediaTypeImageLayer + "+zstd"
default:
return nil, errors.Errorf("unknown layer compression type: %q", compressionType)
}
var mediaType string
var compressorFunc compressor
var finalize func(context.Context, content.Store) (map[string]string, error)
switch compressionType {
case compression.Uncompressed:
mediaType = ocispecs.MediaTypeImageLayer
case compression.Gzip:
mediaType = ocispecs.MediaTypeImageLayerGzip
case compression.EStargz:
compressorFunc, finalize = compressEStargz()
mediaType = ocispecs.MediaTypeImageLayerGzip
case compression.Zstd:
compressorFunc = zstdWriter
mediaType = ocispecs.MediaTypeImageLayer + "+zstd"
default:
return nil, errors.Errorf("unknown layer compression type: %q", compressionType)
}

var lower []mount.Mount
if sr.parent != nil {
m, err := sr.parent.Mount(ctx, true, s)
var lower []mount.Mount
if sr.layerParent != nil {
m, err := sr.layerParent.Mount(ctx, true, s)
if err != nil {
return nil, err
}
var release func() error
lower, release, err = m.Mount()
if err != nil {
return nil, err
}
if release != nil {
defer release()
}
}
m, err := sr.Mount(ctx, true, s)
if err != nil {
return nil, err
}
var release func() error
lower, release, err = m.Mount()
upper, release, err := m.Mount()
if err != nil {
return nil, err
}
if release != nil {
defer release()
}
}
m, err := sr.Mount(ctx, true, s)
if err != nil {
return nil, err
}
upper, release, err := m.Mount()
if err != nil {
return nil, err
}
if release != nil {
defer release()
}
var desc ocispecs.Descriptor

// Determine differ and error/log handling according to the platform, envvar and the snapshotter.
var enableOverlay, fallback, logWarnOnErr bool
if forceOvlStr := os.Getenv("BUILDKIT_DEBUG_FORCE_OVERLAY_DIFF"); forceOvlStr != "" {
enableOverlay, err = strconv.ParseBool(forceOvlStr)
if err != nil {
return nil, errors.Wrapf(err, "invalid boolean in BUILDKIT_DEBUG_FORCE_OVERLAY_DIFF")
}
fallback = false // prohibit fallback on debug
} else if !isTypeWindows(sr) {
enableOverlay, fallback = true, true
switch sr.cm.ManagerOpt.Snapshotter.Name() {
case "overlayfs", "stargz":
// overlayfs-based snapshotters should support overlay diff. so print warn log on failure.
logWarnOnErr = true
case "fuse-overlayfs":
// not supported with fuse-overlayfs snapshotter which doesn't provide overlayfs mounts.
// TODO: add support for fuse-overlayfs
enableOverlay = false
var desc ocispecs.Descriptor

// Determine differ and error/log handling according to the platform, envvar and the snapshotter.
var enableOverlay, fallback, logWarnOnErr bool
if forceOvlStr := os.Getenv("BUILDKIT_DEBUG_FORCE_OVERLAY_DIFF"); forceOvlStr != "" {
enableOverlay, err = strconv.ParseBool(forceOvlStr)
if err != nil {
return nil, errors.Wrapf(err, "invalid boolean in BUILDKIT_DEBUG_FORCE_OVERLAY_DIFF")
}
fallback = false // prohibit fallback on debug
} else if !isTypeWindows(sr) {
enableOverlay, fallback = true, true
switch sr.cm.Snapshotter.Name() {
case "overlayfs", "stargz":
// overlayfs-based snapshotters should support overlay diff. so print warn log on failure.
logWarnOnErr = true
case "fuse-overlayfs":
// not supported with fuse-overlayfs snapshotter which doesn't provide overlayfs mounts.
// TODO: add support for fuse-overlayfs
enableOverlay = false
}
}
}
if enableOverlay {
computed, ok, err := sr.tryComputeOverlayBlob(ctx, lower, upper, mediaType, sr.ID(), compressorFunc)
if !ok || err != nil {
if !fallback {
if !ok {
return nil, errors.Errorf("overlay mounts not detected (lower=%+v,upper=%+v)", lower, upper)
if enableOverlay {
computed, ok, err := sr.tryComputeOverlayBlob(ctx, lower, upper, mediaType, sr.ID(), compressorFunc)
if !ok || err != nil {
if !fallback {
if !ok {
return nil, errors.Errorf("overlay mounts not detected (lower=%+v,upper=%+v)", lower, upper)
}
if err != nil {
return nil, errors.Wrapf(err, "failed to compute overlay diff")
}
}
if err != nil {
return nil, errors.Wrapf(err, "failed to compute overlay diff")
if logWarnOnErr {
logrus.Warnf("failed to compute blob by overlay differ (ok=%v): %v", ok, err)
}
}
if logWarnOnErr {
logrus.Warnf("failed to compute blob by overlay differ (ok=%v): %v", ok, err)
if ok {
desc = computed
}
}
if ok {
desc = computed

if desc.Digest == "" {
desc, err = sr.cm.Differ.Compare(ctx, lower, upper,
diff.WithMediaType(mediaType),
diff.WithReference(sr.ID()),
diff.WithCompressor(compressorFunc),
)
if err != nil {
return nil, err
}
}
}

if desc.Digest == "" {
desc, err = sr.cm.Differ.Compare(ctx, lower, upper,
diff.WithMediaType(mediaType),
diff.WithReference(sr.ID()),
diff.WithCompressor(compressorFunc),
)
if desc.Annotations == nil {
desc.Annotations = map[string]string{}
}
if finalize != nil {
a, err := finalize(ctx, sr.cm.ContentStore)
if err != nil {
return nil, errors.Wrapf(err, "failed to finalize compression")
}
for k, v := range a {
desc.Annotations[k] = v
}
}
info, err := sr.cm.ContentStore.Info(ctx, desc.Digest)
if err != nil {
return nil, err
}
}

if desc.Annotations == nil {
desc.Annotations = map[string]string{}
}
if finalize != nil {
a, err := finalize(ctx, sr.cm.ContentStore)
if err != nil {
return nil, errors.Wrapf(err, "failed to finalize compression")
if diffID, ok := info.Labels[containerdUncompressed]; ok {
desc.Annotations[containerdUncompressed] = diffID
} else if mediaType == ocispecs.MediaTypeImageLayer {
desc.Annotations[containerdUncompressed] = desc.Digest.String()
} else {
return nil, errors.Errorf("unknown layer compression type")
}
for k, v := range a {
desc.Annotations[k] = v
}
}

info, err := sr.cm.ContentStore.Info(ctx, desc.Digest)
if err := sr.setBlob(ctx, compressionType, desc); err != nil {
return nil, err
}
return nil, nil
})
if err != nil {
return nil, err
return err
}

if diffID, ok := info.Labels[containerdUncompressed]; ok {
desc.Annotations[containerdUncompressed] = diffID
} else if mediaType == ocispecs.MediaTypeImageLayer {
desc.Annotations[containerdUncompressed] = desc.Digest.String()
} else {
return nil, errors.Errorf("unknown layer compression type")
}

if err := sr.setBlob(ctx, compressionType, desc); err != nil {
return nil, err
if forceCompression {
if err := ensureCompression(ctx, sr, compressionType, s); err != nil {
return errors.Wrapf(err, "failed to ensure compression type of %q", compressionType)
}
}

return nil, nil
return nil
})
if err != nil {
return err
}
if forceCompression {
if err := ensureCompression(ctx, sr, compressionType, s); err != nil {
return errors.Wrapf(err, "failed to ensure compression type of %q", compressionType)
}
}
return nil
})
}

if err := eg.Wait(); err != nil {
return err
}
return sr.setChains(ctx)
return sr.computeChainMetadata(ctx)
}

// setBlob associates a blob with the cache record.
// A lease must be held for the blob when calling this function
// Caller should call Info() for knowing what current values are actually set
func (sr *immutableRef) setBlob(ctx context.Context, compressionType compression.Type, desc ocispecs.Descriptor) error {
if _, ok := leases.FromContext(ctx); !ok {
return errors.Errorf("missing lease requirement for setBlob")
Expand Down Expand Up @@ -267,9 +274,9 @@ func (sr *immutableRef) setBlob(ctx context.Context, compressionType compression
return nil
}

func (sr *immutableRef) setChains(ctx context.Context) error {
func (sr *immutableRef) computeChainMetadata(ctx context.Context) error {
if _, ok := leases.FromContext(ctx); !ok {
return errors.Errorf("missing lease requirement for setChains")
return errors.Errorf("missing lease requirement for computeChainMetadata")
}

sr.mu.Lock()
Expand All @@ -281,13 +288,37 @@ func (sr *immutableRef) setChains(ctx context.Context) error {

var chainIDs []digest.Digest
var blobChainIDs []digest.Digest
if sr.parent != nil {
chainIDs = append(chainIDs, digest.Digest(sr.parent.getChainID()))
blobChainIDs = append(blobChainIDs, digest.Digest(sr.parent.getBlobChainID()))

// Blobs should be set the actual layers in the ref's chain, no
// any merge refs.
layerChain := sr.layerChain()
var layerParent *cacheRecord
switch sr.kind() {
case Merge:
layerParent = layerChain[len(layerChain)-1].cacheRecord
case Layer:
// skip the last layer in the chain, which is this ref itself
layerParent = layerChain[len(layerChain)-2].cacheRecord
}
if layerParent != nil {
if parentChainID := layerParent.getChainID(); parentChainID != "" {
chainIDs = append(chainIDs, parentChainID)
} else {
return errors.Errorf("failed to set chain for reference with non-addressable parent")
}
if parentBlobChainID := layerParent.getBlobChainID(); parentBlobChainID != "" {
blobChainIDs = append(blobChainIDs, parentBlobChainID)
} else {
return errors.Errorf("failed to set blobchain for reference with non-addressable parent")
}
}

switch sr.kind() {
case Layer, BaseLayer:
diffID := digest.Digest(sr.getDiffID())
chainIDs = append(chainIDs, diffID)
blobChainIDs = append(blobChainIDs, imagespecidentity.ChainID([]digest.Digest{digest.Digest(sr.getBlob()), diffID}))
}
diffID := digest.Digest(sr.getDiffID())
chainIDs = append(chainIDs, diffID)
blobChainIDs = append(blobChainIDs, imagespecidentity.ChainID([]digest.Digest{digest.Digest(sr.getBlob()), diffID}))

chainID := imagespecidentity.ChainID(chainIDs)
blobChainID := imagespecidentity.ChainID(blobChainIDs)
Expand All @@ -304,8 +335,15 @@ func isTypeWindows(sr *immutableRef) bool {
if sr.GetLayerType() == "windows" {
return true
}
if parent := sr.parent; parent != nil {
return isTypeWindows(parent)
switch sr.kind() {
case Merge:
for _, p := range sr.mergeParents {
if isTypeWindows(p) {
return true
}
}
case Layer:
return isTypeWindows(sr.layerParent)
}
return false
}
Expand Down
Loading

0 comments on commit d73e62f

Please sign in to comment.