From 347fcb31fafe77d9d7a355440ca3d982e15937af Mon Sep 17 00:00:00 2001 From: Vlad Date: Fri, 22 Dec 2017 19:11:36 +0200 Subject: [PATCH 1/6] whisper: bloom filter logic introduced --- whisper/whisperv6/api.go | 25 ++++++-- whisper/whisperv6/doc.go | 2 +- whisper/whisperv6/envelope.go | 35 +++++++++- whisper/whisperv6/peer.go | 16 ++++- whisper/whisperv6/whisper.go | 117 +++++++++++++++++++++++++++++----- 5 files changed, 170 insertions(+), 25 deletions(-) diff --git a/whisper/whisperv6/api.go b/whisper/whisperv6/api.go index 3dddb69539f0..f11e8b6dca5c 100644 --- a/whisper/whisperv6/api.go +++ b/whisper/whisperv6/api.go @@ -113,15 +113,32 @@ func (api *PublicWhisperAPI) Info(ctx context.Context) Info { // SetMaxMessageSize sets the maximum message size that is accepted. // Upper limit is defined by MaxMessageSize. func (api *PublicWhisperAPI) SetMaxMessageSize(ctx context.Context, size uint32) (bool, error) { - return true, api.w.SetMaxMessageSize(size) + err := api.w.SetMaxMessageSize(size) + if err != nil { + return false, err + } + return true, nil } -// SetMinPow sets the minimum PoW for a message before it is accepted. +// SetMinPow sets the minimum PoW, and notifies the peers. func (api *PublicWhisperAPI) SetMinPoW(ctx context.Context, pow float64) (bool, error) { - return true, api.w.SetMinimumPoW(pow) + err := api.w.SetMinimumPoW(pow) + if err != nil { + return false, err + } + return true, nil +} + +// SetBloomFilter sets the new value of bloom filter, and notifies the peers. +func (api *PublicWhisperAPI) SetBloomFilter(ctx context.Context, bloom hexutil.Bytes) (bool, error) { + err := api.w.SetBloomFilter(bloom) + if err != nil { + return false, err + } + return true, nil } -// MarkTrustedPeer marks a peer trusted. , which will allow it to send historic (expired) messages. +// MarkTrustedPeer marks a peer trusted, which will allow it to send historic (expired) messages. // Note: This function is not adding new nodes, the node needs to exists as a peer. func (api *PublicWhisperAPI) MarkTrustedPeer(ctx context.Context, enode string) (bool, error) { n, err := discover.ParseNode(enode) diff --git a/whisper/whisperv6/doc.go b/whisper/whisperv6/doc.go index 2a4911d65aa3..7d3bdbdcad92 100644 --- a/whisper/whisperv6/doc.go +++ b/whisper/whisperv6/doc.go @@ -35,7 +35,6 @@ import ( ) const ( - EnvelopeVersion = uint64(0) ProtocolVersion = uint64(6) ProtocolVersionStr = "6.0" ProtocolName = "shh" @@ -57,6 +56,7 @@ const ( aesKeyLength = 32 AESNonceLength = 12 keyIdSize = 32 + bloomFilterSize = 64 MaxMessageSize = uint32(10 * 1024 * 1024) // maximum accepted size of a message. DefaultMaxMessageSize = uint32(1024 * 1024) diff --git a/whisper/whisperv6/envelope.go b/whisper/whisperv6/envelope.go index 676df669bf7e..7f2532c2c905 100644 --- a/whisper/whisperv6/envelope.go +++ b/whisper/whisperv6/envelope.go @@ -42,9 +42,11 @@ type Envelope struct { Data []byte Nonce uint64 - pow float64 // Message-specific PoW as described in the Whisper specification. - hash common.Hash // Cached hash of the envelope to avoid rehashing every time. - // Don't access hash directly, use Hash() function instead. + pow float64 // Message-specific PoW as described in the Whisper specification. + + // the following variables should not be accessed directly, use the corresponding function instead: Hash(), Bloom() + hash common.Hash // Cached hash of the envelope to avoid rehashing every time. + bloom []byte } // size returns the size of envelope as it is sent (i.e. public fields only) @@ -227,3 +229,30 @@ func (e *Envelope) Open(watcher *Filter) (msg *ReceivedMessage) { } return msg } + +// Bloom maps 4-bytes Topic into 64-byte bloom filter with 3 bits set (at most). +func (e *Envelope) Bloom() []byte { + if e.bloom == nil { + e.bloom = TopicToBloom(e.Topic) + } + return e.bloom +} + +func TopicToBloom(topic TopicType) []byte { + var powers = [...]byte{1, 2, 4, 8, 16, 32, 64, 128} + b := make([]byte, bloomFilterSize) + var index [3]int + for j := 0; j < 3; j++ { + index[j] = int(topic[j]) + if (topic[3] & powers[j]) != 0 { + index[j] += 256 + } + } + + for j := 0; j < 3; j++ { + byteIndex := index[j] / 8 + bitIndex := index[j] % 8 + b[byteIndex] = powers[bitIndex] + } + return b +} diff --git a/whisper/whisperv6/peer.go b/whisper/whisperv6/peer.go index 65e0c77b0c3f..7893e4a9ad63 100644 --- a/whisper/whisperv6/peer.go +++ b/whisper/whisperv6/peer.go @@ -36,6 +36,7 @@ type Peer struct { trusted bool powRequirement float64 + bloomFilter []byte known *set.Set // Messages already known by the peer to avoid wasting bandwidth @@ -156,7 +157,7 @@ func (p *Peer) broadcast() error { envelopes := p.host.Envelopes() bundle := make([]*Envelope, 0, len(envelopes)) for _, envelope := range envelopes { - if !p.marked(envelope) && envelope.PoW() >= p.powRequirement { + if !p.marked(envelope) && envelope.PoW() >= p.powRequirement && p.bloomMatch(envelope) { bundle = append(bundle, envelope) } } @@ -186,3 +187,16 @@ func (p *Peer) notifyAboutPowRequirementChange(pow float64) error { i := math.Float64bits(pow) return p2p.Send(p.ws, powRequirementCode, i) } + +func (p *Peer) notifyAboutBloomFilterChange(bloom []byte) error { + return p2p.Send(p.ws, bloomFilterExCode, bloom) +} + +func (p *Peer) bloomMatch(env *Envelope) bool { + if p.bloomFilter == nil { + // no filter - full node, accepts all envelops + return true + } + + return bloomFilterMatch(p.bloomFilter, env.Bloom()) +} diff --git a/whisper/whisperv6/whisper.go b/whisper/whisperv6/whisper.go index 492591486b94..99f0167274b6 100644 --- a/whisper/whisperv6/whisper.go +++ b/whisper/whisperv6/whisper.go @@ -48,9 +48,10 @@ type Statistics struct { } const ( - minPowIdx = iota // Minimal PoW required by the whisper node - maxMsgSizeIdx = iota // Maximal message length allowed by the whisper node - overflowIdx = iota // Indicator of message queue overflow + minPowIdx = iota // Minimal PoW required by the whisper node + maxMsgSizeIdx = iota // Maximal message length allowed by the whisper node + overflowIdx = iota // Indicator of message queue overflow + bloomFilterIdx = iota // Bloom filter for topics of interest for this node ) // Whisper represents a dark communication interface through the Ethereum @@ -131,6 +132,11 @@ func (w *Whisper) MinPow() float64 { return val.(float64) } +func (w *Whisper) BloomFilter() []byte { + val, _ := w.settings.Load(bloomFilterIdx) + return val.([]byte) +} + // MaxMessageSize returns the maximum accepted message size. func (w *Whisper) MaxMessageSize() uint32 { val, _ := w.settings.Load(maxMsgSizeIdx) @@ -180,6 +186,23 @@ func (w *Whisper) SetMaxMessageSize(size uint32) error { return nil } +// SetBloomFilter sets the new bloom filter +func (w *Whisper) SetBloomFilter(bloom []byte) error { + if len(bloom) != bloomFilterSize { + return fmt.Errorf("invalid bloom filter size: %d", len(bloom)) + } + + w.notifyPeersAboutBloomFilterChange(bloom) + + go func() { + // allow some time before all the peers have processed the notification + time.Sleep(time.Duration(w.reactionAllowance) * time.Second) + w.settings.Store(bloomFilterIdx, bloom) + }() + + return nil +} + // SetMinimumPoW sets the minimal PoW required by this node func (w *Whisper) SetMinimumPoW(val float64) error { if val < 0.0 { @@ -203,17 +226,14 @@ func (w *Whisper) SetMinimumPowTest(val float64) { w.settings.Store(minPowIdx, val) } -func (w *Whisper) notifyPeersAboutPowRequirementChange(pow float64) { - arr := make([]*Peer, len(w.peers)) - i := 0 - - w.peerMu.Lock() - for p := range w.peers { - arr[i] = p - i++ - } - w.peerMu.Unlock() +// SetBloomFilterTest sets the Bloom Filter in test environment +func (w *Whisper) SetBloomFilterTest(bloom []byte) { + w.notifyPeersAboutBloomFilterChange(bloom) + w.settings.Store(minPowIdx, bloom) +} +func (w *Whisper) notifyPeersAboutPowRequirementChange(pow float64) { + arr := w.getPeers() for _, p := range arr { err := p.notifyAboutPowRequirementChange(pow) if err != nil { @@ -221,11 +241,37 @@ func (w *Whisper) notifyPeersAboutPowRequirementChange(pow float64) { err = p.notifyAboutPowRequirementChange(pow) } if err != nil { - log.Warn("oversized message received", "peer", p.ID(), "error", err) + log.Warn("failed to notify peer about new pow requirement", "peer", p.ID(), "error", err) + } + } +} + +func (w *Whisper) notifyPeersAboutBloomFilterChange(bloom []byte) { + arr := w.getPeers() + for _, p := range arr { + err := p.notifyAboutBloomFilterChange(bloom) + if err != nil { + // allow one retry + err = p.notifyAboutBloomFilterChange(bloom) + } + if err != nil { + log.Warn("failed to notify peer about new bloom filter", "peer", p.ID(), "error", err) } } } +func (w *Whisper) getPeers() []*Peer { + arr := make([]*Peer, len(w.peers)) + i := 0 + w.peerMu.Lock() + for p := range w.peers { + arr[i] = p + i++ + } + w.peerMu.Unlock() + return arr +} + // getPeer retrieves peer by ID func (w *Whisper) getPeer(peerID []byte) (*Peer, error) { w.peerMu.Lock() @@ -592,7 +638,21 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { } p.powRequirement = f case bloomFilterExCode: - // to be implemented + var bloom []byte + err := packet.Decode(&bloom) + if err == nil && len(bloom) != bloomFilterSize { + err = fmt.Errorf("wrong bloom filter size %d", len(bloom)) + } + + if err != nil { + log.Warn("failed to decode bloom filter exchange message, peer will be disconnected", "peer", p.peer.ID(), "err", err) + return errors.New("invalid bloom filter exchange message") + } + if isFulNode(bloom) { + p.bloomFilter = nil + } else { + p.bloomFilter = bloom + } case p2pMessageCode: // peer-to-peer message, sent directly to peer bypassing PoW checks, etc. // this message is not supposed to be forwarded to other peers, and @@ -659,7 +719,11 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) { return false, nil // drop envelope without error for now // once the status message includes the PoW requirement, an error should be returned here: - //return false, fmt.Errorf("envelope with low PoW dropped: PoW=%f, hash=[%v]", envelope.PoW(), envelope.Hash().Hex()) + //return false, fmt.Errorf("envelope with low PoW received: PoW=%f, hash=[%v]", envelope.PoW(), envelope.Hash().Hex()) + } + + if !bloomFilterMatch(wh.BloomFilter(), envelope.Bloom()) { + return false, fmt.Errorf("envelope does not match bloom filter, hash=[%v]", envelope.Hash().Hex()) } hash := envelope.Hash() @@ -897,3 +961,24 @@ func GenerateRandomID() (id string, err error) { id = common.Bytes2Hex(buf) return id, err } + +func isFulNode(bloom []byte) bool { + for _, b := range bloom { + if b != 255 { + return false + } + } + return true +} + +func bloomFilterMatch(filter, sample []byte) bool { + for i := 0; i < bloomFilterSize; i++ { + f := filter[i] + s := sample[i] + if ((f | s) ^ f) != 0 { + return false + } + } + + return true +} From 787054a378ea1d53ca80d9534db7d073a7643ee2 Mon Sep 17 00:00:00 2001 From: Vlad Date: Tue, 2 Jan 2018 12:59:15 +0200 Subject: [PATCH 2/6] whisper: pow exchange and bloom exchange protocols implemented --- whisper/whisperv6/doc.go | 4 +- whisper/whisperv6/peer.go | 2 +- whisper/whisperv6/whisper.go | 125 +++++++++++++++++++++++++++-------- 3 files changed, 102 insertions(+), 29 deletions(-) diff --git a/whisper/whisperv6/doc.go b/whisper/whisperv6/doc.go index 7d3bdbdcad92..062f3ba62c5e 100644 --- a/whisper/whisperv6/doc.go +++ b/whisper/whisperv6/doc.go @@ -68,8 +68,8 @@ const ( expirationCycle = time.Second transmissionCycle = 300 * time.Millisecond - DefaultTTL = 50 // seconds - SynchAllowance = 10 // seconds + DefaultTTL = 50 // seconds + DefaultSyncAllowance = 10 // seconds EnvelopeHeaderLength = 20 ) diff --git a/whisper/whisperv6/peer.go b/whisper/whisperv6/peer.go index 7893e4a9ad63..b21f07e03183 100644 --- a/whisper/whisperv6/peer.go +++ b/whisper/whisperv6/peer.go @@ -36,7 +36,7 @@ type Peer struct { trusted bool powRequirement float64 - bloomFilter []byte + bloomFilter []byte // may contain nil in case of full node known *set.Set // Messages already known by the peer to avoid wasting bandwidth diff --git a/whisper/whisperv6/whisper.go b/whisper/whisperv6/whisper.go index 99f0167274b6..9ffd7faa2468 100644 --- a/whisper/whisperv6/whisper.go +++ b/whisper/whisperv6/whisper.go @@ -48,10 +48,12 @@ type Statistics struct { } const ( - minPowIdx = iota // Minimal PoW required by the whisper node - maxMsgSizeIdx = iota // Maximal message length allowed by the whisper node - overflowIdx = iota // Indicator of message queue overflow - bloomFilterIdx = iota // Bloom filter for topics of interest for this node + maxMsgSizeIdx = iota // Maximal message length allowed by the whisper node + overflowIdx = iota // Indicator of message queue overflow + minPowIdx = iota // Minimal PoW required by the whisper node + minPowToleranceIdx = iota // Minimal PoW tolerated by the whisper node for a limited time + bloomFilterIdx = iota // Bloom filter for topics of interest for this node + bloomFilterToleranceIdx = iota // Bloom filter tolerated by the whisper node for a limited time ) // Whisper represents a dark communication interface through the Ethereum @@ -77,7 +79,7 @@ type Whisper struct { settings syncmap.Map // holds configuration settings that can be dynamically changed - reactionAllowance int // maximum time in seconds allowed to process the whisper-related messages + syncAllowance int // maximum time in seconds allowed to process the whisper-related messages statsMu sync.Mutex // guard stats stats Statistics // Statistics of whisper node @@ -92,15 +94,15 @@ func New(cfg *Config) *Whisper { } whisper := &Whisper{ - privateKeys: make(map[string]*ecdsa.PrivateKey), - symKeys: make(map[string][]byte), - envelopes: make(map[common.Hash]*Envelope), - expirations: make(map[uint32]*set.SetNonTS), - peers: make(map[*Peer]struct{}), - messageQueue: make(chan *Envelope, messageQueueLimit), - p2pMsgQueue: make(chan *Envelope, messageQueueLimit), - quit: make(chan struct{}), - reactionAllowance: SynchAllowance, + privateKeys: make(map[string]*ecdsa.PrivateKey), + symKeys: make(map[string][]byte), + envelopes: make(map[common.Hash]*Envelope), + expirations: make(map[uint32]*set.SetNonTS), + peers: make(map[*Peer]struct{}), + messageQueue: make(chan *Envelope, messageQueueLimit), + p2pMsgQueue: make(chan *Envelope, messageQueueLimit), + quit: make(chan struct{}), + syncAllowance: DefaultSyncAllowance, } whisper.filters = NewFilters(whisper) @@ -129,11 +131,33 @@ func New(cfg *Config) *Whisper { func (w *Whisper) MinPow() float64 { val, _ := w.settings.Load(minPowIdx) + if val == nil { + return DefaultMinimumPoW + } + return val.(float64) +} + +func (w *Whisper) MinPowTolerance() float64 { + val, _ := w.settings.Load(minPowToleranceIdx) + if val == nil { + return DefaultMinimumPoW + } return val.(float64) } func (w *Whisper) BloomFilter() []byte { val, _ := w.settings.Load(bloomFilterIdx) + if val == nil { + return nil + } + return val.([]byte) +} + +func (w *Whisper) BloomFilterTolerance() []byte { + val, _ := w.settings.Load(bloomFilterToleranceIdx) + if val == nil { + return nil + } return val.([]byte) } @@ -192,12 +216,13 @@ func (w *Whisper) SetBloomFilter(bloom []byte) error { return fmt.Errorf("invalid bloom filter size: %d", len(bloom)) } + w.settings.Store(bloomFilterIdx, bloom) w.notifyPeersAboutBloomFilterChange(bloom) go func() { // allow some time before all the peers have processed the notification - time.Sleep(time.Duration(w.reactionAllowance) * time.Second) - w.settings.Store(bloomFilterIdx, bloom) + time.Sleep(time.Duration(w.syncAllowance) * time.Second) + w.settings.Store(bloomFilterToleranceIdx, bloom) }() return nil @@ -209,12 +234,13 @@ func (w *Whisper) SetMinimumPoW(val float64) error { return fmt.Errorf("invalid PoW: %f", val) } + w.settings.Store(minPowIdx, val) w.notifyPeersAboutPowRequirementChange(val) go func() { // allow some time before all the peers have processed the notification - time.Sleep(time.Duration(w.reactionAllowance) * time.Second) - w.settings.Store(minPowIdx, val) + time.Sleep(time.Duration(w.syncAllowance) * time.Second) + w.settings.Store(minPowToleranceIdx, val) }() return nil @@ -222,14 +248,16 @@ func (w *Whisper) SetMinimumPoW(val float64) error { // SetMinimumPoW sets the minimal PoW in test environment func (w *Whisper) SetMinimumPowTest(val float64) { - w.notifyPeersAboutPowRequirementChange(val) w.settings.Store(minPowIdx, val) + w.notifyPeersAboutPowRequirementChange(val) + w.settings.Store(minPowToleranceIdx, val) } // SetBloomFilterTest sets the Bloom Filter in test environment func (w *Whisper) SetBloomFilterTest(bloom []byte) { + w.settings.Store(bloomFilterIdx, bloom) w.notifyPeersAboutBloomFilterChange(bloom) - w.settings.Store(minPowIdx, bloom) + w.settings.Store(bloomFilterToleranceIdx, bloom) } func (w *Whisper) notifyPeersAboutPowRequirementChange(pow float64) { @@ -505,7 +533,28 @@ func (w *Whisper) GetSymKey(id string) ([]byte, error) { // Subscribe installs a new message handler used for filtering, decrypting // and subsequent storing of incoming messages. func (w *Whisper) Subscribe(f *Filter) (string, error) { - return w.filters.Install(f) + s, err := w.filters.Install(f) + if err == nil { + w.updateBloomFilter(f) + } + return s, err +} + +// updateBloomFilter recalculates the new value of bloom filter, +// and informs the peers if necessary. +func (w *Whisper) updateBloomFilter(f *Filter) { + aggregate := make([]byte, bloomFilterSize) + for _, t := range f.Topics { + top := BytesToTopic(t) + b := TopicToBloom(top) + aggregate = addBloom(aggregate, b) + } + + if !bloomFilterMatch(w.BloomFilter(), aggregate) { + // existing bloom filter must be updated + aggregate = addBloom(w.BloomFilter(), aggregate) + w.SetBloomFilter(aggregate) + } } // GetFilter returns the filter by id. @@ -693,7 +742,7 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) { sent := envelope.Expiry - envelope.TTL if sent > now { - if sent-SynchAllowance > now { + if sent-DefaultSyncAllowance > now { return false, fmt.Errorf("envelope created in the future [%x]", envelope.Hash()) } else { // recalculate PoW, adjusted for the time difference, plus one second for latency @@ -702,7 +751,7 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) { } if envelope.Expiry < now { - if envelope.Expiry+SynchAllowance*2 < now { + if envelope.Expiry+DefaultSyncAllowance*2 < now { return false, fmt.Errorf("very old message") } else { log.Debug("expired envelope dropped", "hash", envelope.Hash().Hex()) @@ -715,15 +764,23 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) { } if envelope.PoW() < wh.MinPow() { - log.Debug("envelope with low PoW dropped", "PoW", envelope.PoW(), "hash", envelope.Hash().Hex()) - return false, nil // drop envelope without error for now + // maybe the value was recently changed, and the peers did not adjust yet. + // some tolerance might be still allowed for a short period of adjustment time. + if envelope.PoW() < wh.MinPowTolerance() { + log.Debug("envelope with low PoW dropped", "PoW", envelope.PoW(), "hash", envelope.Hash().Hex()) + return false, nil // drop envelope without error for now + } // once the status message includes the PoW requirement, an error should be returned here: //return false, fmt.Errorf("envelope with low PoW received: PoW=%f, hash=[%v]", envelope.PoW(), envelope.Hash().Hex()) } if !bloomFilterMatch(wh.BloomFilter(), envelope.Bloom()) { - return false, fmt.Errorf("envelope does not match bloom filter, hash=[%v]", envelope.Hash().Hex()) + // maybe the value was recently changed, and the peers did not adjust yet. + // some tolerance might be still allowed for a short period of adjustment time. + if !bloomFilterMatch(wh.BloomFilterTolerance(), envelope.Bloom()) { + return false, fmt.Errorf("envelope does not match bloom filter, hash=[%v]", envelope.Hash().Hex()) + } } hash := envelope.Hash() @@ -963,6 +1020,9 @@ func GenerateRandomID() (id string, err error) { } func isFulNode(bloom []byte) bool { + if bloom == nil { + return true + } for _, b := range bloom { if b != 255 { return false @@ -972,6 +1032,11 @@ func isFulNode(bloom []byte) bool { } func bloomFilterMatch(filter, sample []byte) bool { + if filter == nil { + // full node, accepts all messages + return true + } + for i := 0; i < bloomFilterSize; i++ { f := filter[i] s := sample[i] @@ -982,3 +1047,11 @@ func bloomFilterMatch(filter, sample []byte) bool { return true } + +func addBloom(a, b []byte) []byte { + c := make([]byte, bloomFilterSize) + for i := 0; i < bloomFilterSize; i++ { + c[i] = a[i] | b[i] + } + return c +} From becaadd13da0b7671710fdf6b589bbc0e2227309 Mon Sep 17 00:00:00 2001 From: Vlad Date: Wed, 3 Jan 2018 21:16:48 +0200 Subject: [PATCH 3/6] whisper: Status message changed, more params added --- whisper/whisperv6/peer.go | 38 +++++++++- whisper/whisperv6/peer_test.go | 114 ++++++++++++++++++++++++------ whisper/whisperv6/whisper.go | 51 ++++++------- whisper/whisperv6/whisper_test.go | 61 ++++++++++++++++ 4 files changed, 213 insertions(+), 51 deletions(-) diff --git a/whisper/whisperv6/peer.go b/whisper/whisperv6/peer.go index b21f07e03183..08071c0f77be 100644 --- a/whisper/whisperv6/peer.go +++ b/whisper/whisperv6/peer.go @@ -75,8 +75,12 @@ func (p *Peer) handshake() error { // Send the handshake status message asynchronously errc := make(chan error, 1) go func() { - errc <- p2p.Send(p.ws, statusCode, ProtocolVersion) + pow := p.host.MinPow() + powConverted := math.Float64bits(pow) + bloom := p.host.BloomFilter() + errc <- p2p.SendItems(p.ws, statusCode, ProtocolVersion, powConverted, bloom) }() + // Fetch the remote status packet and verify protocol match packet, err := p.ws.ReadMsg() if err != nil { @@ -86,14 +90,42 @@ func (p *Peer) handshake() error { return fmt.Errorf("peer [%x] sent packet %x before status packet", p.ID(), packet.Code) } s := rlp.NewStream(packet.Payload, uint64(packet.Size)) - peerVersion, err := s.Uint() + _, err = s.List() if err != nil { return fmt.Errorf("peer [%x] sent bad status message: %v", p.ID(), err) } + peerVersion, err := s.Uint() + if err != nil { + return fmt.Errorf("peer [%x] sent bad status message (unable to decode version): %v", p.ID(), err) + } if peerVersion != ProtocolVersion { return fmt.Errorf("peer [%x]: protocol version mismatch %d != %d", p.ID(), peerVersion, ProtocolVersion) } - // Wait until out own status is consumed too + + // only version is mandatory, subsequent parameters are optional + powRaw, err := s.Uint() + if err == nil { + pow := math.Float64frombits(powRaw) + if math.IsInf(pow, 0) || math.IsNaN(pow) || pow < 0.0 { + return fmt.Errorf("peer [%x] sent bad status message: invalid pow", p.ID()) + } + p.powRequirement = pow + + var bloom []byte + err = s.Decode(&bloom) + if err == nil { + sz := len(bloom) + if sz != bloomFilterSize && sz != 0 { + return fmt.Errorf("peer [%x] sent bad status message: wrong bloom filter size %d", p.ID(), sz) + } + if isFullNode(bloom) { + p.bloomFilter = nil + } else { + p.bloomFilter = bloom + } + } + } + if err := <-errc; err != nil { return fmt.Errorf("peer [%x] failed to send status packet: %v", p.ID(), err) } diff --git a/whisper/whisperv6/peer_test.go b/whisper/whisperv6/peer_test.go index 599a479be490..b6d0dd02fa48 100644 --- a/whisper/whisperv6/peer_test.go +++ b/whisper/whisperv6/peer_test.go @@ -20,6 +20,7 @@ import ( "bytes" "crypto/ecdsa" "fmt" + mrand "math/rand" "net" "sync" "testing" @@ -87,6 +88,9 @@ var nodes [NumNodes]*TestNode var sharedKey []byte = []byte("some arbitrary data here") var sharedTopic TopicType = TopicType{0xF, 0x1, 0x2, 0} var expectedMessage []byte = []byte("per rectum ad astra") +var masterBloomFilter []byte +var masterPow = 0.00000001 +var round int = 1 func TestSimulation(t *testing.T) { // create a chain of whisper nodes, @@ -104,8 +108,13 @@ func TestSimulation(t *testing.T) { // check if each node have received and decrypted exactly one message checkPropagation(t, true) - // send protocol-level messages (powRequirementCode) and check the new PoW requirement values - powReqExchange(t) + // check if Status message was correctly decoded + checkBloomFilterExchange(t) + checkPowExchange(t) + + // send new pow and bloom exchange messages + resetParams(t) + round++ // node #1 sends one expected (decryptable) message sendMsg(t, true, 1) @@ -113,18 +122,65 @@ func TestSimulation(t *testing.T) { // check if each node (except node #0) have received and decrypted exactly one message checkPropagation(t, false) + for i := 1; i < NumNodes; i++ { + time.Sleep(20 * time.Millisecond) + sendMsg(t, true, i) + } + + // check if corresponding protocol-level messages were correctly decoded + checkPowExchangeForNodeZero(t) + checkBloomFilterExchange(t) + stopServers() } +func resetParams(t *testing.T) { + // change pow only for node zero + masterPow = 7777777.0 + nodes[0].shh.SetMinimumPoW(masterPow) + + // change bloom for all nodes + masterBloomFilter = TopicToBloom(sharedTopic) + for i := 0; i < NumNodes; i++ { + nodes[i].shh.SetBloomFilter(masterBloomFilter) + } +} + +func initBloom(t *testing.T) { + masterBloomFilter = make([]byte, bloomFilterSize) + _, err := mrand.Read(masterBloomFilter) + if err != nil { + t.Fatalf("rand failed: %s.", err) + } + + msgBloom := TopicToBloom(sharedTopic) + masterBloomFilter = addBloom(masterBloomFilter, msgBloom) + for i := 0; i < 32; i++ { + masterBloomFilter[i] = 0xFF + } + + if !bloomFilterMatch(masterBloomFilter, msgBloom) { + t.Fatalf("bloom mismatch on initBloom.") + } +} + func initialize(t *testing.T) { + initBloom(t) + var err error ip := net.IPv4(127, 0, 0, 1) port0 := 30303 for i := 0; i < NumNodes; i++ { var node TestNode + b := make([]byte, bloomFilterSize) + copy(b, masterBloomFilter) node.shh = New(&DefaultConfig) - node.shh.SetMinimumPowTest(0.00000001) + node.shh.SetMinimumPoW(masterPow) + node.shh.SetBloomFilter(b) + if !isBloomFilterEqual(node.shh.BloomFilter(), masterBloomFilter) { + t.Fatalf("bloom mismatch on init.") + } node.shh.Start(nil) topics := make([]TopicType, 0) topics = append(topics, sharedTopic) @@ -206,7 +262,7 @@ func checkPropagation(t *testing.T, includingNodeZero bool) { for i := first; i < NumNodes; i++ { f := nodes[i].shh.GetFilter(nodes[i].filerId) if f == nil { - t.Fatalf("failed to get filterId %s from node %d.", nodes[i].filerId, i) + t.Fatalf("failed to get filterId %s from node %d, round %d.", nodes[i].filerId, i, round) } mail := f.Retrieve() @@ -332,34 +388,52 @@ func TestPeerBasic(t *testing.T) { } } -func powReqExchange(t *testing.T) { +func checkPowExchangeForNodeZero(t *testing.T) { + cnt := 0 for i, node := range nodes { for peer := range node.shh.peers { - if peer.powRequirement > 1000.0 { - t.Fatalf("node %d: one of the peers' pow requirement is too big (%f).", i, peer.powRequirement) + if peer.peer.ID() == discover.PubkeyID(&nodes[0].id.PublicKey) { + cnt++ + if peer.powRequirement != masterPow { + t.Fatalf("node %d: failed to set the new pow requirement.", i) + } } } } + if cnt == 0 { + t.Fatalf("no matching peers found.") + } +} - const pow float64 = 7777777.0 - nodes[0].shh.SetMinimumPoW(pow) - - // wait until all the messages are delivered - time.Sleep(64 * time.Millisecond) - - cnt := 0 +func checkPowExchange(t *testing.T) { for i, node := range nodes { for peer := range node.shh.peers { - if peer.peer.ID() == discover.PubkeyID(&nodes[0].id.PublicKey) { - cnt++ - if peer.powRequirement != pow { - t.Fatalf("node %d: failed to set the new pow requirement.", i) + if peer.peer.ID() != discover.PubkeyID(&nodes[0].id.PublicKey) { + if peer.powRequirement != masterPow { + t.Fatalf("node %d: failed to exchange pow requirement in round %d; expected %f, got %f", + i, round, masterPow, peer.powRequirement) } } } } +} + +func checkBloomFilterExchange(t *testing.T) { + for i, node := range nodes { + for peer := range node.shh.peers { + if !isBloomFilterEqual(peer.bloomFilter, masterBloomFilter) { + t.Fatalf("node %d: failed to exchange bloom filter requirement in round %d. \n%x expected \n%x got", + i, round, masterBloomFilter, peer.bloomFilter) + } + } + } +} - if cnt == 0 { - t.Fatalf("no matching peers found.") +func isBloomFilterEqual(a, b []byte) bool { + for i := 0; i < bloomFilterSize; i++ { + if a[i] != b[i] { + return false + } } + return true } diff --git a/whisper/whisperv6/whisper.go b/whisper/whisperv6/whisper.go index 9ffd7faa2468..dd99bc3aae32 100644 --- a/whisper/whisperv6/whisper.go +++ b/whisper/whisperv6/whisper.go @@ -130,32 +130,32 @@ func New(cfg *Config) *Whisper { } func (w *Whisper) MinPow() float64 { - val, _ := w.settings.Load(minPowIdx) - if val == nil { + val, exist := w.settings.Load(minPowIdx) + if !exist || val == nil { return DefaultMinimumPoW } return val.(float64) } func (w *Whisper) MinPowTolerance() float64 { - val, _ := w.settings.Load(minPowToleranceIdx) - if val == nil { + val, exist := w.settings.Load(minPowToleranceIdx) + if !exist || val == nil { return DefaultMinimumPoW } return val.(float64) } func (w *Whisper) BloomFilter() []byte { - val, _ := w.settings.Load(bloomFilterIdx) - if val == nil { + val, exist := w.settings.Load(bloomFilterIdx) + if !exist || val == nil { return nil } return val.([]byte) } func (w *Whisper) BloomFilterTolerance() []byte { - val, _ := w.settings.Load(bloomFilterToleranceIdx) - if val == nil { + val, exist := w.settings.Load(bloomFilterToleranceIdx) + if !exist || val == nil { return nil } return val.([]byte) @@ -216,13 +216,16 @@ func (w *Whisper) SetBloomFilter(bloom []byte) error { return fmt.Errorf("invalid bloom filter size: %d", len(bloom)) } - w.settings.Store(bloomFilterIdx, bloom) - w.notifyPeersAboutBloomFilterChange(bloom) + b := make([]byte, bloomFilterSize) + copy(b, bloom) + + w.settings.Store(bloomFilterIdx, b) + w.notifyPeersAboutBloomFilterChange(b) go func() { // allow some time before all the peers have processed the notification time.Sleep(time.Duration(w.syncAllowance) * time.Second) - w.settings.Store(bloomFilterToleranceIdx, bloom) + w.settings.Store(bloomFilterToleranceIdx, b) }() return nil @@ -253,13 +256,6 @@ func (w *Whisper) SetMinimumPowTest(val float64) { w.settings.Store(minPowToleranceIdx, val) } -// SetBloomFilterTest sets the Bloom Filter in test environment -func (w *Whisper) SetBloomFilterTest(bloom []byte) { - w.settings.Store(bloomFilterIdx, bloom) - w.notifyPeersAboutBloomFilterChange(bloom) - w.settings.Store(bloomFilterToleranceIdx, bloom) -} - func (w *Whisper) notifyPeersAboutPowRequirementChange(pow float64) { arr := w.getPeers() for _, p := range arr { @@ -697,7 +693,7 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { log.Warn("failed to decode bloom filter exchange message, peer will be disconnected", "peer", p.peer.ID(), "err", err) return errors.New("invalid bloom filter exchange message") } - if isFulNode(bloom) { + if isFullNode(bloom) { p.bloomFilter = nil } else { p.bloomFilter = bloom @@ -765,21 +761,20 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) { if envelope.PoW() < wh.MinPow() { // maybe the value was recently changed, and the peers did not adjust yet. - // some tolerance might be still allowed for a short period of adjustment time. + // in this case the previous value is retrieved by MinPowTolerance() + // for a short period of peer synchronization. if envelope.PoW() < wh.MinPowTolerance() { - log.Debug("envelope with low PoW dropped", "PoW", envelope.PoW(), "hash", envelope.Hash().Hex()) - return false, nil // drop envelope without error for now + return false, fmt.Errorf("envelope with low PoW received: PoW=%f, hash=[%v]", envelope.PoW(), envelope.Hash().Hex()) } - - // once the status message includes the PoW requirement, an error should be returned here: - //return false, fmt.Errorf("envelope with low PoW received: PoW=%f, hash=[%v]", envelope.PoW(), envelope.Hash().Hex()) } if !bloomFilterMatch(wh.BloomFilter(), envelope.Bloom()) { // maybe the value was recently changed, and the peers did not adjust yet. - // some tolerance might be still allowed for a short period of adjustment time. + // in this case the previous value is retrieved by BloomFilterTolerance() + // for a short period of peer synchronization. if !bloomFilterMatch(wh.BloomFilterTolerance(), envelope.Bloom()) { - return false, fmt.Errorf("envelope does not match bloom filter, hash=[%v]", envelope.Hash().Hex()) + return false, fmt.Errorf("envelope does not match bloom filter, hash=[%v], bloom: \n%x \n%x \n%x", + envelope.Hash().Hex(), wh.BloomFilter(), envelope.Bloom(), envelope.Topic) } } @@ -1019,7 +1014,7 @@ func GenerateRandomID() (id string, err error) { return id, err } -func isFulNode(bloom []byte) bool { +func isFullNode(bloom []byte) bool { if bloom == nil { return true } diff --git a/whisper/whisperv6/whisper_test.go b/whisper/whisperv6/whisper_test.go index b391a1161b45..fa14acb1b1b4 100644 --- a/whisper/whisperv6/whisper_test.go +++ b/whisper/whisperv6/whisper_test.go @@ -843,3 +843,64 @@ func TestSymmetricSendKeyMismatch(t *testing.T) { t.Fatalf("received a message when keys weren't matching") } } + +func TestBloom(t *testing.T) { + topic := TopicType{0, 0, 255, 6} + b := TopicToBloom(topic) + x := make([]byte, bloomFilterSize) + x[0] = byte(1) + x[32] = byte(1) + x[bloomFilterSize-1] = byte(128) + if !bloomFilterMatch(x, b) || !bloomFilterMatch(b, x) { + t.Fatalf("bloom filter does not match the mask") + } + + _, err := mrand.Read(b) + if err != nil { + t.Fatalf("math rand error") + } + _, err = mrand.Read(x) + if err != nil { + t.Fatalf("math rand error") + } + if !bloomFilterMatch(b, b) { + t.Fatalf("bloom filter does not match self") + } + x = addBloom(x, b) + if !bloomFilterMatch(x, b) { + t.Fatalf("bloom filter does not match combined bloom") + } + if !isFullNode(nil) { + t.Fatalf("isFullNode did not recognize nil as full node") + } + x[17] = 254 + if isFullNode(x) { + t.Fatalf("isFullNode false positive") + } + for i := 0; i < bloomFilterSize; i++ { + b[i] = byte(255) + } + if !isFullNode(b) { + t.Fatalf("isFullNode false negative") + } + if bloomFilterMatch(x, b) { + t.Fatalf("bloomFilterMatch false positive") + } + if !bloomFilterMatch(b, x) { + t.Fatalf("bloomFilterMatch false negative") + } + + w := New(&DefaultConfig) + f := w.BloomFilter() + if f != nil { + t.Fatalf("wrong bloom on creation") + } + err = w.SetBloomFilter(x) + if err != nil { + t.Fatalf("failed to set bloom filter: %s", err) + } + f = w.BloomFilter() + if !bloomFilterMatch(f, x) || !bloomFilterMatch(x, f) { + t.Fatalf("retireved wrong bloom filter") + } +} From 5d71ee3a2d97d8edd2eaffab2fedfe1eccdcd378 Mon Sep 17 00:00:00 2001 From: Vlad Date: Sun, 7 Jan 2018 21:12:05 +0200 Subject: [PATCH 4/6] whisper: minor refactoring --- whisper/whisperv6/doc.go | 14 +++++------- whisper/whisperv6/envelope.go | 7 +++--- whisper/whisperv6/peer_test.go | 9 -------- whisper/whisperv6/whisper.go | 40 ++++++++++++++++++++++++++++------ 4 files changed, 43 insertions(+), 27 deletions(-) diff --git a/whisper/whisperv6/doc.go b/whisper/whisperv6/doc.go index 062f3ba62c5e..040e1b0dfc01 100644 --- a/whisper/whisperv6/doc.go +++ b/whisper/whisperv6/doc.go @@ -51,12 +51,12 @@ const ( paddingMask = byte(3) signatureFlag = byte(4) - TopicLength = 4 - signatureLength = 65 - aesKeyLength = 32 - AESNonceLength = 12 - keyIdSize = 32 - bloomFilterSize = 64 + TopicLength = 4 // in bytes + signatureLength = 65 // in bytes + aesKeyLength = 32 // in bytes + AESNonceLength = 12 // in bytes + keyIdSize = 32 // in bytes + bloomFilterSize = 64 // in bytes MaxMessageSize = uint32(10 * 1024 * 1024) // maximum accepted size of a message. DefaultMaxMessageSize = uint32(1024 * 1024) @@ -70,8 +70,6 @@ const ( DefaultTTL = 50 // seconds DefaultSyncAllowance = 10 // seconds - - EnvelopeHeaderLength = 20 ) type unknownVersionError uint64 diff --git a/whisper/whisperv6/envelope.go b/whisper/whisperv6/envelope.go index 7f2532c2c905..304608fdeb3c 100644 --- a/whisper/whisperv6/envelope.go +++ b/whisper/whisperv6/envelope.go @@ -51,6 +51,7 @@ type Envelope struct { // size returns the size of envelope as it is sent (i.e. public fields only) func (e *Envelope) size() int { + const EnvelopeHeaderLength = 20 return EnvelopeHeaderLength + len(e.Data) } @@ -238,13 +239,13 @@ func (e *Envelope) Bloom() []byte { return e.bloom } +// TopicToBloom converts the topic (4 bytes) to the bloom filter (64 bytes) func TopicToBloom(topic TopicType) []byte { - var powers = [...]byte{1, 2, 4, 8, 16, 32, 64, 128} b := make([]byte, bloomFilterSize) var index [3]int for j := 0; j < 3; j++ { index[j] = int(topic[j]) - if (topic[3] & powers[j]) != 0 { + if (topic[3] & (1 << uint(j))) != 0 { index[j] += 256 } } @@ -252,7 +253,7 @@ func TopicToBloom(topic TopicType) []byte { for j := 0; j < 3; j++ { byteIndex := index[j] / 8 bitIndex := index[j] % 8 - b[byteIndex] = powers[bitIndex] + b[byteIndex] = (1 << uint(bitIndex)) } return b } diff --git a/whisper/whisperv6/peer_test.go b/whisper/whisperv6/peer_test.go index b6d0dd02fa48..9a1f9dc8e64f 100644 --- a/whisper/whisperv6/peer_test.go +++ b/whisper/whisperv6/peer_test.go @@ -428,12 +428,3 @@ func checkBloomFilterExchange(t *testing.T) { } } } - -func isBloomFilterEqual(a, b []byte) bool { - for i := 0; i < bloomFilterSize; i++ { - if a[i] != b[i] { - return false - } - } - return true -} diff --git a/whisper/whisperv6/whisper.go b/whisper/whisperv6/whisper.go index dd99bc3aae32..e4fb273005a6 100644 --- a/whisper/whisperv6/whisper.go +++ b/whisper/whisperv6/whisper.go @@ -49,11 +49,11 @@ type Statistics struct { const ( maxMsgSizeIdx = iota // Maximal message length allowed by the whisper node - overflowIdx = iota // Indicator of message queue overflow - minPowIdx = iota // Minimal PoW required by the whisper node - minPowToleranceIdx = iota // Minimal PoW tolerated by the whisper node for a limited time - bloomFilterIdx = iota // Bloom filter for topics of interest for this node - bloomFilterToleranceIdx = iota // Bloom filter tolerated by the whisper node for a limited time + overflowIdx // Indicator of message queue overflow + minPowIdx // Minimal PoW required by the whisper node + minPowToleranceIdx // Minimal PoW tolerated by the whisper node for a limited time + bloomFilterIdx // Bloom filter for topics of interest for this node + bloomFilterToleranceIdx // Bloom filter tolerated by the whisper node for a limited time ) // Whisper represents a dark communication interface through the Ethereum @@ -129,14 +129,23 @@ func New(cfg *Config) *Whisper { return whisper } +// MinPow returns the PoW value required by this node. func (w *Whisper) MinPow() float64 { val, exist := w.settings.Load(minPowIdx) if !exist || val == nil { return DefaultMinimumPoW } - return val.(float64) + v, ok := val.(float64) + if !ok { + log.Error("Error loading minPowIdx, using default") + return DefaultMinimumPoW + } + return v } +// MinPowTolerance returns the value of minimum PoW which is tolerated for a limited +// time after PoW was changed. If sufficient time have elapsed or no change of PoW +// have ever occurred, the return value will be the same as return value of MinPow(). func (w *Whisper) MinPowTolerance() float64 { val, exist := w.settings.Load(minPowToleranceIdx) if !exist || val == nil { @@ -145,6 +154,10 @@ func (w *Whisper) MinPowTolerance() float64 { return val.(float64) } +// BloomFilter returns the aggregated bloom filter for all the topics of interest. +// The nodes are required to send only messages that match the advertised bloom filter. +// If a message does not match the bloom, it will tantamount to spam, and the peer will +// be disconnected. func (w *Whisper) BloomFilter() []byte { val, exist := w.settings.Load(bloomFilterIdx) if !exist || val == nil { @@ -153,6 +166,10 @@ func (w *Whisper) BloomFilter() []byte { return val.([]byte) } +// BloomFilterTolerance returns the bloom filter which is tolerated for a limited +// time after new bloom was advertised to the peers. If sufficient time have elapsed +// or no change of bloom filter have ever occurred, the return value will be the same +// as return value of BloomFilter(). func (w *Whisper) BloomFilterTolerance() []byte { val, exist := w.settings.Load(bloomFilterToleranceIdx) if !exist || val == nil { @@ -1035,7 +1052,7 @@ func bloomFilterMatch(filter, sample []byte) bool { for i := 0; i < bloomFilterSize; i++ { f := filter[i] s := sample[i] - if ((f | s) ^ f) != 0 { + if (f | s) != f { return false } } @@ -1050,3 +1067,12 @@ func addBloom(a, b []byte) []byte { } return c } + +func isBloomFilterEqual(a, b []byte) bool { + for i := 0; i < bloomFilterSize; i++ { + if a[i] != b[i] { + return false + } + } + return true +} From e33ef228f4571685eb5bae445c13b538ccde77ec Mon Sep 17 00:00:00 2001 From: Vlad Date: Tue, 9 Jan 2018 00:09:53 +0200 Subject: [PATCH 5/6] whisper: moved a constant --- whisper/whisperv6/doc.go | 2 ++ whisper/whisperv6/envelope.go | 1 - 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/whisper/whisperv6/doc.go b/whisper/whisperv6/doc.go index 040e1b0dfc01..da1b4ee5bae9 100644 --- a/whisper/whisperv6/doc.go +++ b/whisper/whisperv6/doc.go @@ -58,6 +58,8 @@ const ( keyIdSize = 32 // in bytes bloomFilterSize = 64 // in bytes + EnvelopeHeaderLength = 20 + MaxMessageSize = uint32(10 * 1024 * 1024) // maximum accepted size of a message. DefaultMaxMessageSize = uint32(1024 * 1024) DefaultMinimumPoW = 0.2 diff --git a/whisper/whisperv6/envelope.go b/whisper/whisperv6/envelope.go index 304608fdeb3c..9ed712b93446 100644 --- a/whisper/whisperv6/envelope.go +++ b/whisper/whisperv6/envelope.go @@ -51,7 +51,6 @@ type Envelope struct { // size returns the size of envelope as it is sent (i.e. public fields only) func (e *Envelope) size() int { - const EnvelopeHeaderLength = 20 return EnvelopeHeaderLength + len(e.Data) } From c27badee7ebb27f0298d7dc67f4f5b24806033c7 Mon Sep 17 00:00:00 2001 From: Vlad Date: Tue, 9 Jan 2018 13:39:36 +0200 Subject: [PATCH 6/6] whisper: minor refactoring --- whisper/whisperv6/api.go | 18 +++--------------- whisper/whisperv6/peer_test.go | 4 ++-- whisper/whisperv6/whisper.go | 9 --------- 3 files changed, 5 insertions(+), 26 deletions(-) diff --git a/whisper/whisperv6/api.go b/whisper/whisperv6/api.go index f11e8b6dca5c..0e8490b41956 100644 --- a/whisper/whisperv6/api.go +++ b/whisper/whisperv6/api.go @@ -113,29 +113,17 @@ func (api *PublicWhisperAPI) Info(ctx context.Context) Info { // SetMaxMessageSize sets the maximum message size that is accepted. // Upper limit is defined by MaxMessageSize. func (api *PublicWhisperAPI) SetMaxMessageSize(ctx context.Context, size uint32) (bool, error) { - err := api.w.SetMaxMessageSize(size) - if err != nil { - return false, err - } - return true, nil + return true, api.w.SetMaxMessageSize(size) } // SetMinPow sets the minimum PoW, and notifies the peers. func (api *PublicWhisperAPI) SetMinPoW(ctx context.Context, pow float64) (bool, error) { - err := api.w.SetMinimumPoW(pow) - if err != nil { - return false, err - } - return true, nil + return true, api.w.SetMinimumPoW(pow) } // SetBloomFilter sets the new value of bloom filter, and notifies the peers. func (api *PublicWhisperAPI) SetBloomFilter(ctx context.Context, bloom hexutil.Bytes) (bool, error) { - err := api.w.SetBloomFilter(bloom) - if err != nil { - return false, err - } - return true, nil + return true, api.w.SetBloomFilter(bloom) } // MarkTrustedPeer marks a peer trusted, which will allow it to send historic (expired) messages. diff --git a/whisper/whisperv6/peer_test.go b/whisper/whisperv6/peer_test.go index 9a1f9dc8e64f..8a65cb71432b 100644 --- a/whisper/whisperv6/peer_test.go +++ b/whisper/whisperv6/peer_test.go @@ -178,7 +178,7 @@ func initialize(t *testing.T) { node.shh = New(&DefaultConfig) node.shh.SetMinimumPoW(masterPow) node.shh.SetBloomFilter(b) - if !isBloomFilterEqual(node.shh.BloomFilter(), masterBloomFilter) { + if !bytes.Equal(node.shh.BloomFilter(), masterBloomFilter) { t.Fatalf("bloom mismatch on init.") } node.shh.Start(nil) @@ -421,7 +421,7 @@ func checkPowExchange(t *testing.T) { func checkBloomFilterExchange(t *testing.T) { for i, node := range nodes { for peer := range node.shh.peers { - if !isBloomFilterEqual(peer.bloomFilter, masterBloomFilter) { + if !bytes.Equal(peer.bloomFilter, masterBloomFilter) { t.Fatalf("node %d: failed to exchange bloom filter requirement in round %d. \n%x expected \n%x got", i, round, masterBloomFilter, peer.bloomFilter) } diff --git a/whisper/whisperv6/whisper.go b/whisper/whisperv6/whisper.go index e4fb273005a6..bc89aadccd9f 100644 --- a/whisper/whisperv6/whisper.go +++ b/whisper/whisperv6/whisper.go @@ -1067,12 +1067,3 @@ func addBloom(a, b []byte) []byte { } return c } - -func isBloomFilterEqual(a, b []byte) bool { - for i := 0; i < bloomFilterSize; i++ { - if a[i] != b[i] { - return false - } - } - return true -}