Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

whisper: PoW requirement #15701

Merged
merged 12 commits into from
Dec 21, 2017
2 changes: 1 addition & 1 deletion whisper/whisperv6/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (api *PublicWhisperAPI) SetMaxMessageSize(ctx context.Context, size uint32)

// SetMinPow sets the minimum PoW for a message before it is accepted.
func (api *PublicWhisperAPI) SetMinPoW(ctx context.Context, pow float64) (bool, error) {
return true, api.w.SetMinimumPoW(pow)
return true, api.w.SetMinimumPoW(pow, false)
}

// MarkTrustedPeer marks a peer trusted. , which will allow it to send historic (expired) messages.
Expand Down
11 changes: 7 additions & 4 deletions whisper/whisperv6/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,13 @@ const (
ProtocolVersionStr = "6.0"
ProtocolName = "shh"

statusCode = 0 // used by whisper protocol
messagesCode = 1 // normal whisper message
p2pCode = 2 // peer-to-peer message (to be consumed by the peer, but not forwarded any further)
p2pRequestCode = 3 // peer-to-peer message, used by Dapp protocol
// whisper protocol message codes, according to EIP-627
statusCode = 0 // used by whisper protocol
messagesCode = 1 // normal whisper message
powRequirementCode = 2 // PoW requirement
bloomFilterExCode = 3 // bloom filter exchange
p2pRequestCode = 126 // peer-to-peer message, used by Dapp protocol
p2pMessageCode = 127 // peer-to-peer message (to be consumed by the peer, but not forwarded any further)
NumberOfMessageCodes = 128

paddingMask = byte(3)
Expand Down
31 changes: 20 additions & 11 deletions whisper/whisperv6/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package whisperv6

import (
"fmt"
"math"
"time"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -29,10 +30,12 @@ import (

// peer represents a whisper protocol peer connection.
type Peer struct {
host *Whisper
peer *p2p.Peer
ws p2p.MsgReadWriter
trusted bool
host *Whisper
peer *p2p.Peer
ws p2p.MsgReadWriter

trusted bool
powRequirement float64

known *set.Set // Messages already known by the peer to avoid wasting bandwidth

Expand All @@ -42,12 +45,13 @@ type Peer struct {
// newPeer creates a new whisper peer object, but does not run the handshake itself.
func newPeer(host *Whisper, remote *p2p.Peer, rw p2p.MsgReadWriter) *Peer {
return &Peer{
host: host,
peer: remote,
ws: rw,
trusted: false,
known: set.New(),
quit: make(chan struct{}),
host: host,
peer: remote,
ws: rw,
trusted: false,
powRequirement: 0.0,
known: set.New(),
quit: make(chan struct{}),
}
}

Expand Down Expand Up @@ -152,7 +156,7 @@ func (p *Peer) broadcast() error {
var cnt int
envelopes := p.host.Envelopes()
for _, envelope := range envelopes {
if !p.marked(envelope) {
if !p.marked(envelope) && envelope.PoW() >= p.powRequirement {
err := p2p.Send(p.ws, messagesCode, envelope)
if err != nil {
return err
Expand All @@ -172,3 +176,8 @@ func (p *Peer) ID() []byte {
id := p.peer.ID()
return id[:]
}

func (p *Peer) notifyAboutPowRequirementChange(pow float64) error {
i := math.Float64bits(pow)
return p2p.Send(p.ws, powRequirementCode, i)
}
97 changes: 78 additions & 19 deletions whisper/whisperv6/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,21 +88,31 @@ var sharedKey []byte = []byte("some arbitrary data here")
var sharedTopic TopicType = TopicType{0xF, 0x1, 0x2, 0}
var expectedMessage []byte = []byte("per rectum ad astra")

// This test does the following:
// 1. creates a chain of whisper nodes,
// 2. installs the filters with shared (predefined) parameters,
// 3. each node sends a number of random (undecryptable) messages,
// 4. first node sends one expected (decryptable) message,
// 5. checks if each node have received and decrypted exactly one message.
func TestSimulation(t *testing.T) {
// create a chain of whisper nodes,
// installs the filters with shared (predefined) parameters
initialize(t)

// each node sends a number of random (undecryptable) messages
for i := 0; i < NumNodes; i++ {
sendMsg(t, false, i)
}

// node #0 sends one expected (decryptable) message
sendMsg(t, true, 0)
checkPropagation(t)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should create two unit tests, both with a specific name. You want tests to be as specific as possible, to pinpoint the root cause quicker.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generally yes, but this case is special. this is already the worst whisper test (in terms of duration), because initialization takes too long. if i splitted the test, it would take double to run it. and it is already quite bad.


// check if each node have received and decrypted exactly one message
checkPropagation(t, true)

// send protocol-level messages (powRequirementCode) and check the new PoW requirement values
powReqExchange(t)

// node #1 sends one expected (decryptable) message
sendMsg(t, true, 1)

// check if each node (except node #0) have received and decrypted exactly one message
checkPropagation(t, false)

stopServers()
}

Expand All @@ -114,7 +124,7 @@ func initialize(t *testing.T) {
for i := 0; i < NumNodes; i++ {
var node TestNode
node.shh = New(&DefaultConfig)
node.shh.SetMinimumPoW(0.00000001)
node.shh.SetMinimumPoW(0.00000001, true)
node.shh.Start(nil)
topics := make([]TopicType, 0)
topics = append(topics, sharedTopic)
Expand Down Expand Up @@ -154,13 +164,18 @@ func initialize(t *testing.T) {
},
}

err = node.server.Start()
if err != nil {
t.Fatalf("failed to start server %d.", i)
}

nodes[i] = &node
}

for i := 1; i < NumNodes; i++ {
go nodes[i].server.Start()
}

// we need to wait until the first node actually starts
err = nodes[0].server.Start()
if err != nil {
t.Fatalf("failed to start the fisrt server.")
}
}

func stopServers() {
Expand All @@ -174,18 +189,21 @@ func stopServers() {
}
}

func checkPropagation(t *testing.T) {
func checkPropagation(t *testing.T, includingNodeZero bool) {
if t.Failed() {
return
}

const cycle = 100
const iterations = 100
const cycle = 50
const iterations = 200

for j := 0; j < iterations; j++ {
time.Sleep(cycle * time.Millisecond)
first := 0
if !includingNodeZero {
first = 1
}

for i := 0; i < NumNodes; i++ {
for j := 0; j < iterations; j++ {
for i := first; i < NumNodes; i++ {
f := nodes[i].shh.GetFilter(nodes[i].filerId)
if f == nil {
t.Fatalf("failed to get filterId %s from node %d.", nodes[i].filerId, i)
Expand All @@ -200,9 +218,18 @@ func checkPropagation(t *testing.T) {
return
}
}

time.Sleep(cycle * time.Millisecond)
}

t.Fatalf("Test was not complete: timeout %d seconds.", iterations*cycle/1000)

if !includingNodeZero {
f := nodes[0].shh.GetFilter(nodes[0].filerId)
if f != nil {
t.Fatalf("node zero received a message with low PoW.")
}
}
}

func validateMail(t *testing.T, index int, mail []*ReceivedMessage) bool {
Expand Down Expand Up @@ -304,3 +331,35 @@ func TestPeerBasic(t *testing.T) {
t.Fatalf("failed mark with seed %d.", seed)
}
}

func powReqExchange(t *testing.T) {
for i, node := range nodes {
for peer, _ := range node.shh.peers {
if peer.powRequirement > 1000.0 {
t.Fatalf("node %d: one of the peers' pow requirement is too big (%f).", i, peer.powRequirement)
}
}
}

const pow float64 = 7777777.0
nodes[0].shh.SetMinimumPoW(pow, true)

// wait until all the messages are delivered
time.Sleep(64 * time.Millisecond)

cnt := 0
for i, node := range nodes {
for peer, _ := range node.shh.peers {
if peer.peer.ID() == discover.PubkeyID(&nodes[0].id.PublicKey) {
cnt++
if peer.powRequirement != pow {
t.Fatalf("node %d: failed to set the new pow requirement.", i)
}
}
}
}

if cnt == 0 {
t.Fatalf("no matching peers found.")
}
}
76 changes: 63 additions & 13 deletions whisper/whisperv6/whisper.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
crand "crypto/rand"
"crypto/sha256"
"fmt"
"math"
"runtime"
"sync"
"time"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
"github.com/syndtr/goleveldb/leveldb/errors"
"golang.org/x/crypto/pbkdf2"
Expand Down Expand Up @@ -74,6 +76,8 @@ type Whisper struct {

settings syncmap.Map // holds configuration settings that can be dynamically changed

reactionAllowance int // maximum time in seconds allowed to process the whisper-related messages

statsMu sync.Mutex // guard stats
stats Statistics // Statistics of whisper node

Expand All @@ -87,14 +91,15 @@ func New(cfg *Config) *Whisper {
}

whisper := &Whisper{
privateKeys: make(map[string]*ecdsa.PrivateKey),
symKeys: make(map[string][]byte),
envelopes: make(map[common.Hash]*Envelope),
expirations: make(map[uint32]*set.SetNonTS),
peers: make(map[*Peer]struct{}),
messageQueue: make(chan *Envelope, messageQueueLimit),
p2pMsgQueue: make(chan *Envelope, messageQueueLimit),
quit: make(chan struct{}),
privateKeys: make(map[string]*ecdsa.PrivateKey),
symKeys: make(map[string][]byte),
envelopes: make(map[common.Hash]*Envelope),
expirations: make(map[uint32]*set.SetNonTS),
peers: make(map[*Peer]struct{}),
messageQueue: make(chan *Envelope, messageQueueLimit),
p2pMsgQueue: make(chan *Envelope, messageQueueLimit),
quit: make(chan struct{}),
reactionAllowance: SynchAllowance,
}

whisper.filters = NewFilters(whisper)
Expand Down Expand Up @@ -176,14 +181,41 @@ func (w *Whisper) SetMaxMessageSize(size uint32) error {
}

// SetMinimumPoW sets the minimal PoW required by this node
func (w *Whisper) SetMinimumPoW(val float64) error {
func (w *Whisper) SetMinimumPoW(val float64, testMode bool) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would make more sense to Mock SetMinimumPoW during your tests instead of explicitly changing the function interface to specify if this is a test or not. Many things can slip through the cracks this way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean, i should create another function, e.g. SetMinimumPowTest?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, have a Whisper as an interface that implements SetMinimumPoW. Then, in tests, implement the Whisper interface as a WhisperMock structure and implement a different SetMinimumPoW for this. However, as I'm saying below, I think this is overkill when you can simply use the same function in both cases.

if val <= 0.0 {
return fmt.Errorf("invalid PoW: %f", val)
}
w.settings.Store(minPowIdx, val)

w.notifyPeersAboutPowRequirementChange(val)

if testMode {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why making such a big difference between the tests and the regular code? If the real function waits, the tests should too.

Copy link
Contributor Author

@gluk256 gluk256 Dec 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

real function waits for a reason, which is related to syncronization in real-life environment. in test environment it does not make much sense. but if this test will run longer than all other tests together, than it will be really painful to run the tests every time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand it might take longer to run the test, at the same time running the whole list of tests is really long because of swarm so I would not lose sleep over this. What you can also do, is configure the waiting time to be different in the case of a test environment, so the code remains the same but the wait time is shorter for tests. That would ensure that the code that is being tested is the same as that which runs in production.

I have been looking for a way to distinguish a normal build from a test build in the doc, haven't found anything so far. Anything else that the powers that be have decided you don't need?

If you manage to do that, then my recommendation in the previous comment doesn't need to happen.

w.settings.Store(minPowIdx, val)
} else {
go func() {
// // allow some time before all the peers have processed the notification
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merge error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment / uncomment error :)

time.Sleep(time.Duration(w.reactionAllowance) * time.Second)
w.settings.Store(minPowIdx, val)
}()
}

return nil
}

func (w *Whisper) notifyPeersAboutPowRequirementChange(pow float64) {
w.peerMu.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing network stuff in a critical section, that's a long waiting time while holdig a mutex. Could you do something lighter, e.g. starting a goroutine to do this? Or copy the list ? How contentious do we expect that list to be, and how much of a problem is it going to be if a peer is removed/added?

defer w.peerMu.Unlock()
for p := range w.peers {
err := p.notifyAboutPowRequirementChange(pow)
if err != nil {
// allow one retry
err = p.notifyAboutPowRequirementChange(pow)
}
if err != nil {
fmt.Errorf("Error sending PoW notification to peer [%x]: %s", p.ID(), err)
}
}
}

// getPeer retrieves peer by ID
func (w *Whisper) getPeer(peerID []byte) (*Peer, error) {
w.peerMu.Lock()
Expand Down Expand Up @@ -233,7 +265,7 @@ func (w *Whisper) SendP2PMessage(peerID []byte, envelope *Envelope) error {

// SendP2PDirect sends a peer-to-peer message to a specific peer.
func (w *Whisper) SendP2PDirect(peer *Peer, envelope *Envelope) error {
return p2p.Send(peer.ws, p2pCode, envelope)
return p2p.Send(peer.ws, p2pMessageCode, envelope)
}

// NewKeyPair generates a new cryptographic identity for the client, and injects
Expand Down Expand Up @@ -528,7 +560,22 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
if cached {
p.mark(&envelope)
}
case p2pCode:
case powRequirementCode:
s := rlp.NewStream(packet.Payload, uint64(packet.Size))
i, err := s.Uint()
if err != nil {
log.Warn("failed to decode powRequirementCode message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
return errors.New("invalid powRequirementCode message")
}
f := math.Float64frombits(i)
if math.IsInf(f, 0) || math.IsNaN(f) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And negative/zero values?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

zero allowed, but not negative

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, I misread what IsInf was doing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, no I didn't. You also need to make sure that the PoW is positive

log.Warn("invalid value in powRequirementCode message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
return errors.New("invalid value in powRequirementCode message")
}
p.powRequirement = float64(f)
case bloomFilterExCode:
// to be implemented
case p2pMessageCode:
// peer-to-peer message, sent directly to peer bypassing PoW checks, etc.
// this message is not supposed to be forwarded to other peers, and
// therefore might not satisfy the PoW, expiry and other requirements.
Expand Down Expand Up @@ -591,7 +638,10 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) {

if envelope.PoW() < wh.MinPow() {
log.Debug("envelope with low PoW dropped", "PoW", envelope.PoW(), "hash", envelope.Hash().Hex())
return false, nil // drop envelope without error
return false, nil // drop envelope without error for now

// after the Status message will include the PoW requirement, it should return an error here:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

once the status message includes the PoW requirement, an error should be returned here

//return false, fmt.Errorf("envelope with low PoW dropped: PoW=%f, hash=[%v]", envelope.PoW(), envelope.Hash().Hex())
}

hash := envelope.Hash()
Expand Down
Loading