diff --git a/.bazelversion b/.bazelversion new file mode 100644 index 00000000..09b254e9 --- /dev/null +++ b/.bazelversion @@ -0,0 +1 @@ +6.0.0 diff --git a/pkg/blobstore/BUILD.bazel b/pkg/blobstore/BUILD.bazel index de205475..c02ee2c7 100644 --- a/pkg/blobstore/BUILD.bazel +++ b/pkg/blobstore/BUILD.bazel @@ -23,6 +23,7 @@ go_library( "read_canarying_blob_access.go", "redis_blob_access.go", "reference_expanding_blob_access.go", + "routing_blob_access.go", "size_distinguishing_blob_access.go", "validation_caching_read_buffer_factory.go", "zip_reading_blob_access.go", diff --git a/pkg/blobstore/configuration/new_blob_access.go b/pkg/blobstore/configuration/new_blob_access.go index d3befa9d..cf4b1d46 100644 --- a/pkg/blobstore/configuration/new_blob_access.go +++ b/pkg/blobstore/configuration/new_blob_access.go @@ -3,6 +3,7 @@ package configuration import ( "archive/zip" "context" + "fmt" "net/http" "os" "sync" @@ -643,6 +644,29 @@ func (nc *simpleNestedBlobAccessCreator) newNestedBlobAccessBare(configuration * BlobAccess: blobAccess, DigestKeyFormat: digestKeyFormat, }, "zip_writing", nil + + case *pb.BlobAccessConfiguration_Routing: + config := backend.Routing + nested, err := nc.NewNestedBlobAccess(config.Backend, creator) + if err != nil { + return BlobAccessInfo{}, "", err + } + + var router blobstore.Router + switch routerConfig := config.Router.(type) { + case *pb.RoutingBlobAccessConfiguration_InstanceNameReplacing_: + router, err = blobstore.NewInstanceNameReplacing(routerConfig.InstanceNameReplacing.GetInstanceName()) + if err != nil { + return BlobAccessInfo{}, "", err + } + default: + return BlobAccessInfo{}, "", fmt.Errorf("unimplemented RoutingBlobAccess Router: %T", routerConfig) + } + + return BlobAccessInfo{ + BlobAccess: blobstore.NewRoutingBlobAccess(nested.BlobAccess, router), + DigestKeyFormat: nested.DigestKeyFormat, + }, "routing", nil } return creator.NewCustomBlobAccess(configuration, nc) } diff --git a/pkg/blobstore/routing_blob_access.go b/pkg/blobstore/routing_blob_access.go new file mode 100644 index 00000000..67c5cd4d --- /dev/null +++ b/pkg/blobstore/routing_blob_access.go @@ -0,0 +1,112 @@ +package blobstore + +import ( + "context" + "fmt" + + remoteexecution "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2" + "github.com/buildbarn/bb-storage/pkg/blobstore/buffer" + "github.com/buildbarn/bb-storage/pkg/blobstore/slicing" + "github.com/buildbarn/bb-storage/pkg/digest" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type Router interface { + Route(digest.InstanceName) digest.InstanceName +} + +func newRouter(config interface{}) Router { + switch config.(type) { + } + return &instanceNameReplacing{newInstanceName: digest.MustNewInstanceName("foo")} +} + +type instanceNameReplacing struct { + newInstanceName digest.InstanceName +} + +func NewInstanceNameReplacing(newName string) (Router, error) { + newInstanceName, err := digest.NewInstanceName(newName) + if err != nil { + return nil, fmt.Errorf("failed to create InstanceNameReplacing router with name %q: %w", newName, err) + } + return &instanceNameReplacing{ + newInstanceName: newInstanceName, + }, nil +} + +func (r *instanceNameReplacing) Route(in digest.InstanceName) digest.InstanceName { + return r.newInstanceName +} + +type routingBlobAccess struct { + BlobAccess + router Router +} + +func NewRoutingBlobAccess(blobAccess BlobAccess, router Router) BlobAccess { + return &routingBlobAccess{ + BlobAccess: blobAccess, + router: router, + } +} + +func (ba *routingBlobAccess) Get(ctx context.Context, digest digest.Digest) buffer.Buffer { + return ba.BlobAccess.Get(ctx, reroutedDigest(digest, ba.router)) +} + +func (ba *routingBlobAccess) GetFromComposite(ctx context.Context, parentDigest, childDigest digest.Digest, slicer slicing.BlobSlicer) buffer.Buffer { + // TODO: Should both digests be rerouted, or just one? + return buffer.NewBufferFromError(status.Error(codes.Unimplemented, "GetFromComposite not implemented for RoutingBlobAccess")) +} + +func (ba *routingBlobAccess) Put(ctx context.Context, digest digest.Digest, b buffer.Buffer) error { + return ba.BlobAccess.Put(ctx, reroutedDigest(digest, ba.router), b) +} + +func (ba *routingBlobAccess) FindMissing(ctx context.Context, digests digest.Set) (digest.Set, error) { + // Make a set with all instance names rerouted + // Also make a map of underlying digest -> original digest(s), so that if an + // underlying digest is missing, we can return the corresponding original + // digests as also missing. + reroutedBuilder := digest.NewSetBuilder() + underlyingToOriginalMap := map[string][]digest.Digest{} + for _, d := range digests.Items() { + newDigest := reroutedDigest(d, ba.router) + reroutedBuilder.Add(newDigest) + underlyingToOriginalMap[newDigest.GetKey(digest.KeyWithInstance)] = append( + underlyingToOriginalMap[newDigest.GetKey(digest.KeyWithInstance)], + d, + ) + } + rerouted := reroutedBuilder.Build() + + // Find missing from underlying backend + underlyingMissing, err := ba.BlobAccess.FindMissing(ctx, rerouted) + if err != nil { + return digest.EmptySet, err + } + + if underlyingMissing.Length() == 0 { + return underlyingMissing, nil + } + + originalMissing := digest.NewSetBuilder() + for _, missing := range underlyingMissing.Items() { + for _, originalDigest := range underlyingToOriginalMap[missing.GetKey(digest.KeyWithInstance)] { + originalMissing.Add(originalDigest) + } + } + return originalMissing.Build(), nil +} + +func (ba *routingBlobAccess) GetCapabilities(ctx context.Context, instanceName digest.InstanceName) (*remoteexecution.ServerCapabilities, error) { + return ba.BlobAccess.GetCapabilities(ctx, ba.router.Route(instanceName)) +} + +func reroutedDigest(d digest.Digest, router Router) digest.Digest { + oldName := d.GetDigestFunction().GetInstanceName() + return digest.NewInstanceNamePatcher(oldName, router.Route(oldName)).PatchDigest(d) +} diff --git a/pkg/proto/configuration/blobstore/blobstore.proto b/pkg/proto/configuration/blobstore/blobstore.proto index 181bbb6a..c7040974 100644 --- a/pkg/proto/configuration/blobstore/blobstore.proto +++ b/pkg/proto/configuration/blobstore/blobstore.proto @@ -206,6 +206,12 @@ message BlobAccessConfiguration { // Refer to a BlobAccess object declared through 'with_labels'. string label = 27; + + // Route accesses to alternative instance names. + // + // Prefixing names is possible using DemultiplexingBlobAccessConfiguration; + // this backend is useful for rewriting the instance name entirely. + RoutingBlobAccessConfiguration routing = 28; } // Was 'circular' (CircularBlobAccess). This backend has been replaced @@ -981,3 +987,32 @@ message WithLabelsBlobAccessConfiguration { // A map of string labels to backends that can be referenced. map labels = 2; } + +message RoutingBlobAccessConfiguration { + message InstanceNameReplacing { + // Instance name that blobs in requests should be changed to. + // + // Blobs in responses will have the instance name set to the original + // incoming instance name, before it was rewritten. + string instance_name = 1; + } + + // The backend to which requests are forwarded. + BlobAccessConfiguration backend = 1; + + oneof router { + // Rewrite the instance name wholesale. + // + // All accesses for blobs for a given instance name will have the instance + // name rewritten before the access, and any references returned will have + // the instance name rewritten back to that of the outer context. + // + // For instance: a RoutingBlobAccess with InstanceNameReplacing set to `baz` + // might have an incoming request referencing a blob in instance name + // `foo/bar`, in which case it will: + // * rewrite instance names on blobs in the request to `baz` + // * perform the access on `backend` + // * rewrite instance names on blobs in the response back to `foo/bar` + InstanceNameReplacing instance_name_replacing = 2; + } +}