Skip to content

Commit

Permalink
Dedupe layer uploads where possible (#396)
Browse files Browse the repository at this point in the history
* Dedupe layer uploads where possible

* Add test

* fix races

* review comments
  • Loading branch information
imjasonh authored Mar 6, 2019
1 parent 4aac97b commit 1b9964c
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 1 deletion.
16 changes: 16 additions & 0 deletions pkg/v1/remote/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,25 @@ func Write(ref name.Reference, img v1.Image, auth authn.Authenticator, t http.Ro
}

// Upload individual layers in goroutines and collect any errors.
// If we can dedupe by the layer digest, try to do so. If the layer is
// a stream.Layer, we can't dedupe and might re-upload.
var g errgroup.Group
uploaded := map[v1.Hash]bool{}
for _, l := range ls {
l := l
if _, ok := l.(*stream.Layer); !ok {
h, err := l.Digest()
if err != nil {
return err
}
// If we can determine the layer's digest ahead of
// time, use it to dedupe uploads.
if uploaded[h] {
continue // Already uploading.
}
uploaded[h] = true
}

g.Go(func() error {
return w.uploadOne(l)
})
Expand Down
96 changes: 95 additions & 1 deletion pkg/v1/remote/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,20 @@ import (
"net/http/httptest"
"net/url"
"strings"
"sync/atomic"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/google/go-containerregistry/pkg/authn"
"github.com/google/go-containerregistry/pkg/name"
v1 "github.com/google/go-containerregistry/pkg/v1"
"github.com/google/go-containerregistry/pkg/v1/mutate"
"github.com/google/go-containerregistry/pkg/v1/partial"
"github.com/google/go-containerregistry/pkg/v1/random"
"github.com/google/go-containerregistry/pkg/v1/remote/transport"
"github.com/google/go-containerregistry/pkg/v1/stream"
"github.com/google/go-containerregistry/pkg/v1/tarball"
)

func mustNewTag(t *testing.T, s string) name.Tag {
Expand Down Expand Up @@ -128,7 +131,7 @@ func setupImage(t *testing.T) v1.Image {
func setupIndex(t *testing.T, children int64) v1.ImageIndex {
rnd, err := random.Index(1024, 1, children)
if err != nil {
t.Fatalf("random.ImageIndex() = %v", err)
t.Fatalf("random.Index() = %v", err)
}
return rnd
}
Expand Down Expand Up @@ -417,6 +420,97 @@ func TestInitiateUploadMountsWithMountFromTheSameRegistry(t *testing.T) {
}
}

func TestDedupeLayers(t *testing.T) {
newBlob := func() io.ReadCloser { return ioutil.NopCloser(bytes.NewReader(bytes.Repeat([]byte{'a'}, 10000))) }

img, err := random.Image(1024, 3)
if err != nil {
t.Fatalf("random.Image: %v", err)
}

// Append three identical tarball.Layers, which should be deduped
// because contents can be hashed before uploading.
for i := 0; i < 3; i++ {
tl, err := tarball.LayerFromOpener(func() (io.ReadCloser, error) { return newBlob(), nil })
if err != nil {
t.Fatalf("LayerFromOpener(#%d): %v", i, err)
}
img, err = mutate.AppendLayers(img, tl)
if err != nil {
t.Fatalf("mutate.AppendLayer(#%d): %v", i, err)
}
}

// Append three identical stream.Layers, whose uploads will *not* be
// deduped since Write can't tell they're identical ahead of time.
for i := 0; i < 3; i++ {
sl := stream.NewLayer(newBlob())
img, err = mutate.AppendLayers(img, sl)
if err != nil {
t.Fatalf("mutate.AppendLayer(#%d): %v", i, err)
}
}

expectedRepo := "write/time"
headPathPrefix := fmt.Sprintf("/v2/%s/blobs/", expectedRepo)
initiatePath := fmt.Sprintf("/v2/%s/blobs/uploads/", expectedRepo)
manifestPath := fmt.Sprintf("/v2/%s/manifests/latest", expectedRepo)
uploadPath := "/upload"
commitPath := "/commit"
var numUploads int32 = 0
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodHead && strings.HasPrefix(r.URL.Path, headPathPrefix) && r.URL.Path != initiatePath {
http.Error(w, "NotFound", http.StatusNotFound)
return
}
switch r.URL.Path {
case "/v2/":
w.WriteHeader(http.StatusOK)
case initiatePath:
if r.Method != http.MethodPost {
t.Errorf("Method; got %v, want %v", r.Method, http.MethodPost)
}
w.Header().Set("Location", uploadPath)
http.Error(w, "Accepted", http.StatusAccepted)
case uploadPath:
if r.Method != http.MethodPatch {
t.Errorf("Method; got %v, want %v", r.Method, http.MethodPatch)
}
atomic.AddInt32(&numUploads, 1)
w.Header().Set("Location", commitPath)
http.Error(w, "Created", http.StatusCreated)
case commitPath:
http.Error(w, "Created", http.StatusCreated)
case manifestPath:
if r.Method != http.MethodPut {
t.Errorf("Method; got %v, want %v", r.Method, http.MethodPut)
}
http.Error(w, "Created", http.StatusCreated)
default:
t.Fatalf("Unexpected path: %v", r.URL.Path)
}
}))
defer server.Close()
u, err := url.Parse(server.URL)
if err != nil {
t.Fatalf("url.Parse(%v) = %v", server.URL, err)
}
tag, err := name.NewTag(fmt.Sprintf("%s/%s:latest", u.Host, expectedRepo), name.WeakValidation)
if err != nil {
t.Fatalf("NewTag() = %v", err)
}

if err := Write(tag, img, authn.Anonymous, http.DefaultTransport); err != nil {
t.Errorf("Write: %v", err)
}

// 3 random layers, 1 tarball layer (deduped), 3 stream layers (not deduped), 1 image config blob
wantUploads := int32(3 + 1 + 3 + 1)
if numUploads != wantUploads {
t.Fatalf("Write uploaded %d blobs, want %d", numUploads, wantUploads)
}
}

func TestStreamBlob(t *testing.T) {
img := setupImage(t)
expectedPath := "/vWhatever/I/decide"
Expand Down
11 changes: 11 additions & 0 deletions pkg/v1/stream/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"hash"
"io"
"sync"

v1 "github.com/google/go-containerregistry/pkg/v1"
)
Expand All @@ -40,6 +41,7 @@ type Layer struct {
blob io.ReadCloser
consumed bool

mu sync.Mutex
digest, diffID *v1.Hash
size int64
}
Expand All @@ -51,6 +53,8 @@ func NewLayer(rc io.ReadCloser) *Layer { return &Layer{blob: rc} }

// Digest implements v1.Layer.
func (l *Layer) Digest() (v1.Hash, error) {
l.mu.Lock()
defer l.mu.Unlock()
if l.digest == nil {
return v1.Hash{}, ErrNotComputed
}
Expand All @@ -59,6 +63,8 @@ func (l *Layer) Digest() (v1.Hash, error) {

// DiffID implements v1.Layer.
func (l *Layer) DiffID() (v1.Hash, error) {
l.mu.Lock()
defer l.mu.Unlock()
if l.diffID == nil {
return v1.Hash{}, ErrNotComputed
}
Expand All @@ -67,6 +73,8 @@ func (l *Layer) DiffID() (v1.Hash, error) {

// Size implements v1.Layer.
func (l *Layer) Size() (int64, error) {
l.mu.Lock()
defer l.mu.Unlock()
if l.size == 0 {
return 0, ErrNotComputed
}
Expand Down Expand Up @@ -136,6 +144,9 @@ func newCompressedReader(l *Layer) (*compressedReader, error) {
func (cr *compressedReader) Read(b []byte) (int, error) { return cr.pr.Read(b) }

func (cr *compressedReader) Close() error {
cr.l.mu.Lock()
defer cr.l.mu.Unlock()

// Close the inner ReadCloser.
if err := cr.closer.Close(); err != nil {
return err
Expand Down

0 comments on commit 1b9964c

Please sign in to comment.