Skip to content

Commit

Permalink
Deprecate support of reverse catchup protocol ( part 3 ) (algorand#1916)
Browse files Browse the repository at this point in the history
In this PR, network v1 cleanup 3rd iteration:
- Simplified requestBlock return
- Stopped the use of UniCatchupReqTag in the requests (to eventually eliminate it in the next version)
- Storing the tag in the fetcher structs and passing it via NewOverGossip is not needed, because the fetcher will not use only a single tag value, and have it hard coded in the request function.
   - Removed the tag parameter from NewOverGossip interface 
   - Removed the tag field from the structs WsFetcher and wsFetcherClient
  • Loading branch information
algonautshant authored Feb 18, 2021
1 parent 05cab8e commit 6f996c5
Show file tree
Hide file tree
Showing 8 changed files with 26 additions and 48 deletions.
7 changes: 3 additions & 4 deletions catchup/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type FetcherFactory interface {
// Create a new fetcher
New() Fetcher
// Create a new fetcher that also fetches from backup peers over gossip network utilising given message tag
NewOverGossip(requestTag protocol.Tag) Fetcher
NewOverGossip() Fetcher
}

// NetworkFetcherFactory creates network fetchers
Expand Down Expand Up @@ -117,15 +117,14 @@ func (factory NetworkFetcherFactory) New() Fetcher {
// NewOverGossip returns a fetcher using the given message tag.
// If there are gossip peers, then it returns a fetcher over gossip
// Otherwise, it returns an HTTP fetcher
// We should never build two fetchers utilising the same tag. Why?
func (factory NetworkFetcherFactory) NewOverGossip(tag protocol.Tag) Fetcher {
func (factory NetworkFetcherFactory) NewOverGossip() Fetcher {
gossipPeers := factory.net.GetPeers(network.PeersConnectedIn)
factory.log.Debugf("%d gossip peers", len(gossipPeers))
if len(gossipPeers) == 0 {
factory.log.Info("no gossip peers for NewOverGossip")
return factory.New()
}
f := MakeWsFetcher(factory.log, tag, gossipPeers, factory.cfg)
f := MakeWsFetcher(factory.log, gossipPeers, factory.cfg)
return &ComposedFetcher{fetchers: []Fetcher{factory.New(), f}}
}

Expand Down
4 changes: 2 additions & 2 deletions catchup/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,7 @@ func makeTestUnicastPeer(gn network.GossipNode, version string, t *testing.T) ne
// A quick GetBlock over websockets test hitting a mocked websocket server (no actual connection)
func TestGetBlockWS(t *testing.T) {
// test the WS fetcher:
// 1. fetcher sends UniCatchupReqTag to http peer
// 1. fetcher sends UniEnsBlockReqTag to http peer
// 2. peer send message to gossip node
// 3. gossip node send message to ledger service
// 4. ledger service responds with UniCatchupResTag sending it back to the http peer
Expand Down Expand Up @@ -885,7 +885,7 @@ func TestGetBlockWS(t *testing.T) {
require.True(t, ok)
factory := MakeNetworkFetcherFactory(net, numberOfPeers, &cfg)
factory.log = logging.TestingLog(t)
fetcher := factory.NewOverGossip(protocol.UniCatchupReqTag)
fetcher := factory.NewOverGossip()
// we have one peer, the Ws block server
require.Equal(t, fetcher.NumPeers(), 1)

Expand Down
6 changes: 3 additions & 3 deletions catchup/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func (s *Service) pipelineCallback(fetcher Fetcher, r basics.Round, thisFetchCom

// TODO the following code does not handle the following case: seedLookback upgrades during fetch
func (s *Service) pipelinedFetch(seedLookback uint64) {
fetcher := s.fetcherFactory.NewOverGossip(protocol.UniCatchupReqTag)
fetcher := s.fetcherFactory.NewOverGossip()
defer fetcher.Close()

// make sure that we have at least one peer
Expand Down Expand Up @@ -557,7 +557,7 @@ func (s *Service) syncCert(cert *PendingUnmatchedCertificate) {
// TODO this doesn't actually use the digest from cert!
func (s *Service) fetchRound(cert agreement.Certificate, verifier *agreement.AsyncVoteVerifier) {
blockHash := bookkeeping.BlockHash(cert.Proposal.BlockDigest) // semantic digest (i.e., hash of the block header), not byte-for-byte digest
fetcher := s.latestRoundFetcherFactory.NewOverGossip(protocol.UniEnsBlockReqTag)
fetcher := s.latestRoundFetcherFactory.NewOverGossip()
defer func() {
fetcher.Close()
}()
Expand All @@ -567,7 +567,7 @@ func (s *Service) fetchRound(cert agreement.Certificate, verifier *agreement.Asy
// refresh peers and try again
logging.Base().Warn("fetchRound found no outgoing peers")
s.net.RequestConnectOutgoing(true, s.ctx.Done())
fetcher = s.latestRoundFetcherFactory.NewOverGossip(protocol.UniEnsBlockReqTag)
fetcher = s.latestRoundFetcherFactory.NewOverGossip()
}
// Ask the fetcher to get the block somehow
block, fetchedCert, rpcc, err := s.innerFetch(fetcher, cert.Round)
Expand Down
2 changes: 1 addition & 1 deletion catchup/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (factory *MockedFetcherFactory) New() Fetcher {
return factory.fetcher
}

func (factory *MockedFetcherFactory) NewOverGossip(tag protocol.Tag) Fetcher {
func (factory *MockedFetcherFactory) NewOverGossip() Fetcher {
return factory.New()
}

Expand Down
38 changes: 12 additions & 26 deletions catchup/wsFetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ const numBufferedInternalMsg = 1
// a custom websockets interface (bidirectional). Internally it keeps track
// of multiple peers and handles dropping them appropriately using a NetworkFetcher.
type WsFetcher struct {
tag protocol.Tag // domain separation per request

f *NetworkFetcher
clients map[network.Peer]*wsFetcherClient
config *config.Local
Expand All @@ -53,20 +51,18 @@ type WsFetcher struct {
}

// MakeWsFetcher creates a fetcher that fetches over the gossip network.
// It instantiates a NetworkFetcher under the hood, registers as a handler for the given message tag,
// It instantiates a NetworkFetcher under the hood,
// and demuxes messages appropriately to the corresponding fetcher clients.
func MakeWsFetcher(log logging.Logger, tag protocol.Tag, peers []network.Peer, cfg *config.Local) Fetcher {
func MakeWsFetcher(log logging.Logger, peers []network.Peer, cfg *config.Local) Fetcher {
f := &WsFetcher{
log: log,
tag: tag,
config: cfg,
}
f.clients = make(map[network.Peer]*wsFetcherClient)
p := make([]FetcherClient, len(peers))
for i, peer := range peers {
fc := &wsFetcherClient{
target: peer.(network.UnicastPeer),
tag: f.tag,
pendingCtxs: make(map[context.Context]context.CancelFunc),
config: cfg,
}
Expand Down Expand Up @@ -105,7 +101,6 @@ func (wsf *WsFetcher) Close() {
// a stub fetcherClient to satisfy the NetworkFetcher interface
type wsFetcherClient struct {
target network.UnicastPeer // the peer where we're going to send the request.
tag protocol.Tag // the tag that is associated with the request/
pendingCtxs map[context.Context]context.CancelFunc // a map of all the current pending contexts.
config *config.Local

Expand Down Expand Up @@ -134,17 +129,14 @@ func (w *wsFetcherClient) GetBlockBytes(ctx context.Context, r basics.Round) ([]
delete(w.pendingCtxs, childCtx)
}()

resp, err := w.requestBlock(childCtx, r)
blockBytes, err := w.requestBlock(childCtx, r)
if err != nil {
return nil, err
}
if resp.Error != "" {
return nil, fmt.Errorf("wsFetcherClient(%d): server error, %v", r, resp.Error)
}
if len(resp.BlockBytes) == 0 {
if len(blockBytes) == 0 {
return nil, fmt.Errorf("wsFetcherClient(%d): empty response", r)
}
return resp.BlockBytes, nil
return blockBytes, nil
}

// Address implements FetcherClient
Expand All @@ -165,7 +157,7 @@ func (w *wsFetcherClient) Close() error {
}

// requestBlock send a request for block <round> and wait until it receives a response or a context expires.
func (w *wsFetcherClient) requestBlock(ctx context.Context, round basics.Round) (rpcs.WsGetBlockOut, error) {
func (w *wsFetcherClient) requestBlock(ctx context.Context, round basics.Round) ([]byte, error) {
roundBin := make([]byte, binary.MaxVarintLen64)
binary.PutUvarint(roundBin, uint64(round))
topics := network.Topics{
Expand All @@ -175,33 +167,27 @@ func (w *wsFetcherClient) requestBlock(ctx context.Context, round basics.Round)
rpcs.RoundKey,
roundBin),
}
resp, err := w.target.Request(ctx, w.tag, topics)
resp, err := w.target.Request(ctx, protocol.UniEnsBlockReqTag, topics)
if err != nil {
return rpcs.WsGetBlockOut{}, fmt.Errorf("wsFetcherClient(%s).requestBlock(%d): Request failed, %v", w.target.GetAddress(), round, err)
return nil, fmt.Errorf("wsFetcherClient(%s).requestBlock(%d): Request failed, %v", w.target.GetAddress(), round, err)
}

if errMsg, found := resp.Topics.GetValue(network.ErrorKey); found {
return rpcs.WsGetBlockOut{}, fmt.Errorf("wsFetcherClient(%s).requestBlock(%d): Request failed, %s", w.target.GetAddress(), round, string(errMsg))
return nil, fmt.Errorf("wsFetcherClient(%s).requestBlock(%d): Request failed, %s", w.target.GetAddress(), round, string(errMsg))
}

blk, found := resp.Topics.GetValue(rpcs.BlockDataKey)
if !found {
return rpcs.WsGetBlockOut{}, fmt.Errorf("wsFetcherClient(%s): request failed: block data not found", w.target.GetAddress())
return nil, fmt.Errorf("wsFetcherClient(%s): request failed: block data not found", w.target.GetAddress())
}
cert, found := resp.Topics.GetValue(rpcs.CertDataKey)
if !found {
return rpcs.WsGetBlockOut{}, fmt.Errorf("wsFetcherClient(%s): request failed: cert data not found", w.target.GetAddress())
return nil, fmt.Errorf("wsFetcherClient(%s): request failed: cert data not found", w.target.GetAddress())
}

// For backward compatibility, the block and cert are repackaged here.
// This can be dropeed once the v1 is dropped.
blockCertBytes := protocol.EncodeReflect(rpcs.PreEncodedBlockCert{
Block: blk,
Certificate: cert})

wsBlockOut := rpcs.WsGetBlockOut{
Round: uint64(round),
BlockBytes: blockCertBytes,
}
return wsBlockOut, nil
return blockCertBytes, nil
}
2 changes: 1 addition & 1 deletion network/wsNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1425,7 +1425,7 @@ func handleTopicRequest(msg IncomingMessage) (out OutgoingMessage) {

// Set up two nodes, test topics send/recieve is working
func TestWebsocketNetworkTopicRoundtrip(t *testing.T) {
var topicMsgReqTag Tag = protocol.UniCatchupReqTag
var topicMsgReqTag Tag = protocol.UniEnsBlockReqTag
netA := makeTestWebsocketNode(t)
netA.config.GossipFanout = 1
netA.Start()
Expand Down
2 changes: 1 addition & 1 deletion protocol/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const (
ProposalPayloadTag Tag = "PP"
TopicMsgRespTag Tag = "TS"
TxnTag Tag = "TX"
UniCatchupReqTag Tag = "UC"
UniCatchupReqTag Tag = "UC" //Replaced by UniEnsBlockReqTag. Only for backward compatibility.
UniEnsBlockReqTag Tag = "UE"
//UniEnsBlockResTag Tag = "US" was used for wsfetcherservice
//UniCatchupResTag Tag = "UT" was used for wsfetcherservice
Expand Down
13 changes: 3 additions & 10 deletions rpcs/blockService.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (bs *BlockService) Start() {
bs.net.RegisterHandlers(handlers)
}
bs.stop = make(chan struct{})
go bs.ListenForCatchupReq(bs.catchupReqs, bs.stop)
go bs.listenForCatchupReq(bs.catchupReqs, bs.stop)
}

// Stop servicing catchup requests over ws
Expand Down Expand Up @@ -210,13 +210,6 @@ func (bs *BlockService) ServeHTTP(response http.ResponseWriter, request *http.Re
}
}

// WsGetBlockOut is a msgpack message delivered on responding to a block (not rpc-based though)
type WsGetBlockOut struct {
Round uint64
Error string
BlockBytes []byte `json:"blockbytes"`
}

func (bs *BlockService) processIncomingMessage(msg network.IncomingMessage) (n network.OutgoingMessage) {
// don't block - just stick in a slightly buffered channel if possible
select {
Expand All @@ -227,8 +220,8 @@ func (bs *BlockService) processIncomingMessage(msg network.IncomingMessage) (n n
return
}

// ListenForCatchupReq handles catchup getblock request
func (bs *BlockService) ListenForCatchupReq(reqs <-chan network.IncomingMessage, stop chan struct{}) {
// listenForCatchupReq handles catchup getblock request
func (bs *BlockService) listenForCatchupReq(reqs <-chan network.IncomingMessage, stop chan struct{}) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for {
Expand Down

0 comments on commit 6f996c5

Please sign in to comment.