diff --git a/rpcs/blockService.go b/rpcs/blockService.go index 2e77aba8e5..698751b257 100644 --- a/rpcs/blockService.go +++ b/rpcs/blockService.go @@ -19,6 +19,7 @@ package rpcs import ( "context" "encoding/binary" + "errors" "net/http" "path" "strconv" @@ -29,6 +30,8 @@ import ( "github.com/algorand/go-codec/codec" + "github.com/algorand/go-deadlock" + "github.com/algorand/go-algorand/agreement" "github.com/algorand/go-algorand/config" "github.com/algorand/go-algorand/crypto" @@ -61,6 +64,8 @@ const ( BlockAndCertValue = "blockAndCert" // block+cert request data (as the value of requestDataTypeKey) ) +var errBlockServiceClosed = errors.New("block service is shutting down") + // BlockService represents the Block RPC API type BlockService struct { ledger *data.Ledger @@ -74,6 +79,7 @@ type BlockService struct { enableArchiverFallback bool log logging.Logger closeWaitGroup sync.WaitGroup + mu deadlock.Mutex } // EncodedBlockCert defines how GetBlockBytes encodes a block and its certificate @@ -118,6 +124,8 @@ func MakeBlockService(log logging.Logger, config config.Local, ledger *data.Ledg // Start listening to catchup requests over ws func (bs *BlockService) Start() { + bs.mu.Lock() + defer bs.mu.Unlock() if bs.enableServiceOverGossip { handlers := []network.TaggedMessageHandler{ {Tag: protocol.UniCatchupReqTag, MessageHandler: network.HandlerFunc(bs.processIncomingMessage)}, @@ -133,12 +141,14 @@ func (bs *BlockService) Start() { // Stop servicing catchup requests over ws func (bs *BlockService) Stop() { + bs.mu.Lock() close(bs.stop) + bs.mu.Unlock() bs.closeWaitGroup.Wait() } // ServerHTTP returns blocks -// Either /v{version}/block/{round} or ?b={round}&v={version} +// Either /v{version}/{genesisID}/block/{round} or ?b={round}&v={version} // Uses gorilla/mux for path argument parsing. func (bs *BlockService) ServeHTTP(response http.ResponseWriter, request *http.Request) { pathVars := mux.Vars(request) @@ -200,7 +210,7 @@ func (bs *BlockService) ServeHTTP(response http.ResponseWriter, request *http.Re response.WriteHeader(http.StatusBadRequest) return } - encodedBlockCert, err := RawBlockBytes(bs.ledger, basics.Round(round)) + encodedBlockCert, err := bs.rawBlockBytes(basics.Round(round)) if err != nil { switch err.(type) { case ledgercore.ErrNoEntry: @@ -321,7 +331,7 @@ func (bs *BlockService) redirectRequest(round uint64, response http.ResponseWrit bs.log.Debugf("redirectRequest: %s", err.Error()) return false } - parsedURL.Path = FormatBlockQuery(round, parsedURL.Path, bs.net) + parsedURL.Path = strings.Replace(FormatBlockQuery(round, parsedURL.Path, bs.net), "{genesisID}", bs.genesisID, 1) http.Redirect(response, request, parsedURL.String(), http.StatusTemporaryRedirect) bs.log.Debugf("redirectRequest: redirected block request to %s", parsedURL.String()) return true @@ -356,6 +366,22 @@ func (bs *BlockService) getRandomArchiver() (endpointAddress string) { return } +// rawBlockBytes returns the block/cert for a given round, while taking the lock +// to ensure the block service is currently active. +func (bs *BlockService) rawBlockBytes(round basics.Round) ([]byte, error) { + bs.mu.Lock() + defer bs.mu.Unlock() + select { + case _, ok := <-bs.stop: + if !ok { + // service is closed. + return nil, errBlockServiceClosed + } + default: + } + return RawBlockBytes(bs.ledger, round) +} + func topicBlockBytes(log logging.Logger, dataLedger *data.Ledger, round basics.Round, requestType string) network.Topics { blk, cert, err := dataLedger.EncodedBlockCert(round) if err != nil { diff --git a/rpcs/blockService_test.go b/rpcs/blockService_test.go index 542e8783ed..828f2265a9 100644 --- a/rpcs/blockService_test.go +++ b/rpcs/blockService_test.go @@ -19,7 +19,9 @@ package rpcs import ( "context" "fmt" + "io/ioutil" "net/http" + "strings" "testing" "time" @@ -118,7 +120,7 @@ func TestHandleCatchupReqNegative(t *testing.T) { require.Equal(t, roundNumberParseErrMsg, string(val)) } -// TestRedirectBasic tests the case when the block service redirects the request to elsewhere +// TestRedirectFallbackArchiver tests the case when the block service fallback to another in the absense of a given block. func TestRedirectFallbackArchiver(t *testing.T) { partitiontest.PartitionTest(t) @@ -136,8 +138,8 @@ func TestRedirectFallbackArchiver(t *testing.T) { net2 := &httpTestPeerSource{} config := config.GetDefaultLocal() - bs1 := MakeBlockService(log, config, ledger1, net1, "{genesisID}") - bs2 := MakeBlockService(log, config, ledger2, net2, "{genesisID}") + bs1 := MakeBlockService(log, config, ledger1, net1, "test-genesis-ID") + bs2 := MakeBlockService(log, config, ledger2, net2, "test-genesis-ID") nodeA := &basicRPCNode{} nodeB := &basicRPCNode{} @@ -159,6 +161,7 @@ func TestRedirectFallbackArchiver(t *testing.T) { ctx := context.Background() parsedURL.Path = FormatBlockQuery(uint64(2), parsedURL.Path, net1) + parsedURL.Path = strings.Replace(parsedURL.Path, "{genesisID}", "test-genesis-ID", 1) blockURL := parsedURL.String() request, err := http.NewRequest("GET", blockURL, nil) require.NoError(t, err) @@ -170,6 +173,58 @@ func TestRedirectFallbackArchiver(t *testing.T) { require.NoError(t, err) require.Equal(t, http.StatusOK, response.StatusCode) + bodyData, err := ioutil.ReadAll(response.Body) + require.NoError(t, err) + require.NotEqual(t, 0, len(bodyData)) +} + +// TestBlockServiceShutdown tests that the block service is shutting down correctly. +func TestBlockServiceShutdown(t *testing.T) { + partitiontest.PartitionTest(t) + + log := logging.TestingLog(t) + + ledger1 := makeLedger(t, "l1") + addBlock(t, ledger1) + + net1 := &httpTestPeerSource{} + + config := config.GetDefaultLocal() + bs1 := MakeBlockService(log, config, ledger1, net1, "test-genesis-ID") + bs1.Start() + + nodeA := &basicRPCNode{} + + nodeA.RegisterHTTPHandler(BlockServiceBlockPath, bs1) + nodeA.start() + defer nodeA.stop() + + parsedURL, err := network.ParseHostOrURL(nodeA.rootURL()) + require.NoError(t, err) + + client := http.Client{} + + ctx := context.Background() + parsedURL.Path = FormatBlockQuery(uint64(1), parsedURL.Path, net1) + parsedURL.Path = strings.Replace(parsedURL.Path, "{genesisID}", "test-genesis-ID", 1) + blockURL := parsedURL.String() + request, err := http.NewRequest("GET", blockURL, nil) + require.NoError(t, err) + requestCtx, requestCancel := context.WithTimeout(ctx, time.Duration(config.CatchupHTTPBlockFetchTimeoutSec)*time.Second) + defer requestCancel() + request = request.WithContext(requestCtx) + network.SetUserAgentHeader(request.Header) + + requestDone := make(chan struct{}) + go func() { + defer close(requestDone) + client.Do(request) + }() + + bs1.Stop() + ledger1.Close() + + <-requestDone } // TestRedirectBasic tests the case when the block service redirects the request to elsewhere