diff --git a/cmd/booster-bitswap/init.go b/cmd/booster-bitswap/init.go new file mode 100644 index 000000000..9b135e37c --- /dev/null +++ b/cmd/booster-bitswap/init.go @@ -0,0 +1,154 @@ +package main + +import ( + "context" + "crypto/rand" + "errors" + "fmt" + "os" + "path/filepath" + + lcli "github.com/filecoin-project/lotus/cli" + "github.com/libp2p/go-libp2p" + crypto "github.com/libp2p/go-libp2p-core/crypto" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" + peer "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p/p2p/muxer/mplex" + "github.com/libp2p/go-libp2p/p2p/muxer/yamux" + quic "github.com/libp2p/go-libp2p/p2p/transport/quic" + "github.com/libp2p/go-libp2p/p2p/transport/tcp" + "github.com/urfave/cli/v2" +) + +type PeerIDAPI interface { + DealsGetBitswapPeerID(ctx context.Context) (peer.ID, error) + DealsSetBitswapPeerID(ctx context.Context, p peer.ID) error +} + +func configureRepo(ctx context.Context, cfgDir string, papi PeerIDAPI, overrideExistingPeerID bool) (crypto.PrivKey, error) { + if cfgDir == "" { + return nil, fmt.Errorf("dataDir must be set") + } + + if err := os.MkdirAll(cfgDir, 0744); err != nil { + return nil, err + } + + peerkey, err := loadPeerKey(cfgDir) + if err != nil { + return nil, err + } + + selfPid, err := peer.IDFromPrivateKey(peerkey) + if err != nil { + return nil, err + } + existingPid, err := papi.DealsGetBitswapPeerID(ctx) + if err != nil { + return nil, err + } + peerIDNotSet := existingPid == peer.ID("") + matchesPid := existingPid == selfPid + log.Infow("get/set peer id of bitswap from boost", "local", selfPid.String(), "boost", existingPid.String(), "boost not set", peerIDNotSet, "override", overrideExistingPeerID) + // error if a peer id is set that is different and we aren't overriding + if !peerIDNotSet && !matchesPid && !overrideExistingPeerID { + return nil, errors.New("bitswap peer id does not match boost node configuration. use --override-peer-id to force a change") + } + if peerIDNotSet || (!matchesPid && overrideExistingPeerID) { + err = papi.DealsSetBitswapPeerID(ctx, selfPid) + if err != nil { + return nil, err + } + } + return peerkey, nil +} + +func setupHost(ctx context.Context, cfgDir string, port int, papi PeerIDAPI) (host.Host, error) { + peerKey, err := configureRepo(ctx, cfgDir, papi, false) + if err != nil { + return nil, err + } + return libp2p.New( + libp2p.ListenAddrStrings( + fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", port), + fmt.Sprintf("/ip4/0.0.0.0/udp/%d/quic", port), + ), + libp2p.Transport(tcp.NewTCPTransport), + libp2p.Transport(quic.NewTransport), + libp2p.Muxer("/mplex/6.7.0", mplex.DefaultTransport), + libp2p.Muxer("/yamux/1.0.0", yamux.DefaultTransport), + libp2p.Identity(peerKey), + libp2p.ResourceManager(network.NullResourceManager), + ) +} + +func loadPeerKey(cfgDir string) (crypto.PrivKey, error) { + var peerkey crypto.PrivKey + keyPath := filepath.Join(cfgDir, "libp2p.key") + keyFile, err := os.ReadFile(keyPath) + if err != nil { + if !os.IsNotExist(err) { + return nil, err + } + + log.Infof("Generating new peer key...") + + key, _, err := crypto.GenerateEd25519Key(rand.Reader) + if err != nil { + return nil, err + } + peerkey = key + + data, err := crypto.MarshalPrivateKey(key) + if err != nil { + return nil, err + } + + if err := os.WriteFile(keyPath, data, 0600); err != nil { + return nil, err + } + } else { + key, err := crypto.UnmarshalPrivateKey(keyFile) + if err != nil { + return nil, err + } + + peerkey = key + } + + if peerkey == nil { + panic("sanity check: peer key is uninitialized") + } + + return peerkey, nil +} + +var initCmd = &cli.Command{ + Name: "init", + Usage: "Init booster-bitswap config", + Before: before, + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "api-boost", + Usage: "the endpoint for the boost API", + Required: true, + }, + }, + Action: func(cctx *cli.Context) error { + + ctx := lcli.ReqContext(cctx) + + // Connect to the Boost API + boostAPIInfo := cctx.String("api-boost") + bapi, bcloser, err := getBoostAPI(ctx, boostAPIInfo) + if err != nil { + return fmt.Errorf("getting boost API: %w", err) + } + defer bcloser() + repoDir := cctx.String(FlagRepo.Name) + + _, err = configureRepo(ctx, repoDir, bapi, true) + return err + }, +} diff --git a/cmd/booster-bitswap/main.go b/cmd/booster-bitswap/main.go index f270ebc75..8965b068d 100644 --- a/cmd/booster-bitswap/main.go +++ b/cmd/booster-bitswap/main.go @@ -11,6 +11,13 @@ import ( var log = logging.Logger("booster") +var FlagRepo = &cli.StringFlag{ + Name: "repo", + Usage: "repo directory for Booster bitswap", + Value: "~/.booster-bitswap", + EnvVars: []string{"BOOST_BITSWAP_REPO"}, +} + func main() { app := &cli.App{ Name: "booster-bitswap", @@ -19,8 +26,10 @@ func main() { Version: build.UserVersion(), Flags: []cli.Flag{ cliutil.FlagVeryVerbose, + FlagRepo, }, Commands: []*cli.Command{ + initCmd, runCmd, }, } diff --git a/cmd/booster-bitswap/run.go b/cmd/booster-bitswap/run.go index 9ddf48f5f..919a1fc16 100644 --- a/cmd/booster-bitswap/run.go +++ b/cmd/booster-bitswap/run.go @@ -5,8 +5,6 @@ import ( "fmt" "net/http" _ "net/http/pprof" - "os" - "path" "strings" "github.com/filecoin-project/boost/api" @@ -48,11 +46,6 @@ var runCmd = &cli.Command{ Usage: "the endpoint for the tracing exporter", Value: "http://tempo:14268/api/traces", }, - &cli.BoolFlag{ - Name: "override-peer-id", - Usage: "if present, forces a change to boost's peer id for bitswap", - Value: false, - }, }, Action: func(cctx *cli.Context) error { if cctx.Bool("pprof") { @@ -90,20 +83,23 @@ var runCmd = &cli.Command{ remoteStore := remoteblockstore.NewRemoteBlockstore(bapi) // Create the server API port := cctx.Int("port") - server := NewBitswapServer(port, remoteStore, bapi) + repoDir := cctx.String(FlagRepo.Name) + host, err := setupHost(ctx, repoDir, port, bapi) + + // Start the server + server := NewBitswapServer(remoteStore, host) addrs, err := bapi.NetAddrsListen(ctx) if err != nil { return fmt.Errorf("getting boost API addrs: %w", err) } - overrideExistingPeerID := cctx.Bool("override-peer-id") - // Start the server log.Infof("Starting booster-bitswap node on port %d", port) - err = server.Start(ctx, dataDirPath(cctx), addrs, overrideExistingPeerID) + err = server.Start(ctx, addrs) if err != nil { return err } + // Monitor for shutdown. <-ctx.Done() @@ -138,18 +134,3 @@ func getBoostAPI(ctx context.Context, ai string) (api.Boost, jsonrpc.ClientClose return api, closer, nil } - -func dataDirPath(ctx *cli.Context) string { - dataDir := ctx.String("data-dir") - - if dataDir == "" { - homeDir, err := os.UserHomeDir() - if err != nil { - homeDir = "./" - } - - dataDir = path.Join(homeDir, "/.booster-bitswap") - } - - return dataDir -} diff --git a/cmd/booster-bitswap/server.go b/cmd/booster-bitswap/server.go index 973bcfde3..24a2d4567 100644 --- a/cmd/booster-bitswap/server.go +++ b/cmd/booster-bitswap/server.go @@ -2,100 +2,33 @@ package main import ( "context" - "crypto/rand" - "errors" - "fmt" - "os" - "path/filepath" - "github.com/filecoin-project/boost/loadbalancer" + "github.com/filecoin-project/boost/protocolproxy" bsnetwork "github.com/ipfs/go-bitswap/network" "github.com/ipfs/go-bitswap/server" blockstore "github.com/ipfs/go-ipfs-blockstore" nilrouting "github.com/ipfs/go-ipfs-routing/none" - "github.com/libp2p/go-libp2p" - crypto "github.com/libp2p/go-libp2p-core/crypto" - "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/host" peer "github.com/libp2p/go-libp2p-core/peer" - "github.com/libp2p/go-libp2p/p2p/muxer/mplex" - "github.com/libp2p/go-libp2p/p2p/muxer/yamux" - quic "github.com/libp2p/go-libp2p/p2p/transport/quic" - "github.com/libp2p/go-libp2p/p2p/transport/tcp" ) type BitswapServer struct { - port int remoteStore blockstore.Blockstore - papi PeerIDAPI ctx context.Context cancel context.CancelFunc server *server.Server + host host.Host } -type PeerIDAPI interface { - DealsGetBitswapPeerID(ctx context.Context) (peer.ID, error) - DealsSetBitswapPeerID(ctx context.Context, p peer.ID) error +func NewBitswapServer(remoteStore blockstore.Blockstore, host host.Host) *BitswapServer { + return &BitswapServer{remoteStore: remoteStore, host: host} } -func NewBitswapServer(port int, remoteStore blockstore.Blockstore, papi PeerIDAPI) *BitswapServer { - return &BitswapServer{port: port, remoteStore: remoteStore, papi: papi} -} - -func (s *BitswapServer) Start(ctx context.Context, dataDir string, balancer peer.AddrInfo, overrideExistingPeerID bool) error { +func (s *BitswapServer) Start(ctx context.Context, balancer peer.AddrInfo) error { s.ctx, s.cancel = context.WithCancel(ctx) - if dataDir == "" { - return fmt.Errorf("dataDir must be set") - } - - if err := os.MkdirAll(dataDir, 0744); err != nil { - return err - } - - peerkey, err := loadPeerKey(dataDir) - if err != nil { - return err - } - - selfPid, err := peer.IDFromPrivateKey(peerkey) - if err != nil { - return err - } - existingPid, err := s.papi.DealsGetBitswapPeerID(ctx) - peerIDNotSet := err != nil && err.Error() == "no bitswap peer id set" - if err != nil && !peerIDNotSet { - return err - } - matchesPid := existingPid == selfPid - log.Infow("get/set peer id of bitswap from boost", "local", selfPid.String(), "boost", existingPid.String(), "boost not set", peerIDNotSet, "override", overrideExistingPeerID) - // error if a peer id is set that is different and we aren't overriding - if !peerIDNotSet && !matchesPid && !overrideExistingPeerID { - return errors.New("bitswap peer id does not match boost node configuration. use --override-peer-id to force a change") - } - if peerIDNotSet || (!matchesPid && overrideExistingPeerID) { - err = s.papi.DealsSetBitswapPeerID(ctx, selfPid) - if err != nil { - return err - } - } - - host, err := libp2p.New( - libp2p.ListenAddrStrings( - fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", s.port), - fmt.Sprintf("/ip4/0.0.0.0/udp/%d/quic", s.port), - ), - libp2p.Transport(tcp.NewTCPTransport), - libp2p.Transport(quic.NewTransport), - libp2p.Muxer("/mplex/6.7.0", mplex.DefaultTransport), - libp2p.Muxer("/yamux/1.0.0", yamux.DefaultTransport), - libp2p.Identity(peerkey), - libp2p.ResourceManager(network.NullResourceManager), - ) - if err != nil { - return err - } - host, err = loadbalancer.NewServiceNode(ctx, host, balancer) + host, err := protocolproxy.NewForwardingHost(ctx, s.host, balancer) if err != nil { return err } @@ -118,44 +51,3 @@ func (s *BitswapServer) Stop() error { s.cancel() return s.server.Close() } - -func loadPeerKey(dataDir string) (crypto.PrivKey, error) { - var peerkey crypto.PrivKey - keyPath := filepath.Join(dataDir, "peerkey") - keyFile, err := os.ReadFile(keyPath) - if err != nil { - if !os.IsNotExist(err) { - return nil, err - } - - log.Infof("Generating new peer key...") - - key, _, err := crypto.GenerateEd25519Key(rand.Reader) - if err != nil { - return nil, err - } - peerkey = key - - data, err := crypto.MarshalPrivateKey(key) - if err != nil { - return nil, err - } - - if err := os.WriteFile(keyPath, data, 0600); err != nil { - return nil, err - } - } else { - key, err := crypto.UnmarshalPrivateKey(keyFile) - if err != nil { - return nil, err - } - - peerkey = key - } - - if peerkey == nil { - panic("sanity check: peer key is uninitialized") - } - - return peerkey, nil -} diff --git a/loadbalancer/errors.go b/loadbalancer/errors.go deleted file mode 100644 index 3f1e0e978..000000000 --- a/loadbalancer/errors.go +++ /dev/null @@ -1,97 +0,0 @@ -package loadbalancer - -import ( - "errors" - "fmt" - - "github.com/libp2p/go-libp2p-core/peer" - "github.com/libp2p/go-libp2p-core/protocol" -) - -// ErrAlreadyRegistered indicates a protocol has already been registered as a route -type ErrAlreadyRegistered struct { - protocolID protocol.ID -} - -func (e ErrAlreadyRegistered) Error() string { - return fmt.Sprintf("protocol already registered: %s", e.protocolID) -} - -// ErrNotRegistered indicates a peer has not registered a given protocol but is -// trying to extend or terminate the registration -type ErrNotRegistered struct { - p peer.ID - protocolID protocol.ID -} - -func (e ErrNotRegistered) Error() string { - return fmt.Sprintf("protocol %s is not registered to peer %s", e.protocolID, e.p) -} - -type ErrRouteClosing struct { - protocolID protocol.ID -} - -func (e ErrRouteClosing) Error() string { - return fmt.Sprintf("route is closing for protocol: %s", e.protocolID) -} - -// ErrNoInboundRequests is thrown by the load balancer when it receives and inbound request -var ErrNoInboundRequests = errors.New("inbound requests not accepted") - -// ErrNoOutboundRequests is thrown by the service node when it receives and outbound request -var ErrNoOutboundRequests = errors.New("outbound requests not accepted") - -// ErrInboundRequestsAreSingleProtocol is thrown by the service node when it receives and outbound request -var ErrInboundRequestsAreSingleProtocol = errors.New("inbound requests are single protocol") - -// ErrUnableToOpenStream occurs when we're unable to reach the load balancer -type ErrUnableToOpenStream struct { - original error - p peer.ID -} - -func (e ErrUnableToOpenStream) Unwrap() error { - return e.original -} -func (e ErrUnableToOpenStream) Error() string { - return fmt.Sprintf("opening stream to load balancer %s: %s", e.p, e.original) -} - -// ErrWritingRoutingRequest occurs when we're unable to write a routing request -type ErrWritingRoutingRequest struct { - original error - pid protocol.ID -} - -func (e ErrWritingRoutingRequest) Unwrap() error { - return e.original -} - -func (e ErrWritingRoutingRequest) Error() string { - return fmt.Sprintf("writing routing request for protocol %s: %s", e.pid, e.original) -} - -// ErrReadingRoutingResponse occurs when we're unable to read a routing response -type ErrReadingRoutingResponse struct { - original error - pid protocol.ID -} - -func (e ErrReadingRoutingResponse) Unwrap() error { - return e.original -} - -func (e ErrReadingRoutingResponse) Error() string { - return fmt.Sprintf("reading routing response for protocol %s: %s", e.pid, e.original) -} - -// ErrRejectedRouting indicates a routing request was rejected -type ErrRejectedRouting struct { - pid protocol.ID - message string -} - -func (e ErrRejectedRouting) Error() string { - return fmt.Sprintf("rejected routing for protocol %s, reason %s", e.pid, e.message) -} diff --git a/node/builder.go b/node/builder.go index e8c5189ec..404a492c9 100644 --- a/node/builder.go +++ b/node/builder.go @@ -13,12 +13,12 @@ import ( "github.com/filecoin-project/boost/fundmanager" "github.com/filecoin-project/boost/gql" "github.com/filecoin-project/boost/indexprovider" - "github.com/filecoin-project/boost/loadbalancer" "github.com/filecoin-project/boost/node/config" "github.com/filecoin-project/boost/node/impl" "github.com/filecoin-project/boost/node/impl/common" "github.com/filecoin-project/boost/node/modules" "github.com/filecoin-project/boost/node/modules/dtypes" + "github.com/filecoin-project/boost/protocolproxy" "github.com/filecoin-project/boost/retrievalmarket/lp2pimpl" "github.com/filecoin-project/boost/sealingpipeline" "github.com/filecoin-project/boost/storagemanager" @@ -143,7 +143,7 @@ const ( HandleDealsKey HandleRetrievalKey HandleRetrievalTransportsKey - HandleLoadBalancerKey + HandleProtocolProxyKey RunSectorServiceKey // boost should be started after legacy markets (HandleDealsKey) @@ -527,9 +527,9 @@ func ConfigBoost(cfg *config.Boost) Option { Override(new(retrievalmarket.RetrievalProvider), lotus_modules.RetrievalProvider), Override(HandleRetrievalKey, lotus_modules.HandleRetrieval), Override(new(*lp2pimpl.TransportsListener), modules.NewTransportsListener(cfg)), - Override(new(*loadbalancer.LoadBalancer), modules.NewLoadBalancer(cfg)), + Override(new(*protocolproxy.ProtocolProxy), modules.NewProtocolProxy(cfg)), Override(HandleRetrievalTransportsKey, modules.HandleRetrievalTransports), - Override(HandleLoadBalancerKey, modules.HandleLoadBalancer), + Override(HandleProtocolProxyKey, modules.HandleProtocolProxy), Override(new(idxprov.MeshCreator), idxprov.NewMeshCreator), Override(new(provider.Interface), modules.IndexProvider(cfg.IndexProvider)), diff --git a/node/modules/retrieval.go b/node/modules/retrieval.go index c82edc806..bb3640f98 100644 --- a/node/modules/retrieval.go +++ b/node/modules/retrieval.go @@ -4,9 +4,9 @@ import ( "context" "fmt" - "github.com/filecoin-project/boost/loadbalancer" "github.com/filecoin-project/boost/node/config" "github.com/filecoin-project/boost/node/modules/dtypes" + "github.com/filecoin-project/boost/protocolproxy" "github.com/filecoin-project/boost/retrievalmarket/lp2pimpl" "github.com/filecoin-project/boost/retrievalmarket/types" lotus_repo "github.com/filecoin-project/lotus/node/repo" @@ -18,6 +18,13 @@ import ( "go.uber.org/fx" ) +var bitswapProtocols = []protocol.ID{ + network.ProtocolBitswap, + network.ProtocolBitswapNoVers, + network.ProtocolBitswapOneOne, + network.ProtocolBitswapOneZero, +} + func NewTransportsListener(cfg *config.Boost) func(h host.Host) (*lp2pimpl.TransportsListener, error) { return func(h host.Host) (*lp2pimpl.TransportsListener, error) { protos := []types.Protocol{} @@ -71,55 +78,40 @@ func HandleRetrievalTransports(lc fx.Lifecycle, l *lp2pimpl.TransportsListener) }) } -func NewLoadBalancer(cfg *config.Boost) func(h host.Host) (*loadbalancer.LoadBalancer, error) { - return func(h host.Host) (*loadbalancer.LoadBalancer, error) { +func NewProtocolProxy(cfg *config.Boost) func(h host.Host) (*protocolproxy.ProtocolProxy, error) { + return func(h host.Host) (*protocolproxy.ProtocolProxy, error) { peerConfig := map[peer.ID][]protocol.ID{} if cfg.Dealmaking.BitswapPeerID != "" { bsPeerID, err := peer.Decode(cfg.Dealmaking.BitswapPeerID) if err != nil { return nil, err } - peerConfig[bsPeerID] = []protocol.ID{ - network.ProtocolBitswap, - network.ProtocolBitswapNoVers, - network.ProtocolBitswapOneOne, - network.ProtocolBitswapOneZero, - } + peerConfig[bsPeerID] = bitswapProtocols } - return loadbalancer.NewLoadBalancer(h, peerConfig) + return protocolproxy.NewProtocolProxy(h, peerConfig) } } -func HandleLoadBalancer(lc fx.Lifecycle, lb *loadbalancer.LoadBalancer) { +func HandleProtocolProxy(lc fx.Lifecycle, pp *protocolproxy.ProtocolProxy) { lc.Append(fx.Hook{ OnStart: func(ctx context.Context) error { - log.Debug("starting load balancer") - lb.Start(ctx) + log.Info("starting load balancer") + pp.Start(ctx) return nil }, OnStop: func(context.Context) error { - log.Debug("stopping load balancer") - return lb.Close() + log.Info("stopping load balancer") + pp.Close() + return nil }, }) } -func NewSetBitswapPeerIDFunc(r lotus_repo.LockedRepo, lb *loadbalancer.LoadBalancer) (dtypes.SetBitswapPeerIDFunc, error) { +func NewSetBitswapPeerIDFunc(r lotus_repo.LockedRepo) (dtypes.SetBitswapPeerIDFunc, error) { return func(p peer.ID) (err error) { err = mutateCfg(r, func(cfg *config.Boost) { cfg.Dealmaking.BitswapPeerID = peer.Encode(p) }) - if err != nil { - return - } - peerConfig := map[peer.ID][]protocol.ID{} - peerConfig[p] = []protocol.ID{ - network.ProtocolBitswap, - network.ProtocolBitswapNoVers, - network.ProtocolBitswapOneOne, - network.ProtocolBitswapOneZero, - } - err = lb.UpdatePeerConfig(peerConfig) return }, nil } @@ -127,14 +119,14 @@ func NewSetBitswapPeerIDFunc(r lotus_repo.LockedRepo, lb *loadbalancer.LoadBalan func NewGetBitswapPeerIDFunc(r lotus_repo.LockedRepo) (dtypes.GetBitswapPeerIDFunc, error) { return func() (p peer.ID, err error) { var pString string - err = mutateCfg(r, func(cfg *config.Boost) { + err = readCfg(r, func(cfg *config.Boost) { pString = cfg.Dealmaking.BitswapPeerID }) if err != nil { return } if pString == "" { - err = fmt.Errorf("no bitswap peer id set") + p = peer.ID("") return } p, err = peer.Decode(pString) diff --git a/protocolproxy/errors.go b/protocolproxy/errors.go new file mode 100644 index 000000000..38661f7af --- /dev/null +++ b/protocolproxy/errors.go @@ -0,0 +1,29 @@ +package protocolproxy + +import ( + "errors" + "fmt" + + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/protocol" +) + +// ErrNotRegistered indicates a peer has not registered a given protocol but is +// trying to extend or terminate the registration +type ErrNotRegistered struct { + p peer.ID + protocolID protocol.ID +} + +func (e ErrNotRegistered) Error() string { + return fmt.Sprintf("protocol %s is not registered to peer %s", e.protocolID, e.p) +} + +// ErrNoInboundRequests is thrown by the load balancer when it receives and inbound request +var ErrNoInboundRequests = errors.New("inbound requests not accepted") + +// ErrNoOutboundRequests is thrown by the service node when it receives and outbound request +var ErrNoOutboundRequests = errors.New("outbound requests not accepted") + +// ErrInboundRequestsAreSingleProtocol is thrown by the service node when it receives and outbound request +var ErrInboundRequestsAreSingleProtocol = errors.New("inbound requests are single protocol") diff --git a/loadbalancer/servicenode.go b/protocolproxy/forwardinghost.go similarity index 70% rename from loadbalancer/servicenode.go rename to protocolproxy/forwardinghost.go index 69f772149..2ee663073 100644 --- a/loadbalancer/servicenode.go +++ b/protocolproxy/forwardinghost.go @@ -1,58 +1,58 @@ -package loadbalancer +package protocolproxy import ( "context" "fmt" "sync" - "github.com/filecoin-project/boost/loadbalancer/messages" + "github.com/filecoin-project/boost/protocolproxy/messages" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" ) -// ServiceNode is a host that behaves as a service node connected to a load balancer +// ForwardingHost is a host that behaves as a service node connected to a load balancer // -- all traffic is routed through the load balancer for each registered protocol -type ServiceNode struct { +type ForwardingHost struct { host.Host balancer peer.ID handlersLk sync.RWMutex handlers map[protocol.ID]network.StreamHandler } -// NewServiceNode node constructs a service node connected to the given load balancer on the passed +// NewForwardingHost node constructs a service node connected to the given load balancer on the passed // in host. A service node behaves exactly like a host.Host but setting up new protocol handlers // registers routes on the load balancer -func NewServiceNode(ctx context.Context, h host.Host, balancer peer.AddrInfo) (host.Host, error) { +func NewForwardingHost(ctx context.Context, h host.Host, balancer peer.AddrInfo) (host.Host, error) { err := h.Connect(ctx, balancer) if err != nil { return nil, err } - sn := &ServiceNode{ + fh := &ForwardingHost{ Host: h, balancer: balancer.ID, handlers: make(map[protocol.ID]network.StreamHandler), } - sn.Host.SetStreamHandler(ForwardingProtocolID, sn.handleForwarding) - return sn, nil + fh.Host.SetStreamHandler(ForwardingProtocolID, fh.handleForwarding) + return fh, nil } // Close shuts down a service node host's forwarding -func (sn *ServiceNode) Close() error { - sn.Host.RemoveStreamHandler(ForwardingProtocolID) - return sn.Host.Close() +func (fh *ForwardingHost) Close() error { + fh.Host.RemoveStreamHandler(ForwardingProtocolID) + return fh.Host.Close() } // SetStreamHandler interrupts the normal process of setting up stream handlers by instead // registering a route on the connected load balancer. All traffic for this protocol // will go through the forwarding handshake with load balancer, then the native handler will // be called -func (sn *ServiceNode) SetStreamHandler(pid protocol.ID, handler network.StreamHandler) { +func (fh *ForwardingHost) SetStreamHandler(pid protocol.ID, handler network.StreamHandler) { // only set the handler if we are successful in registering the route - sn.handlersLk.Lock() - sn.handlers[pid] = handler - sn.handlersLk.Unlock() + fh.handlersLk.Lock() + fh.handlers[pid] = handler + fh.handlersLk.Unlock() } // these wrappings on the stream or conn make it SEEM like the request is coming @@ -82,9 +82,9 @@ func (wc *wrappedConn) RemotePeer() peer.ID { } // handle inbound forwarding requests -func (sn *ServiceNode) handleForwarding(s network.Stream) { +func (fh *ForwardingHost) handleForwarding(s network.Stream) { // only accept requests from the load balancer - if s.Conn().RemotePeer() != sn.balancer { + if s.Conn().RemotePeer() != fh.balancer { _ = s.Reset() return } @@ -100,7 +100,7 @@ func (sn *ServiceNode) handleForwarding(s network.Stream) { log.Debugw("received forwarding request for protocol", "protocols", request.Protocols, "remote", request.Remote) // validate the request - handler, responseErr := sn.validateForwardingRequest(request) + handler, responseErr := fh.validateForwardingRequest(request) if responseErr != nil { log.Infof("rejected forwarding request: %s", responseErr) @@ -113,9 +113,9 @@ func (sn *ServiceNode) handleForwarding(s network.Stream) { } // validates a forwarding request is one we can accept -func (sn *ServiceNode) validateForwardingRequest(request *messages.ForwardingRequest) (network.StreamHandler, error) { - sn.handlersLk.RLock() - defer sn.handlersLk.RUnlock() +func (fh *ForwardingHost) validateForwardingRequest(request *messages.ForwardingRequest) (network.StreamHandler, error) { + fh.handlersLk.RLock() + defer fh.handlersLk.RUnlock() // only accept inbound requests if request.Kind != messages.ForwardingInbound { @@ -128,11 +128,11 @@ func (sn *ServiceNode) validateForwardingRequest(request *messages.ForwardingReq } // check for a registered handler - registeredHandler, ok := sn.handlers[request.Protocols[0]] + registeredHandler, ok := fh.handlers[request.Protocols[0]] // don't accept inbound requests on protocols we didn't setup routing for if !ok { - return nil, ErrNotRegistered{sn.ID(), request.Protocols[0]} + return nil, ErrNotRegistered{fh.ID(), request.Protocols[0]} } // return the registered handler @@ -141,10 +141,10 @@ func (sn *ServiceNode) validateForwardingRequest(request *messages.ForwardingReq // Calls to "NewStream" open an outbound forwarding request to the load balancer, that is then sent on // the the specified peer -func (sn *ServiceNode) NewStream(ctx context.Context, p peer.ID, protocols ...protocol.ID) (network.Stream, error) { +func (fh *ForwardingHost) NewStream(ctx context.Context, p peer.ID, protocols ...protocol.ID) (network.Stream, error) { // open a forwarding stream - routedStream, err := sn.Host.NewStream(ctx, sn.balancer, ForwardingProtocolID) + routedStream, err := fh.Host.NewStream(ctx, fh.balancer, ForwardingProtocolID) if err != nil { return nil, err } @@ -174,15 +174,15 @@ func (sn *ServiceNode) NewStream(ctx context.Context, p peer.ID, protocols ...pr } // RemoveStreamHandler removes a stream handler by shutting down registered route with the original host -func (sn *ServiceNode) RemoveStreamHandler(pid protocol.ID) { +func (fh *ForwardingHost) RemoveStreamHandler(pid protocol.ID) { // check if the handler is exists - sn.handlersLk.Lock() - delete(sn.handlers, pid) - sn.handlersLk.Unlock() + fh.handlersLk.Lock() + delete(fh.handlers, pid) + fh.handlersLk.Unlock() } // Connect for now does nothing -func (sn *ServiceNode) Connect(ctx context.Context, pi peer.AddrInfo) error { +func (fh *ForwardingHost) Connect(ctx context.Context, pi peer.AddrInfo) error { // for now, this does nothing -- see discussion/improvements return nil } diff --git a/loadbalancer/servicenode_test.go b/protocolproxy/forwardinghost_test.go similarity index 75% rename from loadbalancer/servicenode_test.go rename to protocolproxy/forwardinghost_test.go index b629a40d5..5316f739e 100644 --- a/loadbalancer/servicenode_test.go +++ b/protocolproxy/forwardinghost_test.go @@ -1,4 +1,4 @@ -package loadbalancer +package protocolproxy import ( "context" @@ -8,7 +8,7 @@ import ( "testing" "time" - "github.com/filecoin-project/boost/loadbalancer/messages" + "github.com/filecoin-project/boost/protocolproxy/messages" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" @@ -43,13 +43,13 @@ func TestSetStreamHandler(t *testing.T) { tn := setupTestNet(ctx, t, peers) // setup a mock load balancer for routing - sn, err := NewServiceNode(ctx, tn.serviceNode, peer.AddrInfo{ - ID: peers.loadBalancer.id, - Addrs: []multiaddr.Multiaddr{peers.loadBalancer.multiAddr}, + fh, err := NewForwardingHost(ctx, tn.serviceNode, peer.AddrInfo{ + ID: peers.proxyNode.id, + Addrs: []multiaddr.Multiaddr{peers.proxyNode.multiAddr}, }) require.NoError(t, err) - sn.SetStreamHandler(testProtocol, func(s network.Stream) { + fh.SetStreamHandler(testProtocol, func(s network.Stream) { defer s.Close() require.Equal(t, s.Protocol(), testProtocol) require.Equal(t, s.Conn().RemotePeer(), peers.publicNode.id) @@ -59,7 +59,7 @@ func TestSetStreamHandler(t *testing.T) { _, err = s.Write([]byte("response")) require.NoError(t, err) }) - s, err := tn.loadBalancer.NewStream(ctx, tn.serviceNode.ID(), ForwardingProtocolID) + s, err := tn.proxyNode.NewStream(ctx, tn.serviceNode.ID(), ForwardingProtocolID) require.NoError(t, err) defer s.Close() err = messages.WriteInboundForwardingRequest(s, tn.publicNode.ID(), testCase.protocol) @@ -84,10 +84,10 @@ func TestNewStream(t *testing.T) { ctx := context.Background() peers := makePeers(t) testCases := []struct { - name string - loadBalancerForwardingError error - writeForwardingResponse func(io.Writer) error - expectedDataResponse []byte + name string + protocolProxyForwardingError error + writeForwardingResponse func(io.Writer) error + expectedDataResponse []byte }{ { name: "outbound - simple success", @@ -100,7 +100,7 @@ func TestNewStream(t *testing.T) { writeForwardingResponse: func(w io.Writer) error { return messages.WriteForwardingResponseError(w, errors.New("something went wrong")) }, - loadBalancerForwardingError: fmt.Errorf("opening forwarded stream: something went wrong"), + protocolProxyForwardingError: fmt.Errorf("opening forwarded stream: something went wrong"), }, } for _, testCase := range testCases { @@ -109,7 +109,7 @@ func TestNewStream(t *testing.T) { defer cancel() tn := setupTestNet(ctx, t, peers) - tn.loadBalancer.SetStreamHandler(ForwardingProtocolID, func(s network.Stream) { + tn.proxyNode.SetStreamHandler(ForwardingProtocolID, func(s network.Stream) { defer s.Close() request, err := messages.ReadForwardingRequest(s) require.NoError(t, err) @@ -120,7 +120,7 @@ func TestNewStream(t *testing.T) { err = testCase.writeForwardingResponse(s) require.NoError(t, err) } - if testCase.loadBalancerForwardingError == nil { + if testCase.protocolProxyForwardingError == nil { data, err := io.ReadAll(s) require.NoError(t, err) require.Equal(t, []byte("request"), data) @@ -128,14 +128,14 @@ func TestNewStream(t *testing.T) { require.NoError(t, err) } }) - sn, err := NewServiceNode(ctx, tn.serviceNode, peer.AddrInfo{ - ID: peers.loadBalancer.id, - Addrs: []multiaddr.Multiaddr{peers.loadBalancer.multiAddr}, + fh, err := NewForwardingHost(ctx, tn.serviceNode, peer.AddrInfo{ + ID: peers.proxyNode.id, + Addrs: []multiaddr.Multiaddr{peers.proxyNode.multiAddr}, }) require.NoError(t, err) - s, err := sn.NewStream(ctx, tn.publicNode.ID(), testProtocol) - if testCase.loadBalancerForwardingError != nil { - require.EqualError(t, err, testCase.loadBalancerForwardingError.Error()) + s, err := fh.NewStream(ctx, tn.publicNode.ID(), testProtocol) + if testCase.protocolProxyForwardingError != nil { + require.EqualError(t, err, testCase.protocolProxyForwardingError.Error()) } else { require.NoError(t, err) defer s.Close() diff --git a/loadbalancer/messages/messages.go b/protocolproxy/messages/messages.go similarity index 100% rename from loadbalancer/messages/messages.go rename to protocolproxy/messages/messages.go diff --git a/loadbalancer/messages/messages.ipldsch b/protocolproxy/messages/messages.ipldsch similarity index 100% rename from loadbalancer/messages/messages.ipldsch rename to protocolproxy/messages/messages.ipldsch diff --git a/loadbalancer/messages/messages_test.go b/protocolproxy/messages/messages_test.go similarity index 98% rename from loadbalancer/messages/messages_test.go rename to protocolproxy/messages/messages_test.go index 0d85726d6..37a590459 100644 --- a/loadbalancer/messages/messages_test.go +++ b/protocolproxy/messages/messages_test.go @@ -7,7 +7,7 @@ import ( "io" "testing" - "github.com/filecoin-project/boost/loadbalancer/messages" + "github.com/filecoin-project/boost/protocolproxy/messages" "github.com/ipld/go-ipld-prime/codec/dagcbor" "github.com/libp2p/go-libp2p-core/crypto" peer "github.com/libp2p/go-libp2p-core/peer" diff --git a/loadbalancer/messages/utils.go b/protocolproxy/messages/utils.go similarity index 100% rename from loadbalancer/messages/utils.go rename to protocolproxy/messages/utils.go diff --git a/loadbalancer/loadbalancer.go b/protocolproxy/protocolproxy.go similarity index 54% rename from loadbalancer/loadbalancer.go rename to protocolproxy/protocolproxy.go index 0254d8541..fe29c048b 100644 --- a/loadbalancer/loadbalancer.go +++ b/protocolproxy/protocolproxy.go @@ -1,4 +1,4 @@ -package loadbalancer +package protocolproxy import ( "context" @@ -6,9 +6,8 @@ import ( "fmt" "io" "sync" - "time" - "github.com/filecoin-project/boost/loadbalancer/messages" + "github.com/filecoin-project/boost/protocolproxy/messages" logging "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" @@ -17,93 +16,71 @@ import ( "github.com/multiformats/go-multiaddr" ) -const DefaultDuration = time.Hour +var log = logging.Logger("protocolproxy") -var log = logging.Logger("loadbalancer") - -type LoadBalancer struct { - ctx context.Context - h host.Host - peerConfig map[peer.ID][]protocol.ID - activeRoutesLk sync.RWMutex - activeRoutes map[protocol.ID]peer.ID -} - -func NewLoadBalancer(h host.Host, peerConfig map[peer.ID][]protocol.ID) (*LoadBalancer, error) { - err := validatePeerConfig(peerConfig) - if err != nil { - return nil, err - } - return &LoadBalancer{ - h: h, - activeRoutes: make(map[protocol.ID]peer.ID), - peerConfig: peerConfig, - }, nil +type ProtocolProxy struct { + ctx context.Context + h host.Host + peerConfig map[peer.ID][]protocol.ID + supportedProtocols map[protocol.ID]peer.ID + activeRoutesLk sync.RWMutex + activeRoutes map[protocol.ID]peer.ID } -func validatePeerConfig(peerConfig map[peer.ID][]protocol.ID) error { +func NewProtocolProxy(h host.Host, peerConfig map[peer.ID][]protocol.ID) (*ProtocolProxy, error) { // for now, double check no peers overlap in config // TODO: support multiple peers owning a config - routesSet := map[protocol.ID]struct{}{} - for _, protocols := range peerConfig { + routesSet := map[protocol.ID]peer.ID{} + for p, protocols := range peerConfig { for _, protocol := range protocols { _, existing := routesSet[protocol] if existing { - return errors.New("Route registered for multiple peers") + return nil, errors.New("Route registered for multiple peers") } + routesSet[protocol] = p } } - return nil -} - -func (lb *LoadBalancer) Start(ctx context.Context) { - lb.ctx = ctx - lb.h.SetStreamHandler(ForwardingProtocolID, lb.handleForwarding) - lb.h.Network().Notify(lb) + return &ProtocolProxy{ + h: h, + activeRoutes: make(map[protocol.ID]peer.ID), + supportedProtocols: routesSet, + peerConfig: peerConfig, + }, nil } -func (lb *LoadBalancer) Close() error { - lb.h.RemoveStreamHandler(ForwardingProtocolID) - - lb.activeRoutesLk.Lock() - for id := range lb.activeRoutes { - lb.h.RemoveStreamHandler(id) +func (pp *ProtocolProxy) Start(ctx context.Context) { + pp.ctx = ctx + pp.h.SetStreamHandler(ForwardingProtocolID, pp.handleForwarding) + for id := range pp.supportedProtocols { + pp.h.SetStreamHandler(id, pp.handleIncoming) } - lb.activeRoutes = map[protocol.ID]peer.ID{} - lb.activeRoutesLk.Unlock() - lb.h.Network().StopNotify(lb) - return nil + pp.h.Network().Notify(pp) } -// UpdatePeerConfig updates the load balancer with a new peer config -// existing nodes will have to disconnect and reconnect -func (lb *LoadBalancer) UpdatePeerConfig(peerConfig map[peer.ID][]protocol.ID) error { - err := validatePeerConfig(peerConfig) - if err != nil { - return err - } - err = lb.Close() - if err != nil { - return err +func (pp *ProtocolProxy) Close() { + pp.h.RemoveStreamHandler(ForwardingProtocolID) + for id := range pp.supportedProtocols { + pp.h.RemoveStreamHandler(id) } - lb.peerConfig = peerConfig - lb.Start(lb.ctx) - return nil + pp.activeRoutesLk.Lock() + pp.activeRoutes = map[protocol.ID]peer.ID{} + pp.activeRoutesLk.Unlock() + pp.h.Network().StopNotify(pp) } // Listen satifies the network.Notifee interface but does nothing -func (lb *LoadBalancer) Listen(network.Network, multiaddr.Multiaddr) {} // called when network starts listening on an addr +func (pp *ProtocolProxy) Listen(network.Network, multiaddr.Multiaddr) {} // called when network starts listening on an addr // ListenClose satifies the network.Notifee interface but does nothing -func (lb *LoadBalancer) ListenClose(network.Network, multiaddr.Multiaddr) {} // called when network stops listening on an addr +func (pp *ProtocolProxy) ListenClose(network.Network, multiaddr.Multiaddr) {} // called when network stops listening on an addr // Connected checks the peersConfig and begins listening any time a service node connects -func (lb *LoadBalancer) Connected(n network.Network, c network.Conn) { +func (pp *ProtocolProxy) Connected(n network.Network, c network.Conn) { // read the peer that just connected p := c.RemotePeer() // check if they are in the peer config - protocols, isServiceNode := lb.peerConfig[p] + protocols, isServiceNode := pp.peerConfig[p] if !isServiceNode { return @@ -112,25 +89,20 @@ func (lb *LoadBalancer) Connected(n network.Network, c network.Conn) { log.Infow("service node connected activating peer protocols", "peerID", p, "protocols", protocols) // if they are in the peer config, listen on all protocols they are setup for - lb.activeRoutesLk.Lock() - defer lb.activeRoutesLk.Unlock() + pp.activeRoutesLk.Lock() + defer pp.activeRoutesLk.Unlock() for _, id := range protocols { - // check if we already registered this protocol - if _, ok := lb.activeRoutes[id]; ok { - continue - } - lb.activeRoutes[id] = p - lb.h.SetStreamHandler(id, lb.handleIncoming) + pp.activeRoutes[id] = p } } // Disconnected checks the peersConfig and removes listening when a service node disconnects -func (lb *LoadBalancer) Disconnected(n network.Network, c network.Conn) { // called when a connection closed +func (pp *ProtocolProxy) Disconnected(n network.Network, c network.Conn) { // called when a connection closed // read the peer that just connected p := c.RemotePeer() // check if they are in the peer config - protocols, isServiceNode := lb.peerConfig[p] + protocols, isServiceNode := pp.peerConfig[p] if !isServiceNode { return @@ -139,20 +111,15 @@ func (lb *LoadBalancer) Disconnected(n network.Network, c network.Conn) { // cal log.Infow("service node disconnected deactivating peer protocols", "peerID", p, "protocols", protocols) // if they are in the peer config, 'un'-listen on all protocols they are setup for - lb.activeRoutesLk.Lock() - defer lb.activeRoutesLk.Unlock() + pp.activeRoutesLk.Lock() + defer pp.activeRoutesLk.Unlock() for _, id := range protocols { - // check if we already de-registered this protocol - if _, ok := lb.activeRoutes[id]; !ok { - continue - } - delete(lb.activeRoutes, id) - lb.h.RemoveStreamHandler(id) + delete(pp.activeRoutes, id) } } // handle a request from a routed peer to make an external ougoing connection -func (lb *LoadBalancer) handleForwarding(s network.Stream) { +func (pp *ProtocolProxy) handleForwarding(s network.Stream) { defer s.Close() p := s.Conn().RemotePeer() request, err := messages.ReadForwardingRequest(s) @@ -174,7 +141,7 @@ func (lb *LoadBalancer) handleForwarding(s network.Stream) { log.Debugw("outgoing forwarding stream", "protocols", request.Protocols, "routed peer", p, "remote peer", request.Remote) // open the forwarding stream - outgoingStream, streamErr := lb.processForwardingRequest(p, request.Remote, request.Protocols) + outgoingStream, streamErr := pp.processForwardingRequest(p, request.Remote, request.Protocols) // if we failed to open the stream, write the response and return if streamErr != nil { @@ -195,30 +162,30 @@ func (lb *LoadBalancer) handleForwarding(s network.Stream) { } // bridge the streams together - lb.bridgeStreams(s, outgoingStream) + pp.bridgeStreams(s, outgoingStream) } -func (lb *LoadBalancer) processForwardingRequest(p peer.ID, remote peer.ID, protocols []protocol.ID) (network.Stream, error) { - lb.activeRoutesLk.RLock() +func (pp *ProtocolProxy) processForwardingRequest(p peer.ID, remote peer.ID, protocols []protocol.ID) (network.Stream, error) { + pp.activeRoutesLk.RLock() // check routes to verify ownership for _, id := range protocols { - registeredPeer, ok := lb.activeRoutes[id] + registeredPeer, ok := pp.activeRoutes[id] if !ok || p != registeredPeer { - lb.activeRoutesLk.RUnlock() + pp.activeRoutesLk.RUnlock() // error if this protocol not registered to this peer return nil, ErrNotRegistered{p, id} } } - lb.activeRoutesLk.RUnlock() - s, err := lb.h.NewStream(lb.ctx, remote, protocols...) + pp.activeRoutesLk.RUnlock() + s, err := pp.h.NewStream(pp.ctx, remote, protocols...) if err != nil { return nil, fmt.Errorf("remote peer: %w", err) } return s, nil } -// pipe a stream through the LB -func (lb *LoadBalancer) bridgeStreams(s1, s2 network.Stream) { +// pipe a stream through the PP +func (pp *ProtocolProxy) bridgeStreams(s1, s2 network.Stream) { var wg sync.WaitGroup wg.Add(2) go func() { @@ -250,25 +217,25 @@ func (lb *LoadBalancer) bridgeStreams(s1, s2 network.Stream) { wg.Wait() } -func (lb *LoadBalancer) handleIncoming(s network.Stream) { +func (pp *ProtocolProxy) handleIncoming(s network.Stream) { defer s.Close() // check routed peer for this stream - lb.activeRoutesLk.RLock() - routedPeer, ok := lb.activeRoutes[s.Protocol()] - lb.activeRoutesLk.RUnlock() - - log.Debugw("incoming stream, reforwarding to peer", "protocol", s.Protocol(), "routed peer", routedPeer, "remote peer", s.Conn().RemotePeer()) + pp.activeRoutesLk.RLock() + routedPeer, ok := pp.activeRoutes[s.Protocol()] + pp.activeRoutesLk.RUnlock() if !ok { // if none exists, return - log.Warnf("received protocol request for protocol '%s' with no router peer", s.Protocol()) + log.Infof("received protocol request for protocol '%s' with no active peer", s.Protocol()) _ = s.Reset() return } + log.Debugw("incoming stream, reforwarding to peer", "protocol", s.Protocol(), "routed peer", routedPeer, "remote peer", s.Conn().RemotePeer()) + // open a forwarding stream - routedStream, err := lb.h.NewStream(lb.ctx, routedPeer, ForwardingProtocolID) + routedStream, err := pp.h.NewStream(pp.ctx, routedPeer, ForwardingProtocolID) if err != nil { log.Warnf("unable to open forwarding stream for protocol '%s' with peer %s", s.Protocol(), routedPeer) _ = s.Reset() @@ -285,5 +252,5 @@ func (lb *LoadBalancer) handleIncoming(s network.Stream) { return } - lb.bridgeStreams(s, routedStream) + pp.bridgeStreams(s, routedStream) } diff --git a/loadbalancer/loadbalancer_test.go b/protocolproxy/protocolproxy_test.go similarity index 90% rename from loadbalancer/loadbalancer_test.go rename to protocolproxy/protocolproxy_test.go index 925436767..f2141794d 100644 --- a/loadbalancer/loadbalancer_test.go +++ b/protocolproxy/protocolproxy_test.go @@ -1,4 +1,4 @@ -package loadbalancer +package protocolproxy import ( "context" @@ -11,7 +11,7 @@ import ( "time" "github.com/benbjohnson/clock" - "github.com/filecoin-project/boost/loadbalancer/messages" + "github.com/filecoin-project/boost/protocolproxy/messages" "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" @@ -85,11 +85,11 @@ func TestOutboundForwarding(t *testing.T) { tn := setupTestNet(ctx, t, peers) testClock := clock.NewMock() testClock.Set(startTime) - lb, err := NewLoadBalancer(tn.loadBalancer, map[peer.ID][]protocol.ID{ + pp, err := NewProtocolProxy(tn.proxyNode, map[peer.ID][]protocol.ID{ peers.serviceNode.id: testProtocols, }) require.NoError(t, err) - lb.Start(ctx) + pp.Start(ctx) if testCase.registerHandler { handler := func(s network.Stream) { @@ -115,8 +115,7 @@ func TestOutboundForwarding(t *testing.T) { require.Equal(t, "response", string(streamResponse)) s.Close() } - err = lb.Close() - require.NoError(t, err) + pp.Close() }) } } @@ -145,10 +144,10 @@ func TestInboundForwarding(t *testing.T) { }, { name: "error - not connected to service node", - protocols: []protocol.ID{otherProtocol}, + protocols: testProtocols, doNotConnect: true, - willErrorOpening: true, - willErrorReading: false, + willErrorOpening: false, + willErrorReading: true, registerHandler: true, }, { @@ -181,11 +180,11 @@ func TestInboundForwarding(t *testing.T) { tn := setupTestNet(ctx, t, peers) testClock := clock.NewMock() testClock.Set(startTime) - lb, err := NewLoadBalancer(tn.loadBalancer, map[peer.ID][]protocol.ID{ + pp, err := NewProtocolProxy(tn.proxyNode, map[peer.ID][]protocol.ID{ peers.serviceNode.id: testProtocols, }) require.NoError(t, err) - lb.Start(ctx) + pp.Start(ctx) if testCase.registerHandler { handler := func(s network.Stream) { defer s.Close() @@ -210,14 +209,14 @@ func TestInboundForwarding(t *testing.T) { } if !testCase.doNotConnect { err = tn.serviceNode.Connect(ctx, peer.AddrInfo{ - ID: peers.loadBalancer.id, + ID: peers.proxyNode.id, Addrs: []multiaddr.Multiaddr{ - peers.loadBalancer.multiAddr, + peers.proxyNode.multiAddr, }, }) require.NoError(t, err) } - s, err := tn.publicNode.NewStream(tn.ctx, tn.loadBalancer.ID(), testCase.protocols...) + s, err := tn.publicNode.NewStream(tn.ctx, tn.proxyNode.ID(), testCase.protocols...) if testCase.willErrorOpening { require.Error(t, err) } else { @@ -236,8 +235,7 @@ func TestInboundForwarding(t *testing.T) { s.Close() } } - err = lb.Close() - require.NoError(t, err) + pp.Close() }) } } @@ -245,7 +243,7 @@ func TestInboundForwarding(t *testing.T) { type testNet struct { ctx context.Context t *testing.T - loadBalancer host.Host + proxyNode host.Host serviceNode host.Host otherServiceNode host.Host publicNode host.Host @@ -259,7 +257,7 @@ type peerInfo struct { } type peerInfos struct { - loadBalancer peerInfo + proxyNode peerInfo serviceNode peerInfo otherServiceNode peerInfo publicNode peerInfo @@ -267,7 +265,7 @@ type peerInfos struct { func setupTestNet(ctx context.Context, t *testing.T, pis peerInfos) *testNet { mn := mocknet.New() - lb, err := mn.AddPeer(pis.loadBalancer.key, pis.loadBalancer.multiAddr) + prn, err := mn.AddPeer(pis.proxyNode.key, pis.proxyNode.multiAddr) require.NoError(t, err) sn, err := mn.AddPeer(pis.serviceNode.key, pis.serviceNode.multiAddr) require.NoError(t, err) @@ -280,7 +278,7 @@ func setupTestNet(ctx context.Context, t *testing.T, pis peerInfos) *testNet { tn := &testNet{ t: t, ctx: ctx, - loadBalancer: lb, + proxyNode: prn, serviceNode: sn, otherServiceNode: osn, publicNode: pn, @@ -289,7 +287,7 @@ func setupTestNet(ctx context.Context, t *testing.T, pis peerInfos) *testNet { } func (tn *testNet) openOutboundForwardingRequest(write func(io.Writer) error) (*messages.ForwardingResponse, network.Stream) { - s, err := tn.serviceNode.NewStream(tn.ctx, tn.loadBalancer.ID(), ForwardingProtocolID) + s, err := tn.serviceNode.NewStream(tn.ctx, tn.proxyNode.ID(), ForwardingProtocolID) require.NoError(tn.t, err) err = write(s) require.NoError(tn.t, err) @@ -318,7 +316,7 @@ func makePeer(t *testing.T) peerInfo { func makePeers(t *testing.T) peerInfos { return peerInfos{ - loadBalancer: makePeer(t), + proxyNode: makePeer(t), serviceNode: makePeer(t), otherServiceNode: makePeer(t), publicNode: makePeer(t), diff --git a/loadbalancer/protocols.go b/protocolproxy/protocols.go similarity index 54% rename from loadbalancer/protocols.go rename to protocolproxy/protocols.go index 6986a14e5..4025c9b69 100644 --- a/loadbalancer/protocols.go +++ b/protocolproxy/protocols.go @@ -1,11 +1,8 @@ -package loadbalancer +package protocolproxy import ( "github.com/libp2p/go-libp2p-core/protocol" ) -// RegisterRoutingProtocolID identifies the protocol for registering routes with a libp2p load balancer -const RegisterRoutingProtocolID protocol.ID = "/libp2p/balancer/register-routing/0.0.1" - // ForwardingProtocolID identifies the protocol for requesting forwarding of a protocol for the libp2p load balancer const ForwardingProtocolID protocol.ID = "/libp2p/balancer/forwarding/0.0.1"