From 9020bfbbe5b1b5a0ea4924ac77f1ad6aeffcb40a Mon Sep 17 00:00:00 2001 From: Vlad Date: Wed, 13 Dec 2017 15:37:54 +0200 Subject: [PATCH 1/3] whisper: messge bundling --- whisper/whisperv6/peer.go | 25 +++++++++++++++---------- whisper/whisperv6/whisper.go | 24 +++++++++++++----------- 2 files changed, 28 insertions(+), 21 deletions(-) diff --git a/whisper/whisperv6/peer.go b/whisper/whisperv6/peer.go index ac7b3b12b659..be6fc68c3ba2 100644 --- a/whisper/whisperv6/peer.go +++ b/whisper/whisperv6/peer.go @@ -149,21 +149,26 @@ func (peer *Peer) expire() { // broadcast iterates over the collection of envelopes and transmits yet unknown // ones over the network. func (p *Peer) broadcast() error { - var cnt int envelopes := p.host.Envelopes() + bundle := make([]*Envelope, 0, len(envelopes)) for _, envelope := range envelopes { if !p.marked(envelope) { - err := p2p.Send(p.ws, messagesCode, envelope) - if err != nil { - return err - } else { - p.mark(envelope) - cnt++ - } + bundle = append(bundle, envelope) } } - if cnt > 0 { - log.Trace("broadcast", "num. messages", cnt) + + // transmit the unknown batch (potentially empty) + if err := p2p.Send(p.ws, messagesCode, bundle); err != nil { + return err + } + + // mark envelopes only if they were successfully sent + for _, e := range bundle { + p.mark(e) + } + + if len(bundle) > 0 { + log.Trace("broadcast", "num. messages", len(bundle)) } return nil } diff --git a/whisper/whisperv6/whisper.go b/whisper/whisperv6/whisper.go index d09baab3fddb..c7f1fa13c343 100644 --- a/whisper/whisperv6/whisper.go +++ b/whisper/whisperv6/whisper.go @@ -515,18 +515,20 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { log.Warn("unxepected status message received", "peer", p.peer.ID()) case messagesCode: // decode the contained envelopes - var envelope Envelope - if err := packet.Decode(&envelope); err != nil { - log.Warn("failed to decode envelope, peer will be disconnected", "peer", p.peer.ID(), "err", err) - return errors.New("invalid envelope") + var envelopes []*Envelope + if err := packet.Decode(&envelopes); err != nil { + log.Warn("failed to decode envelopes, peer will be disconnected", "peer", p.peer.ID(), "err", err) + return errors.New("invalid envelopes") } - cached, err := wh.add(&envelope) - if err != nil { - log.Warn("bad envelope received, peer will be disconnected", "peer", p.peer.ID(), "err", err) - return errors.New("invalid envelope") - } - if cached { - p.mark(&envelope) + for _, env := range envelopes { + cached, err := wh.add(env) + if err != nil { + log.Warn("bad envelope received, peer will be disconnected", "peer", p.peer.ID(), "err", err) + return errors.New("invalid envelope") + } + if cached { + p.mark(env) + } } case p2pCode: // peer-to-peer message, sent directly to peer bypassing PoW checks, etc. From b7ecc495c309e91ba8514a3aa9a38d92a89057f0 Mon Sep 17 00:00:00 2001 From: Vlad Date: Wed, 20 Dec 2017 09:18:48 +0200 Subject: [PATCH 2/3] whisper: process all received envelopes, do not send empty bundle --- whisper/whisperv6/peer.go | 18 +++++++++--------- whisper/whisperv6/whisper.go | 13 ++++++++++--- 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/whisper/whisperv6/peer.go b/whisper/whisperv6/peer.go index be6fc68c3ba2..ffc39505effa 100644 --- a/whisper/whisperv6/peer.go +++ b/whisper/whisperv6/peer.go @@ -157,17 +157,17 @@ func (p *Peer) broadcast() error { } } - // transmit the unknown batch (potentially empty) - if err := p2p.Send(p.ws, messagesCode, bundle); err != nil { - return err - } + if len(bundle) > 0 { + // transmit the batch of envelopes + if err := p2p.Send(p.ws, messagesCode, bundle); err != nil { + return err + } - // mark envelopes only if they were successfully sent - for _, e := range bundle { - p.mark(e) - } + // mark envelopes only if they were successfully sent + for _, e := range bundle { + p.mark(e) + } - if len(bundle) > 0 { log.Trace("broadcast", "num. messages", len(bundle)) } return nil diff --git a/whisper/whisperv6/whisper.go b/whisper/whisperv6/whisper.go index c7f1fa13c343..307d2abe3444 100644 --- a/whisper/whisperv6/whisper.go +++ b/whisper/whisperv6/whisper.go @@ -520,16 +520,23 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { log.Warn("failed to decode envelopes, peer will be disconnected", "peer", p.peer.ID(), "err", err) return errors.New("invalid envelopes") } + + var trouble error for _, env := range envelopes { cached, err := wh.add(env) - if err != nil { - log.Warn("bad envelope received, peer will be disconnected", "peer", p.peer.ID(), "err", err) - return errors.New("invalid envelope") + if err != nil && trouble == nil { + // only report the first occurring error + trouble = err } if cached { p.mark(env) } } + + if trouble != nil { + log.Warn("bad envelope received, peer will be disconnected", "peer", p.peer.ID(), "err", trouble) + return errors.New("invalid envelope") + } case p2pCode: // peer-to-peer message, sent directly to peer bypassing PoW checks, etc. // this message is not supposed to be forwarded to other peers, and From b044e9a0aa1e1dc3ce49b1556f4a53e4ae8285a8 Mon Sep 17 00:00:00 2001 From: Vlad Date: Wed, 20 Dec 2017 15:39:09 +0200 Subject: [PATCH 3/3] whisper: error logging changed --- whisper/whisperv6/whisper.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/whisper/whisperv6/whisper.go b/whisper/whisperv6/whisper.go index 307d2abe3444..2cc1e64f53f6 100644 --- a/whisper/whisperv6/whisper.go +++ b/whisper/whisperv6/whisper.go @@ -521,20 +521,19 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { return errors.New("invalid envelopes") } - var trouble error + trouble := false for _, env := range envelopes { cached, err := wh.add(env) - if err != nil && trouble == nil { - // only report the first occurring error - trouble = err + if err != nil { + trouble = true + log.Error("bad envelope received, peer will be disconnected", "peer", p.peer.ID(), "err", err) } if cached { p.mark(env) } } - if trouble != nil { - log.Warn("bad envelope received, peer will be disconnected", "peer", p.peer.ID(), "err", trouble) + if trouble { return errors.New("invalid envelope") } case p2pCode: