Skip to content

Commit

Permalink
chore: rollback modification to network pkg
Browse files Browse the repository at this point in the history
  • Loading branch information
EclesioMeloJunior committed Apr 26, 2023
1 parent 16e8ee6 commit 3eba164
Show file tree
Hide file tree
Showing 6 changed files with 12 additions and 138 deletions.
8 changes: 2 additions & 6 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,11 @@ const (

// the following are sub-protocols used by the node
syncID = "/sync/2"
warpSyncID = "/sync/warp"
lightID = "/light/2"
blockAnnounceID = "/block-announces/1"
transactionsID = "/transactions/1"

warpSyncMaxResponseSize = 16 * 1024 * 1024
maxMessageSize = 1024 * 64 // 64kb for now
maxMessageSize = 1024 * 64 // 64kb for now
)

var (
Expand Down Expand Up @@ -249,8 +247,6 @@ func (s *Service) Start() error {
}

s.host.registerStreamHandler(s.host.protocolID+syncID, s.handleSyncStream)
// TODO: enable this protocol to receive request from other nodes
//s.host.registerStreamHandler(s.host.protocolID+warpSync, s.handleSyncStream)
s.host.registerStreamHandler(s.host.protocolID+lightID, s.handleLightStream)

// register block announce protocol
Expand Down Expand Up @@ -590,7 +586,7 @@ func (s *Service) NetworkState() common.NetworkState {
}
}

func (s *Service) TotalConnectedPeers() []peer.ID {
func (s *Service) AllConnectedPeers() []peer.ID {
return s.host.p2pHost.Network().Peers()
}

Expand Down
61 changes: 2 additions & 59 deletions dot/network/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,37 +14,8 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
)

var (
blockRequestTimeout = time.Second * 20
)

func (s *Service) RequestWarpProof(to peer.ID, request *WarpSyncProofRequestMessage) (warpSyncResponse interface{}, err error) {
legacyWarpSyncID := s.host.protocolID + warpSyncID

s.host.p2pHost.ConnManager().Protect(to, "")
defer s.host.p2pHost.ConnManager().Unprotect(to, "")

ctx, cancel := context.WithTimeout(s.ctx, blockRequestTimeout)
defer cancel()

stream, err := s.host.p2pHost.NewStream(ctx, to, legacyWarpSyncID)
if err != nil {
return nil, err
}

defer func() {
err := stream.Close()
if err != nil {
logger.Warnf("failed to close stream: %s", err)
}
}()

if err = s.host.writeToStream(stream, request); err != nil {
return nil, err
}

return s.handleWarpSyncProofResponse(stream)
}
var blockRequestTimeout = time.Second * 20
var ErrReceivedEmptyMessage = errors.New("received empty message")

// DoBlockRequest sends a request to the given peer.
// If a response is received within a certain time period, it is returned,
Expand Down Expand Up @@ -77,34 +48,6 @@ func (s *Service) DoBlockRequest(to peer.ID, req *BlockRequestMessage) (*BlockRe
return s.receiveBlockResponse(stream)
}

func (s *Service) handleWarpSyncProofResponse(stream libp2pnetwork.Stream) (interface{}, error) {
s.blockResponseBufMu.Lock()
defer s.blockResponseBufMu.Unlock()

// TODO: should we create another buffer pool for warp proof response buffers?
buf := s.blockResponseBuf

n, err := readStream(stream, &buf, warpSyncMaxResponseSize)
if err != nil {
return nil, fmt.Errorf("reading warp sync stream: %w", err)
}

if n == 0 {
return nil, fmt.Errorf("empty warp sync proof")
}

fmt.Printf("WARP PROOF BYTES ---> %v\n", buf[:n])
warpProof := new(WarpSyncProofResponse)
err = warpProof.Decode(buf[:n])
if err != nil {
panic(fmt.Sprintf("failed to decode warp proof: %s", err))
}
fmt.Printf("WARP PROOF ---> %v\n", warpProof)
return nil, nil
}

var ErrReceivedEmptyMessage = errors.New("received empty message")

func (s *Service) receiveBlockResponse(stream libp2pnetwork.Stream) (*BlockResponseMessage, error) {
// allocating a new (large) buffer every time slows down the syncing by a dramatic amount,
// as malloc is one of the most CPU intensive tasks.
Expand Down
65 changes: 0 additions & 65 deletions dot/network/warp_sync_message.go

This file was deleted.

2 changes: 1 addition & 1 deletion dot/sync/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type Network interface {
// ReportPeer reports peer based on the peer behaviour.
ReportPeer(change peerset.ReputationChange, p peer.ID)

TotalConnectedPeers() []peer.ID
AllConnectedPeers() []peer.ID
}

// Telemetry is the telemetry client to send telemetry messages.
Expand Down
12 changes: 6 additions & 6 deletions dot/sync/mocks_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion dot/sync/worker_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func newSyncWorkerPool(net Network) *syncWorkerPool {
const ignorePeerTimeout = 2 * time.Minute

func (s *syncWorkerPool) useConnectedPeers() {
connectedPeers := s.network.TotalConnectedPeers()
connectedPeers := s.network.AllConnectedPeers()

s.l.Lock()
defer s.l.Unlock()
Expand Down

0 comments on commit 3eba164

Please sign in to comment.