Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Simplify code in fetch and syncer pkgs #5453

Closed
wants to merge 12 commits into from
73 changes: 30 additions & 43 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,7 @@
// unprocessed contains requests that are not processed
unprocessed map[types.Hash32]*request
// ongoing contains requests that have been processed and are waiting for responses
ongoing map[types.Hash32]*request
// batched contains batched ongoing requests.
batched map[types.Hash32]*batchInfo
ongoing map[types.Hash32]*request
batchTimeout *time.Ticker
mu sync.Mutex
onlyOnce sync.Once
Expand Down Expand Up @@ -236,7 +234,6 @@
servers: map[string]requester{},
unprocessed: make(map[types.Hash32]*request),
ongoing: make(map[types.Hash32]*request),
batched: make(map[types.Hash32]*batchInfo),
hashToPeers: NewHashPeersCache(cacheSize),
}
for _, opt := range opts {
Expand Down Expand Up @@ -399,7 +396,7 @@
}

// receive Data from message server and call response handlers accordingly.
func (f *Fetch) receiveResponse(data []byte) {
func (f *Fetch) receiveResponse(data []byte, batch *batchInfo) {
if f.stopped() {
return
}
Expand All @@ -414,14 +411,13 @@
log.Stringer("batch_hash", response.ID),
log.Int("num_hashes", len(response.Responses)),
)
f.mu.Lock()
batch, ok := f.batched[response.ID]
delete(f.batched, response.ID)
f.mu.Unlock()

if !ok {
f.logger.With().Warning("unknown batch response received, or already received",
log.Stringer("batch_hash", response.ID))
if batch.ID != response.ID {
f.logger.With().Warning(
"unknown batch response received",
log.Stringer("expected", batch.ID),
log.Stringer("response", response.ID),
)

Check warning on line 420 in fetch/fetch.go

View check run for this annotation

Codecov / codecov/patch

fetch/fetch.go#L416-L420

Added lines #L416 - L420 were not covered by tests
return
}

Expand Down Expand Up @@ -620,51 +616,43 @@
if f.stopped() {
return
}
f.mu.Lock()
f.batched[batch.ID] = batch
f.mu.Unlock()
f.logger.With().Debug("sending batched request to peer",
log.Stringer("batch_hash", batch.ID),
log.Int("num_requests", len(batch.Requests)),
log.Stringer("peer", peer),
)
// Request is asynchronous,
// Request is synchronous,
poszu marked this conversation as resolved.
Show resolved Hide resolved
// it will return errors only if size of the bytes buffer is large
// or target peer is not connected
start := time.Now()
errf := func(err error) {
f.logger.With().Warning("failed to send batch",
log.Stringer("batch_hash", peer), log.Err(err),
go func() {
start := time.Now()
data, err := f.servers[hashProtocol].Request(
f.shutdownCtx,
peer,
codec.MustEncode(&batch.RequestBatch),
)
f.peers.OnFailure(peer)
f.handleHashError(batch.ID, err)
}
err := f.servers[hashProtocol].Request(
f.shutdownCtx,
peer,
codec.MustEncode(&batch.RequestBatch),
func(buf []byte) {
if err == nil {
f.peers.OnLatency(peer, time.Since(start))
f.receiveResponse(buf)
},
errf,
)
if err != nil {
errf(err)
}
f.receiveResponse(data, batch)
} else {
f.logger.With().Warning(
"failed to send batch",
log.Stringer("batch", batch.ID),
log.Stringer("peer", peer),
log.Err(err),
)
f.peers.OnFailure(peer)
f.handleHashError(batch, err)
}
}()
}

// handleHashError is called when an error occurred processing batches of the following hashes.
func (f *Fetch) handleHashError(batchHash types.Hash32, err error) {
func (f *Fetch) handleHashError(batch *batchInfo, err error) {
f.mu.Lock()
defer f.mu.Unlock()

f.logger.With().Debug("failed batch fetch", log.Stringer("batch_hash", batchHash), log.Err(err))
batch, ok := f.batched[batchHash]
if !ok {
f.logger.With().Error("batch not found", log.Stringer("batch_hash", batchHash))
return
}
f.logger.With().Debug("failed batch fetch", log.Stringer("batch_hash", batch.ID), log.Err(err))
for _, br := range batch.Requests {
req, ok := f.ongoing[br.Hash]
if !ok {
Expand All @@ -680,7 +668,6 @@
close(req.promise.completed)
delete(f.ongoing, req.hash)
}
delete(f.batched, batchHash)
}

// getHash is the regular buffered call to get a specific hash, using provided hash, h as hint the receiving end will
Expand Down
76 changes: 36 additions & 40 deletions fetch/fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,24 +186,22 @@ func TestFetch_RequestHashBatchFromPeers(t *testing.T) {
Data: []byte("b"),
}
f.mHashS.EXPECT().
Request(gomock.Any(), peer, gomock.Any(), gomock.Any(), gomock.Any()).
DoAndReturn(
func(_ context.Context, _ p2p.Peer, req []byte, okFunc func([]byte), _ func(error)) error {
if tc.nErr != nil {
return tc.nErr
}
var rb RequestBatch
err := codec.Decode(req, &rb)
require.NoError(t, err)
resBatch := ResponseBatch{
ID: rb.ID,
Responses: []ResponseMessage{res0, res1},
}
bts, err := codec.Encode(&resBatch)
require.NoError(t, err)
okFunc(bts)
return nil
})
Request(gomock.Any(), peer, gomock.Any()).
DoAndReturn(func(_ context.Context, _ p2p.Peer, req []byte) ([]byte, error) {
if tc.nErr != nil {
return nil, tc.nErr
}
var rb RequestBatch
err := codec.Decode(req, &rb)
require.NoError(t, err)
resBatch := ResponseBatch{
ID: rb.ID,
Responses: []ResponseMessage{res0, res1},
}
bts, err := codec.Encode(&resBatch)
require.NoError(t, err)
return bts, nil
})

var p0, p1 []*promise
// query each hash twice
Expand Down Expand Up @@ -253,28 +251,26 @@ func TestFetch_Loop_BatchRequestMax(t *testing.T) {
h2 := types.RandomHash()
h3 := types.RandomHash()
f.mHashS.EXPECT().
Request(gomock.Any(), peer, gomock.Any(), gomock.Any(), gomock.Any()).
DoAndReturn(
func(_ context.Context, _ p2p.Peer, req []byte, okFunc func([]byte), _ func(error)) error {
var rb RequestBatch
err := codec.Decode(req, &rb)
require.NoError(t, err)
resps := make([]ResponseMessage, 0, len(rb.Requests))
for _, r := range rb.Requests {
resps = append(resps, ResponseMessage{
Hash: r.Hash,
Data: []byte("a"),
})
}
resBatch := ResponseBatch{
ID: rb.ID,
Responses: resps,
}
bts, err := codec.Encode(&resBatch)
require.NoError(t, err)
okFunc(bts)
return nil
}).
Request(gomock.Any(), peer, gomock.Any()).
DoAndReturn(func(_ context.Context, _ p2p.Peer, req []byte) ([]byte, error) {
var rb RequestBatch
err := codec.Decode(req, &rb)
require.NoError(t, err)
resps := make([]ResponseMessage, 0, len(rb.Requests))
for _, r := range rb.Requests {
resps = append(resps, ResponseMessage{
Hash: r.Hash,
Data: []byte("a"),
})
}
resBatch := ResponseBatch{
ID: rb.ID,
Responses: resps,
}
bts, err := codec.Encode(&resBatch)
require.NoError(t, err)
return bts, nil
}).
Times(2)
// 3 requests with batch size 2 -> 2 sends

Expand Down
2 changes: 1 addition & 1 deletion fetch/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

type requester interface {
Run(context.Context) error
Request(context.Context, p2p.Peer, []byte, func([]byte), func(error)) error
Request(context.Context, p2p.Peer, []byte) ([]byte, error)
}

// The ValidatorFunc type is an adapter to allow the use of functions as
Expand Down
Loading