Skip to content

Commit

Permalink
Merge pull request ethereum#92 from ethersphere/swarm-pss-rpcclient
Browse files Browse the repository at this point in the history
swarm/pss: rpcclient + pss test fixes
  • Loading branch information
zelig authored May 18, 2017
2 parents 63e84d4 + c15eba2 commit ebd6199
Show file tree
Hide file tree
Showing 58 changed files with 3,349 additions and 2,302 deletions.
2 changes: 1 addition & 1 deletion cmd/swarm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ func getPassPhrase(prompt string, i int, passwords []string) string {
return password
}

func injectBootnodes(srv p2p.Server, nodes []string) {
func injectBootnodes(srv *p2p.Server, nodes []string) {
for _, url := range nodes {
n, err := discover.ParseNode(url)
if err != nil {
Expand Down
26 changes: 14 additions & 12 deletions cmd/wnode/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ const quitCommand = "~Q"

// singletons
var (
server p2p.Server
server *p2p.Server
shh *whisper.Whisper
done chan struct{}
mailServer mailserver.WMailServer
Expand Down Expand Up @@ -253,17 +253,19 @@ func initialize() {
maxPeers = 800
}

server = p2p.NewServer(p2p.Config{
PrivateKey: nodeid,
MaxPeers: maxPeers,
Name: common.MakeName("wnode", "5.0"),
Protocols: shh.Protocols(),
ListenAddr: *argIP,
NAT: nat.Any(),
BootstrapNodes: peers,
StaticNodes: peers,
TrustedNodes: peers,
})
server = &p2p.Server{
Config: p2p.Config{
PrivateKey: nodeid,
MaxPeers: maxPeers,
Name: common.MakeName("wnode", "5.0"),
Protocols: shh.Protocols(),
ListenAddr: *argIP,
NAT: nat.Any(),
BootstrapNodes: peers,
StaticNodes: peers,
TrustedNodes: peers,
},
}
}

func startServer() {
Expand Down
2 changes: 1 addition & 1 deletion contracts/release/release.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (r *ReleaseService) Protocols() []p2p.Protocol { return nil }
func (r *ReleaseService) APIs() []rpc.API { return nil }

// Start spawns the periodic version checker goroutine
func (r *ReleaseService) Start(server p2p.Server) error {
func (r *ReleaseService) Start(server *p2p.Server) error {
go r.checker()
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import (
)

type LesServer interface {
Start(srvr p2p.Server)
Start(srvr *p2p.Server)
Stop()
Protocols() []p2p.Protocol
}
Expand Down Expand Up @@ -362,7 +362,7 @@ func (s *Ethereum) Protocols() []p2p.Protocol {

// Start implements node.Service, starting all internal goroutines needed by the
// Ethereum protocol implementation.
func (s *Ethereum) Start(srvr p2p.Server) error {
func (s *Ethereum) Start(srvr *p2p.Server) error {
s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.NetVersion())

s.protocolManager.Start()
Expand Down
4 changes: 2 additions & 2 deletions ethstats/ethstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ const historyUpdateRange = 50
type Service struct {
stack *node.Node // Temporary workaround, remove when API finalized

server p2p.Server // Peer-to-peer server to retrieve networking infos
server *p2p.Server // Peer-to-peer server to retrieve networking infos
eth *eth.Ethereum // Full Ethereum service if monitoring a full node
les *les.LightEthereum // Light Ethereum service if monitoring a light node
engine consensus.Engine // Consensus engine to retrieve variadic block fields
Expand Down Expand Up @@ -101,7 +101,7 @@ func (s *Service) Protocols() []p2p.Protocol { return nil }
func (s *Service) APIs() []rpc.API { return nil }

// Start implements node.Service, starting up the monitoring and reporting daemon.
func (s *Service) Start(server p2p.Server) error {
func (s *Service) Start(server *p2p.Server) error {
s.server = server
go s.loop()

Expand Down
4 changes: 2 additions & 2 deletions internal/ethapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1434,12 +1434,12 @@ func (api *PrivateDebugAPI) SetHead(number hexutil.Uint64) {

// PublicNetAPI offers network related RPC methods
type PublicNetAPI struct {
net p2p.Server
net *p2p.Server
networkVersion uint64
}

// NewPublicNetAPI creates a new net API instance.
func NewPublicNetAPI(net p2p.Server, networkVersion uint64) *PublicNetAPI {
func NewPublicNetAPI(net *p2p.Server, networkVersion uint64) *PublicNetAPI {
return &PublicNetAPI{net, networkVersion}
}

Expand Down
2 changes: 1 addition & 1 deletion les/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (s *LightEthereum) Protocols() []p2p.Protocol {

// Start implements node.Service, starting all internal goroutines needed by the
// Ethereum protocol implementation.
func (s *LightEthereum) Start(srvr p2p.Server) error {
func (s *LightEthereum) Start(srvr *p2p.Server) error {
log.Warn("Light client mode is an experimental feature")
s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.networkId)
s.protocolManager.Start(srvr)
Expand Down
10 changes: 3 additions & 7 deletions les/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,6 @@ const (
disableClientRemovePeer = false
)

type discV5Server interface {
DiscV5() *discv5.Network
}

// errIncompatibleConfig is returned if the requested protocols and configs are
// not compatible (low protocol version restrictions and high requirements).
var errIncompatibleConfig = errors.New("incompatible configuration")
Expand Down Expand Up @@ -260,10 +256,10 @@ func (pm *ProtocolManager) removePeer(id string) {
}
}

func (pm *ProtocolManager) Start(srvr p2p.Server) {
func (pm *ProtocolManager) Start(srvr *p2p.Server) {
var topicDisc *discv5.Network
if v, ok := srvr.(discV5Server); ok {
topicDisc = v.DiscV5()
if srvr != nil {
topicDisc = srvr.DiscV5
}
lesTopic := discv5.Topic("LES@" + common.Bytes2Hex(pm.blockchain.Genesis().Hash().Bytes()[0:8]))
if pm.lightSync {
Expand Down
2 changes: 1 addition & 1 deletion les/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (s *LesServer) Protocols() []p2p.Protocol {
}

// Start starts the LES server
func (s *LesServer) Start(srvr p2p.Server) {
func (s *LesServer) Start(srvr *p2p.Server) {
s.protocolManager.Start(srvr)
}

Expand Down
8 changes: 4 additions & 4 deletions les/serverpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ const (
type serverPool struct {
db ethdb.Database
dbKey []byte
server p2p.Server
server *p2p.Server
quit chan struct{}
wg *sync.WaitGroup
connWg sync.WaitGroup
Expand All @@ -118,7 +118,7 @@ type serverPool struct {
}

// newServerPool creates a new serverPool instance
func newServerPool(db ethdb.Database, dbPrefix []byte, server p2p.Server, topic discv5.Topic, quit chan struct{}, wg *sync.WaitGroup) *serverPool {
func newServerPool(db ethdb.Database, dbPrefix []byte, server *p2p.Server, topic discv5.Topic, quit chan struct{}, wg *sync.WaitGroup) *serverPool {
pool := &serverPool{
db: db,
dbKey: append(dbPrefix, []byte(topic)...),
Expand All @@ -139,11 +139,11 @@ func newServerPool(db ethdb.Database, dbPrefix []byte, server p2p.Server, topic
pool.loadNodes()
pool.checkDial()

if srv, ok := pool.server.(discV5Server); ok && srv.DiscV5() != nil {
if pool.server.DiscV5 != nil {
pool.discSetPeriod = make(chan time.Duration, 1)
pool.discNodes = make(chan *discv5.Node, 100)
pool.discLookups = make(chan bool, 100)
go srv.DiscV5().SearchTopic(topic, pool.discSetPeriod, pool.discNodes, pool.discLookups)
go pool.server.DiscV5.SearchTopic(topic, pool.discSetPeriod, pool.discNodes, pool.discLookups)
}

go pool.eventLoop()
Expand Down
2 changes: 1 addition & 1 deletion node/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ func NewPublicWeb3API(stack *Node) *PublicWeb3API {

// ClientVersion returns the node name
func (s *PublicWeb3API) ClientVersion() string {
return s.stack.serverConfig.Name
return s.stack.Server().Name
}

// Sha3 applies the ethereum sha3 implementation on the input.
Expand Down
10 changes: 5 additions & 5 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type Node struct {
instanceDirLock storage.Storage // prevents concurrent use of instance directory

serverConfig p2p.Config
server p2p.Server // Currently running P2P networking layer
server *p2p.Server // Currently running P2P networking layer

serviceFuncs []ServiceConstructor // Service constructors (in dependency order)
services map[reflect.Type]Service // Currently running services
Expand Down Expand Up @@ -165,6 +165,8 @@ func (n *Node) Start() error {
if n.serverConfig.NodeDatabase == "" {
n.serverConfig.NodeDatabase = n.config.NodeDB()
}
running := &p2p.Server{Config: n.serverConfig}
log.Info("Starting peer-to-peer node", "instance", n.serverConfig.Name)

// Otherwise copy and specialize the P2P configuration
services := make(map[reflect.Type]Service)
Expand Down Expand Up @@ -192,10 +194,8 @@ func (n *Node) Start() error {
}
// Gather the protocols and start the freshly assembled P2P server
for _, service := range services {
n.serverConfig.Protocols = append(n.serverConfig.Protocols, service.Protocols()...)
running.Protocols = append(running.Protocols, service.Protocols()...)
}
running := p2p.NewServer(n.serverConfig)
log.Info("Starting peer-to-peer node", "instance", n.serverConfig.Name)
if err := running.Start(); err != nil {
if errno, ok := err.(syscall.Errno); ok && datadirInUseErrnos[uint(errno)] {
return ErrDatadirUsed
Expand Down Expand Up @@ -582,7 +582,7 @@ func (n *Node) RPCHandler() (*rpc.Server, error) {
// Server retrieves the currently running P2P network layer. This method is meant
// only to inspect fields of the currently running server, life cycle management
// should be left to this Node entity.
func (n *Node) Server() p2p.Server {
func (n *Node) Server() *p2p.Server {
n.lock.RLock()
defer n.lock.RUnlock()

Expand Down
2 changes: 1 addition & 1 deletion node/node_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type SampleService struct{}

func (s *SampleService) Protocols() []p2p.Protocol { return nil }
func (s *SampleService) APIs() []rpc.API { return nil }
func (s *SampleService) Start(p2p.Server) error { fmt.Println("Service starting..."); return nil }
func (s *SampleService) Start(*p2p.Server) error { fmt.Println("Service starting..."); return nil }
func (s *SampleService) Stop() error { fmt.Println("Service stopping..."); return nil }

func ExampleService() {
Expand Down
10 changes: 5 additions & 5 deletions node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func TestServiceLifeCycle(t *testing.T) {
id := id // Closure for the constructor
constructor := func(*ServiceContext) (Service, error) {
return &InstrumentedService{
startHook: func(p2p.Server) { started[id] = true },
startHook: func(*p2p.Server) { started[id] = true },
stopHook: func() { stopped[id] = true },
}, nil
}
Expand Down Expand Up @@ -200,7 +200,7 @@ func TestServiceRestarts(t *testing.T) {
running = false

return &InstrumentedService{
startHook: func(p2p.Server) {
startHook: func(*p2p.Server) {
if running {
panic("already running")
}
Expand Down Expand Up @@ -250,7 +250,7 @@ func TestServiceConstructionAbortion(t *testing.T) {
id := id // Closure for the constructor
constructor := func(*ServiceContext) (Service, error) {
return &InstrumentedService{
startHook: func(p2p.Server) { started[id] = true },
startHook: func(*p2p.Server) { started[id] = true },
}, nil
}
if err := stack.Register(maker(constructor)); err != nil {
Expand Down Expand Up @@ -299,7 +299,7 @@ func TestServiceStartupAbortion(t *testing.T) {
id := id // Closure for the constructor
constructor := func(*ServiceContext) (Service, error) {
return &InstrumentedService{
startHook: func(p2p.Server) { started[id] = true },
startHook: func(*p2p.Server) { started[id] = true },
stopHook: func() { stopped[id] = true },
}, nil
}
Expand Down Expand Up @@ -352,7 +352,7 @@ func TestServiceTerminationGuarantee(t *testing.T) {
id := id // Closure for the constructor
constructor := func(*ServiceContext) (Service, error) {
return &InstrumentedService{
startHook: func(p2p.Server) { started[id] = true },
startHook: func(*p2p.Server) { started[id] = true },
stopHook: func() { stopped[id] = true },
}, nil
}
Expand Down
2 changes: 1 addition & 1 deletion node/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ type Service interface {

// Start is called after all services have been constructed and the networking
// layer was also initialized to spawn any goroutines required by the service.
Start(server p2p.Server) error
Start(server *p2p.Server) error

// Stop terminates all goroutines belonging to the service, blocking until they
// are all terminated.
Expand Down
6 changes: 3 additions & 3 deletions node/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type NoopService struct{}

func (s *NoopService) Protocols() []p2p.Protocol { return nil }
func (s *NoopService) APIs() []rpc.API { return nil }
func (s *NoopService) Start(p2p.Server) error { return nil }
func (s *NoopService) Start(*p2p.Server) error { return nil }
func (s *NoopService) Stop() error { return nil }

func NewNoopService(*ServiceContext) (Service, error) { return new(NoopService), nil }
Expand All @@ -57,7 +57,7 @@ type InstrumentedService struct {
stop error

protocolsHook func()
startHook func(p2p.Server)
startHook func(*p2p.Server)
stopHook func()
}

Expand All @@ -74,7 +74,7 @@ func (s *InstrumentedService) APIs() []rpc.API {
return s.apis
}

func (s *InstrumentedService) Start(server p2p.Server) error {
func (s *InstrumentedService) Start(server *p2p.Server) error {
if s.startHook != nil {
s.startHook(server)
}
Expand Down
Loading

0 comments on commit ebd6199

Please sign in to comment.