From 8314c8a41f6e1e75fe047f28f0eb2d24f07fd6e7 Mon Sep 17 00:00:00 2001 From: Jason Hall Date: Tue, 5 Mar 2019 22:18:37 -0500 Subject: [PATCH 1/4] Dedupe layer uploads where possible --- pkg/v1/remote/write.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/pkg/v1/remote/write.go b/pkg/v1/remote/write.go index 0f6b0f68f..ae6147763 100644 --- a/pkg/v1/remote/write.go +++ b/pkg/v1/remote/write.go @@ -57,9 +57,21 @@ func Write(ref name.Reference, img v1.Image, auth authn.Authenticator, t http.Ro } // Upload individual layers in goroutines and collect any errors. + // If we can dedupe by the layer digest, try to do so. If the layer is + // a stream.Layer, we can't dedupe and might re-upload. var g errgroup.Group + seen := map[v1.Hash]struct{}{} for _, l := range ls { l := l + if h, err := l.Digest(); err == nil { + // If we can determine the layer's digest ahead of + // time, use it to dedupe uploads. + if _, found := seen[h]; found { + continue // Already uploading. + } + seen[h] = struct{}{} + } + g.Go(func() error { return w.uploadOne(l) }) From d6c2ec4062f2bc7efb678df80b7c6dbea32f060e Mon Sep 17 00:00:00 2001 From: Jason Hall Date: Wed, 6 Mar 2019 08:36:23 -0500 Subject: [PATCH 2/4] Add test --- pkg/v1/remote/write_test.go | 95 ++++++++++++++++++++++++++++++++++++- 1 file changed, 94 insertions(+), 1 deletion(-) diff --git a/pkg/v1/remote/write_test.go b/pkg/v1/remote/write_test.go index 915ce755c..07b5b3752 100644 --- a/pkg/v1/remote/write_test.go +++ b/pkg/v1/remote/write_test.go @@ -33,10 +33,12 @@ import ( "github.com/google/go-containerregistry/pkg/authn" "github.com/google/go-containerregistry/pkg/name" v1 "github.com/google/go-containerregistry/pkg/v1" + "github.com/google/go-containerregistry/pkg/v1/mutate" "github.com/google/go-containerregistry/pkg/v1/partial" "github.com/google/go-containerregistry/pkg/v1/random" "github.com/google/go-containerregistry/pkg/v1/remote/transport" "github.com/google/go-containerregistry/pkg/v1/stream" + "github.com/google/go-containerregistry/pkg/v1/tarball" ) func mustNewTag(t *testing.T, s string) name.Tag { @@ -128,7 +130,7 @@ func setupImage(t *testing.T) v1.Image { func setupIndex(t *testing.T, children int64) v1.ImageIndex { rnd, err := random.Index(1024, 1, children) if err != nil { - t.Fatalf("random.ImageIndex() = %v", err) + t.Fatalf("random.Index() = %v", err) } return rnd } @@ -417,6 +419,97 @@ func TestInitiateUploadMountsWithMountFromTheSameRegistry(t *testing.T) { } } +func TestDedupeLayers(t *testing.T) { + newBlob := func() io.ReadCloser { return ioutil.NopCloser(bytes.NewReader(bytes.Repeat([]byte{'a'}, 10000))) } + + img, err := random.Image(1024, 3) + if err != nil { + t.Fatalf("random.Image: %v", err) + } + + // Append three identical tarball.Layers, which should be deduped + // because contents can be hashed before uploading. + for i := 0; i < 3; i++ { + tl, err := tarball.LayerFromOpener(func() (io.ReadCloser, error) { return newBlob(), nil }) + if err != nil { + t.Fatalf("LayerFromOpener(#%d): %v", i, err) + } + img, err = mutate.AppendLayers(img, tl) + if err != nil { + t.Fatalf("mutate.AppendLayer(#%d): %v", i, err) + } + } + + // Append three identical stream.Layers, whose uploads will *not* be + // deduped since Write can't tell they're identical ahead of time. + for i := 0; i < 3; i++ { + sl := stream.NewLayer(newBlob()) + img, err = mutate.AppendLayers(img, sl) + if err != nil { + t.Fatalf("mutate.AppendLayer(#%d): %v", i, err) + } + } + + expectedRepo := "write/time" + headPathPrefix := fmt.Sprintf("/v2/%s/blobs/", expectedRepo) + initiatePath := fmt.Sprintf("/v2/%s/blobs/uploads/", expectedRepo) + manifestPath := fmt.Sprintf("/v2/%s/manifests/latest", expectedRepo) + uploadPath := "/upload" + commitPath := "/commit" + numUploads := 0 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodHead && strings.HasPrefix(r.URL.Path, headPathPrefix) && r.URL.Path != initiatePath { + http.Error(w, "NotFound", http.StatusNotFound) + return + } + switch r.URL.Path { + case "/v2/": + w.WriteHeader(http.StatusOK) + case initiatePath: + if r.Method != http.MethodPost { + t.Errorf("Method; got %v, want %v", r.Method, http.MethodPost) + } + w.Header().Set("Location", uploadPath) + http.Error(w, "Accepted", http.StatusAccepted) + case uploadPath: + if r.Method != http.MethodPatch { + t.Errorf("Method; got %v, want %v", r.Method, http.MethodPatch) + } + numUploads++ + w.Header().Set("Location", commitPath) + http.Error(w, "Created", http.StatusCreated) + case commitPath: + http.Error(w, "Created", http.StatusCreated) + case manifestPath: + if r.Method != http.MethodPut { + t.Errorf("Method; got %v, want %v", r.Method, http.MethodPut) + } + http.Error(w, "Created", http.StatusCreated) + default: + t.Fatalf("Unexpected path: %v", r.URL.Path) + } + })) + defer server.Close() + u, err := url.Parse(server.URL) + if err != nil { + t.Fatalf("url.Parse(%v) = %v", server.URL, err) + } + tag, err := name.NewTag(fmt.Sprintf("%s/%s:latest", u.Host, expectedRepo), name.WeakValidation) + if err != nil { + t.Fatalf("NewTag() = %v", err) + } + + if err := Write(tag, img, authn.Anonymous, http.DefaultTransport); err != nil { + t.Errorf("Write: %v", err) + } + + // 3 random layers, 1 tarball layer (deduped), 3 stream layers (not deduped), 1 image config blob + wantUploads := 3 + 1 + 3 + 1 + if numUploads != wantUploads { + t.Fatalf("Write uploaded %d blobs, want %d", numUploads, wantUploads) + } +} + func TestStreamBlob(t *testing.T) { img := setupImage(t) expectedPath := "/vWhatever/I/decide" From 2c5d5a3220d93a1286318f340109d611b7480ddf Mon Sep 17 00:00:00 2001 From: Jason Hall Date: Wed, 6 Mar 2019 09:08:43 -0500 Subject: [PATCH 3/4] fix races --- pkg/v1/remote/write.go | 6 +++++- pkg/v1/remote/write_test.go | 7 ++++--- pkg/v1/stream/layer.go | 11 +++++++++++ 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/pkg/v1/remote/write.go b/pkg/v1/remote/write.go index ae6147763..167cd3fac 100644 --- a/pkg/v1/remote/write.go +++ b/pkg/v1/remote/write.go @@ -63,7 +63,11 @@ func Write(ref name.Reference, img v1.Image, auth authn.Authenticator, t http.Ro seen := map[v1.Hash]struct{}{} for _, l := range ls { l := l - if h, err := l.Digest(); err == nil { + if _, ok := l.(*stream.Layer); !ok { + h, err := l.Digest() + if err != nil { + return err + } // If we can determine the layer's digest ahead of // time, use it to dedupe uploads. if _, found := seen[h]; found { diff --git a/pkg/v1/remote/write_test.go b/pkg/v1/remote/write_test.go index 07b5b3752..7442a999f 100644 --- a/pkg/v1/remote/write_test.go +++ b/pkg/v1/remote/write_test.go @@ -26,6 +26,7 @@ import ( "net/http/httptest" "net/url" "strings" + "sync/atomic" "testing" "github.com/google/go-cmp/cmp" @@ -456,7 +457,7 @@ func TestDedupeLayers(t *testing.T) { manifestPath := fmt.Sprintf("/v2/%s/manifests/latest", expectedRepo) uploadPath := "/upload" commitPath := "/commit" - numUploads := 0 + var numUploads int32 = 0 server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.Method == http.MethodHead && strings.HasPrefix(r.URL.Path, headPathPrefix) && r.URL.Path != initiatePath { http.Error(w, "NotFound", http.StatusNotFound) @@ -475,7 +476,7 @@ func TestDedupeLayers(t *testing.T) { if r.Method != http.MethodPatch { t.Errorf("Method; got %v, want %v", r.Method, http.MethodPatch) } - numUploads++ + atomic.AddInt32(&numUploads, 1) w.Header().Set("Location", commitPath) http.Error(w, "Created", http.StatusCreated) case commitPath: @@ -504,7 +505,7 @@ func TestDedupeLayers(t *testing.T) { } // 3 random layers, 1 tarball layer (deduped), 3 stream layers (not deduped), 1 image config blob - wantUploads := 3 + 1 + 3 + 1 + wantUploads := int32(3 + 1 + 3 + 1) if numUploads != wantUploads { t.Fatalf("Write uploaded %d blobs, want %d", numUploads, wantUploads) } diff --git a/pkg/v1/stream/layer.go b/pkg/v1/stream/layer.go index bbad99aef..f8895a226 100644 --- a/pkg/v1/stream/layer.go +++ b/pkg/v1/stream/layer.go @@ -21,6 +21,7 @@ import ( "errors" "hash" "io" + "sync" v1 "github.com/google/go-containerregistry/pkg/v1" ) @@ -40,6 +41,7 @@ type Layer struct { blob io.ReadCloser consumed bool + mu sync.Mutex digest, diffID *v1.Hash size int64 } @@ -51,6 +53,8 @@ func NewLayer(rc io.ReadCloser) *Layer { return &Layer{blob: rc} } // Digest implements v1.Layer. func (l *Layer) Digest() (v1.Hash, error) { + l.mu.Lock() + defer l.mu.Unlock() if l.digest == nil { return v1.Hash{}, ErrNotComputed } @@ -59,6 +63,8 @@ func (l *Layer) Digest() (v1.Hash, error) { // DiffID implements v1.Layer. func (l *Layer) DiffID() (v1.Hash, error) { + l.mu.Lock() + defer l.mu.Unlock() if l.diffID == nil { return v1.Hash{}, ErrNotComputed } @@ -67,6 +73,8 @@ func (l *Layer) DiffID() (v1.Hash, error) { // Size implements v1.Layer. func (l *Layer) Size() (int64, error) { + l.mu.Lock() + defer l.mu.Unlock() if l.size == 0 { return 0, ErrNotComputed } @@ -136,6 +144,9 @@ func newCompressedReader(l *Layer) (*compressedReader, error) { func (cr *compressedReader) Read(b []byte) (int, error) { return cr.pr.Read(b) } func (cr *compressedReader) Close() error { + cr.l.mu.Lock() + defer cr.l.mu.Unlock() + // Close the inner ReadCloser. if err := cr.closer.Close(); err != nil { return err From 0d3185af7d91d7a732cc6a4d438d5a18c64981e9 Mon Sep 17 00:00:00 2001 From: Jason Hall Date: Wed, 6 Mar 2019 11:42:43 -0500 Subject: [PATCH 4/4] review comments --- pkg/v1/remote/write.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/v1/remote/write.go b/pkg/v1/remote/write.go index 167cd3fac..6e2c38a34 100644 --- a/pkg/v1/remote/write.go +++ b/pkg/v1/remote/write.go @@ -60,7 +60,7 @@ func Write(ref name.Reference, img v1.Image, auth authn.Authenticator, t http.Ro // If we can dedupe by the layer digest, try to do so. If the layer is // a stream.Layer, we can't dedupe and might re-upload. var g errgroup.Group - seen := map[v1.Hash]struct{}{} + uploaded := map[v1.Hash]bool{} for _, l := range ls { l := l if _, ok := l.(*stream.Layer); !ok { @@ -70,10 +70,10 @@ func Write(ref name.Reference, img v1.Image, auth authn.Authenticator, t http.Ro } // If we can determine the layer's digest ahead of // time, use it to dedupe uploads. - if _, found := seen[h]; found { + if uploaded[h] { continue // Already uploading. } - seen[h] = struct{}{} + uploaded[h] = true } g.Go(func() error {