-
Notifications
You must be signed in to change notification settings - Fork 493
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
blocks service: avoid serving blocks past service shutdown #3303
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ package rpcs | |
import ( | ||
"context" | ||
"encoding/binary" | ||
"errors" | ||
"net/http" | ||
"path" | ||
"strconv" | ||
|
@@ -29,6 +30,8 @@ import ( | |
|
||
"github.com/algorand/go-codec/codec" | ||
|
||
"github.com/algorand/go-deadlock" | ||
|
||
"github.com/algorand/go-algorand/agreement" | ||
"github.com/algorand/go-algorand/config" | ||
"github.com/algorand/go-algorand/crypto" | ||
|
@@ -61,6 +64,8 @@ const ( | |
BlockAndCertValue = "blockAndCert" // block+cert request data (as the value of requestDataTypeKey) | ||
) | ||
|
||
var errBlockServiceClosed = errors.New("block service is shutting down") | ||
|
||
// BlockService represents the Block RPC API | ||
type BlockService struct { | ||
ledger *data.Ledger | ||
|
@@ -74,6 +79,7 @@ type BlockService struct { | |
enableArchiverFallback bool | ||
log logging.Logger | ||
closeWaitGroup sync.WaitGroup | ||
mu deadlock.Mutex | ||
} | ||
|
||
// EncodedBlockCert defines how GetBlockBytes encodes a block and its certificate | ||
|
@@ -118,6 +124,8 @@ func MakeBlockService(log logging.Logger, config config.Local, ledger *data.Ledg | |
|
||
// Start listening to catchup requests over ws | ||
func (bs *BlockService) Start() { | ||
bs.mu.Lock() | ||
defer bs.mu.Unlock() | ||
if bs.enableServiceOverGossip { | ||
handlers := []network.TaggedMessageHandler{ | ||
{Tag: protocol.UniCatchupReqTag, MessageHandler: network.HandlerFunc(bs.processIncomingMessage)}, | ||
|
@@ -133,12 +141,14 @@ func (bs *BlockService) Start() { | |
|
||
// Stop servicing catchup requests over ws | ||
func (bs *BlockService) Stop() { | ||
bs.mu.Lock() | ||
close(bs.stop) | ||
bs.mu.Unlock() | ||
bs.closeWaitGroup.Wait() | ||
} | ||
|
||
// ServerHTTP returns blocks | ||
// Either /v{version}/block/{round} or ?b={round}&v={version} | ||
// Either /v{version}/{genesisID}/block/{round} or ?b={round}&v={version} | ||
// Uses gorilla/mux for path argument parsing. | ||
func (bs *BlockService) ServeHTTP(response http.ResponseWriter, request *http.Request) { | ||
pathVars := mux.Vars(request) | ||
|
@@ -200,7 +210,7 @@ func (bs *BlockService) ServeHTTP(response http.ResponseWriter, request *http.Re | |
response.WriteHeader(http.StatusBadRequest) | ||
return | ||
} | ||
encodedBlockCert, err := RawBlockBytes(bs.ledger, basics.Round(round)) | ||
encodedBlockCert, err := bs.rawBlockBytes(basics.Round(round)) | ||
if err != nil { | ||
switch err.(type) { | ||
case ledgercore.ErrNoEntry: | ||
|
@@ -321,7 +331,7 @@ func (bs *BlockService) redirectRequest(round uint64, response http.ResponseWrit | |
bs.log.Debugf("redirectRequest: %s", err.Error()) | ||
return false | ||
} | ||
parsedURL.Path = FormatBlockQuery(round, parsedURL.Path, bs.net) | ||
parsedURL.Path = strings.Replace(FormatBlockQuery(round, parsedURL.Path, bs.net), "{genesisID}", bs.genesisID, 1) | ||
http.Redirect(response, request, parsedURL.String(), http.StatusTemporaryRedirect) | ||
bs.log.Debugf("redirectRequest: redirected block request to %s", parsedURL.String()) | ||
return true | ||
|
@@ -356,6 +366,22 @@ func (bs *BlockService) getRandomArchiver() (endpointAddress string) { | |
return | ||
} | ||
|
||
// rawBlockBytes returns the block/cert for a given round, while taking the lock | ||
// to ensure the block service is currently active. | ||
func (bs *BlockService) rawBlockBytes(round basics.Round) ([]byte, error) { | ||
bs.mu.Lock() | ||
defer bs.mu.Unlock() | ||
select { | ||
case _, ok := <-bs.stop: | ||
if !ok { | ||
// service is closed. | ||
return nil, errBlockServiceClosed | ||
} | ||
default: | ||
} | ||
return RawBlockBytes(bs.ledger, round) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. wonder why is |
||
} | ||
|
||
func topicBlockBytes(log logging.Logger, dataLedger *data.Ledger, round basics.Round, requestType string) network.Topics { | ||
blk, cert, err := dataLedger.EncodedBlockCert(round) | ||
if err != nil { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,7 +19,9 @@ package rpcs | |
import ( | ||
"context" | ||
"fmt" | ||
"io/ioutil" | ||
"net/http" | ||
"strings" | ||
"testing" | ||
"time" | ||
|
||
|
@@ -118,7 +120,7 @@ func TestHandleCatchupReqNegative(t *testing.T) { | |
require.Equal(t, roundNumberParseErrMsg, string(val)) | ||
} | ||
|
||
// TestRedirectBasic tests the case when the block service redirects the request to elsewhere | ||
// TestRedirectFallbackArchiver tests the case when the block service fallback to another in the absense of a given block. | ||
func TestRedirectFallbackArchiver(t *testing.T) { | ||
partitiontest.PartitionTest(t) | ||
|
||
|
@@ -136,8 +138,8 @@ func TestRedirectFallbackArchiver(t *testing.T) { | |
net2 := &httpTestPeerSource{} | ||
|
||
config := config.GetDefaultLocal() | ||
bs1 := MakeBlockService(log, config, ledger1, net1, "{genesisID}") | ||
bs2 := MakeBlockService(log, config, ledger2, net2, "{genesisID}") | ||
bs1 := MakeBlockService(log, config, ledger1, net1, "test-genesis-ID") | ||
bs2 := MakeBlockService(log, config, ledger2, net2, "test-genesis-ID") | ||
|
||
nodeA := &basicRPCNode{} | ||
nodeB := &basicRPCNode{} | ||
|
@@ -159,6 +161,7 @@ func TestRedirectFallbackArchiver(t *testing.T) { | |
|
||
ctx := context.Background() | ||
parsedURL.Path = FormatBlockQuery(uint64(2), parsedURL.Path, net1) | ||
parsedURL.Path = strings.Replace(parsedURL.Path, "{genesisID}", "test-genesis-ID", 1) | ||
blockURL := parsedURL.String() | ||
request, err := http.NewRequest("GET", blockURL, nil) | ||
require.NoError(t, err) | ||
|
@@ -170,6 +173,58 @@ func TestRedirectFallbackArchiver(t *testing.T) { | |
require.NoError(t, err) | ||
|
||
require.Equal(t, http.StatusOK, response.StatusCode) | ||
bodyData, err := ioutil.ReadAll(response.Body) | ||
require.NoError(t, err) | ||
require.NotEqual(t, 0, len(bodyData)) | ||
} | ||
|
||
// TestBlockServiceShutdown tests that the block service is shutting down correctly. | ||
func TestBlockServiceShutdown(t *testing.T) { | ||
partitiontest.PartitionTest(t) | ||
|
||
log := logging.TestingLog(t) | ||
|
||
ledger1 := makeLedger(t, "l1") | ||
addBlock(t, ledger1) | ||
|
||
net1 := &httpTestPeerSource{} | ||
|
||
config := config.GetDefaultLocal() | ||
bs1 := MakeBlockService(log, config, ledger1, net1, "test-genesis-ID") | ||
bs1.Start() | ||
|
||
nodeA := &basicRPCNode{} | ||
|
||
nodeA.RegisterHTTPHandler(BlockServiceBlockPath, bs1) | ||
nodeA.start() | ||
defer nodeA.stop() | ||
|
||
parsedURL, err := network.ParseHostOrURL(nodeA.rootURL()) | ||
require.NoError(t, err) | ||
|
||
client := http.Client{} | ||
|
||
ctx := context.Background() | ||
parsedURL.Path = FormatBlockQuery(uint64(1), parsedURL.Path, net1) | ||
parsedURL.Path = strings.Replace(parsedURL.Path, "{genesisID}", "test-genesis-ID", 1) | ||
blockURL := parsedURL.String() | ||
request, err := http.NewRequest("GET", blockURL, nil) | ||
require.NoError(t, err) | ||
requestCtx, requestCancel := context.WithTimeout(ctx, time.Duration(config.CatchupHTTPBlockFetchTimeoutSec)*time.Second) | ||
defer requestCancel() | ||
request = request.WithContext(requestCtx) | ||
network.SetUserAgentHeader(request.Header) | ||
|
||
requestDone := make(chan struct{}) | ||
go func() { | ||
defer close(requestDone) | ||
client.Do(request) | ||
}() | ||
|
||
bs1.Stop() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe stop first and then start a goroutine? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The intent here is to create the "contention" so that the request would be made before/while/after the |
||
ledger1.Close() | ||
|
||
<-requestDone | ||
} | ||
|
||
// TestRedirectBasic tests the case when the block service redirects the request to elsewhere | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't genesisID also be in ?b={round}&v={version}&g={genesisID} ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm pretty sure that the latter form is not being used. But the former was obviously incorrect, so I fixed it..
Feel free to fix the latter.