Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

whisper: PoW requirement #15701

Merged
merged 12 commits into from
Dec 21, 2017
11 changes: 7 additions & 4 deletions whisper/whisperv6/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,13 @@ const (
ProtocolVersionStr = "6.0"
ProtocolName = "shh"

statusCode = 0 // used by whisper protocol
messagesCode = 1 // normal whisper message
p2pCode = 2 // peer-to-peer message (to be consumed by the peer, but not forwarded any further)
p2pRequestCode = 3 // peer-to-peer message, used by Dapp protocol
// whisper protocol message codes, according to EIP-627
statusCode = 0 // used by whisper protocol
messagesCode = 1 // normal whisper message
powRequirementCode = 2 // PoW requirement
bloomFilterExCode = 3 // bloom filter exchange
p2pRequestCode = 126 // peer-to-peer message, used by Dapp protocol
p2pMessageCode = 127 // peer-to-peer message (to be consumed by the peer, but not forwarded any further)
NumberOfMessageCodes = 128

paddingMask = byte(3)
Expand Down
31 changes: 20 additions & 11 deletions whisper/whisperv6/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package whisperv6

import (
"fmt"
"math"
"time"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -29,10 +30,12 @@ import (

// peer represents a whisper protocol peer connection.
type Peer struct {
host *Whisper
peer *p2p.Peer
ws p2p.MsgReadWriter
trusted bool
host *Whisper
peer *p2p.Peer
ws p2p.MsgReadWriter

trusted bool
powRequirement float64

known *set.Set // Messages already known by the peer to avoid wasting bandwidth

Expand All @@ -42,12 +45,13 @@ type Peer struct {
// newPeer creates a new whisper peer object, but does not run the handshake itself.
func newPeer(host *Whisper, remote *p2p.Peer, rw p2p.MsgReadWriter) *Peer {
return &Peer{
host: host,
peer: remote,
ws: rw,
trusted: false,
known: set.New(),
quit: make(chan struct{}),
host: host,
peer: remote,
ws: rw,
trusted: false,
powRequirement: 0.0,
known: set.New(),
quit: make(chan struct{}),
}
}

Expand Down Expand Up @@ -152,7 +156,7 @@ func (p *Peer) broadcast() error {
envelopes := p.host.Envelopes()
bundle := make([]*Envelope, 0, len(envelopes))
for _, envelope := range envelopes {
if !p.marked(envelope) {
if !p.marked(envelope) && envelope.PoW() >= p.powRequirement {
bundle = append(bundle, envelope)
}
}
Expand All @@ -177,3 +181,8 @@ func (p *Peer) ID() []byte {
id := p.peer.ID()
return id[:]
}

func (p *Peer) notifyAboutPowRequirementChange(pow float64) error {
i := math.Float64bits(pow)
return p2p.Send(p.ws, powRequirementCode, i)
}
97 changes: 78 additions & 19 deletions whisper/whisperv6/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,21 +88,31 @@ var sharedKey []byte = []byte("some arbitrary data here")
var sharedTopic TopicType = TopicType{0xF, 0x1, 0x2, 0}
var expectedMessage []byte = []byte("per rectum ad astra")

// This test does the following:
// 1. creates a chain of whisper nodes,
// 2. installs the filters with shared (predefined) parameters,
// 3. each node sends a number of random (undecryptable) messages,
// 4. first node sends one expected (decryptable) message,
// 5. checks if each node have received and decrypted exactly one message.
func TestSimulation(t *testing.T) {
// create a chain of whisper nodes,
// installs the filters with shared (predefined) parameters
initialize(t)

// each node sends a number of random (undecryptable) messages
for i := 0; i < NumNodes; i++ {
sendMsg(t, false, i)
}

// node #0 sends one expected (decryptable) message
sendMsg(t, true, 0)
checkPropagation(t)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should create two unit tests, both with a specific name. You want tests to be as specific as possible, to pinpoint the root cause quicker.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generally yes, but this case is special. this is already the worst whisper test (in terms of duration), because initialization takes too long. if i splitted the test, it would take double to run it. and it is already quite bad.


// check if each node have received and decrypted exactly one message
checkPropagation(t, true)

// send protocol-level messages (powRequirementCode) and check the new PoW requirement values
powReqExchange(t)

// node #1 sends one expected (decryptable) message
sendMsg(t, true, 1)

// check if each node (except node #0) have received and decrypted exactly one message
checkPropagation(t, false)

stopServers()
}

Expand All @@ -114,7 +124,7 @@ func initialize(t *testing.T) {
for i := 0; i < NumNodes; i++ {
var node TestNode
node.shh = New(&DefaultConfig)
node.shh.SetMinimumPoW(0.00000001)
node.shh.SetMinimumPowTest(0.00000001)
node.shh.Start(nil)
topics := make([]TopicType, 0)
topics = append(topics, sharedTopic)
Expand Down Expand Up @@ -154,13 +164,18 @@ func initialize(t *testing.T) {
},
}

err = node.server.Start()
if err != nil {
t.Fatalf("failed to start server %d.", i)
}

nodes[i] = &node
}

for i := 1; i < NumNodes; i++ {
go nodes[i].server.Start()
}

// we need to wait until the first node actually starts
err = nodes[0].server.Start()
if err != nil {
t.Fatalf("failed to start the fisrt server.")
}
}

func stopServers() {
Expand All @@ -174,18 +189,21 @@ func stopServers() {
}
}

func checkPropagation(t *testing.T) {
func checkPropagation(t *testing.T, includingNodeZero bool) {
if t.Failed() {
return
}

const cycle = 100
const iterations = 100
const cycle = 50
const iterations = 200

for j := 0; j < iterations; j++ {
time.Sleep(cycle * time.Millisecond)
first := 0
if !includingNodeZero {
first = 1
}

for i := 0; i < NumNodes; i++ {
for j := 0; j < iterations; j++ {
for i := first; i < NumNodes; i++ {
f := nodes[i].shh.GetFilter(nodes[i].filerId)
if f == nil {
t.Fatalf("failed to get filterId %s from node %d.", nodes[i].filerId, i)
Expand All @@ -200,9 +218,18 @@ func checkPropagation(t *testing.T) {
return
}
}

time.Sleep(cycle * time.Millisecond)
}

t.Fatalf("Test was not complete: timeout %d seconds.", iterations*cycle/1000)

if !includingNodeZero {
f := nodes[0].shh.GetFilter(nodes[0].filerId)
if f != nil {
t.Fatalf("node zero received a message with low PoW.")
}
}
}

func validateMail(t *testing.T, index int, mail []*ReceivedMessage) bool {
Expand Down Expand Up @@ -304,3 +331,35 @@ func TestPeerBasic(t *testing.T) {
t.Fatalf("failed mark with seed %d.", seed)
}
}

func powReqExchange(t *testing.T) {
for i, node := range nodes {
for peer := range node.shh.peers {
if peer.powRequirement > 1000.0 {
t.Fatalf("node %d: one of the peers' pow requirement is too big (%f).", i, peer.powRequirement)
}
}
}

const pow float64 = 7777777.0
nodes[0].shh.SetMinimumPoW(pow)

// wait until all the messages are delivered
time.Sleep(64 * time.Millisecond)

cnt := 0
for i, node := range nodes {
for peer := range node.shh.peers {
if peer.peer.ID() == discover.PubkeyID(&nodes[0].id.PublicKey) {
cnt++
if peer.powRequirement != pow {
t.Fatalf("node %d: failed to set the new pow requirement.", i)
}
}
}
}

if cnt == 0 {
t.Fatalf("no matching peers found.")
}
}
86 changes: 73 additions & 13 deletions whisper/whisperv6/whisper.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
crand "crypto/rand"
"crypto/sha256"
"fmt"
"math"
"runtime"
"sync"
"time"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
"github.com/syndtr/goleveldb/leveldb/errors"
"golang.org/x/crypto/pbkdf2"
Expand Down Expand Up @@ -74,6 +76,8 @@ type Whisper struct {

settings syncmap.Map // holds configuration settings that can be dynamically changed

reactionAllowance int // maximum time in seconds allowed to process the whisper-related messages

statsMu sync.Mutex // guard stats
stats Statistics // Statistics of whisper node

Expand All @@ -87,14 +91,15 @@ func New(cfg *Config) *Whisper {
}

whisper := &Whisper{
privateKeys: make(map[string]*ecdsa.PrivateKey),
symKeys: make(map[string][]byte),
envelopes: make(map[common.Hash]*Envelope),
expirations: make(map[uint32]*set.SetNonTS),
peers: make(map[*Peer]struct{}),
messageQueue: make(chan *Envelope, messageQueueLimit),
p2pMsgQueue: make(chan *Envelope, messageQueueLimit),
quit: make(chan struct{}),
privateKeys: make(map[string]*ecdsa.PrivateKey),
symKeys: make(map[string][]byte),
envelopes: make(map[common.Hash]*Envelope),
expirations: make(map[uint32]*set.SetNonTS),
peers: make(map[*Peer]struct{}),
messageQueue: make(chan *Envelope, messageQueueLimit),
p2pMsgQueue: make(chan *Envelope, messageQueueLimit),
quit: make(chan struct{}),
reactionAllowance: SynchAllowance,
}

whisper.filters = NewFilters(whisper)
Expand Down Expand Up @@ -177,13 +182,50 @@ func (w *Whisper) SetMaxMessageSize(size uint32) error {

// SetMinimumPoW sets the minimal PoW required by this node
func (w *Whisper) SetMinimumPoW(val float64) error {
if val <= 0.0 {
if val < 0.0 {
return fmt.Errorf("invalid PoW: %f", val)
}
w.settings.Store(minPowIdx, val)

w.notifyPeersAboutPowRequirementChange(val)

go func() {
// allow some time before all the peers have processed the notification
time.Sleep(time.Duration(w.reactionAllowance) * time.Second)
w.settings.Store(minPowIdx, val)
}()

return nil
}

// SetMinimumPoW sets the minimal PoW in test environment
func (w *Whisper) SetMinimumPowTest(val float64) {
w.notifyPeersAboutPowRequirementChange(val)
w.settings.Store(minPowIdx, val)
}

func (w *Whisper) notifyPeersAboutPowRequirementChange(pow float64) {
arr := make([]*Peer, len(w.peers))
i := 0

w.peerMu.Lock()
for p := range w.peers {
arr[i] = p
i++
}
w.peerMu.Unlock()

for _, p := range arr {
err := p.notifyAboutPowRequirementChange(pow)
if err != nil {
// allow one retry
err = p.notifyAboutPowRequirementChange(pow)
}
if err != nil {
log.Warn("oversized message received", "peer", p.ID(), "error", err)
}
}
}

// getPeer retrieves peer by ID
func (w *Whisper) getPeer(peerID []byte) (*Peer, error) {
w.peerMu.Lock()
Expand Down Expand Up @@ -233,7 +275,7 @@ func (w *Whisper) SendP2PMessage(peerID []byte, envelope *Envelope) error {

// SendP2PDirect sends a peer-to-peer message to a specific peer.
func (w *Whisper) SendP2PDirect(peer *Peer, envelope *Envelope) error {
return p2p.Send(peer.ws, p2pCode, envelope)
return p2p.Send(peer.ws, p2pMessageCode, envelope)
}

// NewKeyPair generates a new cryptographic identity for the client, and injects
Expand Down Expand Up @@ -536,7 +578,22 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
if trouble {
return errors.New("invalid envelope")
}
case p2pCode:
case powRequirementCode:
s := rlp.NewStream(packet.Payload, uint64(packet.Size))
i, err := s.Uint()
if err != nil {
log.Warn("failed to decode powRequirementCode message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
return errors.New("invalid powRequirementCode message")
}
f := math.Float64frombits(i)
if math.IsInf(f, 0) || math.IsNaN(f) || f < 0.0 {
log.Warn("invalid value in powRequirementCode message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
return errors.New("invalid value in powRequirementCode message")
}
p.powRequirement = f
case bloomFilterExCode:
// to be implemented
case p2pMessageCode:
// peer-to-peer message, sent directly to peer bypassing PoW checks, etc.
// this message is not supposed to be forwarded to other peers, and
// therefore might not satisfy the PoW, expiry and other requirements.
Expand Down Expand Up @@ -599,7 +656,10 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) {

if envelope.PoW() < wh.MinPow() {
log.Debug("envelope with low PoW dropped", "PoW", envelope.PoW(), "hash", envelope.Hash().Hex())
return false, nil // drop envelope without error
return false, nil // drop envelope without error for now

// once the status message includes the PoW requirement, an error should be returned here:
//return false, fmt.Errorf("envelope with low PoW dropped: PoW=%f, hash=[%v]", envelope.PoW(), envelope.Hash().Hex())
}

hash := envelope.Hash()
Expand Down
Loading