Skip to content

Commit

Permalink
Merge pull request ethereum#68 from ethersphere/swarm-pss
Browse files Browse the repository at this point in the history
swarm, swarm/network, cmd/swarm: pss included in swarm, binary
  • Loading branch information
zelig authored Apr 29, 2017
2 parents f1389e7 + 128faf2 commit ac88a5a
Show file tree
Hide file tree
Showing 16 changed files with 1,629 additions and 230 deletions.
10 changes: 8 additions & 2 deletions cmd/swarm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ var (
SwarmUploadMimeType = cli.StringFlag{
Name: "mime",
Usage: "force mime type",
PssEnabledFlag = cli.BoolFlag{
Name: "pss",
Usage: "Enable pss (message passing over swarm)",
}
CorsStringFlag = cli.StringFlag{
Name: "corsdomain",
Expand Down Expand Up @@ -260,6 +263,8 @@ Cleans database of corrupted entries.
SwarmUploadDefaultPath,
SwarmUpFromStdinFlag,
SwarmUploadMimeType,
// pss flags
PssEnabledFlag,
}
app.Flags = append(app.Flags, debug.Flags...)
app.Before = func(ctx *cli.Context) error {
Expand Down Expand Up @@ -347,7 +352,8 @@ func registerBzzService(ctx *cli.Context, stack *node.Node) {
}
swapEnabled := ctx.GlobalBool(SwarmSwapEnabledFlag.Name)
syncEnabled := ctx.GlobalBoolT(SwarmSyncEnabledFlag.Name)

pssEnabled := ctx.GlobalBool(PssEnabledFlag.Name)

ethapi := ctx.GlobalString(EthAPIFlag.Name)
cors := ctx.GlobalString(CorsStringFlag.Name)

Expand All @@ -361,7 +367,7 @@ func registerBzzService(ctx *cli.Context, stack *node.Node) {
} else {
swapEnabled = false
}
return swarm.NewSwarm(ctx, client, bzzconfig, swapEnabled, syncEnabled, cors)
return swarm.NewSwarm(ctx, client, bzzconfig, swapEnabled, syncEnabled, cors, pssEnabled)
}
if err := stack.Register(boot); err != nil {
utils.Fatalf("Failed to register the Swarm service: %v", err)
Expand Down
15 changes: 15 additions & 0 deletions common/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ import (
"time"
)

const (
LabelLength = 8
)

// PrettyDuration is a pretty printed version of a time.Duration value that cuts
// the unnecessary precision off from the formatted textual representation.
type PrettyDuration time.Duration
Expand All @@ -38,3 +42,14 @@ func (d PrettyDuration) String() string {
}
return label
}

// useful for segfault safe reduction of log clutter
func ByteLabel(addr []byte) (b [LabelLength]byte) {
//b = make([LabelLength]byte)
if len(addr) < LabelLength {
copy(b[LabelLength - len(addr) - 1:len(addr)], addr[:])
} else {
copy(b[:], addr[:LabelLength])
}
return b
}
27 changes: 11 additions & 16 deletions p2p/protocols/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ package protocols
import (
"fmt"
"reflect"
"time"

"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
Expand Down Expand Up @@ -122,6 +121,15 @@ type CodeMap struct {
messages map[reflect.Type]uint64 // index of types to codes, for sending by type
}

func (self *CodeMap) GetInterface(code uint64) (interface{}, bool) {
if int(code) > len(self.codes)-1 {
return nil, false
}
typ := self.codes[code]
val := reflect.New(typ)
return val.Interface(), true
}

func (self *CodeMap) GetCode(msg interface{}) (uint64, bool) {
code, found := self.messages[reflect.TypeOf(msg)]
return code, found
Expand Down Expand Up @@ -268,21 +276,8 @@ func (self *Peer) Send(msg interface{}) error {
return errorf(ErrInvalidMsgType, "%v", code)
}
log.Trace(fmt.Sprintf("=> msg #%d TO %v : %v", code, self.ID(), msg))
go func() {
self.wErrc <- p2p.Send(self.rw, uint64(code), msg)
}()
var err error
select {
case err = <-self.wErrc:
if err == nil {
return nil
}
case <-time.NewTimer(3000 * time.Millisecond).C:
err = fmt.Errorf("write timeout")
}
err = errorf(ErrWrite, "(msg code: %v): %v", code, err)
self.Drop(err)
return err
go p2p.Send(self.rw, uint64(code), msg)
return nil
}

func (self *Peer) DisconnectHook(f func(error)) {
Expand Down
44 changes: 42 additions & 2 deletions p2p/testing/protocolsession.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package testing

import (
"fmt"
"regexp"
"strconv"
"strings"
"sync"
"time"

Expand All @@ -13,6 +16,7 @@ import (
type ProtocolSession struct {
TestNodeAdapter
Ids []*adapters.NodeId
ignore []uint64
}

type TestMessenger interface {
Expand All @@ -21,6 +25,7 @@ type TestMessenger interface {
}

type TestNodeAdapter interface {
p2p.Server
GetPeer(id *adapters.NodeId) *adapters.Peer
}

Expand Down Expand Up @@ -64,6 +69,10 @@ func NewProtocolSession(na TestNodeAdapter, ids []*adapters.NodeId) *ProtocolSes
return ps
}

func (self *ProtocolSession) SetIgnoreCodes(ignore ...uint64) {
self.ignore = ignore
}

// trigger sends messages from peers
func (self *ProtocolSession) trigger(trig Trigger) error {
peer := self.GetPeer(trig.Peer)
Expand Down Expand Up @@ -109,8 +118,39 @@ func (self *ProtocolSession) expect(exp Expect) error {

errc := make(chan error)
go func() {
log.Trace(fmt.Sprintf("waiting for msg, %v", exp.Msg))
errc <- p2p.ExpectMsg(peer, exp.Code, exp.Msg)
var err error
ignored := true
log.Trace("waiting for msg", "code", exp.Code, "msg", exp.Msg)
for ignored {
ignored = false
err = p2p.ExpectMsg(peer, exp.Code, exp.Msg)
// frail, but we can't know what code expectmsg got otherwise
// can we do better error reporting in p2p.ExpectMsg()?
if err != nil {
if strings.Contains(err.Error(), "code") {
re, _ := regexp.Compile("got ([0-9]+),")
match := re.FindStringSubmatch(err.Error())
if len(match) > 1 {
for _, codetoignore := range self.ignore {
codewegot, err := strconv.ParseUint(match[1], 10, 64)
if err == nil {
if codetoignore == codewegot {
ignored = true
log.Trace("ignore msg with wrong code", "received", codewegot, "expected", exp.Code)
break
}
} else {
log.Warn("expectmsg errormsg parse error?!")
}
}
} else {
log.Warn("expectmsg errormsg parse error?!")
break
}
}
}
}
errc <- err
}()

t := exp.Timeout
Expand Down
8 changes: 5 additions & 3 deletions swarm/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,14 @@ type Config struct {
*storage.ChunkerParams
*network.HiveParams
Swap *swap.SwapParams
*network.SyncParams
//*network.SyncParams
Path string
Port string
PublicKey string
BzzKey string
EnsRoot common.Address
NetworkId uint64
*network.PssParams
}

// config is agnostic to where private key is coming from
Expand All @@ -72,10 +73,11 @@ func NewConfig(path string, contract common.Address, prvKey *ecdsa.PrivateKey, n
keyhex := crypto.Keccak256Hash(pubkey).Hex()

self = &Config{
SyncParams: network.NewSyncParams(dirpath),
HiveParams: network.NewHiveParams(dirpath),
//SyncParams: network.NewSyncParams(dirpath),
HiveParams: network.NewHiveParams(),
ChunkerParams: storage.NewChunkerParams(),
StoreParams: storage.NewStoreParams(dirpath),
PssParams: network.NewPssParams(),
Port: port,
Path: dirpath,
Swap: swap.DefaultSwapParams(contract, prvKey),
Expand Down
12 changes: 0 additions & 12 deletions swarm/api/testapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,6 @@ func NewControl(api *Api, hive *network.Hive) *Control {
return &Control{api, hive}
}

func (self *Control) BlockNetworkRead(on bool) {
self.hive.BlockNetworkRead(on)
}

func (self *Control) SyncEnabled(on bool) {
self.hive.SyncEnabled(on)
}

func (self *Control) SwapEnabled(on bool) {
self.hive.SwapEnabled(on)
}

func (self *Control) Hive() string {
return self.hive.String()
}
54 changes: 54 additions & 0 deletions swarm/api/ws/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package ws

import (
"net"

"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/rpc"
)

// Server is the basic configuration needs for the HTTP server and also
// includes CORS settings.
type Server struct {
//Addr string
CorsString string
Endpoint string
}

// startWS initializes and starts the websocket RPC endpoint.
func StartWSServer(apis []rpc.API, server *Server) error {

// Generate the whitelist based on the allowed modules
/*whitelist := make(map[string]bool)
for _, module := range modules {
whitelist[module] = true
}*/
// Register all the APIs exposed by the services
handler := rpc.NewServer()
for _, api := range apis {
//if whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) {
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
return err
}
glog.V(logger.Debug).Infof("WebSocket registered %T under '%s'", api.Service, api.Namespace)
//}
}
// All APIs registered, start the HTTP listener
var (
listener net.Listener
err error
)
if listener, err = net.Listen("tcp", server.Endpoint); err != nil {
return err
}
rpc.NewWSServer(server.CorsString, handler).Serve(listener)
glog.V(logger.Info).Infof("WebSocket endpoint opened: ws://%s", server.Endpoint)

// All listeners booted successfully
//n.wsEndpoint = endpoint
//n.wsListener = listener
//n.wsHandler = handler

return nil
}
62 changes: 62 additions & 0 deletions swarm/api/ws/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package ws

import (
"context"
"testing"
"time"

"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/rpc"
)

func init() {
glog.SetV(logger.Detail)
glog.SetToStderr(true)
}

type TestResult struct {
Foo string `json:"foo"`
}

func TestStartWSServer(t *testing.T) {
ep := "localhost:8099"
server := &Server{
Endpoint: ep,
CorsString: "*",
}
apis := []rpc.API{
{
Namespace: "pss",
Version: "0.1",
Service: makeFakeAPIHandler(),
Public: true,
},
}
go func() {
err := StartWSServer(apis, server)
t.Logf("wsserver exited: %v", err)
}()

time.Sleep(time.Second)

client, err := rpc.DialWebsocket(context.Background(), "ws://" + ep, "ws://localhost")
if err != nil {
t.Fatalf("could not connect: %v", err)
} else {
t.Logf("client: %v", client)
client.Call(&TestResult{}, "pss_test")
}

}

func makeFakeAPIHandler() *FakeAPIHandler {
return &FakeAPIHandler{}
}

type FakeAPIHandler struct {
}

func (self *FakeAPIHandler) Test() {
glog.V(logger.Detail).Infof("in fakehandler Test()")
}
Loading

0 comments on commit ac88a5a

Please sign in to comment.