Skip to content

Commit

Permalink
Add compressed writes to cas.go.
Browse files Browse the repository at this point in the history
This follows the current tentative API being worked on in
bazelbuild/remote-apis#168. While there's technically room for it to
change, it has reached a somewhat stable point worth implementing.
  • Loading branch information
rubensf committed Nov 9, 2020
1 parent 0371e47 commit 13788e3
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 53 deletions.
4 changes: 4 additions & 0 deletions go/pkg/client/bytestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ func (c *Client) WriteChunked(ctx context.Context, name string, ch *chunker.Chun
return err
}
if chunk.Offset == 0 {
// Notice that the digest in the chunker might be misleading.
// Specifically, for compressed blob uploads, the resource
// name should include the uncompressed digest - while chunker
// should be including the compressed digest.
req.ResourceName = name
}
req.WriteOffset = chunk.Offset
Expand Down
39 changes: 36 additions & 3 deletions go/pkg/client/cas.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import (
log "github.com/golang/glog"
)

// DefaultCompressedBytestreamThreshold is the default threshold for transferring blobs compressed on ByteStream.Write RPCs.
const DefaultCompressedBytestreamThreshold = 1024

// UploadIfMissing stores a number of uploadable items.
// It first queries the CAS to see which items are missing and only uploads those that are.
// Returns a slice of the missing digests.
Expand Down Expand Up @@ -87,8 +90,12 @@ func (c *Client) UploadIfMissing(ctx context.Context, data ...*chunker.Chunker)
} else {
log.V(3).Infof("Uploading single blob with digest %s", batch[0])
ch := chunkers[batch[0]]
dg := ch.Digest()
if err := c.WriteChunked(eCtx, c.ResourceNameWrite(dg.Hash, dg.Size), ch); err != nil {
var rscName string
var err error
if rscName, err = c.maybeCompressBlob(ch); err != nil {
return err
}
if err = c.WriteChunked(eCtx, rscName, ch); err != nil {
return err
}
}
Expand Down Expand Up @@ -135,7 +142,26 @@ func (c *Client) WriteProto(ctx context.Context, msg proto.Message) (digest.Dige
func (c *Client) WriteBlob(ctx context.Context, blob []byte) (digest.Digest, error) {
ch := chunker.NewFromBlob(blob, int(c.ChunkMaxSize))
dg := ch.Digest()
return dg, c.WriteChunked(ctx, c.ResourceNameWrite(dg.Hash, dg.Size), ch)

name, err := c.maybeCompressBlob(ch)
if err != nil {
return dg, err
}

return dg, c.WriteChunked(ctx, name, ch)
}

// maybeCompressBlob will, depending on the client configuration, set the blobs to be
// read compressed. It returns the appropriate resource name.
func (c *Client) maybeCompressBlob(ch *chunker.Chunker) (string, error) {
dg := ch.Digest()
if c.CompressedBytestreamThreshold < 0 || int64(c.CompressedBytestreamThreshold) > ch.Digest().Size {
return c.ResourceNameWrite(dg.Hash, dg.Size), nil
}
if err := chunker.CompressChunker(ch); err != nil {
return "", err
}
return c.ResourceNameCompressedWrite(dg.Hash, dg.Size), nil
}

// BatchWriteBlobs uploads a number of blobs to the CAS. They must collectively be below the
Expand Down Expand Up @@ -514,6 +540,13 @@ func (c *Client) ResourceNameWrite(hash string, sizeBytes int64) string {
return fmt.Sprintf("%s/uploads/%s/blobs/%s/%d", c.InstanceName, uuid.New(), hash, sizeBytes)
}

// ResourceNameCompressedWrite generates a valid write resource name.
// TODO(rubensf): Converge compressor to proto in https://github.com/bazelbuild/remote-apis/pull/168 once
// that gets merged in.
func (c *Client) ResourceNameCompressedWrite(hash string, sizeBytes int64) string {
return fmt.Sprintf("%s/uploads/%s/compressed-blobs/zstd/%s/%d", c.InstanceName, uuid.New(), hash, sizeBytes)
}

// GetDirectoryTree returns the entire directory tree rooted at the given digest (which must target
// a Directory stored in the CAS).
func (c *Client) GetDirectoryTree(ctx context.Context, d *repb.Digest) (result []*repb.Directory, err error) {
Expand Down
29 changes: 19 additions & 10 deletions go/pkg/client/cas_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func TestWrite(t *testing.T) {
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
testFunc := func(t *testing.T) {
gotDg, err := c.WriteBlob(ctx, tc.blob)
if err != nil {
t.Errorf("c.WriteBlob(ctx, blob) gave error %s, wanted nil", err)
Expand All @@ -299,7 +299,13 @@ func TestWrite(t *testing.T) {
if dg != gotDg {
t.Errorf("c.WriteBlob(ctx, blob) had diff on digest returned (want %s, got %s)", dg, gotDg)
}
})
}

// Harder to write in a for loop since it -1/0 isn't an intuitive "enabled/disabled"
c.CompressedBytestreamThreshold = -1
t.Run(tc.name+" - no compression", testFunc)
c.CompressedBytestreamThreshold = 0
t.Run(tc.name+" - with compression", testFunc)
}
}

Expand Down Expand Up @@ -442,7 +448,7 @@ func TestUpload(t *testing.T) {
t.Run(fmt.Sprintf("UsingBatch:%t", ub), func(t *testing.T) {
ub.Apply(c)
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
testFunc := func(t *testing.T) {
fake.Clear()
if tc.concurrency > 0 {
tc.concurrency.Apply(c)
Expand All @@ -467,12 +473,9 @@ func TestUpload(t *testing.T) {
for _, dg := range missing {
missingSet[dg] = struct{}{}
}
for _, ch := range input {
for i, ch := range input {
dg := ch.Digest()
blob, err := ch.FullData()
if err != nil {
t.Errorf("ch.FullData() returned an error: %v", err)
}
blob := tc.input[i]
if present[dg] {
if fake.BlobWrites(dg) > 0 {
t.Errorf("blob %v with digest %s was uploaded even though it was already present in the CAS", blob, dg)
Expand All @@ -485,7 +488,7 @@ func TestUpload(t *testing.T) {
if gotBlob, ok := fake.Get(dg); !ok {
t.Errorf("blob %v with digest %s was not uploaded, expected it to be present in the CAS", blob, dg)
} else if !bytes.Equal(blob, gotBlob) {
t.Errorf("blob digest %s had diff on uploaded blob: want %v, got %v", dg, blob, gotBlob)
t.Errorf("blob digest %s had diff on uploaded blob: want %s, got %s", dg, blob, gotBlob)
}
if _, ok := missingSet[dg]; !ok {
t.Errorf("Stats said that blob %v with digest %s was present in the CAS", blob, dg)
Expand All @@ -494,7 +497,13 @@ func TestUpload(t *testing.T) {
if fake.MaxConcurrency() > defaultCASConcurrency {
t.Errorf("CAS concurrency %v was higher than max %v", fake.MaxConcurrency(), defaultCASConcurrency)
}
})
}

// Harder to write in a for loop since it -1/0 isn't an intuitive "enabled/disabled"
c.CompressedBytestreamThreshold = -1
t.Run(tc.name+" - no compression", testFunc)
c.CompressedBytestreamThreshold = 0
t.Run(tc.name+" - with compression", testFunc)
}
})
}
Expand Down
53 changes: 33 additions & 20 deletions go/pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ type Client struct {
StartupCapabilities StartupCapabilities
// ChunkMaxSize is maximum chunk size to use for CAS uploads/downloads.
ChunkMaxSize ChunkMaxSize
// CompressedBytestreamThreshold is the threshold in bytes for which blobs are read and written
// compressed. Use 0 for all writes being compressed, and a negative number for all writes being
// uncompressed. TODO(rubensf): Make sure this will throw an error if the server doesn't support compression,
// pending https://github.com/bazelbuild/remote-apis/pull/168 being submitted.
CompressedBytestreamThreshold CompressedBytestreamThreshold
// MaxBatchDigests is maximum amount of digests to batch in batched operations.
MaxBatchDigests MaxBatchDigests
// MaxBatchSize is maximum size in bytes of a batch request for batch operations.
Expand Down Expand Up @@ -136,6 +141,13 @@ func (s ChunkMaxSize) Apply(c *Client) {
c.ChunkMaxSize = s
}

type CompressedBytestreamThreshold int64

// Apply sets the client's maximal chunk size s.
func (s CompressedBytestreamThreshold) Apply(c *Client) {
c.CompressedBytestreamThreshold = s
}

// UtilizeLocality is to specify whether client downloads files utilizing disk access locality.
type UtilizeLocality bool

Expand Down Expand Up @@ -427,26 +439,27 @@ func NewClient(ctx context.Context, instanceName string, params DialParams, opts
return nil, err
}
client := &Client{
InstanceName: instanceName,
actionCache: regrpc.NewActionCacheClient(casConn),
byteStream: bsgrpc.NewByteStreamClient(casConn),
cas: regrpc.NewContentAddressableStorageClient(casConn),
execution: regrpc.NewExecutionClient(conn),
operations: opgrpc.NewOperationsClient(conn),
rpcTimeouts: DefaultRPCTimeouts,
Connection: conn,
CASConnection: casConn,
ChunkMaxSize: chunker.DefaultChunkSize,
MaxBatchDigests: DefaultMaxBatchDigests,
MaxBatchSize: DefaultMaxBatchSize,
DirMode: DefaultDirMode,
ExecutableMode: DefaultExecutableMode,
RegularMode: DefaultRegularMode,
useBatchOps: true,
StartupCapabilities: true,
casUploaders: make(chan bool, DefaultCASConcurrency),
casDownloaders: make(chan bool, DefaultCASConcurrency),
Retrier: RetryTransient(),
InstanceName: instanceName,
actionCache: regrpc.NewActionCacheClient(casConn),
byteStream: bsgrpc.NewByteStreamClient(casConn),
cas: regrpc.NewContentAddressableStorageClient(casConn),
execution: regrpc.NewExecutionClient(conn),
operations: opgrpc.NewOperationsClient(conn),
rpcTimeouts: DefaultRPCTimeouts,
Connection: conn,
CASConnection: casConn,
ChunkMaxSize: chunker.DefaultChunkSize,
CompressedBytestreamThreshold: DefaultCompressedBytestreamThreshold,
MaxBatchDigests: DefaultMaxBatchDigests,
MaxBatchSize: DefaultMaxBatchSize,
DirMode: DefaultDirMode,
ExecutableMode: DefaultExecutableMode,
RegularMode: DefaultRegularMode,
useBatchOps: true,
StartupCapabilities: true,
casUploaders: make(chan bool, DefaultCASConcurrency),
casDownloaders: make(chan bool, DefaultCASConcurrency),
Retrier: RetryTransient(),
}
for _, o := range opts {
o.Apply(client)
Expand Down
1 change: 1 addition & 0 deletions go/pkg/fakes/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
"@com_github_golang_glog//:go_default_library",
"@com_github_golang_protobuf//proto:go_default_library",
"@com_github_golang_protobuf//ptypes:go_default_library_gen",
"@com_github_klauspost_compress//zstd:go_default_library",
"@com_github_pborman_uuid//:go_default_library",
"@go_googleapis//google/bytestream:bytestream_go_proto",
"@go_googleapis//google/longrunning:longrunning_go_proto",
Expand Down
89 changes: 70 additions & 19 deletions go/pkg/fakes/cas.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/bazelbuild/remote-apis-sdks/go/pkg/client"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/digest"
"github.com/golang/protobuf/proto"
"github.com/klauspost/compress/zstd"
"github.com/pborman/uuid"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -127,19 +128,29 @@ func (f *Writer) Write(stream bsgrpc.ByteStream_WriteServer) (err error) {
}

path := strings.Split(req.ResourceName, "/")
if len(path) != 6 || path[0] != "instance" || path[1] != "uploads" || path[3] != "blobs" {
return status.Error(codes.InvalidArgument, "test fake expected resource name of the form \"instance/uploads/<uuid>/blobs/<hash>/<size>\"")
if (len(path) != 6 && len(path) != 7) || path[0] != "instance" || path[1] != "uploads" || (path[3] != "blobs" && path[3] != "compressed-blobs") {
return status.Error(codes.InvalidArgument, "test fake expected resource name of the form \"instance/uploads/<uuid>/blobs|compressed-blobs/<compressor?>/<hash>/<size>\"")
}
// indexOffset for all 4+ paths - `compressed-blobs` paths have one more element.
indexOffset := 0
if path[3] == "compressed-blobs" {
indexOffset = 1
// TODO(rubensf): Change this to all the possible compressors in https://github.com/bazelbuild/remote-apis/pull/168.
if path[4] != "zstd" {
return status.Error(codes.InvalidArgument, "test fake expected valid compressor, eg zstd")
}
}
size, err := strconv.ParseInt(path[5], 10, 64)

size, err := strconv.ParseInt(path[5+indexOffset], 10, 64)
if err != nil {
return status.Error(codes.InvalidArgument, "test fake expected resource name of the form \"instance/uploads/<uuid>/blobs/<hash>/<size>\"")
return status.Error(codes.InvalidArgument, "test fake expected resource name of the form \"instance/uploads/<uuid>/blobs|compressed-blobs/<compressor?>/<hash>/<size>\"")
}
dg, e := digest.New(path[4], size)
dg, e := digest.New(path[4+indexOffset], size)
if e != nil {
return status.Error(codes.InvalidArgument, "test fake expected valid digest as part of resource name of the form \"instance/uploads/<uuid>/blobs/<hash>/<size>\"")
return status.Error(codes.InvalidArgument, "test fake expected valid digest as part of resource name of the form \"instance/uploads/<uuid>/blobs|compressed-blobs/<compressor?>/<hash>/<size>\"")
}
if uuid.Parse(path[2]) == nil {
return status.Error(codes.InvalidArgument, "test fake expected resource name of the form \"instance/uploads/<uuid>/blobs/<hash>/<size>\"")
return status.Error(codes.InvalidArgument, "test fake expected resource name of the form \"instance/uploads/<uuid>/blobs|compressed-blobs/<compressor?>/<hash>/<size>\"")
}

res := req.ResourceName
Expand Down Expand Up @@ -179,7 +190,22 @@ func (f *Writer) Write(stream bsgrpc.ByteStream_WriteServer) (err error) {
return status.Errorf(codes.InvalidArgument, "reached end of stream before the client finished writing")
}

f.Buf = buf.Bytes()
if path[3] == "compressed-blobs" {
if path[4] != "zstd" {
return status.Errorf(codes.InvalidArgument, "%s compressor isn't supported", path[4])
}
decoder, err := zstd.NewReader(nil)
if err != nil {
return status.Errorf(codes.Internal, "failed to initialize internal decoder: %v", err)
}
f.Buf, err = decoder.DecodeAll(buf.Bytes(), nil)
if err != nil {
return status.Errorf(codes.InvalidArgument, "served bytes can't be decompressed: %v", err)
}
} else {
f.Buf = buf.Bytes()
}

cDg := digest.NewFromBlob(f.Buf)
if dg != cDg {
return status.Errorf(codes.InvalidArgument, "mismatched digest: received %s, computed %s", dg, cDg)
Expand Down Expand Up @@ -210,13 +236,17 @@ type CAS struct {
writeReqs int
concReqs int
maxConcReqs int
decoder *zstd.Decoder
}

// NewCAS returns a new empty fake CAS.
func NewCAS() *CAS {
func NewCAS() (*CAS, error) {
c := &CAS{BatchSize: client.DefaultMaxBatchSize}
c.Clear()
return c

var err error
c.decoder, err = zstd.NewReader(nil)
return c, err
}

// Clear removes all results from the cache.
Expand Down Expand Up @@ -470,19 +500,28 @@ func (f *CAS) Write(stream bsgrpc.ByteStream_WriteServer) (err error) {
}

path := strings.Split(req.ResourceName, "/")
if len(path) != 6 || path[0] != "instance" || path[1] != "uploads" || path[3] != "blobs" {
return status.Error(codes.InvalidArgument, "test fake expected resource name of the form \"instance/uploads/<uuid>/blobs/<hash>/<size>\"")
if (len(path) != 6 && len(path) != 7) || path[0] != "instance" || path[1] != "uploads" || (path[3] != "blobs" && path[3] != "compressed-blobs") {
return status.Error(codes.InvalidArgument, "test fake expected resource name of the form \"instance/uploads/<uuid>/blobs|compressed-blobs/<compressor?>/<hash>/<size>\"")
}
// indexOffset for all 4+ paths - `compressed-blobs` paths have one more element.
indexOffset := 0
if path[3] == "compressed-blobs" {
indexOffset = 1
// TODO(rubensf): Change this to all the possible compressors in https://github.com/bazelbuild/remote-apis/pull/168.
if path[4] != "zstd" {
return status.Error(codes.InvalidArgument, "test fake expected valid compressor, eg zstd")
}
}
size, err := strconv.ParseInt(path[5], 10, 64)
size, err := strconv.ParseInt(path[5+indexOffset], 10, 64)
if err != nil {
return status.Error(codes.InvalidArgument, "test fake expected resource name of the form \"instance/uploads/<uuid>/blobs/<hash>/<size>\"")
return status.Error(codes.InvalidArgument, "test fake expected resource name of the form \"instance/uploads/<uuid>/blobs|compressed-blobs/<compressor?>/<hash>/<size>\"")
}
dg, err := digest.New(path[4], size)
dg, err := digest.New(path[4+indexOffset], size)
if err != nil {
return status.Error(codes.InvalidArgument, "test fake expected a valid digest as part of the resource name: \"instance/uploads/<uuid>/blobs/<hash>/<size>\"")
return status.Error(codes.InvalidArgument, "test fake expected a valid digest as part of the resource name: \"instance/uploads/<uuid>/blobs|compressed-blobs/<compressor?>/<hash>/<size>\"")
}
if uuid.Parse(path[2]) == nil {
return status.Error(codes.InvalidArgument, "test fake expected resource name of the form \"instance/uploads/<uuid>/blobs/<hash>/<size>\"")
return status.Error(codes.InvalidArgument, "test fake expected resource name of the form \"instance/uploads/<uuid>/blobs|compressed-blobs/<compressor?>/<hash>/<size>\"")
}

res := req.ResourceName
Expand Down Expand Up @@ -522,11 +561,23 @@ func (f *CAS) Write(stream bsgrpc.ByteStream_WriteServer) (err error) {
return status.Errorf(codes.InvalidArgument, "reached end of stream before the client finished writing")
}

uncompressedBuf := buf.Bytes()
if path[3] == "compressed-blobs" {
if path[4] != "zstd" {
return status.Errorf(codes.InvalidArgument, "%s compressor isn't supported", path[4])
}
var err error
uncompressedBuf, err = f.decoder.DecodeAll(buf.Bytes(), nil)
if err != nil {
return status.Errorf(codes.InvalidArgument, "served bytes can't be decompressed: %v", err)
}
}

f.mu.Lock()
f.blobs[dg] = buf.Bytes()
f.blobs[dg] = uncompressedBuf
f.writes[dg]++
f.mu.Unlock()
cDg := digest.NewFromBlob(buf.Bytes())
cDg := digest.NewFromBlob(uncompressedBuf)
if dg != cDg {
return status.Errorf(codes.InvalidArgument, "mismatched digest: received %s, computed %s", dg, cDg)
}
Expand Down
5 changes: 4 additions & 1 deletion go/pkg/fakes/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ type Server struct {

// NewServer creates a server that is ready to accept requests.
func NewServer(t *testing.T) (s *Server, err error) {
cas := NewCAS()
cas, err := NewCAS()
if err != nil {
return nil, err
}
ac := NewActionCache()
s = &Server{Exec: NewExec(t, ac, cas), CAS: cas, ActionCache: ac}
s.listener, err = net.Listen("tcp", ":0")
Expand Down

0 comments on commit 13788e3

Please sign in to comment.