Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: RoutingBlobAccess #196

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .bazelversion
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
6.0.0
1 change: 1 addition & 0 deletions pkg/blobstore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ go_library(
"read_canarying_blob_access.go",
"redis_blob_access.go",
"reference_expanding_blob_access.go",
"routing_blob_access.go",
"size_distinguishing_blob_access.go",
"validation_caching_read_buffer_factory.go",
"zip_reading_blob_access.go",
Expand Down
24 changes: 24 additions & 0 deletions pkg/blobstore/configuration/new_blob_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package configuration
import (
"archive/zip"
"context"
"fmt"
"net/http"
"os"
"sync"
Expand Down Expand Up @@ -643,6 +644,29 @@ func (nc *simpleNestedBlobAccessCreator) newNestedBlobAccessBare(configuration *
BlobAccess: blobAccess,
DigestKeyFormat: digestKeyFormat,
}, "zip_writing", nil

case *pb.BlobAccessConfiguration_Routing:
config := backend.Routing
nested, err := nc.NewNestedBlobAccess(config.Backend, creator)
if err != nil {
return BlobAccessInfo{}, "", err
}

var router blobstore.Router
switch routerConfig := config.Router.(type) {
case *pb.RoutingBlobAccessConfiguration_InstanceNameReplacing_:
router, err = blobstore.NewInstanceNameReplacing(routerConfig.InstanceNameReplacing.GetInstanceName())
if err != nil {
return BlobAccessInfo{}, "", err
}
default:
return BlobAccessInfo{}, "", fmt.Errorf("unimplemented RoutingBlobAccess Router: %T", routerConfig)
}

return BlobAccessInfo{
BlobAccess: blobstore.NewRoutingBlobAccess(nested.BlobAccess, router),
DigestKeyFormat: nested.DigestKeyFormat,
}, "routing", nil
}
return creator.NewCustomBlobAccess(configuration, nc)
}
Expand Down
112 changes: 112 additions & 0 deletions pkg/blobstore/routing_blob_access.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package blobstore

import (
"context"
"fmt"

remoteexecution "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
"github.com/buildbarn/bb-storage/pkg/blobstore/buffer"
"github.com/buildbarn/bb-storage/pkg/blobstore/slicing"
"github.com/buildbarn/bb-storage/pkg/digest"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type Router interface {
Route(digest.InstanceName) digest.InstanceName
}

func newRouter(config interface{}) Router {
switch config.(type) {
}
return &instanceNameReplacing{newInstanceName: digest.MustNewInstanceName("foo")}
}

type instanceNameReplacing struct {
newInstanceName digest.InstanceName
}

func NewInstanceNameReplacing(newName string) (Router, error) {
newInstanceName, err := digest.NewInstanceName(newName)
if err != nil {
return nil, fmt.Errorf("failed to create InstanceNameReplacing router with name %q: %w", newName, err)
}
return &instanceNameReplacing{
newInstanceName: newInstanceName,
}, nil
}

func (r *instanceNameReplacing) Route(in digest.InstanceName) digest.InstanceName {
return r.newInstanceName
}

type routingBlobAccess struct {
BlobAccess
router Router
}

func NewRoutingBlobAccess(blobAccess BlobAccess, router Router) BlobAccess {
return &routingBlobAccess{
BlobAccess: blobAccess,
router: router,
}
}

func (ba *routingBlobAccess) Get(ctx context.Context, digest digest.Digest) buffer.Buffer {
return ba.BlobAccess.Get(ctx, reroutedDigest(digest, ba.router))
}

func (ba *routingBlobAccess) GetFromComposite(ctx context.Context, parentDigest, childDigest digest.Digest, slicer slicing.BlobSlicer) buffer.Buffer {
// TODO: Should both digests be rerouted, or just one?
return buffer.NewBufferFromError(status.Error(codes.Unimplemented, "GetFromComposite not implemented for RoutingBlobAccess"))
}

func (ba *routingBlobAccess) Put(ctx context.Context, digest digest.Digest, b buffer.Buffer) error {
return ba.BlobAccess.Put(ctx, reroutedDigest(digest, ba.router), b)
}

func (ba *routingBlobAccess) FindMissing(ctx context.Context, digests digest.Set) (digest.Set, error) {
// Make a set with all instance names rerouted
// Also make a map of underlying digest -> original digest(s), so that if an
// underlying digest is missing, we can return the corresponding original
// digests as also missing.
reroutedBuilder := digest.NewSetBuilder()
underlyingToOriginalMap := map[string][]digest.Digest{}
for _, d := range digests.Items() {
newDigest := reroutedDigest(d, ba.router)
reroutedBuilder.Add(newDigest)
underlyingToOriginalMap[newDigest.GetKey(digest.KeyWithInstance)] = append(
underlyingToOriginalMap[newDigest.GetKey(digest.KeyWithInstance)],
d,
)
}
rerouted := reroutedBuilder.Build()

// Find missing from underlying backend
underlyingMissing, err := ba.BlobAccess.FindMissing(ctx, rerouted)
if err != nil {
return digest.EmptySet, err
}

if underlyingMissing.Length() == 0 {
return underlyingMissing, nil
}

originalMissing := digest.NewSetBuilder()
for _, missing := range underlyingMissing.Items() {
for _, originalDigest := range underlyingToOriginalMap[missing.GetKey(digest.KeyWithInstance)] {
originalMissing.Add(originalDigest)
}
}
return originalMissing.Build(), nil
}

func (ba *routingBlobAccess) GetCapabilities(ctx context.Context, instanceName digest.InstanceName) (*remoteexecution.ServerCapabilities, error) {
return ba.BlobAccess.GetCapabilities(ctx, ba.router.Route(instanceName))
}

func reroutedDigest(d digest.Digest, router Router) digest.Digest {
oldName := d.GetDigestFunction().GetInstanceName()
return digest.NewInstanceNamePatcher(oldName, router.Route(oldName)).PatchDigest(d)
}
35 changes: 35 additions & 0 deletions pkg/proto/configuration/blobstore/blobstore.proto
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,12 @@ message BlobAccessConfiguration {

// Refer to a BlobAccess object declared through 'with_labels'.
string label = 27;

// Route accesses to alternative instance names.
//
// Prefixing names is possible using DemultiplexingBlobAccessConfiguration;
// this backend is useful for rewriting the instance name entirely.
RoutingBlobAccessConfiguration routing = 28;
}

// Was 'circular' (CircularBlobAccess). This backend has been replaced
Expand Down Expand Up @@ -981,3 +987,32 @@ message WithLabelsBlobAccessConfiguration {
// A map of string labels to backends that can be referenced.
map<string, BlobAccessConfiguration> labels = 2;
}

message RoutingBlobAccessConfiguration {
message InstanceNameReplacing {
// Instance name that blobs in requests should be changed to.
//
// Blobs in responses will have the instance name set to the original
// incoming instance name, before it was rewritten.
string instance_name = 1;
}

// The backend to which requests are forwarded.
BlobAccessConfiguration backend = 1;

oneof router {
// Rewrite the instance name wholesale.
//
// All accesses for blobs for a given instance name will have the instance
// name rewritten before the access, and any references returned will have
// the instance name rewritten back to that of the outer context.
//
// For instance: a RoutingBlobAccess with InstanceNameReplacing set to `baz`
// might have an incoming request referencing a blob in instance name
// `foo/bar`, in which case it will:
// * rewrite instance names on blobs in the request to `baz`
// * perform the access on `backend`
// * rewrite instance names on blobs in the response back to `foo/bar`
InstanceNameReplacing instance_name_replacing = 2;
}
}