Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

blocks service: avoid serving blocks past service shutdown #3303

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions rpcs/blockService.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package rpcs
import (
"context"
"encoding/binary"
"errors"
"net/http"
"path"
"strconv"
Expand All @@ -29,6 +30,8 @@ import (

"github.com/algorand/go-codec/codec"

"github.com/algorand/go-deadlock"

"github.com/algorand/go-algorand/agreement"
"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/crypto"
Expand Down Expand Up @@ -61,6 +64,8 @@ const (
BlockAndCertValue = "blockAndCert" // block+cert request data (as the value of requestDataTypeKey)
)

var errBlockServiceClosed = errors.New("block service is shutting down")

// BlockService represents the Block RPC API
type BlockService struct {
ledger *data.Ledger
Expand All @@ -74,6 +79,7 @@ type BlockService struct {
enableArchiverFallback bool
log logging.Logger
closeWaitGroup sync.WaitGroup
mu deadlock.Mutex
}

// EncodedBlockCert defines how GetBlockBytes encodes a block and its certificate
Expand Down Expand Up @@ -118,6 +124,8 @@ func MakeBlockService(log logging.Logger, config config.Local, ledger *data.Ledg

// Start listening to catchup requests over ws
func (bs *BlockService) Start() {
bs.mu.Lock()
defer bs.mu.Unlock()
if bs.enableServiceOverGossip {
handlers := []network.TaggedMessageHandler{
{Tag: protocol.UniCatchupReqTag, MessageHandler: network.HandlerFunc(bs.processIncomingMessage)},
Expand All @@ -133,12 +141,14 @@ func (bs *BlockService) Start() {

// Stop servicing catchup requests over ws
func (bs *BlockService) Stop() {
bs.mu.Lock()
close(bs.stop)
bs.mu.Unlock()
bs.closeWaitGroup.Wait()
}

// ServerHTTP returns blocks
// Either /v{version}/block/{round} or ?b={round}&v={version}
// Either /v{version}/{genesisID}/block/{round} or ?b={round}&v={version}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't genesisID also be in ?b={round}&v={version}&g={genesisID} ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't genesisID also be in ?b={round}&v={version}&g={genesisID} ?

I'm pretty sure that the latter form is not being used. But the former was obviously incorrect, so I fixed it..
Feel free to fix the latter.

// Uses gorilla/mux for path argument parsing.
func (bs *BlockService) ServeHTTP(response http.ResponseWriter, request *http.Request) {
pathVars := mux.Vars(request)
Expand Down Expand Up @@ -200,7 +210,7 @@ func (bs *BlockService) ServeHTTP(response http.ResponseWriter, request *http.Re
response.WriteHeader(http.StatusBadRequest)
return
}
encodedBlockCert, err := RawBlockBytes(bs.ledger, basics.Round(round))
encodedBlockCert, err := bs.rawBlockBytes(basics.Round(round))
if err != nil {
switch err.(type) {
case ledgercore.ErrNoEntry:
Expand Down Expand Up @@ -321,7 +331,7 @@ func (bs *BlockService) redirectRequest(round uint64, response http.ResponseWrit
bs.log.Debugf("redirectRequest: %s", err.Error())
return false
}
parsedURL.Path = FormatBlockQuery(round, parsedURL.Path, bs.net)
parsedURL.Path = strings.Replace(FormatBlockQuery(round, parsedURL.Path, bs.net), "{genesisID}", bs.genesisID, 1)
http.Redirect(response, request, parsedURL.String(), http.StatusTemporaryRedirect)
bs.log.Debugf("redirectRequest: redirected block request to %s", parsedURL.String())
return true
Expand Down Expand Up @@ -356,6 +366,22 @@ func (bs *BlockService) getRandomArchiver() (endpointAddress string) {
return
}

// rawBlockBytes returns the block/cert for a given round, while taking the lock
// to ensure the block service is currently active.
func (bs *BlockService) rawBlockBytes(round basics.Round) ([]byte, error) {
bs.mu.Lock()
defer bs.mu.Unlock()
select {
case _, ok := <-bs.stop:
if !ok {
// service is closed.
return nil, errBlockServiceClosed
}
default:
}
return RawBlockBytes(bs.ledger, round)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wonder why is RawBlockBytes a part of block service since it has nothing to do with bs... but this is a separate topic

}

func topicBlockBytes(log logging.Logger, dataLedger *data.Ledger, round basics.Round, requestType string) network.Topics {
blk, cert, err := dataLedger.EncodedBlockCert(round)
if err != nil {
Expand Down
61 changes: 58 additions & 3 deletions rpcs/blockService_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package rpcs
import (
"context"
"fmt"
"io/ioutil"
"net/http"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -118,7 +120,7 @@ func TestHandleCatchupReqNegative(t *testing.T) {
require.Equal(t, roundNumberParseErrMsg, string(val))
}

// TestRedirectBasic tests the case when the block service redirects the request to elsewhere
// TestRedirectFallbackArchiver tests the case when the block service fallback to another in the absense of a given block.
func TestRedirectFallbackArchiver(t *testing.T) {
partitiontest.PartitionTest(t)

Expand All @@ -136,8 +138,8 @@ func TestRedirectFallbackArchiver(t *testing.T) {
net2 := &httpTestPeerSource{}

config := config.GetDefaultLocal()
bs1 := MakeBlockService(log, config, ledger1, net1, "{genesisID}")
bs2 := MakeBlockService(log, config, ledger2, net2, "{genesisID}")
bs1 := MakeBlockService(log, config, ledger1, net1, "test-genesis-ID")
bs2 := MakeBlockService(log, config, ledger2, net2, "test-genesis-ID")

nodeA := &basicRPCNode{}
nodeB := &basicRPCNode{}
Expand All @@ -159,6 +161,7 @@ func TestRedirectFallbackArchiver(t *testing.T) {

ctx := context.Background()
parsedURL.Path = FormatBlockQuery(uint64(2), parsedURL.Path, net1)
parsedURL.Path = strings.Replace(parsedURL.Path, "{genesisID}", "test-genesis-ID", 1)
blockURL := parsedURL.String()
request, err := http.NewRequest("GET", blockURL, nil)
require.NoError(t, err)
Expand All @@ -170,6 +173,58 @@ func TestRedirectFallbackArchiver(t *testing.T) {
require.NoError(t, err)

require.Equal(t, http.StatusOK, response.StatusCode)
bodyData, err := ioutil.ReadAll(response.Body)
require.NoError(t, err)
require.NotEqual(t, 0, len(bodyData))
}

// TestBlockServiceShutdown tests that the block service is shutting down correctly.
func TestBlockServiceShutdown(t *testing.T) {
partitiontest.PartitionTest(t)

log := logging.TestingLog(t)

ledger1 := makeLedger(t, "l1")
addBlock(t, ledger1)

net1 := &httpTestPeerSource{}

config := config.GetDefaultLocal()
bs1 := MakeBlockService(log, config, ledger1, net1, "test-genesis-ID")
bs1.Start()

nodeA := &basicRPCNode{}

nodeA.RegisterHTTPHandler(BlockServiceBlockPath, bs1)
nodeA.start()
defer nodeA.stop()

parsedURL, err := network.ParseHostOrURL(nodeA.rootURL())
require.NoError(t, err)

client := http.Client{}

ctx := context.Background()
parsedURL.Path = FormatBlockQuery(uint64(1), parsedURL.Path, net1)
parsedURL.Path = strings.Replace(parsedURL.Path, "{genesisID}", "test-genesis-ID", 1)
blockURL := parsedURL.String()
request, err := http.NewRequest("GET", blockURL, nil)
require.NoError(t, err)
requestCtx, requestCancel := context.WithTimeout(ctx, time.Duration(config.CatchupHTTPBlockFetchTimeoutSec)*time.Second)
defer requestCancel()
request = request.WithContext(requestCtx)
network.SetUserAgentHeader(request.Header)

requestDone := make(chan struct{})
go func() {
defer close(requestDone)
client.Do(request)
}()

bs1.Stop()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe stop first and then start a goroutine?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intent here is to create the "contention" so that the request would be made before/while/after the Stop. i.e. this is where the bug was, so I was trying to reproduce it. Moving the Stop before the go routine eliminate some of that randomness..

ledger1.Close()

<-requestDone
}

// TestRedirectBasic tests the case when the block service redirects the request to elsewhere
Expand Down